matchmaker/preview/previewer.rs
1use ansi_to_tui::IntoText;
2use cba::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use tokio::sync::watch::{Receiver, Sender, channel};
13use tokio::task::JoinHandle;
14
15use super::AppendOnly;
16use crate::config::PreviewerConfig;
17use crate::event::EventSender;
18use crate::message::Event;
19use crate::preview::Preview;
20
21#[derive(Debug, Default, strum_macros::Display, Clone)]
22pub enum PreviewMessage {
23 Run(String, EnvVars),
24 Set(Text<'static>),
25 Unset,
26 #[default]
27 Stop,
28 Pause,
29 Unpause,
30}
31
32#[derive(Debug)]
33pub struct Previewer {
34 /// The reciever for for [`PreviewMessage`]'s.
35 rx: Receiver<PreviewMessage>,
36 /// storage for preview command output
37 lines: AppendOnly<Line<'static>>,
38 /// storage for preview string override
39 string: Arc<Mutex<Option<Text<'static>>>>,
40 /// Flag which is set to true whenever the state changes
41 /// and which the viewer can toggle after receiving the current state
42 changed: Arc<AtomicBool>,
43
44 paused: bool,
45 /// Maintain a queue of child processes to improve cleanup reliability
46 procs: Vec<Child>,
47 /// The currently executing child process
48 current: Option<(Child, JoinHandle<bool>)>,
49 pub config: PreviewerConfig,
50 /// Event loop controller
51 // We only use it to send [`ControlEvent::Event`]
52 event_controller_tx: Option<EventSender>,
53}
54
55impl Previewer {
56 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
57 let (tx, rx) = channel(PreviewMessage::Stop);
58
59 let new = Self {
60 rx,
61 lines: AppendOnly::new(),
62 string: Default::default(),
63 changed: Default::default(),
64 paused: false,
65
66 procs: Vec::new(),
67 current: None,
68 config,
69 event_controller_tx: None,
70 };
71
72 (new, tx)
73 }
74
75 pub fn view(&self) -> Preview {
76 Preview::new(
77 self.lines.clone(),
78 self.string.clone(),
79 self.changed.clone(),
80 )
81 }
82
83 pub fn set_string(&self, s: Text<'static>) {
84 if let Ok(mut guard) = self.string.lock() {
85 *guard = Some(s);
86 self.changed.store(true, Ordering::Release);
87 }
88 }
89
90 pub fn clear_string(&self) {
91 if let Ok(mut guard) = self.string.lock() {
92 *guard = None;
93 self.changed.store(true, Ordering::Release);
94 }
95 }
96
97 pub fn has_string(&self) -> bool {
98 let guard = self.string.lock();
99 guard.is_ok_and(|s| s.is_some())
100 }
101
102 pub async fn run(mut self) -> Result<(), Vec<Child>> {
103 while self.rx.changed().await.is_ok() {
104 if !self.procs.is_empty() {
105 debug!("procs: {:?}", self.procs);
106 }
107
108 {
109 let m = &*self.rx.borrow();
110 match m {
111 PreviewMessage::Pause => {
112 self.paused = true;
113 continue;
114 }
115 PreviewMessage::Unpause => {
116 self.paused = false;
117 continue;
118 }
119 _ if self.paused => {
120 continue;
121 }
122 PreviewMessage::Set(s) => {
123 self.set_string(s.clone());
124 // don't kill the underlying
125 continue;
126 }
127 PreviewMessage::Unset => {
128 self.clear_string();
129 continue;
130 }
131 _ => {}
132 }
133 }
134
135 self.dispatch_kill();
136 self.clear_string();
137 self.lines.clear();
138
139 match &*self.rx.borrow() {
140 PreviewMessage::Run(cmd, variables) => {
141 // we need the child handle
142 if let Some(mut child) = Command::from_script(cmd)
143 .envs(variables.iter().cloned())
144 .stdout(Stdio::piped())
145 .stdin(Stdio::null())
146 .stderr(Stdio::null())
147 .detach()
148 ._spawn()
149 {
150 if let Some(stdout) = child.stdout.take() {
151 self.changed.store(true, Ordering::Relaxed);
152
153 let lines = self.lines.clone();
154 let guard = self.lines.read();
155 let cmd = cmd.clone();
156
157 // false => needs refresh (i.e. invalid utf-8)
158 let handle = tokio::spawn(async move {
159 let mut reader = BufReader::new(stdout);
160 let mut leftover = Vec::new();
161 let mut buf = [0u8; 8192];
162
163 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
164 if n == 0 {
165 break;
166 }
167
168 leftover.extend_from_slice(&buf[..n]);
169
170 let valid_up_to = match std::str::from_utf8(&leftover) {
171 Ok(_) => leftover.len(),
172 Err(e) => e.valid_up_to(),
173 };
174
175 let split_at = leftover[..valid_up_to]
176 .iter()
177 .rposition(|&b| b == b'\n' || b == b'\r')
178 .map(|pos| pos + 1)
179 .unwrap_or(valid_up_to);
180
181 let (valid_bytes, rest) = leftover.split_at(split_at);
182
183 match valid_bytes.into_text() {
184 Ok(text) => {
185 for line in text {
186 // re-check before pushing
187 if lines.is_expired(&guard) {
188 return true;
189 }
190 guard.push(line);
191 }
192 }
193 Err(e) => {
194 if self.config.try_lossy {
195 for bytes in valid_bytes.split(|b| *b == b'\n') {
196 if lines.is_expired(&guard) {
197 return true;
198 }
199 let line =
200 String::from_utf8_lossy(bytes).into_owned();
201 guard.push(Line::from(line));
202 }
203 } else {
204 error!("Error displaying {cmd}: {:?}", e);
205 return false;
206 }
207 }
208 }
209
210 leftover = rest.to_vec();
211 }
212
213 if !leftover.is_empty() && !lines.is_expired(&guard) {
214 match leftover.into_text() {
215 Ok(text) => {
216 for line in text {
217 if lines.is_expired(&guard) {
218 return true;
219 }
220 guard.push(line);
221 }
222 }
223 Err(e) => {
224 if self.config.try_lossy {
225 for bytes in leftover.split(|b| *b == b'\n') {
226 if lines.is_expired(&guard) {
227 return true;
228 }
229 let line =
230 String::from_utf8_lossy(bytes).into_owned();
231 guard.push(Line::from(line));
232 }
233 } else {
234 error!("Error displaying {cmd}: {:?}", e);
235 return false;
236 }
237 }
238 }
239 }
240
241 true
242 });
243 self.current = Some((child, handle))
244 } else {
245 error!("Failed to get stdout of preview command: {cmd}")
246 }
247 }
248 }
249 PreviewMessage::Stop => {}
250 _ => unreachable!(),
251 }
252
253 self.prune_procs();
254 }
255
256 let ret = self.cleanup_procs();
257 if ret.is_empty() { Ok(()) } else { Err(ret) }
258 }
259
260 fn dispatch_kill(&mut self) {
261 if let Some((mut child, old)) = self.current.take() {
262 let _ = child.kill();
263 self.procs.push(child);
264 let mut old = Box::pin(old); // pin it to heap
265
266 match old.as_mut().now_or_never() {
267 Some(Ok(result)) => {
268 if !result {
269 self.send(Event::Refresh)
270 }
271 }
272 None => {
273 old.abort(); // still works because `AbortHandle` is separate
274 }
275 _ => {}
276 }
277 }
278 }
279
280 fn send(&self, event: Event) {
281 if let Some(ref tx) = self.event_controller_tx {
282 let _ = tx.send(event);
283 }
284 }
285
286 pub fn connect_controller(&mut self, event_controller_tx: EventSender) {
287 self.event_controller_tx = Some(event_controller_tx)
288 }
289
290 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
291 // also, maybe don't want this delaying exit?
292 fn cleanup_procs(mut self) -> Vec<Child> {
293 let total_timeout = Duration::from_secs(1);
294 let start = Instant::now();
295
296 self.procs.retain_mut(|child| {
297 loop {
298 match child.try_wait() {
299 Ok(Some(_)) => return false,
300 Ok(None) => {
301 if start.elapsed() >= total_timeout {
302 error!("Child failed to exit in time: {:?}", child);
303 return true;
304 } else {
305 thread::sleep(Duration::from_millis(10));
306 }
307 }
308 Err(e) => {
309 error!("Error waiting on child: {e}");
310 return true;
311 }
312 }
313 }
314 });
315
316 self.procs
317 }
318
319 fn prune_procs(&mut self) {
320 self.procs.retain_mut(|child| match child.try_wait() {
321 Ok(None) => true,
322 Ok(Some(_)) => false,
323 Err(e) => {
324 warn!("Error waiting on child: {e}");
325 true
326 }
327 });
328 }
329}
330
331// ---------- NON ANSI VARIANT
332// let reader = BufReader::new(stdout);
333// if self.config.try_lossy {
334// for line_result in reader.split(b'\n') {
335// match line_result {
336// Ok(bytes) => {
337// let line =
338// String::from_utf8_lossy(&bytes).into_owned();
339// lines.push(Line::from(line));
340// }
341// Err(e) => error!("Failed to read line: {:?}", e),
342// }
343// }
344// } else {
345// for line_result in reader.lines() {
346// match line_result {
347// Ok(line) => lines.push(Line::from(line)),
348// Err(e) => {
349// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
350// error!("Error displaying {cmd}: {:?}", e);
351// break;
352// }
353// }
354// }
355// }
356
357// trait Resettable: Default {
358// fn reset(&mut self) {}
359// }
360// impl<T> Resettable for AppendOnly<T> {
361// fn reset(&mut self) {
362// self.clear();
363// }
364// }
365
366// use std::ops::{Deref, DerefMut};
367
368// #[derive(Debug)]
369// struct Queue<V: Resettable> {
370// entries: Vec<(String, V)>,
371// order: Vec<usize>, // indices ordered by recency (0 = most recent)
372// }
373
374// impl<V: Resettable> Queue<V> {
375// pub fn new(len: usize) -> Self {
376// Self {
377// entries: (0..len)
378// .map(|_| (String::default(), V::default()))
379// .collect(),
380// order: vec![len; len],
381// }
382// }
383
384// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
385// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
386// if order_idx == self.entries.len() {
387// return None
388// }
389// if self.entries[entries_idx].0 == key {
390// return Some((order_idx, entries_idx));
391// }
392// }
393// None
394// }
395
396// /// Try to get a key; if found, move it to the top.
397// /// If not found, replace the oldest, clear its vec, set new key.
398// pub fn try_get(&mut self, key: &str) -> bool {
399// let n = self.entries.len();
400
401// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
402// self.order.copy_within(0..order_idx, 1);
403// self.order[0] = idx;
404// true
405// } else {
406// let order_idx = (0..n)
407// .rfind(|&i| self.order[i] < n)
408// .map(|i| i + 1)
409// .unwrap_or(0);
410
411// let idx = if self.order[order_idx] < self.entries.len() {
412// order_idx
413// } else {
414// *self.order.last().unwrap()
415// };
416
417// // shift and insert at front
418// self.order.copy_within(0..order_idx, 1);
419// self.order[0] = idx;
420
421// // reset and assign new key
422// let (ref mut k, ref mut v) = self.entries[idx];
423// *k = key.to_owned();
424// v.reset();
425
426// false
427// }
428// }
429// }
430
431// impl<V: Resettable> Deref for Queue<V> {
432// type Target = V;
433// fn deref(&self) -> &Self::Target {
434// &self.entries[self.order[0]].1
435// }
436// }
437
438// impl<V: Resettable> DerefMut for Queue<V> {
439// fn deref_mut(&mut self) -> &mut Self::Target {
440// &mut self.entries[self.order[0]].1
441// }
442// }
443
444// impl<V: Resettable> Default for Queue<V> {
445// fn default() -> Self {
446// Self::new(1)
447// }
448// }