matchmaker/spawn/
preview.rs

1use ansi_to_tui::IntoText;
2use futures::FutureExt;
3use log::{debug, error, warn};
4use ratatui::text::{Line, Text};
5use tokio::sync::mpsc::UnboundedSender;
6use std::io::{BufReader};
7use std::process::{Child, Stdio};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::watch::{Receiver, Sender, channel};
14use tokio::task::JoinHandle;
15
16use super::{AppendOnly, EnvVars, spawn};
17use crate::config::PreviewerConfig;
18use crate::message::Event;
19
20#[derive(Debug, Display, Clone)]
21pub enum PreviewMessage {
22    Run(String, EnvVars),
23    Stop,
24}
25
26#[derive(Debug)]
27pub struct Previewer {
28    rx: Receiver<PreviewMessage>,
29    lines: AppendOnly<Line<'static>>, // append-only buffer
30    procs: Vec<Child>,
31    current: Option<(Child, JoinHandle<bool>)>,
32    changed: Arc<AtomicBool>,
33    pub config: PreviewerConfig,
34    controller_tx: Option<UnboundedSender<Event>>
35    
36}
37
38#[derive(Debug)]
39pub struct PreviewerView {
40    lines: AppendOnly<Line<'static>>,
41    changed: Arc<AtomicBool>,
42}
43
44impl PreviewerView {
45    pub fn results(&self) -> Text<'_> {
46        let output = self.lines.read().unwrap(); // acquire read lock
47        Text::from_iter(output.iter().map(|(_, line)| line.clone()))
48    }
49    
50    pub fn len(&self) -> usize {
51        let output = self.lines.read().unwrap();
52        output.count()
53        // todo: handle overflow possibility
54    }
55    
56    pub fn changed(&self) -> bool {
57        self.changed.swap(false, Ordering::Relaxed)
58    }
59}
60
61impl Previewer {
62    pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
63        let (tx, rx) = channel(PreviewMessage::Stop);
64        
65        let lines = AppendOnly::new();
66        
67        let new = Self {
68            rx,
69            lines: lines.clone(),
70            procs: Vec::new(),
71            current: None,
72            changed: Default::default(),
73            config,
74            controller_tx: None
75        };
76        
77        (new, tx)
78    }
79    
80    pub fn view(&self) -> PreviewerView {
81        PreviewerView {
82            lines: self.lines.clone(),
83            changed: self.changed.clone(),
84        }
85    }
86    
87    pub async fn run(mut self) -> Result<(), Vec<Child>> {
88        while self.rx.changed().await.is_ok() {
89            if !self.procs.is_empty() {
90                debug!("procs: {:?}", self.procs);
91            }
92            
93            self.dispatch_kill();
94            
95            match &*self.rx.borrow() {
96                PreviewMessage::Run(cmd, variables) => {
97                    self.lines.clear();
98                    if let Some(mut child) = spawn(
99                        &cmd,
100                        variables.iter().cloned(),
101                        Stdio::null(),
102                        Stdio::piped(),
103                        Stdio::null(),
104                    ) {
105                        if let Some(stdout) = child.stdout.take() {
106                            self.changed.store(true, Ordering::Relaxed);
107                            let lines = self.lines.clone();
108                            let cmd = cmd.clone();
109                            let handle = tokio::spawn(async move {                                
110                                let mut reader = BufReader::new(stdout);
111                                let mut leftover = Vec::new();
112                                let mut buf = [0u8; 8192];
113                                
114                                // TODO: want to use buffer over lines (for efficiency?), but partial lines are not handled, and damaged utf-8 still leaks thu somehow
115                                loop {
116                                    let n = if let Ok(x) = std::io::Read::read(&mut reader, &mut buf) {x } else { break };
117                                    if n == 0 { break; }
118                                    
119                                    leftover.extend_from_slice(&buf[..n]);
120                                    
121                                    let valid_up_to = match std::str::from_utf8(&leftover) {
122                                        Ok(_) => leftover.len(),
123                                        Err(e) => e.valid_up_to(),
124                                    };
125                                    
126                                    let split_at = leftover[..valid_up_to]
127                                    .iter()
128                                    .rposition(|&b| b == b'\n' || b == b'\r')
129                                    .map(|pos| pos + 1)
130                                    .unwrap_or(valid_up_to); // todo: problem if line exceeds
131
132                                    let (valid_bytes, rest) = leftover.split_at(split_at);
133                                    
134                                    match valid_bytes.into_text() {
135                                        Ok(text) => {
136                                            for line in text {
137                                                lines.push(line);
138                                            }
139                                        }
140                                        Err(e) => {
141                                            if self.config.try_lossy {
142                                                for bytes in valid_bytes.split(|b| *b == b'\n') {
143                                                    let line =
144                                                    String::from_utf8_lossy(&bytes).into_owned();
145                                                    lines.push(Line::from(line));
146                                                }
147                                            } else {
148                                                error!("Error displaying {cmd}: {:?}", e);
149                                                return false
150                                            }
151                                        }
152                                    }
153                                    
154                                    leftover = rest.to_vec();
155                                }
156                                
157                                // handle any remaining bytes
158                                if !leftover.is_empty() {
159                                    match leftover.into_text() {
160                                        Ok(text) => {
161                                            for line in text {
162                                                lines.push(line);
163                                            }
164                                        }
165                                        Err(e) => {
166                                            if self.config.try_lossy {
167                                                for bytes in leftover.split(|b| *b == b'\n') {
168                                                    let line =
169                                                    String::from_utf8_lossy(&bytes).into_owned();
170                                                    lines.push(Line::from(line));
171                                                }
172                                            } else {
173                                                error!("Error displaying {cmd}: {:?}", e);
174                                                return false
175                                            }
176                                        }
177                                    }
178                                }
179                                true
180                            });
181                            self.current = Some((child, handle))
182                        } else {
183                            error!("Failed to get stdout of preview command: {cmd}")
184                        }
185                    }
186                }
187                
188                PreviewMessage::Stop => {}
189            }
190            
191            self.prune_procs();
192        }
193        
194        let ret = self.cleanup_procs();
195        if ret.is_empty() { Ok(()) } else { Err(ret) }
196    }
197    
198    fn dispatch_kill(&mut self) {
199        if let Some((mut child, old)) = self.current.take() {
200            let _ = child.kill();
201            self.procs.push(child);
202            let mut old = Box::pin(old); // pin it to heap
203            
204            match old.as_mut().now_or_never() {
205                Some(Ok(result)) => {
206                    if !result {
207                        if let Some(ref tx) = self.controller_tx {
208                            let _ = tx.send(Event::Refresh);
209                        }
210                    }
211                }
212                None => {
213                    old.abort(); // still works because `AbortHandle` is separate
214                }
215                _ => {}
216            }
217            
218        }
219    }
220    
221    pub fn connect_controller(&mut self, controller_tx: UnboundedSender<Event>) {
222        self.controller_tx = Some(controller_tx)
223        
224    }
225    
226    // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
227    fn cleanup_procs(mut self) -> Vec<Child> {
228        let total_timeout = Duration::from_secs(1);
229        let start = Instant::now();
230        
231        self.procs.retain_mut(|child| {
232            loop {
233                match child.try_wait() {
234                    Ok(Some(_)) => return false,
235                    Ok(None) => {
236                        if start.elapsed() >= total_timeout {
237                            error!("Child failed to exit in time: {:?}", child);
238                            return true;
239                        } else {
240                            thread::sleep(Duration::from_millis(10));
241                        }
242                    }
243                    Err(e) => {
244                        error!("Error waiting on child: {e}");
245                        return true;
246                    }
247                }
248            }
249        });
250        
251        self.procs
252    }
253    
254    fn prune_procs(&mut self) {
255        self.procs.retain_mut(|child| match child.try_wait() {
256            Ok(None) => true,
257            Ok(Some(_)) => false,
258            Err(e) => {
259                warn!("Error waiting on child: {e}");
260                true
261            }
262        });
263    }
264}
265
266
267// ---------- NON ANSI VARIANT
268// let reader = BufReader::new(stdout);
269// if self.config.try_lossy {
270// for line_result in reader.split(b'\n') {
271//     match line_result {
272//         Ok(bytes) => {
273//             let line =
274//             String::from_utf8_lossy(&bytes).into_owned();
275//             lines.push(Line::from(line));
276//         }
277//         Err(e) => error!("Failed to read line: {:?}", e),
278//     }
279// }
280// } else {
281//     for line_result in reader.lines() {
282//         match line_result {
283//             Ok(line) => lines.push(Line::from(line)),
284//             Err(e) => {
285//                 // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
286//                 error!("Error displaying {cmd}: {:?}", e);
287//                 break;
288//             }
289//         }
290//     }
291// }