Skip to main content

matchmaker/preview/
previewer.rs

1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::watch::{Receiver, Sender, channel};
13use tokio::task::JoinHandle;
14
15use super::AppendOnly;
16use crate::config::PreviewerConfig;
17use crate::event::EventSender;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23    Run(String, EnvVars),
24    Set(Text<'static>),
25    Unset,
26    Stop,
27}
28
29#[derive(Debug)]
30pub struct Previewer {
31    /// The reciever for for [`PreviewMessage`]'s.
32    rx: Receiver<PreviewMessage>,
33    /// storage for preview command output
34    lines: AppendOnly<Line<'static>>,
35    /// storage for preview string override
36    string: Arc<Mutex<Option<Text<'static>>>>,
37    /// Flag which is set to true whenever the state changes
38    /// and which the viewer can toggle after receiving the current state
39    changed: Arc<AtomicBool>,
40
41    /// Incremented on every new run / kill to invalidate old feeders
42    version: Arc<AtomicUsize>,
43    /// Maintain a queue of child processes to improve cleanup reliability
44    procs: Vec<Child>,
45    /// The currently executing child process
46    current: Option<(Child, JoinHandle<bool>)>,
47    pub config: PreviewerConfig,
48    /// Event loop controller
49    // We only use it to send [`ControlEvent::Event`]
50    event_controller_tx: Option<EventSender>,
51}
52
53impl Previewer {
54    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
55        let (tx, rx) = channel(PreviewMessage::Stop);
56
57        let new = Self {
58            rx,
59            lines: AppendOnly::new(),
60            string: Default::default(),
61            changed: Default::default(),
62            version: Arc::new(AtomicUsize::new(0)),
63
64            procs: Vec::new(),
65            current: None,
66            config,
67            event_controller_tx: None,
68        };
69
70        (new, tx)
71    }
72
73    pub fn view(&self) -> Preview {
74        Preview::new(
75            self.lines.clone(),
76            self.string.clone(),
77            self.changed.clone(),
78        )
79    }
80
81    pub fn set_string(&self, s: Text<'static>) {
82        if let Ok(mut guard) = self.string.lock() {
83            *guard = Some(s);
84            self.changed.store(true, Ordering::Release);
85        }
86    }
87
88    pub fn clear_string(&self) {
89        if let Ok(mut guard) = self.string.lock() {
90            *guard = None;
91            self.changed.store(true, Ordering::Release);
92        }
93    }
94
95    pub fn has_string(&self) -> bool {
96        let guard = self.string.lock();
97        guard.is_ok_and(|s| s.is_some())
98    }
99
100    pub async fn run(mut self) -> Result<(), Vec<Child>> {
101        while self.rx.changed().await.is_ok() {
102            if !self.procs.is_empty() {
103                debug!("procs: {:?}", self.procs);
104            }
105
106            {
107                let m = &*self.rx.borrow();
108                if let PreviewMessage::Set(s) = m {
109                    self.set_string(s.clone());
110                    // don't kill the underlying
111                    continue;
112                } else if matches!(m, PreviewMessage::Unset) {
113                    self.clear_string();
114                    continue;
115                }
116            }
117
118            self.dispatch_kill();
119            self.clear_string();
120            self.lines.clear();
121
122            match &*self.rx.borrow() {
123                PreviewMessage::Run(cmd, variables) => {
124                    if let Some(mut child) = Command::from_script(cmd)
125                        .envs(variables.iter().cloned())
126                        .stdout(Stdio::piped())
127                        .stdin(Stdio::null())
128                        .stderr(Stdio::null())
129                        .detach()
130                        ._spawn()
131                    {
132                        if let Some(stdout) = child.stdout.take() {
133                            self.changed.store(true, Ordering::Relaxed);
134
135                            let lines = self.lines.clone();
136                            let cmd = cmd.clone();
137                            let version = self.version.clone();
138                            let _version = version.load(Ordering::Acquire);
139
140                            // false => needs refresh (i.e. invalid utf-8)
141                            let handle = tokio::spawn(async move {
142                                let mut reader = BufReader::new(stdout);
143                                let mut leftover = Vec::new();
144                                let mut buf = [0u8; 8192];
145
146                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
147                                    if n == 0 {
148                                        break;
149                                    }
150
151                                    leftover.extend_from_slice(&buf[..n]);
152
153                                    let valid_up_to = match std::str::from_utf8(&leftover) {
154                                        Ok(_) => leftover.len(),
155                                        Err(e) => e.valid_up_to(),
156                                    };
157
158                                    let split_at = leftover[..valid_up_to]
159                                        .iter()
160                                        .rposition(|&b| b == b'\n' || b == b'\r')
161                                        .map(|pos| pos + 1)
162                                        .unwrap_or(valid_up_to);
163
164                                    let (valid_bytes, rest) = leftover.split_at(split_at);
165
166                                    match valid_bytes.into_text() {
167                                        Ok(text) => {
168                                            for line in text {
169                                                // re-check before pushing
170                                                if version.load(Ordering::Acquire) != _version {
171                                                    return true;
172                                                }
173                                                lines.push(line);
174                                            }
175                                        }
176                                        Err(e) => {
177                                            if self.config.try_lossy {
178                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
179                                                    if version.load(Ordering::Acquire) != _version {
180                                                        return true;
181                                                    }
182                                                    let line =
183                                                        String::from_utf8_lossy(bytes).into_owned();
184                                                    lines.push(Line::from(line));
185                                                }
186                                            } else {
187                                                error!("Error displaying {cmd}: {:?}", e);
188                                                return false;
189                                            }
190                                        }
191                                    }
192
193                                    leftover = rest.to_vec();
194                                }
195
196                                if !leftover.is_empty()
197                                    && version.load(Ordering::Acquire) == _version
198                                {
199                                    match leftover.into_text() {
200                                        Ok(text) => {
201                                            for line in text {
202                                                if version.load(Ordering::Acquire) != _version {
203                                                    return true;
204                                                }
205                                                lines.push(line);
206                                            }
207                                        }
208                                        Err(e) => {
209                                            if self.config.try_lossy {
210                                                for bytes in leftover.split(|b| *b == b'\n') {
211                                                    if version.load(Ordering::Acquire) != _version {
212                                                        return true;
213                                                    }
214                                                    let line =
215                                                        String::from_utf8_lossy(bytes).into_owned();
216                                                    lines.push(Line::from(line));
217                                                }
218                                            } else {
219                                                error!("Error displaying {cmd}: {:?}", e);
220                                                return false;
221                                            }
222                                        }
223                                    }
224                                }
225
226                                true
227                            });
228                            self.current = Some((child, handle))
229                        } else {
230                            error!("Failed to get stdout of preview command: {cmd}")
231                        }
232                    }
233                }
234                PreviewMessage::Stop => {}
235                _ => unreachable!(),
236            }
237
238            self.prune_procs();
239        }
240
241        let ret = self.cleanup_procs();
242        if ret.is_empty() { Ok(()) } else { Err(ret) }
243    }
244
245    fn dispatch_kill(&mut self) {
246        if let Some((mut child, old)) = self.current.take() {
247            // invalidate all existing feeders
248            self.version.fetch_add(1, Ordering::AcqRel);
249
250            let _ = child.kill();
251            self.procs.push(child);
252            let mut old = Box::pin(old); // pin it to heap
253
254            match old.as_mut().now_or_never() {
255                Some(Ok(result)) => {
256                    if !result {
257                        self.send(Event::Refresh)
258                    }
259                }
260                None => {
261                    old.abort(); // still works because `AbortHandle` is separate
262                }
263                _ => {}
264            }
265        }
266    }
267
268    fn send(&self, event: Event) {
269        if let Some(ref tx) = self.event_controller_tx {
270            let _ = tx.send(event);
271        }
272    }
273
274    pub fn connect_controller(&mut self, event_controller_tx: EventSender) {
275        self.event_controller_tx = Some(event_controller_tx)
276    }
277
278    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
279    // also, maybe don't want this delaying exit?
280    fn cleanup_procs(mut self) -> Vec<Child> {
281        let total_timeout = Duration::from_secs(1);
282        let start = Instant::now();
283
284        self.procs.retain_mut(|child| {
285            loop {
286                match child.try_wait() {
287                    Ok(Some(_)) => return false,
288                    Ok(None) => {
289                        if start.elapsed() >= total_timeout {
290                            error!("Child failed to exit in time: {:?}", child);
291                            return true;
292                        } else {
293                            thread::sleep(Duration::from_millis(10));
294                        }
295                    }
296                    Err(e) => {
297                        error!("Error waiting on child: {e}");
298                        return true;
299                    }
300                }
301            }
302        });
303
304        self.procs
305    }
306
307    fn prune_procs(&mut self) {
308        self.procs.retain_mut(|child| match child.try_wait() {
309            Ok(None) => true,
310            Ok(Some(_)) => false,
311            Err(e) => {
312                warn!("Error waiting on child: {e}");
313                true
314            }
315        });
316    }
317}
318
319// ---------- NON ANSI VARIANT
320// let reader = BufReader::new(stdout);
321// if self.config.try_lossy {
322// for line_result in reader.split(b'\n') {
323//     match line_result {
324//         Ok(bytes) => {
325//             let line =
326//             String::from_utf8_lossy(&bytes).into_owned();
327//             lines.push(Line::from(line));
328//         }
329//         Err(e) => error!("Failed to read line: {:?}", e),
330//     }
331// }
332// } else {
333//     for line_result in reader.lines() {
334//         match line_result {
335//             Ok(line) => lines.push(Line::from(line)),
336//             Err(e) => {
337//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
338//                 error!("Error displaying {cmd}: {:?}", e);
339//                 break;
340//             }
341//         }
342//     }
343// }
344
345// trait Resettable: Default {
346//     fn reset(&mut self) {}
347// }
348// impl<T> Resettable for AppendOnly<T> {
349//     fn reset(&mut self) {
350//         self.clear();
351//     }
352// }
353
354// use std::ops::{Deref, DerefMut};
355
356// #[derive(Debug)]
357// struct Queue<V: Resettable> {
358//     entries: Vec<(String, V)>,
359//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
360// }
361
362// impl<V: Resettable> Queue<V> {
363//     pub fn new(len: usize) -> Self {
364//         Self {
365//             entries: (0..len)
366//             .map(|_| (String::default(), V::default()))
367//             .collect(),
368//             order: vec![len; len],
369//         }
370//     }
371
372//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
373//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
374//             if order_idx == self.entries.len() {
375//                 return None
376//             }
377//             if self.entries[entries_idx].0 == key {
378//                 return Some((order_idx, entries_idx));
379//             }
380//         }
381//         None
382//     }
383
384//     /// Try to get a key; if found, move it to the top.
385//     /// If not found, replace the oldest, clear its vec, set new key.
386//     pub fn try_get(&mut self, key: &str) -> bool {
387//         let n = self.entries.len();
388
389//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
390//             self.order.copy_within(0..order_idx, 1);
391//             self.order[0] = idx;
392//             true
393//         } else {
394//             let order_idx = (0..n)
395//             .rfind(|&i| self.order[i] < n)
396//             .map(|i| i + 1)
397//             .unwrap_or(0);
398
399//             let idx = if self.order[order_idx] < self.entries.len() {
400//                 order_idx
401//             } else {
402//                 *self.order.last().unwrap()
403//             };
404
405//             // shift and insert at front
406//             self.order.copy_within(0..order_idx, 1);
407//             self.order[0] = idx;
408
409//             // reset and assign new key
410//             let (ref mut k, ref mut v) = self.entries[idx];
411//             *k = key.to_owned();
412//             v.reset();
413
414//             false
415//         }
416//     }
417// }
418
419// impl<V: Resettable> Deref for Queue<V> {
420//     type Target = V;
421//     fn deref(&self) -> &Self::Target {
422//         &self.entries[self.order[0]].1
423//     }
424// }
425
426// impl<V: Resettable> DerefMut for Queue<V> {
427//     fn deref_mut(&mut self) -> &mut Self::Target {
428//         &mut self.entries[self.order[0]].1
429//     }
430// }
431
432// impl<V: Resettable> Default for Queue<V> {
433//     fn default() -> Self {
434//         Self::new(1)
435//     }
436// }