matchmaker/preview/previewer.rs
1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{CommandExt, EnvVars};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Command, Stdio};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::watch::{Receiver, Sender, channel};
15use tokio::task::JoinHandle;
16
17use super::AppendOnly;
18use crate::config::PreviewerConfig;
19use crate::message::Event;
20use crate::preview::Preview;
21
22#[derive(Debug, Display, Clone)]
23pub enum PreviewMessage {
24 Run(String, EnvVars),
25 Set(Text<'static>),
26 Unset,
27 Stop,
28}
29
30#[derive(Debug)]
31pub struct Previewer {
32 rx: Receiver<PreviewMessage>,
33 lines: AppendOnly<Line<'static>>, // append-only buffer
34 string: Arc<Mutex<Option<Text<'static>>>>,
35 changed: Arc<AtomicBool>,
36
37 procs: Vec<Child>,
38 current: Option<(Child, JoinHandle<bool>)>,
39 pub config: PreviewerConfig,
40 event_controller_tx: Option<UnboundedSender<Event>>,
41}
42
43impl Previewer {
44 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
45 let (tx, rx) = channel(PreviewMessage::Stop);
46
47 let new = Self {
48 rx,
49 lines: AppendOnly::new(),
50 string: Default::default(),
51 changed: Default::default(),
52
53 procs: Vec::new(),
54 current: None,
55 config,
56 event_controller_tx: None,
57 };
58
59 (new, tx)
60 }
61
62 pub fn view(&self) -> Preview {
63 Preview::new(
64 self.lines.clone(),
65 self.string.clone(),
66 self.changed.clone(),
67 )
68 }
69
70 pub fn set_string(&self, s: Text<'static>) {
71 let mut guard = self.string.lock().unwrap();
72 *guard = Some(s);
73 self.changed.store(true, Ordering::Release);
74 }
75
76 pub fn clear_string(&self) {
77 let mut guard = self.string.lock().unwrap();
78 *guard = None;
79 self.changed.store(true, Ordering::Release);
80 }
81
82 pub fn has_string(&self) -> bool {
83 let guard = self.string.lock().unwrap();
84 guard.is_some()
85 }
86
87 pub async fn run(mut self) -> Result<(), Vec<Child>> {
88 while self.rx.changed().await.is_ok() {
89 if !self.procs.is_empty() {
90 debug!("procs: {:?}", self.procs);
91 }
92
93 {
94 let m = &*self.rx.borrow();
95 if let PreviewMessage::Set(s) = m {
96 self.set_string(s.clone());
97 // don't kill the underlying
98 continue;
99 } else if matches!(m, PreviewMessage::Unset) {
100 self.clear_string();
101 continue;
102 }
103 }
104
105 self.dispatch_kill();
106 self.clear_string();
107 self.lines.clear();
108
109 match &*self.rx.borrow() {
110 PreviewMessage::Run(cmd, variables) => {
111 if let Some(mut child) = Command::from_script(cmd)
112 .envs(variables.iter().cloned())
113 .stdout(Stdio::piped())
114 .stdin(Stdio::null())
115 .stderr(Stdio::null())
116 .detach()
117 ._spawn()
118 {
119 if let Some(stdout) = child.stdout.take() {
120 self.changed.store(true, Ordering::Relaxed);
121 let lines = self.lines.clone();
122 let cmd = cmd.clone();
123 let handle = tokio::spawn(async move {
124 let mut reader = BufReader::new(stdout);
125 let mut leftover = Vec::new();
126 let mut buf = [0u8; 8192];
127
128 // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
129 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
130 if n == 0 {
131 break;
132 }
133
134 leftover.extend_from_slice(&buf[..n]);
135
136 let valid_up_to = match std::str::from_utf8(&leftover) {
137 Ok(_) => leftover.len(),
138 Err(e) => e.valid_up_to(),
139 };
140
141 let split_at = leftover[..valid_up_to]
142 .iter()
143 .rposition(|&b| b == b'\n' || b == b'\r')
144 .map(|pos| pos + 1)
145 .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
146
147 let (valid_bytes, rest) = leftover.split_at(split_at);
148
149 match valid_bytes.into_text() {
150 Ok(text) => {
151 for line in text {
152 lines.push(line);
153 }
154 }
155 Err(e) => {
156 if self.config.try_lossy {
157 for bytes in valid_bytes.split(|b| *b == b'\n') {
158 let line =
159 String::from_utf8_lossy(bytes).into_owned();
160 lines.push(Line::from(line));
161 }
162 } else {
163 error!("Error displaying {cmd}: {:?}", e);
164 return false;
165 }
166 }
167 }
168
169 leftover = rest.to_vec();
170 }
171
172 // handle any remaining bytes
173 if !leftover.is_empty() {
174 match leftover.into_text() {
175 Ok(text) => {
176 for line in text {
177 lines.push(line);
178 }
179 }
180 Err(e) => {
181 if self.config.try_lossy {
182 for bytes in leftover.split(|b| *b == b'\n') {
183 let line =
184 String::from_utf8_lossy(bytes).into_owned();
185 lines.push(Line::from(line));
186 }
187 } else {
188 error!("Error displaying {cmd}: {:?}", e);
189 return false;
190 }
191 }
192 }
193 }
194 true
195 });
196 self.current = Some((child, handle))
197 } else {
198 error!("Failed to get stdout of preview command: {cmd}")
199 }
200 }
201 }
202 PreviewMessage::Stop => {}
203 _ => unreachable!(),
204 }
205
206 self.prune_procs();
207 }
208
209 let ret = self.cleanup_procs();
210 if ret.is_empty() { Ok(()) } else { Err(ret) }
211 }
212
213 fn dispatch_kill(&mut self) {
214 if let Some((mut child, old)) = self.current.take() {
215 let _ = child.kill();
216 self.procs.push(child);
217 let mut old = Box::pin(old); // pin it to heap
218
219 match old.as_mut().now_or_never() {
220 Some(Ok(result)) => {
221 if !result && let Some(ref tx) = self.event_controller_tx {
222 let _ = tx.send(Event::Refresh);
223 }
224 }
225 None => {
226 old.abort(); // still works because `AbortHandle` is separate
227 }
228 _ => {}
229 }
230 }
231 }
232
233 pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
234 self.event_controller_tx = Some(event_controller_tx)
235 }
236
237 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
238 // also, maybe don't want this delaying exit?
239 fn cleanup_procs(mut self) -> Vec<Child> {
240 let total_timeout = Duration::from_secs(1);
241 let start = Instant::now();
242
243 self.procs.retain_mut(|child| {
244 loop {
245 match child.try_wait() {
246 Ok(Some(_)) => return false,
247 Ok(None) => {
248 if start.elapsed() >= total_timeout {
249 error!("Child failed to exit in time: {:?}", child);
250 return true;
251 } else {
252 thread::sleep(Duration::from_millis(10));
253 }
254 }
255 Err(e) => {
256 error!("Error waiting on child: {e}");
257 return true;
258 }
259 }
260 }
261 });
262
263 self.procs
264 }
265
266 fn prune_procs(&mut self) {
267 self.procs.retain_mut(|child| match child.try_wait() {
268 Ok(None) => true,
269 Ok(Some(_)) => false,
270 Err(e) => {
271 warn!("Error waiting on child: {e}");
272 true
273 }
274 });
275 }
276}
277
278// ---------- NON ANSI VARIANT
279// let reader = BufReader::new(stdout);
280// if self.config.try_lossy {
281// for line_result in reader.split(b'\n') {
282// match line_result {
283// Ok(bytes) => {
284// let line =
285// String::from_utf8_lossy(&bytes).into_owned();
286// lines.push(Line::from(line));
287// }
288// Err(e) => error!("Failed to read line: {:?}", e),
289// }
290// }
291// } else {
292// for line_result in reader.lines() {
293// match line_result {
294// Ok(line) => lines.push(Line::from(line)),
295// Err(e) => {
296// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
297// error!("Error displaying {cmd}: {:?}", e);
298// break;
299// }
300// }
301// }
302// }
303
304// trait Resettable: Default {
305// fn reset(&mut self) {}
306// }
307// impl<T> Resettable for AppendOnly<T> {
308// fn reset(&mut self) {
309// self.clear();
310// }
311// }
312
313// use std::ops::{Deref, DerefMut};
314
315// #[derive(Debug)]
316// struct Queue<V: Resettable> {
317// entries: Vec<(String, V)>,
318// order: Vec<usize>, // indices ordered by recency (0 = most recent)
319// }
320
321// impl<V: Resettable> Queue<V> {
322// pub fn new(len: usize) -> Self {
323// Self {
324// entries: (0..len)
325// .map(|_| (String::default(), V::default()))
326// .collect(),
327// order: vec![len; len],
328// }
329// }
330
331// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
332// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
333// if order_idx == self.entries.len() {
334// return None
335// }
336// if self.entries[entries_idx].0 == key {
337// return Some((order_idx, entries_idx));
338// }
339// }
340// None
341// }
342
343// /// Try to get a key; if found, move it to the top.
344// /// If not found, replace the oldest, clear its vec, set new key.
345// pub fn try_get(&mut self, key: &str) -> bool {
346// let n = self.entries.len();
347
348// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
349// self.order.copy_within(0..order_idx, 1);
350// self.order[0] = idx;
351// true
352// } else {
353// let order_idx = (0..n)
354// .rfind(|&i| self.order[i] < n)
355// .map(|i| i + 1)
356// .unwrap_or(0);
357
358// let idx = if self.order[order_idx] < self.entries.len() {
359// order_idx
360// } else {
361// *self.order.last().unwrap()
362// };
363
364// // shift and insert at front
365// self.order.copy_within(0..order_idx, 1);
366// self.order[0] = idx;
367
368// // reset and assign new key
369// let (ref mut k, ref mut v) = self.entries[idx];
370// *k = key.to_owned();
371// v.reset();
372
373// false
374// }
375// }
376// }
377
378// impl<V: Resettable> Deref for Queue<V> {
379// type Target = V;
380// fn deref(&self) -> &Self::Target {
381// &self.entries[self.order[0]].1
382// }
383// }
384
385// impl<V: Resettable> DerefMut for Queue<V> {
386// fn deref_mut(&mut self) -> &mut Self::Target {
387// &mut self.entries[self.order[0]].1
388// }
389// }
390
391// impl<V: Resettable> Default for Queue<V> {
392// fn default() -> Self {
393// Self::new(1)
394// }
395// }