Skip to main content

matchmaker/preview/
previewer.rs

1use ansi_to_tui::IntoText;
2use cba::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::watch::{Receiver, Sender, channel};
13use tokio::task::JoinHandle;
14
15use super::AppendOnly;
16use crate::config::PreviewerConfig;
17use crate::event::EventSender;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, Default, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23    Run(String, EnvVars),
24    Set(Text<'static>),
25    Unset,
26    #[default]
27    Stop,
28    Pause,
29    Unpause,
30}
31
32#[derive(Debug)]
33pub struct Previewer {
34    /// The reciever for for [`PreviewMessage`]'s.
35    rx: Receiver<PreviewMessage>,
36    /// storage for preview command output
37    lines: AppendOnly<Line<'static>>,
38    /// storage for preview string override
39    string: Arc<Mutex<Option<Text<'static>>>>,
40    /// Flag which is set to true whenever the state changes
41    /// and which the viewer can toggle after receiving the current state
42    changed: Arc<AtomicBool>,
43
44    paused: bool,
45    /// Maintain a queue of child processes to improve cleanup reliability
46    procs: Vec<Child>,
47    /// The currently executing child process
48    current: Option<(Child, JoinHandle<bool>)>,
49    pub config: PreviewerConfig,
50    /// Event loop controller
51    // We only use it to send [`ControlEvent::Event`]
52    event_controller_tx: Option<EventSender>,
53}
54
55impl Previewer {
56    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
57        let (tx, rx) = channel(PreviewMessage::Stop);
58
59        let new = Self {
60            rx,
61            lines: AppendOnly::new(),
62            string: Default::default(),
63            changed: Default::default(),
64            paused: false,
65
66            procs: Vec::new(),
67            current: None,
68            config,
69            event_controller_tx: None,
70        };
71
72        (new, tx)
73    }
74
75    pub fn view(&self) -> Preview {
76        Preview::new(
77            self.lines.clone(),
78            self.string.clone(),
79            self.changed.clone(),
80        )
81    }
82
83    pub fn set_string(&self, s: Text<'static>) {
84        if let Ok(mut guard) = self.string.lock() {
85            *guard = Some(s);
86            self.changed.store(true, Ordering::Release);
87        }
88    }
89
90    pub fn clear_string(&self) {
91        if let Ok(mut guard) = self.string.lock() {
92            *guard = None;
93            self.changed.store(true, Ordering::Release);
94        }
95    }
96
97    pub fn has_string(&self) -> bool {
98        let guard = self.string.lock();
99        guard.is_ok_and(|s| s.is_some())
100    }
101
102    pub async fn run(mut self) -> Result<(), Vec<Child>> {
103        while self.rx.changed().await.is_ok() {
104            if !self.procs.is_empty() {
105                debug!("procs: {:?}", self.procs);
106            }
107
108            {
109                let m = &*self.rx.borrow();
110                match m {
111                    PreviewMessage::Pause => {
112                        self.paused = true;
113                        continue;
114                    }
115                    PreviewMessage::Unpause => {
116                        self.paused = false;
117                        continue;
118                    }
119                    _ if self.paused => {
120                        continue;
121                    }
122                    PreviewMessage::Set(s) => {
123                        self.set_string(s.clone());
124                        // don't kill the underlying
125                        continue;
126                    }
127                    PreviewMessage::Unset => {
128                        self.clear_string();
129                        continue;
130                    }
131                    _ => {}
132                }
133            }
134
135            self.dispatch_kill();
136            self.clear_string();
137            self.lines.clear();
138
139            match &*self.rx.borrow() {
140                PreviewMessage::Run(cmd, variables) => {
141                    // we need the child handle
142                    if let Some(mut child) = Command::from_script(cmd)
143                        .envs(variables.iter().cloned())
144                        .stdout(Stdio::piped())
145                        .stdin(Stdio::null())
146                        .stderr(Stdio::null())
147                        .detach()
148                        ._spawn()
149                    {
150                        if let Some(stdout) = child.stdout.take() {
151                            self.changed.store(true, Ordering::Relaxed);
152
153                            let lines = self.lines.clone();
154                            let guard = self.lines.read();
155                            let cmd = cmd.clone();
156
157                            // false => needs refresh (i.e. invalid utf-8)
158                            let handle = tokio::spawn(async move {
159                                let mut reader = BufReader::new(stdout);
160                                let mut leftover = Vec::new();
161                                let mut buf = [0u8; 8192];
162
163                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
164                                    if n == 0 {
165                                        break;
166                                    }
167
168                                    leftover.extend_from_slice(&buf[..n]);
169
170                                    let valid_up_to = match std::str::from_utf8(&leftover) {
171                                        Ok(_) => leftover.len(),
172                                        Err(e) => e.valid_up_to(),
173                                    };
174
175                                    let split_at = leftover[..valid_up_to]
176                                        .iter()
177                                        .rposition(|&b| b == b'\n' || b == b'\r')
178                                        .map(|pos| pos + 1)
179                                        .unwrap_or(valid_up_to);
180
181                                    let (valid_bytes, rest) = leftover.split_at(split_at);
182
183                                    match valid_bytes.into_text() {
184                                        Ok(text) => {
185                                            for line in text {
186                                                // re-check before pushing
187                                                if lines.is_expired(&guard) {
188                                                    return true;
189                                                }
190                                                guard.push(line);
191                                            }
192                                        }
193                                        Err(e) => {
194                                            if self.config.try_lossy {
195                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
196                                                    if lines.is_expired(&guard) {
197                                                        return true;
198                                                    }
199                                                    let line =
200                                                        String::from_utf8_lossy(bytes).into_owned();
201                                                    guard.push(Line::from(line));
202                                                }
203                                            } else {
204                                                error!("Error displaying {cmd}: {:?}", e);
205                                                return false;
206                                            }
207                                        }
208                                    }
209
210                                    leftover = rest.to_vec();
211                                }
212
213                                if !leftover.is_empty() && !lines.is_expired(&guard) {
214                                    match leftover.into_text() {
215                                        Ok(text) => {
216                                            for line in text {
217                                                if lines.is_expired(&guard) {
218                                                    return true;
219                                                }
220                                                guard.push(line);
221                                            }
222                                        }
223                                        Err(e) => {
224                                            if self.config.try_lossy {
225                                                for bytes in leftover.split(|b| *b == b'\n') {
226                                                    if lines.is_expired(&guard) {
227                                                        return true;
228                                                    }
229                                                    let line =
230                                                        String::from_utf8_lossy(bytes).into_owned();
231                                                    guard.push(Line::from(line));
232                                                }
233                                            } else {
234                                                error!("Error displaying {cmd}: {:?}", e);
235                                                return false;
236                                            }
237                                        }
238                                    }
239                                }
240
241                                true
242                            });
243                            self.current = Some((child, handle))
244                        } else {
245                            error!("Failed to get stdout of preview command: {cmd}")
246                        }
247                    }
248                }
249                PreviewMessage::Stop => {}
250                _ => unreachable!(),
251            }
252
253            self.prune_procs();
254        }
255
256        let ret = self.cleanup_procs();
257        if ret.is_empty() { Ok(()) } else { Err(ret) }
258    }
259
260    fn dispatch_kill(&mut self) {
261        if let Some((mut child, old)) = self.current.take() {
262            let _ = child.kill();
263            self.procs.push(child);
264            let mut old = Box::pin(old); // pin it to heap
265
266            match old.as_mut().now_or_never() {
267                Some(Ok(result)) => {
268                    if !result {
269                        self.send(Event::Refresh)
270                    }
271                }
272                None => {
273                    old.abort(); // still works because `AbortHandle` is separate
274                }
275                _ => {}
276            }
277        }
278    }
279
280    fn send(&self, event: Event) {
281        if let Some(ref tx) = self.event_controller_tx {
282            let _ = tx.send(event);
283        }
284    }
285
286    pub fn connect_controller(&mut self, event_controller_tx: EventSender) {
287        self.event_controller_tx = Some(event_controller_tx)
288    }
289
290    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
291    // also, maybe don't want this delaying exit?
292    fn cleanup_procs(mut self) -> Vec<Child> {
293        let total_timeout = Duration::from_secs(1);
294        let start = Instant::now();
295
296        self.procs.retain_mut(|child| {
297            loop {
298                match child.try_wait() {
299                    Ok(Some(_)) => return false,
300                    Ok(None) => {
301                        if start.elapsed() >= total_timeout {
302                            error!("Child failed to exit in time: {:?}", child);
303                            return true;
304                        } else {
305                            thread::sleep(Duration::from_millis(10));
306                        }
307                    }
308                    Err(e) => {
309                        error!("Error waiting on child: {e}");
310                        return true;
311                    }
312                }
313            }
314        });
315
316        self.procs
317    }
318
319    fn prune_procs(&mut self) {
320        self.procs.retain_mut(|child| match child.try_wait() {
321            Ok(None) => true,
322            Ok(Some(_)) => false,
323            Err(e) => {
324                warn!("Error waiting on child: {e}");
325                true
326            }
327        });
328    }
329}
330
331// ---------- NON ANSI VARIANT
332// let reader = BufReader::new(stdout);
333// if self.config.try_lossy {
334// for line_result in reader.split(b'\n') {
335//     match line_result {
336//         Ok(bytes) => {
337//             let line =
338//             String::from_utf8_lossy(&bytes).into_owned();
339//             lines.push(Line::from(line));
340//         }
341//         Err(e) => error!("Failed to read line: {:?}", e),
342//     }
343// }
344// } else {
345//     for line_result in reader.lines() {
346//         match line_result {
347//             Ok(line) => lines.push(Line::from(line)),
348//             Err(e) => {
349//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
350//                 error!("Error displaying {cmd}: {:?}", e);
351//                 break;
352//             }
353//         }
354//     }
355// }
356
357// trait Resettable: Default {
358//     fn reset(&mut self) {}
359// }
360// impl<T> Resettable for AppendOnly<T> {
361//     fn reset(&mut self) {
362//         self.clear();
363//     }
364// }
365
366// use std::ops::{Deref, DerefMut};
367
368// #[derive(Debug)]
369// struct Queue<V: Resettable> {
370//     entries: Vec<(String, V)>,
371//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
372// }
373
374// impl<V: Resettable> Queue<V> {
375//     pub fn new(len: usize) -> Self {
376//         Self {
377//             entries: (0..len)
378//             .map(|_| (String::default(), V::default()))
379//             .collect(),
380//             order: vec![len; len],
381//         }
382//     }
383
384//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
385//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
386//             if order_idx == self.entries.len() {
387//                 return None
388//             }
389//             if self.entries[entries_idx].0 == key {
390//                 return Some((order_idx, entries_idx));
391//             }
392//         }
393//         None
394//     }
395
396//     /// Try to get a key; if found, move it to the top.
397//     /// If not found, replace the oldest, clear its vec, set new key.
398//     pub fn try_get(&mut self, key: &str) -> bool {
399//         let n = self.entries.len();
400
401//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
402//             self.order.copy_within(0..order_idx, 1);
403//             self.order[0] = idx;
404//             true
405//         } else {
406//             let order_idx = (0..n)
407//             .rfind(|&i| self.order[i] < n)
408//             .map(|i| i + 1)
409//             .unwrap_or(0);
410
411//             let idx = if self.order[order_idx] < self.entries.len() {
412//                 order_idx
413//             } else {
414//                 *self.order.last().unwrap()
415//             };
416
417//             // shift and insert at front
418//             self.order.copy_within(0..order_idx, 1);
419//             self.order[0] = idx;
420
421//             // reset and assign new key
422//             let (ref mut k, ref mut v) = self.entries[idx];
423//             *k = key.to_owned();
424//             v.reset();
425
426//             false
427//         }
428//     }
429// }
430
431// impl<V: Resettable> Deref for Queue<V> {
432//     type Target = V;
433//     fn deref(&self) -> &Self::Target {
434//         &self.entries[self.order[0]].1
435//     }
436// }
437
438// impl<V: Resettable> DerefMut for Queue<V> {
439//     fn deref_mut(&mut self) -> &mut Self::Target {
440//         &mut self.entries[self.order[0]].1
441//     }
442// }
443
444// impl<V: Resettable> Default for Queue<V> {
445//     fn default() -> Self {
446//         Self::new(1)
447//     }
448// }