matchmaker/preview/previewer.rs
1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{EnvVars, spawn_script};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Stdio};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::watch::{Receiver, Sender, channel};
15use tokio::task::JoinHandle;
16
17use super::AppendOnly;
18use crate::config::PreviewerConfig;
19use crate::message::Event;
20use crate::preview::Preview;
21
22#[derive(Debug, Display, Clone)]
23pub enum PreviewMessage {
24 Run(String, EnvVars),
25 Set(Text<'static>),
26 Unset,
27 Stop,
28}
29
30#[derive(Debug)]
31pub struct Previewer {
32 rx: Receiver<PreviewMessage>,
33 lines: AppendOnly<Line<'static>>, // append-only buffer
34 string: Arc<Mutex<Option<Text<'static>>>>,
35 changed: Arc<AtomicBool>,
36
37 procs: Vec<Child>,
38 current: Option<(Child, JoinHandle<bool>)>,
39 pub config: PreviewerConfig,
40 event_controller_tx: Option<UnboundedSender<Event>>,
41}
42
43impl Previewer {
44 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
45 let (tx, rx) = channel(PreviewMessage::Stop);
46
47 let new = Self {
48 rx,
49 lines: AppendOnly::new(),
50 string: Default::default(),
51 changed: Default::default(),
52
53 procs: Vec::new(),
54 current: None,
55 config,
56 event_controller_tx: None,
57 };
58
59 (new, tx)
60 }
61
62 pub fn view(&self) -> Preview {
63 Preview::new(
64 self.lines.clone(),
65 self.string.clone(),
66 self.changed.clone(),
67 )
68 }
69
70 pub fn set_string(&self, s: Text<'static>) {
71 let mut guard = self.string.lock().unwrap();
72 *guard = Some(s);
73 self.changed.store(true, Ordering::Release);
74 }
75
76 pub fn clear_string(&self) {
77 let mut guard = self.string.lock().unwrap();
78 *guard = None;
79 self.changed.store(true, Ordering::Release);
80 }
81
82 pub fn has_string(&self) -> bool {
83 let guard = self.string.lock().unwrap();
84 guard.is_some()
85 }
86
87 pub async fn run(mut self) -> Result<(), Vec<Child>> {
88 while self.rx.changed().await.is_ok() {
89 if !self.procs.is_empty() {
90 debug!("procs: {:?}", self.procs);
91 }
92
93 {
94 let m = &*self.rx.borrow();
95 if let PreviewMessage::Set(s) = m {
96 self.set_string(s.clone());
97 // don't kill the underlying
98 continue;
99 } else if matches!(m, PreviewMessage::Unset) {
100 self.clear_string();
101 continue;
102 }
103 }
104
105 self.dispatch_kill();
106 self.clear_string();
107 self.lines.clear();
108
109 match &*self.rx.borrow() {
110 PreviewMessage::Run(cmd, variables) => {
111 if let Some(mut child) = spawn_script(
112 cmd,
113 variables.iter().cloned(),
114 Stdio::null(),
115 Stdio::piped(),
116 Stdio::null(),
117 ) {
118 if let Some(stdout) = child.stdout.take() {
119 self.changed.store(true, Ordering::Relaxed);
120 let lines = self.lines.clone();
121 let cmd = cmd.clone();
122 let handle = tokio::spawn(async move {
123 let mut reader = BufReader::new(stdout);
124 let mut leftover = Vec::new();
125 let mut buf = [0u8; 8192];
126
127 // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
128 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
129 if n == 0 {
130 break;
131 }
132
133 leftover.extend_from_slice(&buf[..n]);
134
135 let valid_up_to = match std::str::from_utf8(&leftover) {
136 Ok(_) => leftover.len(),
137 Err(e) => e.valid_up_to(),
138 };
139
140 let split_at = leftover[..valid_up_to]
141 .iter()
142 .rposition(|&b| b == b'\n' || b == b'\r')
143 .map(|pos| pos + 1)
144 .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
145
146 let (valid_bytes, rest) = leftover.split_at(split_at);
147
148 match valid_bytes.into_text() {
149 Ok(text) => {
150 for line in text {
151 lines.push(line);
152 }
153 }
154 Err(e) => {
155 if self.config.try_lossy {
156 for bytes in valid_bytes.split(|b| *b == b'\n') {
157 let line =
158 String::from_utf8_lossy(bytes).into_owned();
159 lines.push(Line::from(line));
160 }
161 } else {
162 error!("Error displaying {cmd}: {:?}", e);
163 return false;
164 }
165 }
166 }
167
168 leftover = rest.to_vec();
169 }
170
171 // handle any remaining bytes
172 if !leftover.is_empty() {
173 match leftover.into_text() {
174 Ok(text) => {
175 for line in text {
176 lines.push(line);
177 }
178 }
179 Err(e) => {
180 if self.config.try_lossy {
181 for bytes in leftover.split(|b| *b == b'\n') {
182 let line =
183 String::from_utf8_lossy(bytes).into_owned();
184 lines.push(Line::from(line));
185 }
186 } else {
187 error!("Error displaying {cmd}: {:?}", e);
188 return false;
189 }
190 }
191 }
192 }
193 true
194 });
195 self.current = Some((child, handle))
196 } else {
197 error!("Failed to get stdout of preview command: {cmd}")
198 }
199 }
200 }
201 PreviewMessage::Stop => {}
202 _ => unreachable!(),
203 }
204
205 self.prune_procs();
206 }
207
208 let ret = self.cleanup_procs();
209 if ret.is_empty() { Ok(()) } else { Err(ret) }
210 }
211
212 fn dispatch_kill(&mut self) {
213 if let Some((mut child, old)) = self.current.take() {
214 let _ = child.kill();
215 self.procs.push(child);
216 let mut old = Box::pin(old); // pin it to heap
217
218 match old.as_mut().now_or_never() {
219 Some(Ok(result)) => {
220 if !result && let Some(ref tx) = self.event_controller_tx {
221 let _ = tx.send(Event::Refresh);
222 }
223 }
224 None => {
225 old.abort(); // still works because `AbortHandle` is separate
226 }
227 _ => {}
228 }
229 }
230 }
231
232 pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
233 self.event_controller_tx = Some(event_controller_tx)
234 }
235
236 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
237 // also, maybe don't want this delaying exit?
238 fn cleanup_procs(mut self) -> Vec<Child> {
239 let total_timeout = Duration::from_secs(1);
240 let start = Instant::now();
241
242 self.procs.retain_mut(|child| {
243 loop {
244 match child.try_wait() {
245 Ok(Some(_)) => return false,
246 Ok(None) => {
247 if start.elapsed() >= total_timeout {
248 error!("Child failed to exit in time: {:?}", child);
249 return true;
250 } else {
251 thread::sleep(Duration::from_millis(10));
252 }
253 }
254 Err(e) => {
255 error!("Error waiting on child: {e}");
256 return true;
257 }
258 }
259 }
260 });
261
262 self.procs
263 }
264
265 fn prune_procs(&mut self) {
266 self.procs.retain_mut(|child| match child.try_wait() {
267 Ok(None) => true,
268 Ok(Some(_)) => false,
269 Err(e) => {
270 warn!("Error waiting on child: {e}");
271 true
272 }
273 });
274 }
275}
276
277// ---------- NON ANSI VARIANT
278// let reader = BufReader::new(stdout);
279// if self.config.try_lossy {
280// for line_result in reader.split(b'\n') {
281// match line_result {
282// Ok(bytes) => {
283// let line =
284// String::from_utf8_lossy(&bytes).into_owned();
285// lines.push(Line::from(line));
286// }
287// Err(e) => error!("Failed to read line: {:?}", e),
288// }
289// }
290// } else {
291// for line_result in reader.lines() {
292// match line_result {
293// Ok(line) => lines.push(Line::from(line)),
294// Err(e) => {
295// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
296// error!("Error displaying {cmd}: {:?}", e);
297// break;
298// }
299// }
300// }
301// }
302
303// trait Resettable: Default {
304// fn reset(&mut self) {}
305// }
306// impl<T> Resettable for AppendOnly<T> {
307// fn reset(&mut self) {
308// self.clear();
309// }
310// }
311
312// use std::ops::{Deref, DerefMut};
313
314// #[derive(Debug)]
315// struct Queue<V: Resettable> {
316// entries: Vec<(String, V)>,
317// order: Vec<usize>, // indices ordered by recency (0 = most recent)
318// }
319
320// impl<V: Resettable> Queue<V> {
321// pub fn new(len: usize) -> Self {
322// Self {
323// entries: (0..len)
324// .map(|_| (String::default(), V::default()))
325// .collect(),
326// order: vec![len; len],
327// }
328// }
329
330// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
331// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
332// if order_idx == self.entries.len() {
333// return None
334// }
335// if self.entries[entries_idx].0 == key {
336// return Some((order_idx, entries_idx));
337// }
338// }
339// None
340// }
341
342// /// Try to get a key; if found, move it to the top.
343// /// If not found, replace the oldest, clear its vec, set new key.
344// pub fn try_get(&mut self, key: &str) -> bool {
345// let n = self.entries.len();
346
347// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
348// self.order.copy_within(0..order_idx, 1);
349// self.order[0] = idx;
350// true
351// } else {
352// let order_idx = (0..n)
353// .rfind(|&i| self.order[i] < n)
354// .map(|i| i + 1)
355// .unwrap_or(0);
356
357// let idx = if self.order[order_idx] < self.entries.len() {
358// order_idx
359// } else {
360// *self.order.last().unwrap()
361// };
362
363// // shift and insert at front
364// self.order.copy_within(0..order_idx, 1);
365// self.order[0] = idx;
366
367// // reset and assign new key
368// let (ref mut k, ref mut v) = self.entries[idx];
369// *k = key.to_owned();
370// v.reset();
371
372// false
373// }
374// }
375// }
376
377// impl<V: Resettable> Deref for Queue<V> {
378// type Target = V;
379// fn deref(&self) -> &Self::Target {
380// &self.entries[self.order[0]].1
381// }
382// }
383
384// impl<V: Resettable> DerefMut for Queue<V> {
385// fn deref_mut(&mut self) -> &mut Self::Target {
386// &mut self.entries[self.order[0]].1
387// }
388// }
389
390// impl<V: Resettable> Default for Queue<V> {
391// fn default() -> Self {
392// Self::new(1)
393// }
394// }