matchmaker/proc/
previewer.rs

1use ansi_to_tui::IntoText;
2use futures::FutureExt;
3use log::{debug, error, warn};
4use ratatui::text::{Line, Text};
5use std::io::BufReader;
6use std::process::{Child, Stdio};
7use std::sync::{Arc, Mutex};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::thread;
10use std::time::{Duration, Instant};
11use strum_macros::Display;
12use tokio::sync::mpsc::UnboundedSender;
13use tokio::sync::watch::{Receiver, Sender, channel};
14use tokio::task::JoinHandle;
15
16use super::{AppendOnly, EnvVars, spawn};
17use crate::proc::Preview;
18use crate::config::PreviewerConfig;
19use crate::message::Event;
20
21#[derive(Debug, Display, Clone)]
22pub enum PreviewMessage {
23    Run(String, EnvVars),
24    Set(Text<'static>),
25    Unset,
26    Stop,
27}
28
29#[derive(Debug)]
30pub struct Previewer {
31    rx: Receiver<PreviewMessage>,
32    lines: AppendOnly<Line<'static>>, // append-only buffer
33    string: Arc<Mutex<Option<Text<'static>>>>,
34    changed: Arc<AtomicBool>,
35
36    procs: Vec<Child>,
37    current: Option<(Child, JoinHandle<bool>)>,
38    pub config: PreviewerConfig,
39    controller_tx: Option<UnboundedSender<Event>>,
40}
41
42impl Previewer {
43    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
44        let (tx, rx) = channel(PreviewMessage::Stop);
45
46        let new = Self {
47            rx,
48            lines: AppendOnly::new(),
49            string: Default::default(),
50            changed: Default::default(),
51
52            procs: Vec::new(),
53            current: None,
54            config,
55            controller_tx: None,
56        };
57
58        (new, tx)
59    }
60
61    pub fn view(&self) -> Preview {
62        Preview::new(self.lines.clone(), self.string.clone(), self.changed.clone())
63    }
64
65    pub fn set_string(&self, s: Text<'static>) {
66        let mut guard = self.string.lock().unwrap();
67        *guard = Some(s);
68        self.changed.store(true, Ordering::Release);
69    }
70
71    pub fn clear_string(&self) {
72        let mut guard = self.string.lock().unwrap();
73        *guard = None;
74        self.changed.store(true, Ordering::Release);
75    }
76
77    pub fn has_string(&self) -> bool {
78        let guard = self.string.lock().unwrap();
79        guard.is_some()
80    }
81
82    pub async fn run(mut self) -> Result<(), Vec<Child>> {
83        while self.rx.changed().await.is_ok() {
84            if !self.procs.is_empty() {
85                debug!("procs: {:?}", self.procs);
86            }
87
88            {
89                let m = &*self.rx.borrow();
90                if let PreviewMessage::Set(s) = m {
91                    self.set_string(s.clone());
92                    // don't kill the underlying
93                    continue;
94                } else if matches!(m, PreviewMessage::Unset) {
95                    self.clear_string();
96                    continue;
97                }
98            }
99
100            self.dispatch_kill();
101            self.clear_string();
102            self.lines.clear();
103
104            match &*self.rx.borrow() {
105                PreviewMessage::Run(cmd, variables) => {
106                    if let Some(mut child) = spawn(
107                        cmd,
108                        variables.iter().cloned(),
109                        Stdio::null(),
110                        Stdio::piped(),
111                        Stdio::null(),
112                    ) {
113                        if let Some(stdout) = child.stdout.take() {
114                            self.changed.store(true, Ordering::Relaxed);
115                            let lines = self.lines.clone();
116                            let cmd = cmd.clone();
117                            let handle = tokio::spawn(async move {
118                                let mut reader = BufReader::new(stdout);
119                                let mut leftover = Vec::new();
120                                let mut buf = [0u8; 8192];
121
122                                // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
123                                while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
124                                    if n == 0 {
125                                        break;
126                                    }
127
128                                    leftover.extend_from_slice(&buf[..n]);
129
130                                    let valid_up_to = match std::str::from_utf8(&leftover) {
131                                        Ok(_) => leftover.len(),
132                                        Err(e) => e.valid_up_to(),
133                                    };
134
135                                    let split_at = leftover[..valid_up_to]
136                                    .iter()
137                                    .rposition(|&b| b == b'\n' || b == b'\r')
138                                    .map(|pos| pos + 1)
139                                    .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
140
141                                    let (valid_bytes, rest) = leftover.split_at(split_at);
142
143                                    match valid_bytes.into_text() {
144                                        Ok(text) => {
145                                            for line in text {
146                                                lines.push(line);
147                                            }
148                                        }
149                                        Err(e) => {
150                                            if self.config.try_lossy {
151                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
152                                                    let line =
153                                                    String::from_utf8_lossy(bytes).into_owned();
154                                                    lines.push(Line::from(line));
155                                                }
156                                            } else {
157                                                error!("Error displaying {cmd}: {:?}", e);
158                                                return false;
159                                            }
160                                        }
161                                    }
162
163                                    leftover = rest.to_vec();
164                                }
165
166                                // handle any remaining bytes
167                                if !leftover.is_empty() {
168                                    match leftover.into_text() {
169                                        Ok(text) => {
170                                            for line in text {
171                                                lines.push(line);
172                                            }
173                                        }
174                                        Err(e) => {
175                                            if self.config.try_lossy {
176                                                for bytes in leftover.split(|b| *b == b'\n') {
177                                                    let line =
178                                                    String::from_utf8_lossy(bytes).into_owned();
179                                                    lines.push(Line::from(line));
180                                                }
181                                            } else {
182                                                error!("Error displaying {cmd}: {:?}", e);
183                                                return false;
184                                            }
185                                        }
186                                    }
187                                }
188                                true
189                            });
190                            self.current = Some((child, handle))
191                        } else {
192                            error!("Failed to get stdout of preview command: {cmd}")
193                        }
194                    }
195                }
196                PreviewMessage::Stop => {
197                }
198                _ => unreachable!()
199            }
200
201            self.prune_procs();
202        }
203
204        let ret = self.cleanup_procs();
205        if ret.is_empty() { Ok(()) } else { Err(ret) }
206    }
207
208    fn dispatch_kill(&mut self) {
209        if let Some((mut child, old)) = self.current.take() {
210            let _ = child.kill();
211            self.procs.push(child);
212            let mut old = Box::pin(old); // pin it to heap
213
214            match old.as_mut().now_or_never() {
215                Some(Ok(result)) => {
216                    if !result && let Some(ref tx) = self.controller_tx {
217                        let _ = tx.send(Event::Refresh);
218                    }
219                }
220                None => {
221                    old.abort(); // still works because `AbortHandle` is separate
222                }
223                _ => {}
224            }
225        }
226    }
227
228    pub fn connect_controller(&mut self, controller_tx: UnboundedSender<Event>) {
229        self.controller_tx = Some(controller_tx)
230    }
231
232    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
233    // also, maybe don't want this delaying exit?
234    fn cleanup_procs(mut self) -> Vec<Child> {
235        let total_timeout = Duration::from_secs(1);
236        let start = Instant::now();
237
238        self.procs.retain_mut(|child| {
239            loop {
240                match child.try_wait() {
241                    Ok(Some(_)) => return false,
242                    Ok(None) => {
243                        if start.elapsed() >= total_timeout {
244                            error!("Child failed to exit in time: {:?}", child);
245                            return true;
246                        } else {
247                            thread::sleep(Duration::from_millis(10));
248                        }
249                    }
250                    Err(e) => {
251                        error!("Error waiting on child: {e}");
252                        return true;
253                    }
254                }
255            }
256        });
257
258        self.procs
259    }
260
261    fn prune_procs(&mut self) {
262        self.procs.retain_mut(|child| match child.try_wait() {
263            Ok(None) => true,
264            Ok(Some(_)) => false,
265            Err(e) => {
266                warn!("Error waiting on child: {e}");
267                true
268            }
269        });
270    }
271}
272
273// ---------- NON ANSI VARIANT
274// let reader = BufReader::new(stdout);
275// if self.config.try_lossy {
276// for line_result in reader.split(b'\n') {
277//     match line_result {
278//         Ok(bytes) => {
279//             let line =
280//             String::from_utf8_lossy(&bytes).into_owned();
281//             lines.push(Line::from(line));
282//         }
283//         Err(e) => error!("Failed to read line: {:?}", e),
284//     }
285// }
286// } else {
287//     for line_result in reader.lines() {
288//         match line_result {
289//             Ok(line) => lines.push(Line::from(line)),
290//             Err(e) => {
291//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
292//                 error!("Error displaying {cmd}: {:?}", e);
293//                 break;
294//             }
295//         }
296//     }
297// }
298
299// trait Resettable: Default {
300//     fn reset(&mut self) {}
301// }
302// impl<T> Resettable for AppendOnly<T> {
303//     fn reset(&mut self) {
304//         self.clear();
305//     }
306// }
307
308// use std::ops::{Deref, DerefMut};
309
310// #[derive(Debug)]
311// struct Queue<V: Resettable> {
312//     entries: Vec<(String, V)>,
313//     order: Vec<usize>, // indices ordered by recency (0 = most recent)
314// }
315
316// impl<V: Resettable> Queue<V> {
317//     pub fn new(len: usize) -> Self {
318//         Self {
319//             entries: (0..len)
320//             .map(|_| (String::default(), V::default()))
321//             .collect(),
322//             order: vec![len; len],
323//         }
324//     }
325
326//     fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
327//         for (order_idx, &entries_idx) in self.order.iter().enumerate() {
328//             if order_idx == self.entries.len() {
329//                 return None
330//             }
331//             if self.entries[entries_idx].0 == key {
332//                 return Some((order_idx, entries_idx));
333//             }
334//         }
335//         None
336//     }
337
338//     /// Try to get a key; if found, move it to the top.
339//     /// If not found, replace the oldest, clear its vec, set new key.
340//     pub fn try_get(&mut self, key: &str) -> bool {
341//         let n = self.entries.len();
342
343//         if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
344//             self.order.copy_within(0..order_idx, 1);
345//             self.order[0] = idx;
346//             true
347//         } else {
348//             let order_idx = (0..n)
349//             .rfind(|&i| self.order[i] < n)
350//             .map(|i| i + 1)
351//             .unwrap_or(0);
352
353//             let idx = if self.order[order_idx] < self.entries.len() {
354//                 order_idx
355//             } else {
356//                 *self.order.last().unwrap()
357//             };
358
359//             // shift and insert at front
360//             self.order.copy_within(0..order_idx, 1);
361//             self.order[0] = idx;
362
363//             // reset and assign new key
364//             let (ref mut k, ref mut v) = self.entries[idx];
365//             *k = key.to_owned();
366//             v.reset();
367
368//             false
369//         }
370//     }
371// }
372
373// impl<V: Resettable> Deref for Queue<V> {
374//     type Target = V;
375//     fn deref(&self) -> &Self::Target {
376//         &self.entries[self.order[0]].1
377//     }
378// }
379
380// impl<V: Resettable> DerefMut for Queue<V> {
381//     fn deref_mut(&mut self) -> &mut Self::Target {
382//         &mut self.entries[self.order[0]].1
383//     }
384// }
385
386// impl<V: Resettable> Default for Queue<V> {
387//     fn default() -> Self {
388//         Self::new(1)
389//     }
390// }