matchmaker/preview/previewer.rs
1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::watch::{Receiver, Sender, channel};
13use tokio::task::JoinHandle;
14
15use super::AppendOnly;
16use crate::config::PreviewerConfig;
17use crate::event::EventSender;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23 Run(String, EnvVars),
24 Set(Text<'static>),
25 Unset,
26 Stop,
27}
28
29#[derive(Debug)]
30pub struct Previewer {
31 /// The reciever for for [`PreviewMessage`]'s.
32 rx: Receiver<PreviewMessage>,
33 /// storage for preview command output
34 lines: AppendOnly<Line<'static>>,
35 /// storage for preview string override
36 string: Arc<Mutex<Option<Text<'static>>>>,
37 /// Flag which is set to true whenever the state changes
38 /// and which the viewer can toggle after receiving the current state
39 changed: Arc<AtomicBool>,
40
41 /// Incremented on every new run / kill to invalidate old feeders
42 version: Arc<AtomicUsize>,
43 /// Maintain a queue of child processes to improve cleanup reliability
44 procs: Vec<Child>,
45 /// The currently executing child process
46 current: Option<(Child, JoinHandle<bool>)>,
47 pub config: PreviewerConfig,
48 /// Event loop controller
49 // We only use it to send [`ControlEvent::Event`]
50 event_controller_tx: Option<EventSender>,
51}
52
53impl Previewer {
54 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
55 let (tx, rx) = channel(PreviewMessage::Stop);
56
57 let new = Self {
58 rx,
59 lines: AppendOnly::new(),
60 string: Default::default(),
61 changed: Default::default(),
62 version: Arc::new(AtomicUsize::new(0)),
63
64 procs: Vec::new(),
65 current: None,
66 config,
67 event_controller_tx: None,
68 };
69
70 (new, tx)
71 }
72
73 pub fn view(&self) -> Preview {
74 Preview::new(
75 self.lines.clone(),
76 self.string.clone(),
77 self.changed.clone(),
78 )
79 }
80
81 pub fn set_string(&self, s: Text<'static>) {
82 if let Ok(mut guard) = self.string.lock() {
83 *guard = Some(s);
84 self.changed.store(true, Ordering::Release);
85 }
86 }
87
88 pub fn clear_string(&self) {
89 if let Ok(mut guard) = self.string.lock() {
90 *guard = None;
91 self.changed.store(true, Ordering::Release);
92 }
93 }
94
95 pub fn has_string(&self) -> bool {
96 let guard = self.string.lock();
97 guard.is_ok_and(|s| s.is_some())
98 }
99
100 pub async fn run(mut self) -> Result<(), Vec<Child>> {
101 while self.rx.changed().await.is_ok() {
102 if !self.procs.is_empty() {
103 debug!("procs: {:?}", self.procs);
104 }
105
106 {
107 let m = &*self.rx.borrow();
108 if let PreviewMessage::Set(s) = m {
109 self.set_string(s.clone());
110 // don't kill the underlying
111 continue;
112 } else if matches!(m, PreviewMessage::Unset) {
113 self.clear_string();
114 continue;
115 }
116 }
117
118 self.dispatch_kill();
119 self.clear_string();
120 self.lines.clear();
121
122 match &*self.rx.borrow() {
123 PreviewMessage::Run(cmd, variables) => {
124 if let Some(mut child) = Command::from_script(cmd)
125 .envs(variables.iter().cloned())
126 .stdout(Stdio::piped())
127 .stdin(Stdio::null())
128 .stderr(Stdio::null())
129 .detach()
130 ._spawn()
131 {
132 if let Some(stdout) = child.stdout.take() {
133 self.changed.store(true, Ordering::Relaxed);
134
135 let lines = self.lines.clone();
136 let cmd = cmd.clone();
137 let version = self.version.clone();
138 let _version = version.load(Ordering::Acquire);
139
140 // false => needs refresh (i.e. invalid utf-8)
141 let handle = tokio::spawn(async move {
142 let mut reader = BufReader::new(stdout);
143 let mut leftover = Vec::new();
144 let mut buf = [0u8; 8192];
145
146 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
147 if n == 0 {
148 break;
149 }
150
151 leftover.extend_from_slice(&buf[..n]);
152
153 let valid_up_to = match std::str::from_utf8(&leftover) {
154 Ok(_) => leftover.len(),
155 Err(e) => e.valid_up_to(),
156 };
157
158 let split_at = leftover[..valid_up_to]
159 .iter()
160 .rposition(|&b| b == b'\n' || b == b'\r')
161 .map(|pos| pos + 1)
162 .unwrap_or(valid_up_to);
163
164 let (valid_bytes, rest) = leftover.split_at(split_at);
165
166 match valid_bytes.into_text() {
167 Ok(text) => {
168 for line in text {
169 // re-check before pushing
170 if version.load(Ordering::Acquire) != _version {
171 return true;
172 }
173 lines.push(line);
174 }
175 }
176 Err(e) => {
177 if self.config.try_lossy {
178 for bytes in valid_bytes.split(|b| *b == b'\n') {
179 if version.load(Ordering::Acquire) != _version {
180 return true;
181 }
182 let line =
183 String::from_utf8_lossy(bytes).into_owned();
184 lines.push(Line::from(line));
185 }
186 } else {
187 error!("Error displaying {cmd}: {:?}", e);
188 return false;
189 }
190 }
191 }
192
193 leftover = rest.to_vec();
194 }
195
196 if !leftover.is_empty()
197 && version.load(Ordering::Acquire) == _version
198 {
199 match leftover.into_text() {
200 Ok(text) => {
201 for line in text {
202 if version.load(Ordering::Acquire) != _version {
203 return true;
204 }
205 lines.push(line);
206 }
207 }
208 Err(e) => {
209 if self.config.try_lossy {
210 for bytes in leftover.split(|b| *b == b'\n') {
211 if version.load(Ordering::Acquire) != _version {
212 return true;
213 }
214 let line =
215 String::from_utf8_lossy(bytes).into_owned();
216 lines.push(Line::from(line));
217 }
218 } else {
219 error!("Error displaying {cmd}: {:?}", e);
220 return false;
221 }
222 }
223 }
224 }
225
226 true
227 });
228 self.current = Some((child, handle))
229 } else {
230 error!("Failed to get stdout of preview command: {cmd}")
231 }
232 }
233 }
234 PreviewMessage::Stop => {}
235 _ => unreachable!(),
236 }
237
238 self.prune_procs();
239 }
240
241 let ret = self.cleanup_procs();
242 if ret.is_empty() { Ok(()) } else { Err(ret) }
243 }
244
245 fn dispatch_kill(&mut self) {
246 if let Some((mut child, old)) = self.current.take() {
247 // invalidate all existing feeders
248 self.version.fetch_add(1, Ordering::AcqRel);
249
250 let _ = child.kill();
251 self.procs.push(child);
252 let mut old = Box::pin(old); // pin it to heap
253
254 match old.as_mut().now_or_never() {
255 Some(Ok(result)) => {
256 if !result {
257 self.send(Event::Refresh)
258 }
259 }
260 None => {
261 old.abort(); // still works because `AbortHandle` is separate
262 }
263 _ => {}
264 }
265 }
266 }
267
268 fn send(&self, event: Event) {
269 if let Some(ref tx) = self.event_controller_tx {
270 let _ = tx.send(event);
271 }
272 }
273
274 pub fn connect_controller(&mut self, event_controller_tx: EventSender) {
275 self.event_controller_tx = Some(event_controller_tx)
276 }
277
278 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
279 // also, maybe don't want this delaying exit?
280 fn cleanup_procs(mut self) -> Vec<Child> {
281 let total_timeout = Duration::from_secs(1);
282 let start = Instant::now();
283
284 self.procs.retain_mut(|child| {
285 loop {
286 match child.try_wait() {
287 Ok(Some(_)) => return false,
288 Ok(None) => {
289 if start.elapsed() >= total_timeout {
290 error!("Child failed to exit in time: {:?}", child);
291 return true;
292 } else {
293 thread::sleep(Duration::from_millis(10));
294 }
295 }
296 Err(e) => {
297 error!("Error waiting on child: {e}");
298 return true;
299 }
300 }
301 }
302 });
303
304 self.procs
305 }
306
307 fn prune_procs(&mut self) {
308 self.procs.retain_mut(|child| match child.try_wait() {
309 Ok(None) => true,
310 Ok(Some(_)) => false,
311 Err(e) => {
312 warn!("Error waiting on child: {e}");
313 true
314 }
315 });
316 }
317}
318
319// ---------- NON ANSI VARIANT
320// let reader = BufReader::new(stdout);
321// if self.config.try_lossy {
322// for line_result in reader.split(b'\n') {
323// match line_result {
324// Ok(bytes) => {
325// let line =
326// String::from_utf8_lossy(&bytes).into_owned();
327// lines.push(Line::from(line));
328// }
329// Err(e) => error!("Failed to read line: {:?}", e),
330// }
331// }
332// } else {
333// for line_result in reader.lines() {
334// match line_result {
335// Ok(line) => lines.push(Line::from(line)),
336// Err(e) => {
337// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
338// error!("Error displaying {cmd}: {:?}", e);
339// break;
340// }
341// }
342// }
343// }
344
345// trait Resettable: Default {
346// fn reset(&mut self) {}
347// }
348// impl<T> Resettable for AppendOnly<T> {
349// fn reset(&mut self) {
350// self.clear();
351// }
352// }
353
354// use std::ops::{Deref, DerefMut};
355
356// #[derive(Debug)]
357// struct Queue<V: Resettable> {
358// entries: Vec<(String, V)>,
359// order: Vec<usize>, // indices ordered by recency (0 = most recent)
360// }
361
362// impl<V: Resettable> Queue<V> {
363// pub fn new(len: usize) -> Self {
364// Self {
365// entries: (0..len)
366// .map(|_| (String::default(), V::default()))
367// .collect(),
368// order: vec![len; len],
369// }
370// }
371
372// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
373// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
374// if order_idx == self.entries.len() {
375// return None
376// }
377// if self.entries[entries_idx].0 == key {
378// return Some((order_idx, entries_idx));
379// }
380// }
381// None
382// }
383
384// /// Try to get a key; if found, move it to the top.
385// /// If not found, replace the oldest, clear its vec, set new key.
386// pub fn try_get(&mut self, key: &str) -> bool {
387// let n = self.entries.len();
388
389// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
390// self.order.copy_within(0..order_idx, 1);
391// self.order[0] = idx;
392// true
393// } else {
394// let order_idx = (0..n)
395// .rfind(|&i| self.order[i] < n)
396// .map(|i| i + 1)
397// .unwrap_or(0);
398
399// let idx = if self.order[order_idx] < self.entries.len() {
400// order_idx
401// } else {
402// *self.order.last().unwrap()
403// };
404
405// // shift and insert at front
406// self.order.copy_within(0..order_idx, 1);
407// self.order[0] = idx;
408
409// // reset and assign new key
410// let (ref mut k, ref mut v) = self.entries[idx];
411// *k = key.to_owned();
412// v.reset();
413
414// false
415// }
416// }
417// }
418
419// impl<V: Resettable> Deref for Queue<V> {
420// type Target = V;
421// fn deref(&self) -> &Self::Target {
422// &self.entries[self.order[0]].1
423// }
424// }
425
426// impl<V: Resettable> DerefMut for Queue<V> {
427// fn deref_mut(&mut self) -> &mut Self::Target {
428// &mut self.entries[self.order[0]].1
429// }
430// }
431
432// impl<V: Resettable> Default for Queue<V> {
433// fn default() -> Self {
434// Self::new(1)
435// }
436// }