matchmaker/preview/previewer.rs
1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::watch::{Receiver, Sender, channel};
13use tokio::task::JoinHandle;
14
15use super::AppendOnly;
16use crate::config::PreviewerConfig;
17use crate::event::EventSender;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, Default, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23 Run(String, EnvVars),
24 Set(Text<'static>),
25 Unset,
26 #[default]
27 Stop,
28 Pause,
29 Unpause,
30}
31
32#[derive(Debug)]
33pub struct Previewer {
34 /// The reciever for for [`PreviewMessage`]'s.
35 rx: Receiver<PreviewMessage>,
36 /// storage for preview command output
37 lines: AppendOnly<Line<'static>>,
38 /// storage for preview string override
39 string: Arc<Mutex<Option<Text<'static>>>>,
40 /// Flag which is set to true whenever the state changes
41 /// and which the viewer can toggle after receiving the current state
42 changed: Arc<AtomicBool>,
43
44 /// Incremented on every new run / kill to invalidate old feeders
45 version: Arc<AtomicUsize>,
46 paused: bool,
47 /// Maintain a queue of child processes to improve cleanup reliability
48 procs: Vec<Child>,
49 /// The currently executing child process
50 current: Option<(Child, JoinHandle<bool>)>,
51 pub config: PreviewerConfig,
52 /// Event loop controller
53 // We only use it to send [`ControlEvent::Event`]
54 event_controller_tx: Option<EventSender>,
55}
56
57impl Previewer {
58 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
59 let (tx, rx) = channel(PreviewMessage::Stop);
60
61 let new = Self {
62 rx,
63 lines: AppendOnly::new(),
64 string: Default::default(),
65 changed: Default::default(),
66 version: Arc::new(AtomicUsize::new(0)),
67 paused: false,
68
69 procs: Vec::new(),
70 current: None,
71 config,
72 event_controller_tx: None,
73 };
74
75 (new, tx)
76 }
77
78 pub fn view(&self) -> Preview {
79 Preview::new(
80 self.lines.clone(),
81 self.string.clone(),
82 self.changed.clone(),
83 )
84 }
85
86 pub fn set_string(&self, s: Text<'static>) {
87 if let Ok(mut guard) = self.string.lock() {
88 *guard = Some(s);
89 self.changed.store(true, Ordering::Release);
90 }
91 }
92
93 pub fn clear_string(&self) {
94 if let Ok(mut guard) = self.string.lock() {
95 *guard = None;
96 self.changed.store(true, Ordering::Release);
97 }
98 }
99
100 pub fn has_string(&self) -> bool {
101 let guard = self.string.lock();
102 guard.is_ok_and(|s| s.is_some())
103 }
104
105 pub async fn run(mut self) -> Result<(), Vec<Child>> {
106 while self.rx.changed().await.is_ok() {
107 if !self.procs.is_empty() {
108 debug!("procs: {:?}", self.procs);
109 }
110
111 {
112 let m = &*self.rx.borrow();
113 match m {
114 PreviewMessage::Pause => {
115 self.paused = true;
116 continue;
117 }
118 PreviewMessage::Unpause => {
119 self.paused = false;
120 continue;
121 }
122 _ if self.paused => {
123 continue;
124 }
125 PreviewMessage::Set(s) => {
126 self.set_string(s.clone());
127 // don't kill the underlying
128 continue;
129 }
130 PreviewMessage::Unset => {
131 self.clear_string();
132 continue;
133 }
134 _ => {}
135 }
136 }
137
138 self.dispatch_kill();
139 self.clear_string();
140 self.lines.clear();
141
142 match &*self.rx.borrow() {
143 PreviewMessage::Run(cmd, variables) => {
144 if let Some(mut child) = Command::from_script(cmd)
145 .envs(variables.iter().cloned())
146 .stdout(Stdio::piped())
147 .stdin(Stdio::null())
148 .stderr(Stdio::null())
149 .detach()
150 ._spawn()
151 {
152 if let Some(stdout) = child.stdout.take() {
153 self.changed.store(true, Ordering::Relaxed);
154
155 let lines = self.lines.clone();
156 let cmd = cmd.clone();
157 let version = self.version.clone();
158 let _version = version.load(Ordering::Acquire);
159
160 // false => needs refresh (i.e. invalid utf-8)
161 let handle = tokio::spawn(async move {
162 let mut reader = BufReader::new(stdout);
163 let mut leftover = Vec::new();
164 let mut buf = [0u8; 8192];
165
166 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
167 if n == 0 {
168 break;
169 }
170
171 leftover.extend_from_slice(&buf[..n]);
172
173 let valid_up_to = match std::str::from_utf8(&leftover) {
174 Ok(_) => leftover.len(),
175 Err(e) => e.valid_up_to(),
176 };
177
178 let split_at = leftover[..valid_up_to]
179 .iter()
180 .rposition(|&b| b == b'\n' || b == b'\r')
181 .map(|pos| pos + 1)
182 .unwrap_or(valid_up_to);
183
184 let (valid_bytes, rest) = leftover.split_at(split_at);
185
186 match valid_bytes.into_text() {
187 Ok(text) => {
188 for line in text {
189 // re-check before pushing
190 if version.load(Ordering::Acquire) != _version {
191 return true;
192 }
193 lines.push(line);
194 }
195 }
196 Err(e) => {
197 if self.config.try_lossy {
198 for bytes in valid_bytes.split(|b| *b == b'\n') {
199 if version.load(Ordering::Acquire) != _version {
200 return true;
201 }
202 let line =
203 String::from_utf8_lossy(bytes).into_owned();
204 lines.push(Line::from(line));
205 }
206 } else {
207 error!("Error displaying {cmd}: {:?}", e);
208 return false;
209 }
210 }
211 }
212
213 leftover = rest.to_vec();
214 }
215
216 if !leftover.is_empty()
217 && version.load(Ordering::Acquire) == _version
218 {
219 match leftover.into_text() {
220 Ok(text) => {
221 for line in text {
222 if version.load(Ordering::Acquire) != _version {
223 return true;
224 }
225 lines.push(line);
226 }
227 }
228 Err(e) => {
229 if self.config.try_lossy {
230 for bytes in leftover.split(|b| *b == b'\n') {
231 if version.load(Ordering::Acquire) != _version {
232 return true;
233 }
234 let line =
235 String::from_utf8_lossy(bytes).into_owned();
236 lines.push(Line::from(line));
237 }
238 } else {
239 error!("Error displaying {cmd}: {:?}", e);
240 return false;
241 }
242 }
243 }
244 }
245
246 true
247 });
248 self.current = Some((child, handle))
249 } else {
250 error!("Failed to get stdout of preview command: {cmd}")
251 }
252 }
253 }
254 PreviewMessage::Stop => {}
255 _ => unreachable!(),
256 }
257
258 self.prune_procs();
259 }
260
261 let ret = self.cleanup_procs();
262 if ret.is_empty() { Ok(()) } else { Err(ret) }
263 }
264
265 fn dispatch_kill(&mut self) {
266 if let Some((mut child, old)) = self.current.take() {
267 // invalidate all existing feeders
268 self.version.fetch_add(1, Ordering::AcqRel);
269
270 let _ = child.kill();
271 self.procs.push(child);
272 let mut old = Box::pin(old); // pin it to heap
273
274 match old.as_mut().now_or_never() {
275 Some(Ok(result)) => {
276 if !result {
277 self.send(Event::Refresh)
278 }
279 }
280 None => {
281 old.abort(); // still works because `AbortHandle` is separate
282 }
283 _ => {}
284 }
285 }
286 }
287
288 fn send(&self, event: Event) {
289 if let Some(ref tx) = self.event_controller_tx {
290 let _ = tx.send(event);
291 }
292 }
293
294 pub fn connect_controller(&mut self, event_controller_tx: EventSender) {
295 self.event_controller_tx = Some(event_controller_tx)
296 }
297
298 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
299 // also, maybe don't want this delaying exit?
300 fn cleanup_procs(mut self) -> Vec<Child> {
301 let total_timeout = Duration::from_secs(1);
302 let start = Instant::now();
303
304 self.procs.retain_mut(|child| {
305 loop {
306 match child.try_wait() {
307 Ok(Some(_)) => return false,
308 Ok(None) => {
309 if start.elapsed() >= total_timeout {
310 error!("Child failed to exit in time: {:?}", child);
311 return true;
312 } else {
313 thread::sleep(Duration::from_millis(10));
314 }
315 }
316 Err(e) => {
317 error!("Error waiting on child: {e}");
318 return true;
319 }
320 }
321 }
322 });
323
324 self.procs
325 }
326
327 fn prune_procs(&mut self) {
328 self.procs.retain_mut(|child| match child.try_wait() {
329 Ok(None) => true,
330 Ok(Some(_)) => false,
331 Err(e) => {
332 warn!("Error waiting on child: {e}");
333 true
334 }
335 });
336 }
337}
338
339// ---------- NON ANSI VARIANT
340// let reader = BufReader::new(stdout);
341// if self.config.try_lossy {
342// for line_result in reader.split(b'\n') {
343// match line_result {
344// Ok(bytes) => {
345// let line =
346// String::from_utf8_lossy(&bytes).into_owned();
347// lines.push(Line::from(line));
348// }
349// Err(e) => error!("Failed to read line: {:?}", e),
350// }
351// }
352// } else {
353// for line_result in reader.lines() {
354// match line_result {
355// Ok(line) => lines.push(Line::from(line)),
356// Err(e) => {
357// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
358// error!("Error displaying {cmd}: {:?}", e);
359// break;
360// }
361// }
362// }
363// }
364
365// trait Resettable: Default {
366// fn reset(&mut self) {}
367// }
368// impl<T> Resettable for AppendOnly<T> {
369// fn reset(&mut self) {
370// self.clear();
371// }
372// }
373
374// use std::ops::{Deref, DerefMut};
375
376// #[derive(Debug)]
377// struct Queue<V: Resettable> {
378// entries: Vec<(String, V)>,
379// order: Vec<usize>, // indices ordered by recency (0 = most recent)
380// }
381
382// impl<V: Resettable> Queue<V> {
383// pub fn new(len: usize) -> Self {
384// Self {
385// entries: (0..len)
386// .map(|_| (String::default(), V::default()))
387// .collect(),
388// order: vec![len; len],
389// }
390// }
391
392// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
393// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
394// if order_idx == self.entries.len() {
395// return None
396// }
397// if self.entries[entries_idx].0 == key {
398// return Some((order_idx, entries_idx));
399// }
400// }
401// None
402// }
403
404// /// Try to get a key; if found, move it to the top.
405// /// If not found, replace the oldest, clear its vec, set new key.
406// pub fn try_get(&mut self, key: &str) -> bool {
407// let n = self.entries.len();
408
409// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
410// self.order.copy_within(0..order_idx, 1);
411// self.order[0] = idx;
412// true
413// } else {
414// let order_idx = (0..n)
415// .rfind(|&i| self.order[i] < n)
416// .map(|i| i + 1)
417// .unwrap_or(0);
418
419// let idx = if self.order[order_idx] < self.entries.len() {
420// order_idx
421// } else {
422// *self.order.last().unwrap()
423// };
424
425// // shift and insert at front
426// self.order.copy_within(0..order_idx, 1);
427// self.order[0] = idx;
428
429// // reset and assign new key
430// let (ref mut k, ref mut v) = self.entries[idx];
431// *k = key.to_owned();
432// v.reset();
433
434// false
435// }
436// }
437// }
438
439// impl<V: Resettable> Deref for Queue<V> {
440// type Target = V;
441// fn deref(&self) -> &Self::Target {
442// &self.entries[self.order[0]].1
443// }
444// }
445
446// impl<V: Resettable> DerefMut for Queue<V> {
447// fn deref_mut(&mut self) -> &mut Self::Target {
448// &mut self.entries[self.order[0]].1
449// }
450// }
451
452// impl<V: Resettable> Default for Queue<V> {
453// fn default() -> Self {
454// Self::new(1)
455// }
456// }