matchmaker/preview/
previewer.rs

1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{EnvVars, spawn_script};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Stdio};
8use std::sync::{Arc, Mutex};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::watch::{Receiver, Sender, channel};
15use tokio::task::JoinHandle;
16
17use super::AppendOnly;
18use crate::preview::Preview;
19use crate::config::PreviewerConfig;
20use crate::message::Event;
21
22#[derive(Debug, Display, Clone)]
23pub enum PreviewMessage {
24    Run(String, EnvVars),
25    Set(Text<'static>),
26    Unset,
27    Stop,
28}
29
30#[derive(Debug)]
31pub struct Previewer {
32    rx: Receiver<PreviewMessage>,
33    lines: AppendOnly<Line<'static>>, // append-only buffer
34    string: Arc<Mutex<Option<Text<'static>>>>,
35    changed: Arc<AtomicBool>,
36
37    procs: Vec<Child>,
38    current: Option<(Child, JoinHandle<bool>)>,
39    pub config: PreviewerConfig,
40    event_controller_tx: Option<UnboundedSender<Event>>,
41}
42
43impl Previewer {
44    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
45        let (tx, rx) = channel(PreviewMessage::Stop);
46
47        let new = Self {
48            rx,
49            lines: AppendOnly::new(),
50            string: Default::default(),
51            changed: Default::default(),
52
53            procs: Vec::new(),
54            current: None,
55            config,
56            event_controller_tx: None,
57        };
58
59        (new, tx)
60    }
61
62    pub fn view(&self) -> Preview {
63        Preview::new(self.lines.clone(), self.string.clone(), self.changed.clone())
64    }
65
66    pub fn set_string(&self, s: Text<'static>) {
67        let mut guard = self.string.lock().unwrap();
68        *guard = Some(s);
69        self.changed.store(true, Ordering::Release);
70    }
71
72    pub fn clear_string(&self) {
73        let mut guard = self.string.lock().unwrap();
74        *guard = None;
75        self.changed.store(true, Ordering::Release);
76    }
77
78    pub fn has_string(&self) -> bool {
79        let guard = self.string.lock().unwrap();
80        guard.is_some()
81    }
82
83    pub async fn run(mut self) -> Result<(), Vec<Child>> {
84        while self.rx.changed().await.is_ok() {
85            if !self.procs.is_empty() {
86                debug!("procs: {:?}", self.procs);
87            }
88
89            {
90                let m = &*self.rx.borrow();
91                if let PreviewMessage::Set(s) = m {
92                    self.set_string(s.clone());
93                    // don't kill the underlying
94                    continue;
95                } else if matches!(m, PreviewMessage::Unset) {
96                    self.clear_string();
97                    continue;
98                }
99            }
100
101            self.dispatch_kill();
102            self.clear_string();
103            self.lines.clear();
104
105            match &*self.rx.borrow() {
106                PreviewMessage::Run(cmd, variables) => {
107                    if let Some(mut child) = spawn_script(
108                        cmd,
109                        variables.iter().cloned(),
110                        Stdio::null(),
111                        Stdio::piped(),
112                        Stdio::null(),
113                    ) {
114                        if let Some(stdout) = child.stdout.take() {
115                            self.changed.store(true, Ordering::Relaxed);
116                            let lines = self.lines.clone();
117                            let cmd = cmd.clone();
118                            let handle = tokio::spawn(async move {
119                                let mut reader = BufReader::new(stdout);
120                                let mut leftover = Vec::new();
121                                let mut buf = [0u8; 8192];
122
123                                // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
124                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
125                                    if n == 0 {
126                                        break;
127                                    }
128
129                                    leftover.extend_from_slice(&buf[..n]);
130
131                                    let valid_up_to = match std::str::from_utf8(&leftover) {
132                                        Ok(_) => leftover.len(),
133                                        Err(e) => e.valid_up_to(),
134                                    };
135
136                                    let split_at = leftover[..valid_up_to]
137                                    .iter()
138                                    .rposition(|&b| b == b'\n' || b == b'\r')
139                                    .map(|pos| pos + 1)
140                                    .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
141
142                                    let (valid_bytes, rest) = leftover.split_at(split_at);
143
144                                    match valid_bytes.into_text() {
145                                        Ok(text) => {
146                                            for line in text {
147                                                lines.push(line);
148                                            }
149                                        }
150                                        Err(e) => {
151                                            if self.config.try_lossy {
152                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
153                                                    let line =
154                                                    String::from_utf8_lossy(bytes).into_owned();
155                                                    lines.push(Line::from(line));
156                                                }
157                                            } else {
158                                                error!("Error displaying {cmd}: {:?}", e);
159                                                return false;
160                                            }
161                                        }
162                                    }
163
164                                    leftover = rest.to_vec();
165                                }
166
167                                // handle any remaining bytes
168                                if !leftover.is_empty() {
169                                    match leftover.into_text() {
170                                        Ok(text) => {
171                                            for line in text {
172                                                lines.push(line);
173                                            }
174                                        }
175                                        Err(e) => {
176                                            if self.config.try_lossy {
177                                                for bytes in leftover.split(|b| *b == b'\n') {
178                                                    let line =
179                                                    String::from_utf8_lossy(bytes).into_owned();
180                                                    lines.push(Line::from(line));
181                                                }
182                                            } else {
183                                                error!("Error displaying {cmd}: {:?}", e);
184                                                return false;
185                                            }
186                                        }
187                                    }
188                                }
189                                true
190                            });
191                            self.current = Some((child, handle))
192                        } else {
193                            error!("Failed to get stdout of preview command: {cmd}")
194                        }
195                    }
196                }
197                PreviewMessage::Stop => {
198                }
199                _ => unreachable!()
200            }
201
202            self.prune_procs();
203        }
204
205        let ret = self.cleanup_procs();
206        if ret.is_empty() { Ok(()) } else { Err(ret) }
207    }
208
209    fn dispatch_kill(&mut self) {
210        if let Some((mut child, old)) = self.current.take() {
211            let _ = child.kill();
212            self.procs.push(child);
213            let mut old = Box::pin(old); // pin it to heap
214
215            match old.as_mut().now_or_never() {
216                Some(Ok(result)) => {
217                    if !result && let Some(ref tx) = self.event_controller_tx {
218                        let _ = tx.send(Event::Refresh);
219                    }
220                }
221                None => {
222                    old.abort(); // still works because `AbortHandle` is separate
223                }
224                _ => {}
225            }
226        }
227    }
228
229    pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
230        self.event_controller_tx = Some(event_controller_tx)
231    }
232
233    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
234    // also, maybe don't want this delaying exit?
235    fn cleanup_procs(mut self) -> Vec<Child> {
236        let total_timeout = Duration::from_secs(1);
237        let start = Instant::now();
238
239        self.procs.retain_mut(|child| {
240            loop {
241                match child.try_wait() {
242                    Ok(Some(_)) => return false,
243                    Ok(None) => {
244                        if start.elapsed() >= total_timeout {
245                            error!("Child failed to exit in time: {:?}", child);
246                            return true;
247                        } else {
248                            thread::sleep(Duration::from_millis(10));
249                        }
250                    }
251                    Err(e) => {
252                        error!("Error waiting on child: {e}");
253                        return true;
254                    }
255                }
256            }
257        });
258
259        self.procs
260    }
261
262    fn prune_procs(&mut self) {
263        self.procs.retain_mut(|child| match child.try_wait() {
264            Ok(None) => true,
265            Ok(Some(_)) => false,
266            Err(e) => {
267                warn!("Error waiting on child: {e}");
268                true
269            }
270        });
271    }
272}
273
274// ---------- NON ANSI VARIANT
275// let reader = BufReader::new(stdout);
276// if self.config.try_lossy {
277// for line_result in reader.split(b'\n') {
278//     match line_result {
279//         Ok(bytes) => {
280//             let line =
281//             String::from_utf8_lossy(&bytes).into_owned();
282//             lines.push(Line::from(line));
283//         }
284//         Err(e) => error!("Failed to read line: {:?}", e),
285//     }
286// }
287// } else {
288//     for line_result in reader.lines() {
289//         match line_result {
290//             Ok(line) => lines.push(Line::from(line)),
291//             Err(e) => {
292//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
293//                 error!("Error displaying {cmd}: {:?}", e);
294//                 break;
295//             }
296//         }
297//     }
298// }
299
300// trait Resettable: Default {
301//     fn reset(&mut self) {}
302// }
303// impl<T> Resettable for AppendOnly<T> {
304//     fn reset(&mut self) {
305//         self.clear();
306//     }
307// }
308
309// use std::ops::{Deref, DerefMut};
310
311// #[derive(Debug)]
312// struct Queue<V: Resettable> {
313//     entries: Vec<(String, V)>,
314//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
315// }
316
317// impl<V: Resettable> Queue<V> {
318//     pub fn new(len: usize) -> Self {
319//         Self {
320//             entries: (0..len)
321//             .map(|_| (String::default(), V::default()))
322//             .collect(),
323//             order: vec![len; len],
324//         }
325//     }
326
327//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
328//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
329//             if order_idx == self.entries.len() {
330//                 return None
331//             }
332//             if self.entries[entries_idx].0 == key {
333//                 return Some((order_idx, entries_idx));
334//             }
335//         }
336//         None
337//     }
338
339//     /// Try to get a key; if found, move it to the top.
340//     /// If not found, replace the oldest, clear its vec, set new key.
341//     pub fn try_get(&mut self, key: &str) -> bool {
342//         let n = self.entries.len();
343
344//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
345//             self.order.copy_within(0..order_idx, 1);
346//             self.order[0] = idx;
347//             true
348//         } else {
349//             let order_idx = (0..n)
350//             .rfind(|&i| self.order[i] < n)
351//             .map(|i| i + 1)
352//             .unwrap_or(0);
353
354//             let idx = if self.order[order_idx] < self.entries.len() {
355//                 order_idx
356//             } else {
357//                 *self.order.last().unwrap()
358//             };
359
360//             // shift and insert at front
361//             self.order.copy_within(0..order_idx, 1);
362//             self.order[0] = idx;
363
364//             // reset and assign new key
365//             let (ref mut k, ref mut v) = self.entries[idx];
366//             *k = key.to_owned();
367//             v.reset();
368
369//             false
370//         }
371//     }
372// }
373
374// impl<V: Resettable> Deref for Queue<V> {
375//     type Target = V;
376//     fn deref(&self) -> &Self::Target {
377//         &self.entries[self.order[0]].1
378//     }
379// }
380
381// impl<V: Resettable> DerefMut for Queue<V> {
382//     fn deref_mut(&mut self) -> &mut Self::Target {
383//         &mut self.entries[self.order[0]].1
384//     }
385// }
386
387// impl<V: Resettable> Default for Queue<V> {
388//     fn default() -> Self {
389//         Self::new(1)
390//     }
391// }