matchmaker/preview/
previewer.rs

1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::watch::{Receiver, Sender, channel};
15use tokio::task::JoinHandle;
16
17use super::AppendOnly;
18use crate::config::PreviewerConfig;
19use crate::message::Event;
20use crate::preview::Preview;
21
22#[derive(Debug, Display, Clone)]
23pub enum PreviewMessage {
24    Run(String, EnvVars),
25    Set(Text<'static>),
26    Unset,
27    Stop,
28}
29
30#[derive(Debug)]
31pub struct Previewer {
32    rx: Receiver<PreviewMessage>,
33    lines: AppendOnly<Line<'static>>, // append-only buffer
34    string: Arc<Mutex<Option<Text<'static>>>>,
35    changed: Arc<AtomicBool>,
36
37    procs: Vec<Child>,
38    current: Option<(Child, JoinHandle<bool>)>,
39    pub config: PreviewerConfig,
40    event_controller_tx: Option<UnboundedSender<Event>>,
41}
42
43impl Previewer {
44    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
45        let (tx, rx) = channel(PreviewMessage::Stop);
46
47        let new = Self {
48            rx,
49            lines: AppendOnly::new(),
50            string: Default::default(),
51            changed: Default::default(),
52
53            procs: Vec::new(),
54            current: None,
55            config,
56            event_controller_tx: None,
57        };
58
59        (new, tx)
60    }
61
62    pub fn view(&self) -> Preview {
63        Preview::new(
64            self.lines.clone(),
65            self.string.clone(),
66            self.changed.clone(),
67        )
68    }
69
70    pub fn set_string(&self, s: Text<'static>) {
71        let mut guard = self.string.lock().unwrap();
72        *guard = Some(s);
73        self.changed.store(true, Ordering::Release);
74    }
75
76    pub fn clear_string(&self) {
77        let mut guard = self.string.lock().unwrap();
78        *guard = None;
79        self.changed.store(true, Ordering::Release);
80    }
81
82    pub fn has_string(&self) -> bool {
83        let guard = self.string.lock().unwrap();
84        guard.is_some()
85    }
86
87    pub async fn run(mut self) -> Result<(), Vec<Child>> {
88        while self.rx.changed().await.is_ok() {
89            if !self.procs.is_empty() {
90                debug!("procs: {:?}", self.procs);
91            }
92
93            {
94                let m = &*self.rx.borrow();
95                if let PreviewMessage::Set(s) = m {
96                    self.set_string(s.clone());
97                    // don't kill the underlying
98                    continue;
99                } else if matches!(m, PreviewMessage::Unset) {
100                    self.clear_string();
101                    continue;
102                }
103            }
104
105            self.dispatch_kill();
106            self.clear_string();
107            self.lines.clear();
108
109            match &*self.rx.borrow() {
110                PreviewMessage::Run(cmd, variables) => {
111                    if let Some(mut child) = Command::from_script(cmd)
112                        .envs(variables.iter().cloned())
113                        .stdout(Stdio::piped())
114                        .stdin(Stdio::null())
115                        .stderr(Stdio::null())
116                        .detach()
117                        ._spawn()
118                    {
119                        if let Some(stdout) = child.stdout.take() {
120                            self.changed.store(true, Ordering::Relaxed);
121                            let lines = self.lines.clone();
122                            let cmd = cmd.clone();
123                            let handle = tokio::spawn(async move {
124                                let mut reader = BufReader::new(stdout);
125                                let mut leftover = Vec::new();
126                                let mut buf = [0u8; 8192];
127
128                                // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
129                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
130                                    if n == 0 {
131                                        break;
132                                    }
133
134                                    leftover.extend_from_slice(&buf[..n]);
135
136                                    let valid_up_to = match std::str::from_utf8(&leftover) {
137                                        Ok(_) => leftover.len(),
138                                        Err(e) => e.valid_up_to(),
139                                    };
140
141                                    let split_at = leftover[..valid_up_to]
142                                        .iter()
143                                        .rposition(|&b| b == b'\n' || b == b'\r')
144                                        .map(|pos| pos + 1)
145                                        .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
146
147                                    let (valid_bytes, rest) = leftover.split_at(split_at);
148
149                                    match valid_bytes.into_text() {
150                                        Ok(text) => {
151                                            for line in text {
152                                                lines.push(line);
153                                            }
154                                        }
155                                        Err(e) => {
156                                            if self.config.try_lossy {
157                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
158                                                    let line =
159                                                        String::from_utf8_lossy(bytes).into_owned();
160                                                    lines.push(Line::from(line));
161                                                }
162                                            } else {
163                                                error!("Error displaying {cmd}: {:?}", e);
164                                                return false;
165                                            }
166                                        }
167                                    }
168
169                                    leftover = rest.to_vec();
170                                }
171
172                                // handle any remaining bytes
173                                if !leftover.is_empty() {
174                                    match leftover.into_text() {
175                                        Ok(text) => {
176                                            for line in text {
177                                                lines.push(line);
178                                            }
179                                        }
180                                        Err(e) => {
181                                            if self.config.try_lossy {
182                                                for bytes in leftover.split(|b| *b == b'\n') {
183                                                    let line =
184                                                        String::from_utf8_lossy(bytes).into_owned();
185                                                    lines.push(Line::from(line));
186                                                }
187                                            } else {
188                                                error!("Error displaying {cmd}: {:?}", e);
189                                                return false;
190                                            }
191                                        }
192                                    }
193                                }
194                                true
195                            });
196                            self.current = Some((child, handle))
197                        } else {
198                            error!("Failed to get stdout of preview command: {cmd}")
199                        }
200                    }
201                }
202                PreviewMessage::Stop => {}
203                _ => unreachable!(),
204            }
205
206            self.prune_procs();
207        }
208
209        let ret = self.cleanup_procs();
210        if ret.is_empty() { Ok(()) } else { Err(ret) }
211    }
212
213    fn dispatch_kill(&mut self) {
214        if let Some((mut child, old)) = self.current.take() {
215            let _ = child.kill();
216            self.procs.push(child);
217            let mut old = Box::pin(old); // pin it to heap
218
219            match old.as_mut().now_or_never() {
220                Some(Ok(result)) => {
221                    if !result && let Some(ref tx) = self.event_controller_tx {
222                        let _ = tx.send(Event::Refresh);
223                    }
224                }
225                None => {
226                    old.abort(); // still works because `AbortHandle` is separate
227                }
228                _ => {}
229            }
230        }
231    }
232
233    pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
234        self.event_controller_tx = Some(event_controller_tx)
235    }
236
237    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
238    // also, maybe don't want this delaying exit?
239    fn cleanup_procs(mut self) -> Vec<Child> {
240        let total_timeout = Duration::from_secs(1);
241        let start = Instant::now();
242
243        self.procs.retain_mut(|child| {
244            loop {
245                match child.try_wait() {
246                    Ok(Some(_)) => return false,
247                    Ok(None) => {
248                        if start.elapsed() >= total_timeout {
249                            error!("Child failed to exit in time: {:?}", child);
250                            return true;
251                        } else {
252                            thread::sleep(Duration::from_millis(10));
253                        }
254                    }
255                    Err(e) => {
256                        error!("Error waiting on child: {e}");
257                        return true;
258                    }
259                }
260            }
261        });
262
263        self.procs
264    }
265
266    fn prune_procs(&mut self) {
267        self.procs.retain_mut(|child| match child.try_wait() {
268            Ok(None) => true,
269            Ok(Some(_)) => false,
270            Err(e) => {
271                warn!("Error waiting on child: {e}");
272                true
273            }
274        });
275    }
276}
277
278// ---------- NON ANSI VARIANT
279// let reader = BufReader::new(stdout);
280// if self.config.try_lossy {
281// for line_result in reader.split(b'\n') {
282//     match line_result {
283//         Ok(bytes) => {
284//             let line =
285//             String::from_utf8_lossy(&bytes).into_owned();
286//             lines.push(Line::from(line));
287//         }
288//         Err(e) => error!("Failed to read line: {:?}", e),
289//     }
290// }
291// } else {
292//     for line_result in reader.lines() {
293//         match line_result {
294//             Ok(line) => lines.push(Line::from(line)),
295//             Err(e) => {
296//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
297//                 error!("Error displaying {cmd}: {:?}", e);
298//                 break;
299//             }
300//         }
301//     }
302// }
303
304// trait Resettable: Default {
305//     fn reset(&mut self) {}
306// }
307// impl<T> Resettable for AppendOnly<T> {
308//     fn reset(&mut self) {
309//         self.clear();
310//     }
311// }
312
313// use std::ops::{Deref, DerefMut};
314
315// #[derive(Debug)]
316// struct Queue<V: Resettable> {
317//     entries: Vec<(String, V)>,
318//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
319// }
320
321// impl<V: Resettable> Queue<V> {
322//     pub fn new(len: usize) -> Self {
323//         Self {
324//             entries: (0..len)
325//             .map(|_| (String::default(), V::default()))
326//             .collect(),
327//             order: vec![len; len],
328//         }
329//     }
330
331//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
332//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
333//             if order_idx == self.entries.len() {
334//                 return None
335//             }
336//             if self.entries[entries_idx].0 == key {
337//                 return Some((order_idx, entries_idx));
338//             }
339//         }
340//         None
341//     }
342
343//     /// Try to get a key; if found, move it to the top.
344//     /// If not found, replace the oldest, clear its vec, set new key.
345//     pub fn try_get(&mut self, key: &str) -> bool {
346//         let n = self.entries.len();
347
348//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
349//             self.order.copy_within(0..order_idx, 1);
350//             self.order[0] = idx;
351//             true
352//         } else {
353//             let order_idx = (0..n)
354//             .rfind(|&i| self.order[i] < n)
355//             .map(|i| i + 1)
356//             .unwrap_or(0);
357
358//             let idx = if self.order[order_idx] < self.entries.len() {
359//                 order_idx
360//             } else {
361//                 *self.order.last().unwrap()
362//             };
363
364//             // shift and insert at front
365//             self.order.copy_within(0..order_idx, 1);
366//             self.order[0] = idx;
367
368//             // reset and assign new key
369//             let (ref mut k, ref mut v) = self.entries[idx];
370//             *k = key.to_owned();
371//             v.reset();
372
373//             false
374//         }
375//     }
376// }
377
378// impl<V: Resettable> Deref for Queue<V> {
379//     type Target = V;
380//     fn deref(&self) -> &Self::Target {
381//         &self.entries[self.order[0]].1
382//     }
383// }
384
385// impl<V: Resettable> DerefMut for Queue<V> {
386//     fn deref_mut(&mut self) -> &mut Self::Target {
387//         &mut self.entries[self.order[0]].1
388//     }
389// }
390
391// impl<V: Resettable> Default for Queue<V> {
392//     fn default() -> Self {
393//         Self::new(1)
394//     }
395// }