Skip to main content

matchmaker/preview/
previewer.rs

1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::watch::{Receiver, Sender, channel};
13use tokio::task::JoinHandle;
14
15use super::AppendOnly;
16use crate::config::PreviewerConfig;
17use crate::event::EventSender;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, Default, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23    Run(String, EnvVars),
24    Set(Text<'static>),
25    Unset,
26    #[default]
27    Stop,
28    Pause,
29    Unpause,
30}
31
32#[derive(Debug)]
33pub struct Previewer {
34    /// The reciever for for [`PreviewMessage`]'s.
35    rx: Receiver<PreviewMessage>,
36    /// storage for preview command output
37    lines: AppendOnly<Line<'static>>,
38    /// storage for preview string override
39    string: Arc<Mutex<Option<Text<'static>>>>,
40    /// Flag which is set to true whenever the state changes
41    /// and which the viewer can toggle after receiving the current state
42    changed: Arc<AtomicBool>,
43
44    /// Incremented on every new run / kill to invalidate old feeders
45    version: Arc<AtomicUsize>,
46    paused: bool,
47    /// Maintain a queue of child processes to improve cleanup reliability
48    procs: Vec<Child>,
49    /// The currently executing child process
50    current: Option<(Child, JoinHandle<bool>)>,
51    pub config: PreviewerConfig,
52    /// Event loop controller
53    // We only use it to send [`ControlEvent::Event`]
54    event_controller_tx: Option<EventSender>,
55}
56
57impl Previewer {
58    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
59        let (tx, rx) = channel(PreviewMessage::Stop);
60
61        let new = Self {
62            rx,
63            lines: AppendOnly::new(),
64            string: Default::default(),
65            changed: Default::default(),
66            version: Arc::new(AtomicUsize::new(0)),
67            paused: false,
68
69            procs: Vec::new(),
70            current: None,
71            config,
72            event_controller_tx: None,
73        };
74
75        (new, tx)
76    }
77
78    pub fn view(&self) -> Preview {
79        Preview::new(
80            self.lines.clone(),
81            self.string.clone(),
82            self.changed.clone(),
83        )
84    }
85
86    pub fn set_string(&self, s: Text<'static>) {
87        if let Ok(mut guard) = self.string.lock() {
88            *guard = Some(s);
89            self.changed.store(true, Ordering::Release);
90        }
91    }
92
93    pub fn clear_string(&self) {
94        if let Ok(mut guard) = self.string.lock() {
95            *guard = None;
96            self.changed.store(true, Ordering::Release);
97        }
98    }
99
100    pub fn has_string(&self) -> bool {
101        let guard = self.string.lock();
102        guard.is_ok_and(|s| s.is_some())
103    }
104
105    pub async fn run(mut self) -> Result<(), Vec<Child>> {
106        while self.rx.changed().await.is_ok() {
107            if !self.procs.is_empty() {
108                debug!("procs: {:?}", self.procs);
109            }
110
111            {
112                let m = &*self.rx.borrow();
113                match m {
114                    PreviewMessage::Pause => {
115                        self.paused = true;
116                        continue;
117                    }
118                    PreviewMessage::Unpause => {
119                        self.paused = false;
120                        continue;
121                    }
122                    _ if self.paused => {
123                        continue;
124                    }
125                    PreviewMessage::Set(s) => {
126                        self.set_string(s.clone());
127                        // don't kill the underlying
128                        continue;
129                    }
130                    PreviewMessage::Unset => {
131                        self.clear_string();
132                        continue;
133                    }
134                    _ => {}
135                }
136            }
137
138            self.dispatch_kill();
139            self.clear_string();
140            self.lines.clear();
141
142            match &*self.rx.borrow() {
143                PreviewMessage::Run(cmd, variables) => {
144                    if let Some(mut child) = Command::from_script(cmd)
145                        .envs(variables.iter().cloned())
146                        .stdout(Stdio::piped())
147                        .stdin(Stdio::null())
148                        .stderr(Stdio::null())
149                        .detach()
150                        ._spawn()
151                    {
152                        if let Some(stdout) = child.stdout.take() {
153                            self.changed.store(true, Ordering::Relaxed);
154
155                            let lines = self.lines.clone();
156                            let cmd = cmd.clone();
157                            let version = self.version.clone();
158                            let _version = version.load(Ordering::Acquire);
159
160                            // false => needs refresh (i.e. invalid utf-8)
161                            let handle = tokio::spawn(async move {
162                                let mut reader = BufReader::new(stdout);
163                                let mut leftover = Vec::new();
164                                let mut buf = [0u8; 8192];
165
166                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
167                                    if n == 0 {
168                                        break;
169                                    }
170
171                                    leftover.extend_from_slice(&buf[..n]);
172
173                                    let valid_up_to = match std::str::from_utf8(&leftover) {
174                                        Ok(_) => leftover.len(),
175                                        Err(e) => e.valid_up_to(),
176                                    };
177
178                                    let split_at = leftover[..valid_up_to]
179                                        .iter()
180                                        .rposition(|&b| b == b'\n' || b == b'\r')
181                                        .map(|pos| pos + 1)
182                                        .unwrap_or(valid_up_to);
183
184                                    let (valid_bytes, rest) = leftover.split_at(split_at);
185
186                                    match valid_bytes.into_text() {
187                                        Ok(text) => {
188                                            for line in text {
189                                                // re-check before pushing
190                                                if version.load(Ordering::Acquire) != _version {
191                                                    return true;
192                                                }
193                                                lines.push(line);
194                                            }
195                                        }
196                                        Err(e) => {
197                                            if self.config.try_lossy {
198                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
199                                                    if version.load(Ordering::Acquire) != _version {
200                                                        return true;
201                                                    }
202                                                    let line =
203                                                        String::from_utf8_lossy(bytes).into_owned();
204                                                    lines.push(Line::from(line));
205                                                }
206                                            } else {
207                                                error!("Error displaying {cmd}: {:?}", e);
208                                                return false;
209                                            }
210                                        }
211                                    }
212
213                                    leftover = rest.to_vec();
214                                }
215
216                                if !leftover.is_empty()
217                                    && version.load(Ordering::Acquire) == _version
218                                {
219                                    match leftover.into_text() {
220                                        Ok(text) => {
221                                            for line in text {
222                                                if version.load(Ordering::Acquire) != _version {
223                                                    return true;
224                                                }
225                                                lines.push(line);
226                                            }
227                                        }
228                                        Err(e) => {
229                                            if self.config.try_lossy {
230                                                for bytes in leftover.split(|b| *b == b'\n') {
231                                                    if version.load(Ordering::Acquire) != _version {
232                                                        return true;
233                                                    }
234                                                    let line =
235                                                        String::from_utf8_lossy(bytes).into_owned();
236                                                    lines.push(Line::from(line));
237                                                }
238                                            } else {
239                                                error!("Error displaying {cmd}: {:?}", e);
240                                                return false;
241                                            }
242                                        }
243                                    }
244                                }
245
246                                true
247                            });
248                            self.current = Some((child, handle))
249                        } else {
250                            error!("Failed to get stdout of preview command: {cmd}")
251                        }
252                    }
253                }
254                PreviewMessage::Stop => {}
255                _ => unreachable!(),
256            }
257
258            self.prune_procs();
259        }
260
261        let ret = self.cleanup_procs();
262        if ret.is_empty() { Ok(()) } else { Err(ret) }
263    }
264
265    fn dispatch_kill(&mut self) {
266        if let Some((mut child, old)) = self.current.take() {
267            // invalidate all existing feeders
268            self.version.fetch_add(1, Ordering::AcqRel);
269
270            let _ = child.kill();
271            self.procs.push(child);
272            let mut old = Box::pin(old); // pin it to heap
273
274            match old.as_mut().now_or_never() {
275                Some(Ok(result)) => {
276                    if !result {
277                        self.send(Event::Refresh)
278                    }
279                }
280                None => {
281                    old.abort(); // still works because `AbortHandle` is separate
282                }
283                _ => {}
284            }
285        }
286    }
287
288    fn send(&self, event: Event) {
289        if let Some(ref tx) = self.event_controller_tx {
290            let _ = tx.send(event);
291        }
292    }
293
294    pub fn connect_controller(&mut self, event_controller_tx: EventSender) {
295        self.event_controller_tx = Some(event_controller_tx)
296    }
297
298    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
299    // also, maybe don't want this delaying exit?
300    fn cleanup_procs(mut self) -> Vec<Child> {
301        let total_timeout = Duration::from_secs(1);
302        let start = Instant::now();
303
304        self.procs.retain_mut(|child| {
305            loop {
306                match child.try_wait() {
307                    Ok(Some(_)) => return false,
308                    Ok(None) => {
309                        if start.elapsed() >= total_timeout {
310                            error!("Child failed to exit in time: {:?}", child);
311                            return true;
312                        } else {
313                            thread::sleep(Duration::from_millis(10));
314                        }
315                    }
316                    Err(e) => {
317                        error!("Error waiting on child: {e}");
318                        return true;
319                    }
320                }
321            }
322        });
323
324        self.procs
325    }
326
327    fn prune_procs(&mut self) {
328        self.procs.retain_mut(|child| match child.try_wait() {
329            Ok(None) => true,
330            Ok(Some(_)) => false,
331            Err(e) => {
332                warn!("Error waiting on child: {e}");
333                true
334            }
335        });
336    }
337}
338
339// ---------- NON ANSI VARIANT
340// let reader = BufReader::new(stdout);
341// if self.config.try_lossy {
342// for line_result in reader.split(b'\n') {
343//     match line_result {
344//         Ok(bytes) => {
345//             let line =
346//             String::from_utf8_lossy(&bytes).into_owned();
347//             lines.push(Line::from(line));
348//         }
349//         Err(e) => error!("Failed to read line: {:?}", e),
350//     }
351// }
352// } else {
353//     for line_result in reader.lines() {
354//         match line_result {
355//             Ok(line) => lines.push(Line::from(line)),
356//             Err(e) => {
357//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
358//                 error!("Error displaying {cmd}: {:?}", e);
359//                 break;
360//             }
361//         }
362//     }
363// }
364
365// trait Resettable: Default {
366//     fn reset(&mut self) {}
367// }
368// impl<T> Resettable for AppendOnly<T> {
369//     fn reset(&mut self) {
370//         self.clear();
371//     }
372// }
373
374// use std::ops::{Deref, DerefMut};
375
376// #[derive(Debug)]
377// struct Queue<V: Resettable> {
378//     entries: Vec<(String, V)>,
379//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
380// }
381
382// impl<V: Resettable> Queue<V> {
383//     pub fn new(len: usize) -> Self {
384//         Self {
385//             entries: (0..len)
386//             .map(|_| (String::default(), V::default()))
387//             .collect(),
388//             order: vec![len; len],
389//         }
390//     }
391
392//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
393//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
394//             if order_idx == self.entries.len() {
395//                 return None
396//             }
397//             if self.entries[entries_idx].0 == key {
398//                 return Some((order_idx, entries_idx));
399//             }
400//         }
401//         None
402//     }
403
404//     /// Try to get a key; if found, move it to the top.
405//     /// If not found, replace the oldest, clear its vec, set new key.
406//     pub fn try_get(&mut self, key: &str) -> bool {
407//         let n = self.entries.len();
408
409//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
410//             self.order.copy_within(0..order_idx, 1);
411//             self.order[0] = idx;
412//             true
413//         } else {
414//             let order_idx = (0..n)
415//             .rfind(|&i| self.order[i] < n)
416//             .map(|i| i + 1)
417//             .unwrap_or(0);
418
419//             let idx = if self.order[order_idx] < self.entries.len() {
420//                 order_idx
421//             } else {
422//                 *self.order.last().unwrap()
423//             };
424
425//             // shift and insert at front
426//             self.order.copy_within(0..order_idx, 1);
427//             self.order[0] = idx;
428
429//             // reset and assign new key
430//             let (ref mut k, ref mut v) = self.entries[idx];
431//             *k = key.to_owned();
432//             v.reset();
433
434//             false
435//         }
436//     }
437// }
438
439// impl<V: Resettable> Deref for Queue<V> {
440//     type Target = V;
441//     fn deref(&self) -> &Self::Target {
442//         &self.entries[self.order[0]].1
443//     }
444// }
445
446// impl<V: Resettable> DerefMut for Queue<V> {
447//     fn deref_mut(&mut self) -> &mut Self::Target {
448//         &mut self.entries[self.order[0]].1
449//     }
450// }
451
452// impl<V: Resettable> Default for Queue<V> {
453//     fn default() -> Self {
454//         Self::new(1)
455//     }
456// }