Skip to main content

matchmaker/preview/
previewer.rs

1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::mpsc::UnboundedSender;
13use tokio::sync::watch::{Receiver, Sender, channel};
14use tokio::task::JoinHandle;
15
16use super::AppendOnly;
17use crate::config::PreviewerConfig;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23    Run(String, EnvVars),
24    Set(Text<'static>),
25    Unset,
26    Stop,
27}
28
29#[derive(Debug)]
30pub struct Previewer {
31    /// The reciever for for [`PreviewMessage`]'s.
32    rx: Receiver<PreviewMessage>,
33    /// storage for preview command output
34    lines: AppendOnly<Line<'static>>,
35    /// storage for preview string override
36    string: Arc<Mutex<Option<Text<'static>>>>,
37    /// Flag which is set to true whenever the state changes
38    /// and which the viewer can toggle after receiving the current state
39    changed: Arc<AtomicBool>,
40
41    /// Incremented on every new run / kill to invalidate old feeders
42    version: Arc<AtomicUsize>,
43    /// Maintain a queue of child processes to improve cleanup reliability
44    procs: Vec<Child>,
45    /// The currently executing child process
46    current: Option<(Child, JoinHandle<bool>)>,
47    pub config: PreviewerConfig,
48    /// Event loop controller
49    // todo: lowpri: does this want to be a render loop controller instead
50    event_controller_tx: Option<UnboundedSender<Event>>,
51}
52
53impl Previewer {
54    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
55        let (tx, rx) = channel(PreviewMessage::Stop);
56
57        let new = Self {
58            rx,
59            lines: AppendOnly::new(),
60            string: Default::default(),
61            changed: Default::default(),
62            version: Arc::new(AtomicUsize::new(0)),
63
64            procs: Vec::new(),
65            current: None,
66            config,
67            event_controller_tx: None,
68        };
69
70        (new, tx)
71    }
72
73    pub fn view(&self) -> Preview {
74        Preview::new(
75            self.lines.clone(),
76            self.string.clone(),
77            self.changed.clone(),
78        )
79    }
80
81    pub fn set_string(&self, s: Text<'static>) {
82        let mut guard = self.string.lock().unwrap();
83        *guard = Some(s);
84        self.changed.store(true, Ordering::Release);
85    }
86
87    pub fn clear_string(&self) {
88        let mut guard = self.string.lock().unwrap();
89        *guard = None;
90        self.changed.store(true, Ordering::Release);
91    }
92
93    pub fn has_string(&self) -> bool {
94        let guard = self.string.lock().unwrap();
95        guard.is_some()
96    }
97
98    pub async fn run(mut self) -> Result<(), Vec<Child>> {
99        while self.rx.changed().await.is_ok() {
100            if !self.procs.is_empty() {
101                debug!("procs: {:?}", self.procs);
102            }
103
104            {
105                let m = &*self.rx.borrow();
106                if let PreviewMessage::Set(s) = m {
107                    self.set_string(s.clone());
108                    // don't kill the underlying
109                    continue;
110                } else if matches!(m, PreviewMessage::Unset) {
111                    self.clear_string();
112                    continue;
113                }
114            }
115
116            self.dispatch_kill();
117            self.clear_string();
118            self.lines.clear();
119
120            match &*self.rx.borrow() {
121                PreviewMessage::Run(cmd, variables) => {
122                    if let Some(mut child) = Command::from_script(cmd)
123                        .envs(variables.iter().cloned())
124                        .stdout(Stdio::piped())
125                        .stdin(Stdio::null())
126                        .stderr(Stdio::null())
127                        .detach()
128                        ._spawn()
129                    {
130                        if let Some(stdout) = child.stdout.take() {
131                            self.changed.store(true, Ordering::Relaxed);
132
133                            let lines = self.lines.clone();
134                            let cmd = cmd.clone();
135                            let version = self.version.clone();
136                            let _version = version.load(Ordering::Acquire);
137
138                            // false => needs refresh (i.e. invalid utf-8)
139                            let handle = tokio::spawn(async move {
140                                let mut reader = BufReader::new(stdout);
141                                let mut leftover = Vec::new();
142                                let mut buf = [0u8; 8192];
143
144                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
145                                    if n == 0 {
146                                        break;
147                                    }
148
149                                    leftover.extend_from_slice(&buf[..n]);
150
151                                    let valid_up_to = match std::str::from_utf8(&leftover) {
152                                        Ok(_) => leftover.len(),
153                                        Err(e) => e.valid_up_to(),
154                                    };
155
156                                    let split_at = leftover[..valid_up_to]
157                                        .iter()
158                                        .rposition(|&b| b == b'\n' || b == b'\r')
159                                        .map(|pos| pos + 1)
160                                        .unwrap_or(valid_up_to);
161
162                                    let (valid_bytes, rest) = leftover.split_at(split_at);
163
164                                    match valid_bytes.into_text() {
165                                        Ok(text) => {
166                                            for line in text {
167                                                // re-check before pushing
168                                                if version.load(Ordering::Acquire) != _version {
169                                                    return true;
170                                                }
171                                                lines.push(line);
172                                            }
173                                        }
174                                        Err(e) => {
175                                            if self.config.try_lossy {
176                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
177                                                    if version.load(Ordering::Acquire) != _version {
178                                                        return true;
179                                                    }
180                                                    let line =
181                                                        String::from_utf8_lossy(bytes).into_owned();
182                                                    lines.push(Line::from(line));
183                                                }
184                                            } else {
185                                                error!("Error displaying {cmd}: {:?}", e);
186                                                return false;
187                                            }
188                                        }
189                                    }
190
191                                    leftover = rest.to_vec();
192                                }
193
194                                if !leftover.is_empty()
195                                    && version.load(Ordering::Acquire) == _version
196                                {
197                                    match leftover.into_text() {
198                                        Ok(text) => {
199                                            for line in text {
200                                                if version.load(Ordering::Acquire) != _version {
201                                                    return true;
202                                                }
203                                                lines.push(line);
204                                            }
205                                        }
206                                        Err(e) => {
207                                            if self.config.try_lossy {
208                                                for bytes in leftover.split(|b| *b == b'\n') {
209                                                    if version.load(Ordering::Acquire) != _version {
210                                                        return true;
211                                                    }
212                                                    let line =
213                                                        String::from_utf8_lossy(bytes).into_owned();
214                                                    lines.push(Line::from(line));
215                                                }
216                                            } else {
217                                                error!("Error displaying {cmd}: {:?}", e);
218                                                return false;
219                                            }
220                                        }
221                                    }
222                                }
223
224                                true
225                            });
226                            self.current = Some((child, handle))
227                        } else {
228                            error!("Failed to get stdout of preview command: {cmd}")
229                        }
230                    }
231                }
232                PreviewMessage::Stop => {}
233                _ => unreachable!(),
234            }
235
236            self.prune_procs();
237        }
238
239        let ret = self.cleanup_procs();
240        if ret.is_empty() { Ok(()) } else { Err(ret) }
241    }
242
243    fn dispatch_kill(&mut self) {
244        if let Some((mut child, old)) = self.current.take() {
245            // invalidate all existing feeders
246            self.version.fetch_add(1, Ordering::AcqRel);
247
248            let _ = child.kill();
249            self.procs.push(child);
250            let mut old = Box::pin(old); // pin it to heap
251
252            match old.as_mut().now_or_never() {
253                Some(Ok(result)) => {
254                    if !result && let Some(ref tx) = self.event_controller_tx {
255                        let _ = tx.send(Event::Refresh);
256                    }
257                }
258                None => {
259                    old.abort(); // still works because `AbortHandle` is separate
260                }
261                _ => {}
262            }
263        }
264    }
265
266    pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
267        self.event_controller_tx = Some(event_controller_tx)
268    }
269
270    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
271    // also, maybe don't want this delaying exit?
272    fn cleanup_procs(mut self) -> Vec<Child> {
273        let total_timeout = Duration::from_secs(1);
274        let start = Instant::now();
275
276        self.procs.retain_mut(|child| {
277            loop {
278                match child.try_wait() {
279                    Ok(Some(_)) => return false,
280                    Ok(None) => {
281                        if start.elapsed() >= total_timeout {
282                            error!("Child failed to exit in time: {:?}", child);
283                            return true;
284                        } else {
285                            thread::sleep(Duration::from_millis(10));
286                        }
287                    }
288                    Err(e) => {
289                        error!("Error waiting on child: {e}");
290                        return true;
291                    }
292                }
293            }
294        });
295
296        self.procs
297    }
298
299    fn prune_procs(&mut self) {
300        self.procs.retain_mut(|child| match child.try_wait() {
301            Ok(None) => true,
302            Ok(Some(_)) => false,
303            Err(e) => {
304                warn!("Error waiting on child: {e}");
305                true
306            }
307        });
308    }
309}
310
311// ---------- NON ANSI VARIANT
312// let reader = BufReader::new(stdout);
313// if self.config.try_lossy {
314// for line_result in reader.split(b'\n') {
315//     match line_result {
316//         Ok(bytes) => {
317//             let line =
318//             String::from_utf8_lossy(&bytes).into_owned();
319//             lines.push(Line::from(line));
320//         }
321//         Err(e) => error!("Failed to read line: {:?}", e),
322//     }
323// }
324// } else {
325//     for line_result in reader.lines() {
326//         match line_result {
327//             Ok(line) => lines.push(Line::from(line)),
328//             Err(e) => {
329//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
330//                 error!("Error displaying {cmd}: {:?}", e);
331//                 break;
332//             }
333//         }
334//     }
335// }
336
337// trait Resettable: Default {
338//     fn reset(&mut self) {}
339// }
340// impl<T> Resettable for AppendOnly<T> {
341//     fn reset(&mut self) {
342//         self.clear();
343//     }
344// }
345
346// use std::ops::{Deref, DerefMut};
347
348// #[derive(Debug)]
349// struct Queue<V: Resettable> {
350//     entries: Vec<(String, V)>,
351//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
352// }
353
354// impl<V: Resettable> Queue<V> {
355//     pub fn new(len: usize) -> Self {
356//         Self {
357//             entries: (0..len)
358//             .map(|_| (String::default(), V::default()))
359//             .collect(),
360//             order: vec![len; len],
361//         }
362//     }
363
364//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
365//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
366//             if order_idx == self.entries.len() {
367//                 return None
368//             }
369//             if self.entries[entries_idx].0 == key {
370//                 return Some((order_idx, entries_idx));
371//             }
372//         }
373//         None
374//     }
375
376//     /// Try to get a key; if found, move it to the top.
377//     /// If not found, replace the oldest, clear its vec, set new key.
378//     pub fn try_get(&mut self, key: &str) -> bool {
379//         let n = self.entries.len();
380
381//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
382//             self.order.copy_within(0..order_idx, 1);
383//             self.order[0] = idx;
384//             true
385//         } else {
386//             let order_idx = (0..n)
387//             .rfind(|&i| self.order[i] < n)
388//             .map(|i| i + 1)
389//             .unwrap_or(0);
390
391//             let idx = if self.order[order_idx] < self.entries.len() {
392//                 order_idx
393//             } else {
394//                 *self.order.last().unwrap()
395//             };
396
397//             // shift and insert at front
398//             self.order.copy_within(0..order_idx, 1);
399//             self.order[0] = idx;
400
401//             // reset and assign new key
402//             let (ref mut k, ref mut v) = self.entries[idx];
403//             *k = key.to_owned();
404//             v.reset();
405
406//             false
407//         }
408//     }
409// }
410
411// impl<V: Resettable> Deref for Queue<V> {
412//     type Target = V;
413//     fn deref(&self) -> &Self::Target {
414//         &self.entries[self.order[0]].1
415//     }
416// }
417
418// impl<V: Resettable> DerefMut for Queue<V> {
419//     fn deref_mut(&mut self) -> &mut Self::Target {
420//         &mut self.entries[self.order[0]].1
421//     }
422// }
423
424// impl<V: Resettable> Default for Queue<V> {
425//     fn default() -> Self {
426//         Self::new(1)
427//     }
428// }