matchmaker/preview/previewer.rs
1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::mpsc::UnboundedSender;
13use tokio::sync::watch::{Receiver, Sender, channel};
14use tokio::task::JoinHandle;
15
16use super::AppendOnly;
17use crate::config::PreviewerConfig;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23 Run(String, EnvVars),
24 Set(Text<'static>),
25 Unset,
26 Stop,
27}
28
29#[derive(Debug)]
30pub struct Previewer {
31 /// The reciever for for [`PreviewMessage`]'s.
32 rx: Receiver<PreviewMessage>,
33 /// storage for preview command output
34 lines: AppendOnly<Line<'static>>,
35 /// storage for preview string override
36 string: Arc<Mutex<Option<Text<'static>>>>,
37 /// Flag which is set to true whenever the state changes
38 /// and which the viewer can toggle after receiving the current state
39 changed: Arc<AtomicBool>,
40
41 /// Incremented on every new run / kill to invalidate old feeders
42 version: Arc<AtomicUsize>,
43 /// Maintain a queue of child processes to improve cleanup reliability
44 procs: Vec<Child>,
45 /// The currently executing child process
46 current: Option<(Child, JoinHandle<bool>)>,
47 pub config: PreviewerConfig,
48 /// Event loop controller
49 // todo: lowpri: does this want to be a render loop controller instead
50 event_controller_tx: Option<UnboundedSender<Event>>,
51}
52
53impl Previewer {
54 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
55 let (tx, rx) = channel(PreviewMessage::Stop);
56
57 let new = Self {
58 rx,
59 lines: AppendOnly::new(),
60 string: Default::default(),
61 changed: Default::default(),
62 version: Arc::new(AtomicUsize::new(0)),
63
64 procs: Vec::new(),
65 current: None,
66 config,
67 event_controller_tx: None,
68 };
69
70 (new, tx)
71 }
72
73 pub fn view(&self) -> Preview {
74 Preview::new(
75 self.lines.clone(),
76 self.string.clone(),
77 self.changed.clone(),
78 )
79 }
80
81 pub fn set_string(&self, s: Text<'static>) {
82 let mut guard = self.string.lock().unwrap();
83 *guard = Some(s);
84 self.changed.store(true, Ordering::Release);
85 }
86
87 pub fn clear_string(&self) {
88 let mut guard = self.string.lock().unwrap();
89 *guard = None;
90 self.changed.store(true, Ordering::Release);
91 }
92
93 pub fn has_string(&self) -> bool {
94 let guard = self.string.lock().unwrap();
95 guard.is_some()
96 }
97
98 pub async fn run(mut self) -> Result<(), Vec<Child>> {
99 while self.rx.changed().await.is_ok() {
100 if !self.procs.is_empty() {
101 debug!("procs: {:?}", self.procs);
102 }
103
104 {
105 let m = &*self.rx.borrow();
106 if let PreviewMessage::Set(s) = m {
107 self.set_string(s.clone());
108 // don't kill the underlying
109 continue;
110 } else if matches!(m, PreviewMessage::Unset) {
111 self.clear_string();
112 continue;
113 }
114 }
115
116 self.dispatch_kill();
117 self.clear_string();
118 self.lines.clear();
119
120 match &*self.rx.borrow() {
121 PreviewMessage::Run(cmd, variables) => {
122 if let Some(mut child) = Command::from_script(cmd)
123 .envs(variables.iter().cloned())
124 .stdout(Stdio::piped())
125 .stdin(Stdio::null())
126 .stderr(Stdio::null())
127 .detach()
128 ._spawn()
129 {
130 if let Some(stdout) = child.stdout.take() {
131 self.changed.store(true, Ordering::Relaxed);
132
133 let lines = self.lines.clone();
134 let cmd = cmd.clone();
135 let version = self.version.clone();
136 let _version = version.load(Ordering::Acquire);
137
138 // false => needs refresh (i.e. invalid utf-8)
139 let handle = tokio::spawn(async move {
140 let mut reader = BufReader::new(stdout);
141 let mut leftover = Vec::new();
142 let mut buf = [0u8; 8192];
143
144 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
145 if n == 0 {
146 break;
147 }
148
149 leftover.extend_from_slice(&buf[..n]);
150
151 let valid_up_to = match std::str::from_utf8(&leftover) {
152 Ok(_) => leftover.len(),
153 Err(e) => e.valid_up_to(),
154 };
155
156 let split_at = leftover[..valid_up_to]
157 .iter()
158 .rposition(|&b| b == b'\n' || b == b'\r')
159 .map(|pos| pos + 1)
160 .unwrap_or(valid_up_to);
161
162 let (valid_bytes, rest) = leftover.split_at(split_at);
163
164 match valid_bytes.into_text() {
165 Ok(text) => {
166 for line in text {
167 // re-check before pushing
168 if version.load(Ordering::Acquire) != _version {
169 return true;
170 }
171 lines.push(line);
172 }
173 }
174 Err(e) => {
175 if self.config.try_lossy {
176 for bytes in valid_bytes.split(|b| *b == b'\n') {
177 if version.load(Ordering::Acquire) != _version {
178 return true;
179 }
180 let line =
181 String::from_utf8_lossy(bytes).into_owned();
182 lines.push(Line::from(line));
183 }
184 } else {
185 error!("Error displaying {cmd}: {:?}", e);
186 return false;
187 }
188 }
189 }
190
191 leftover = rest.to_vec();
192 }
193
194 if !leftover.is_empty()
195 && version.load(Ordering::Acquire) == _version
196 {
197 match leftover.into_text() {
198 Ok(text) => {
199 for line in text {
200 if version.load(Ordering::Acquire) != _version {
201 return true;
202 }
203 lines.push(line);
204 }
205 }
206 Err(e) => {
207 if self.config.try_lossy {
208 for bytes in leftover.split(|b| *b == b'\n') {
209 if version.load(Ordering::Acquire) != _version {
210 return true;
211 }
212 let line =
213 String::from_utf8_lossy(bytes).into_owned();
214 lines.push(Line::from(line));
215 }
216 } else {
217 error!("Error displaying {cmd}: {:?}", e);
218 return false;
219 }
220 }
221 }
222 }
223
224 true
225 });
226 self.current = Some((child, handle))
227 } else {
228 error!("Failed to get stdout of preview command: {cmd}")
229 }
230 }
231 }
232 PreviewMessage::Stop => {}
233 _ => unreachable!(),
234 }
235
236 self.prune_procs();
237 }
238
239 let ret = self.cleanup_procs();
240 if ret.is_empty() { Ok(()) } else { Err(ret) }
241 }
242
243 fn dispatch_kill(&mut self) {
244 if let Some((mut child, old)) = self.current.take() {
245 // invalidate all existing feeders
246 self.version.fetch_add(1, Ordering::AcqRel);
247
248 let _ = child.kill();
249 self.procs.push(child);
250 let mut old = Box::pin(old); // pin it to heap
251
252 match old.as_mut().now_or_never() {
253 Some(Ok(result)) => {
254 if !result && let Some(ref tx) = self.event_controller_tx {
255 let _ = tx.send(Event::Refresh);
256 }
257 }
258 None => {
259 old.abort(); // still works because `AbortHandle` is separate
260 }
261 _ => {}
262 }
263 }
264 }
265
266 pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
267 self.event_controller_tx = Some(event_controller_tx)
268 }
269
270 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
271 // also, maybe don't want this delaying exit?
272 fn cleanup_procs(mut self) -> Vec<Child> {
273 let total_timeout = Duration::from_secs(1);
274 let start = Instant::now();
275
276 self.procs.retain_mut(|child| {
277 loop {
278 match child.try_wait() {
279 Ok(Some(_)) => return false,
280 Ok(None) => {
281 if start.elapsed() >= total_timeout {
282 error!("Child failed to exit in time: {:?}", child);
283 return true;
284 } else {
285 thread::sleep(Duration::from_millis(10));
286 }
287 }
288 Err(e) => {
289 error!("Error waiting on child: {e}");
290 return true;
291 }
292 }
293 }
294 });
295
296 self.procs
297 }
298
299 fn prune_procs(&mut self) {
300 self.procs.retain_mut(|child| match child.try_wait() {
301 Ok(None) => true,
302 Ok(Some(_)) => false,
303 Err(e) => {
304 warn!("Error waiting on child: {e}");
305 true
306 }
307 });
308 }
309}
310
311// ---------- NON ANSI VARIANT
312// let reader = BufReader::new(stdout);
313// if self.config.try_lossy {
314// for line_result in reader.split(b'\n') {
315// match line_result {
316// Ok(bytes) => {
317// let line =
318// String::from_utf8_lossy(&bytes).into_owned();
319// lines.push(Line::from(line));
320// }
321// Err(e) => error!("Failed to read line: {:?}", e),
322// }
323// }
324// } else {
325// for line_result in reader.lines() {
326// match line_result {
327// Ok(line) => lines.push(Line::from(line)),
328// Err(e) => {
329// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
330// error!("Error displaying {cmd}: {:?}", e);
331// break;
332// }
333// }
334// }
335// }
336
337// trait Resettable: Default {
338// fn reset(&mut self) {}
339// }
340// impl<T> Resettable for AppendOnly<T> {
341// fn reset(&mut self) {
342// self.clear();
343// }
344// }
345
346// use std::ops::{Deref, DerefMut};
347
348// #[derive(Debug)]
349// struct Queue<V: Resettable> {
350// entries: Vec<(String, V)>,
351// order: Vec<usize>, // indices ordered by recency (0 = most recent)
352// }
353
354// impl<V: Resettable> Queue<V> {
355// pub fn new(len: usize) -> Self {
356// Self {
357// entries: (0..len)
358// .map(|_| (String::default(), V::default()))
359// .collect(),
360// order: vec![len; len],
361// }
362// }
363
364// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
365// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
366// if order_idx == self.entries.len() {
367// return None
368// }
369// if self.entries[entries_idx].0 == key {
370// return Some((order_idx, entries_idx));
371// }
372// }
373// None
374// }
375
376// /// Try to get a key; if found, move it to the top.
377// /// If not found, replace the oldest, clear its vec, set new key.
378// pub fn try_get(&mut self, key: &str) -> bool {
379// let n = self.entries.len();
380
381// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
382// self.order.copy_within(0..order_idx, 1);
383// self.order[0] = idx;
384// true
385// } else {
386// let order_idx = (0..n)
387// .rfind(|&i| self.order[i] < n)
388// .map(|i| i + 1)
389// .unwrap_or(0);
390
391// let idx = if self.order[order_idx] < self.entries.len() {
392// order_idx
393// } else {
394// *self.order.last().unwrap()
395// };
396
397// // shift and insert at front
398// self.order.copy_within(0..order_idx, 1);
399// self.order[0] = idx;
400
401// // reset and assign new key
402// let (ref mut k, ref mut v) = self.entries[idx];
403// *k = key.to_owned();
404// v.reset();
405
406// false
407// }
408// }
409// }
410
411// impl<V: Resettable> Deref for Queue<V> {
412// type Target = V;
413// fn deref(&self) -> &Self::Target {
414// &self.entries[self.order[0]].1
415// }
416// }
417
418// impl<V: Resettable> DerefMut for Queue<V> {
419// fn deref_mut(&mut self) -> &mut Self::Target {
420// &mut self.entries[self.order[0]].1
421// }
422// }
423
424// impl<V: Resettable> Default for Queue<V> {
425// fn default() -> Self {
426// Self::new(1)
427// }
428// }