matchmaker/preview/
previewer.rs

1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{EnvVars, spawn_script};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Stdio};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::watch::{Receiver, Sender, channel};
15use tokio::task::JoinHandle;
16
17use super::AppendOnly;
18use crate::config::PreviewerConfig;
19use crate::message::Event;
20use crate::preview::Preview;
21
22#[derive(Debug, Display, Clone)]
23pub enum PreviewMessage {
24    Run(String, EnvVars),
25    Set(Text<'static>),
26    Unset,
27    Stop,
28}
29
30#[derive(Debug)]
31pub struct Previewer {
32    rx: Receiver<PreviewMessage>,
33    lines: AppendOnly<Line<'static>>, // append-only buffer
34    string: Arc<Mutex<Option<Text<'static>>>>,
35    changed: Arc<AtomicBool>,
36
37    procs: Vec<Child>,
38    current: Option<(Child, JoinHandle<bool>)>,
39    pub config: PreviewerConfig,
40    event_controller_tx: Option<UnboundedSender<Event>>,
41}
42
43impl Previewer {
44    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
45        let (tx, rx) = channel(PreviewMessage::Stop);
46
47        let new = Self {
48            rx,
49            lines: AppendOnly::new(),
50            string: Default::default(),
51            changed: Default::default(),
52
53            procs: Vec::new(),
54            current: None,
55            config,
56            event_controller_tx: None,
57        };
58
59        (new, tx)
60    }
61
62    pub fn view(&self) -> Preview {
63        Preview::new(
64            self.lines.clone(),
65            self.string.clone(),
66            self.changed.clone(),
67        )
68    }
69
70    pub fn set_string(&self, s: Text<'static>) {
71        let mut guard = self.string.lock().unwrap();
72        *guard = Some(s);
73        self.changed.store(true, Ordering::Release);
74    }
75
76    pub fn clear_string(&self) {
77        let mut guard = self.string.lock().unwrap();
78        *guard = None;
79        self.changed.store(true, Ordering::Release);
80    }
81
82    pub fn has_string(&self) -> bool {
83        let guard = self.string.lock().unwrap();
84        guard.is_some()
85    }
86
87    pub async fn run(mut self) -> Result<(), Vec<Child>> {
88        while self.rx.changed().await.is_ok() {
89            if !self.procs.is_empty() {
90                debug!("procs: {:?}", self.procs);
91            }
92
93            {
94                let m = &*self.rx.borrow();
95                if let PreviewMessage::Set(s) = m {
96                    self.set_string(s.clone());
97                    // don't kill the underlying
98                    continue;
99                } else if matches!(m, PreviewMessage::Unset) {
100                    self.clear_string();
101                    continue;
102                }
103            }
104
105            self.dispatch_kill();
106            self.clear_string();
107            self.lines.clear();
108
109            match &*self.rx.borrow() {
110                PreviewMessage::Run(cmd, variables) => {
111                    if let Some(mut child) = spawn_script(
112                        cmd,
113                        variables.iter().cloned(),
114                        Stdio::null(),
115                        Stdio::piped(),
116                        Stdio::null(),
117                    ) {
118                        if let Some(stdout) = child.stdout.take() {
119                            self.changed.store(true, Ordering::Relaxed);
120                            let lines = self.lines.clone();
121                            let cmd = cmd.clone();
122                            let handle = tokio::spawn(async move {
123                                let mut reader = BufReader::new(stdout);
124                                let mut leftover = Vec::new();
125                                let mut buf = [0u8; 8192];
126
127                                // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
128                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
129                                    if n == 0 {
130                                        break;
131                                    }
132
133                                    leftover.extend_from_slice(&buf[..n]);
134
135                                    let valid_up_to = match std::str::from_utf8(&leftover) {
136                                        Ok(_) => leftover.len(),
137                                        Err(e) => e.valid_up_to(),
138                                    };
139
140                                    let split_at = leftover[..valid_up_to]
141                                        .iter()
142                                        .rposition(|&b| b == b'\n' || b == b'\r')
143                                        .map(|pos| pos + 1)
144                                        .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
145
146                                    let (valid_bytes, rest) = leftover.split_at(split_at);
147
148                                    match valid_bytes.into_text() {
149                                        Ok(text) => {
150                                            for line in text {
151                                                lines.push(line);
152                                            }
153                                        }
154                                        Err(e) => {
155                                            if self.config.try_lossy {
156                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
157                                                    let line =
158                                                        String::from_utf8_lossy(bytes).into_owned();
159                                                    lines.push(Line::from(line));
160                                                }
161                                            } else {
162                                                error!("Error displaying {cmd}: {:?}", e);
163                                                return false;
164                                            }
165                                        }
166                                    }
167
168                                    leftover = rest.to_vec();
169                                }
170
171                                // handle any remaining bytes
172                                if !leftover.is_empty() {
173                                    match leftover.into_text() {
174                                        Ok(text) => {
175                                            for line in text {
176                                                lines.push(line);
177                                            }
178                                        }
179                                        Err(e) => {
180                                            if self.config.try_lossy {
181                                                for bytes in leftover.split(|b| *b == b'\n') {
182                                                    let line =
183                                                        String::from_utf8_lossy(bytes).into_owned();
184                                                    lines.push(Line::from(line));
185                                                }
186                                            } else {
187                                                error!("Error displaying {cmd}: {:?}", e);
188                                                return false;
189                                            }
190                                        }
191                                    }
192                                }
193                                true
194                            });
195                            self.current = Some((child, handle))
196                        } else {
197                            error!("Failed to get stdout of preview command: {cmd}")
198                        }
199                    }
200                }
201                PreviewMessage::Stop => {}
202                _ => unreachable!(),
203            }
204
205            self.prune_procs();
206        }
207
208        let ret = self.cleanup_procs();
209        if ret.is_empty() { Ok(()) } else { Err(ret) }
210    }
211
212    fn dispatch_kill(&mut self) {
213        if let Some((mut child, old)) = self.current.take() {
214            let _ = child.kill();
215            self.procs.push(child);
216            let mut old = Box::pin(old); // pin it to heap
217
218            match old.as_mut().now_or_never() {
219                Some(Ok(result)) => {
220                    if !result && let Some(ref tx) = self.event_controller_tx {
221                        let _ = tx.send(Event::Refresh);
222                    }
223                }
224                None => {
225                    old.abort(); // still works because `AbortHandle` is separate
226                }
227                _ => {}
228            }
229        }
230    }
231
232    pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
233        self.event_controller_tx = Some(event_controller_tx)
234    }
235
236    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
237    // also, maybe don't want this delaying exit?
238    fn cleanup_procs(mut self) -> Vec<Child> {
239        let total_timeout = Duration::from_secs(1);
240        let start = Instant::now();
241
242        self.procs.retain_mut(|child| {
243            loop {
244                match child.try_wait() {
245                    Ok(Some(_)) => return false,
246                    Ok(None) => {
247                        if start.elapsed() >= total_timeout {
248                            error!("Child failed to exit in time: {:?}", child);
249                            return true;
250                        } else {
251                            thread::sleep(Duration::from_millis(10));
252                        }
253                    }
254                    Err(e) => {
255                        error!("Error waiting on child: {e}");
256                        return true;
257                    }
258                }
259            }
260        });
261
262        self.procs
263    }
264
265    fn prune_procs(&mut self) {
266        self.procs.retain_mut(|child| match child.try_wait() {
267            Ok(None) => true,
268            Ok(Some(_)) => false,
269            Err(e) => {
270                warn!("Error waiting on child: {e}");
271                true
272            }
273        });
274    }
275}
276
277// ---------- NON ANSI VARIANT
278// let reader = BufReader::new(stdout);
279// if self.config.try_lossy {
280// for line_result in reader.split(b'\n') {
281//     match line_result {
282//         Ok(bytes) => {
283//             let line =
284//             String::from_utf8_lossy(&bytes).into_owned();
285//             lines.push(Line::from(line));
286//         }
287//         Err(e) => error!("Failed to read line: {:?}", e),
288//     }
289// }
290// } else {
291//     for line_result in reader.lines() {
292//         match line_result {
293//             Ok(line) => lines.push(Line::from(line)),
294//             Err(e) => {
295//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
296//                 error!("Error displaying {cmd}: {:?}", e);
297//                 break;
298//             }
299//         }
300//     }
301// }
302
303// trait Resettable: Default {
304//     fn reset(&mut self) {}
305// }
306// impl<T> Resettable for AppendOnly<T> {
307//     fn reset(&mut self) {
308//         self.clear();
309//     }
310// }
311
312// use std::ops::{Deref, DerefMut};
313
314// #[derive(Debug)]
315// struct Queue<V: Resettable> {
316//     entries: Vec<(String, V)>,
317//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
318// }
319
320// impl<V: Resettable> Queue<V> {
321//     pub fn new(len: usize) -> Self {
322//         Self {
323//             entries: (0..len)
324//             .map(|_| (String::default(), V::default()))
325//             .collect(),
326//             order: vec![len; len],
327//         }
328//     }
329
330//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
331//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
332//             if order_idx == self.entries.len() {
333//                 return None
334//             }
335//             if self.entries[entries_idx].0 == key {
336//                 return Some((order_idx, entries_idx));
337//             }
338//         }
339//         None
340//     }
341
342//     /// Try to get a key; if found, move it to the top.
343//     /// If not found, replace the oldest, clear its vec, set new key.
344//     pub fn try_get(&mut self, key: &str) -> bool {
345//         let n = self.entries.len();
346
347//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
348//             self.order.copy_within(0..order_idx, 1);
349//             self.order[0] = idx;
350//             true
351//         } else {
352//             let order_idx = (0..n)
353//             .rfind(|&i| self.order[i] < n)
354//             .map(|i| i + 1)
355//             .unwrap_or(0);
356
357//             let idx = if self.order[order_idx] < self.entries.len() {
358//                 order_idx
359//             } else {
360//                 *self.order.last().unwrap()
361//             };
362
363//             // shift and insert at front
364//             self.order.copy_within(0..order_idx, 1);
365//             self.order[0] = idx;
366
367//             // reset and assign new key
368//             let (ref mut k, ref mut v) = self.entries[idx];
369//             *k = key.to_owned();
370//             v.reset();
371
372//             false
373//         }
374//     }
375// }
376
377// impl<V: Resettable> Deref for Queue<V> {
378//     type Target = V;
379//     fn deref(&self) -> &Self::Target {
380//         &self.entries[self.order[0]].1
381//     }
382// }
383
384// impl<V: Resettable> DerefMut for Queue<V> {
385//     fn deref_mut(&mut self) -> &mut Self::Target {
386//         &mut self.entries[self.order[0]].1
387//     }
388// }
389
390// impl<V: Resettable> Default for Queue<V> {
391//     fn default() -> Self {
392//         Self::new(1)
393//     }
394// }