matchmaker/preview/previewer.rs
1use ansi_to_tui::IntoText;
2use cli_boilerplate_automation::broc::{EnvVars, spawn_script};
3use futures::FutureExt;
4use log::{debug, error, warn};
5use ratatui::text::{Line, Text};
6use std::io::BufReader;
7use std::process::{Child, Stdio};
8use std::sync::{Arc, Mutex};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::watch::{Receiver, Sender, channel};
15use tokio::task::JoinHandle;
16
17use super::AppendOnly;
18use crate::preview::Preview;
19use crate::config::PreviewerConfig;
20use crate::message::Event;
21
22#[derive(Debug, Display, Clone)]
23pub enum PreviewMessage {
24 Run(String, EnvVars),
25 Set(Text<'static>),
26 Unset,
27 Stop,
28}
29
30#[derive(Debug)]
31pub struct Previewer {
32 rx: Receiver<PreviewMessage>,
33 lines: AppendOnly<Line<'static>>, // append-only buffer
34 string: Arc<Mutex<Option<Text<'static>>>>,
35 changed: Arc<AtomicBool>,
36
37 procs: Vec<Child>,
38 current: Option<(Child, JoinHandle<bool>)>,
39 pub config: PreviewerConfig,
40 event_controller_tx: Option<UnboundedSender<Event>>,
41}
42
43impl Previewer {
44 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
45 let (tx, rx) = channel(PreviewMessage::Stop);
46
47 let new = Self {
48 rx,
49 lines: AppendOnly::new(),
50 string: Default::default(),
51 changed: Default::default(),
52
53 procs: Vec::new(),
54 current: None,
55 config,
56 event_controller_tx: None,
57 };
58
59 (new, tx)
60 }
61
62 pub fn view(&self) -> Preview {
63 Preview::new(self.lines.clone(), self.string.clone(), self.changed.clone())
64 }
65
66 pub fn set_string(&self, s: Text<'static>) {
67 let mut guard = self.string.lock().unwrap();
68 *guard = Some(s);
69 self.changed.store(true, Ordering::Release);
70 }
71
72 pub fn clear_string(&self) {
73 let mut guard = self.string.lock().unwrap();
74 *guard = None;
75 self.changed.store(true, Ordering::Release);
76 }
77
78 pub fn has_string(&self) -> bool {
79 let guard = self.string.lock().unwrap();
80 guard.is_some()
81 }
82
83 pub async fn run(mut self) -> Result<(), Vec<Child>> {
84 while self.rx.changed().await.is_ok() {
85 if !self.procs.is_empty() {
86 debug!("procs: {:?}", self.procs);
87 }
88
89 {
90 let m = &*self.rx.borrow();
91 if let PreviewMessage::Set(s) = m {
92 self.set_string(s.clone());
93 // don't kill the underlying
94 continue;
95 } else if matches!(m, PreviewMessage::Unset) {
96 self.clear_string();
97 continue;
98 }
99 }
100
101 self.dispatch_kill();
102 self.clear_string();
103 self.lines.clear();
104
105 match &*self.rx.borrow() {
106 PreviewMessage::Run(cmd, variables) => {
107 if let Some(mut child) = spawn_script(
108 cmd,
109 variables.iter().cloned(),
110 Stdio::null(),
111 Stdio::piped(),
112 Stdio::null(),
113 ) {
114 if let Some(stdout) = child.stdout.take() {
115 self.changed.store(true, Ordering::Relaxed);
116 let lines = self.lines.clone();
117 let cmd = cmd.clone();
118 let handle = tokio::spawn(async move {
119 let mut reader = BufReader::new(stdout);
120 let mut leftover = Vec::new();
121 let mut buf = [0u8; 8192];
122
123 // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
124 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
125 if n == 0 {
126 break;
127 }
128
129 leftover.extend_from_slice(&buf[..n]);
130
131 let valid_up_to = match std::str::from_utf8(&leftover) {
132 Ok(_) => leftover.len(),
133 Err(e) => e.valid_up_to(),
134 };
135
136 let split_at = leftover[..valid_up_to]
137 .iter()
138 .rposition(|&b| b == b'\n' || b == b'\r')
139 .map(|pos| pos + 1)
140 .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
141
142 let (valid_bytes, rest) = leftover.split_at(split_at);
143
144 match valid_bytes.into_text() {
145 Ok(text) => {
146 for line in text {
147 lines.push(line);
148 }
149 }
150 Err(e) => {
151 if self.config.try_lossy {
152 for bytes in valid_bytes.split(|b| *b == b'\n') {
153 let line =
154 String::from_utf8_lossy(bytes).into_owned();
155 lines.push(Line::from(line));
156 }
157 } else {
158 error!("Error displaying {cmd}: {:?}", e);
159 return false;
160 }
161 }
162 }
163
164 leftover = rest.to_vec();
165 }
166
167 // handle any remaining bytes
168 if !leftover.is_empty() {
169 match leftover.into_text() {
170 Ok(text) => {
171 for line in text {
172 lines.push(line);
173 }
174 }
175 Err(e) => {
176 if self.config.try_lossy {
177 for bytes in leftover.split(|b| *b == b'\n') {
178 let line =
179 String::from_utf8_lossy(bytes).into_owned();
180 lines.push(Line::from(line));
181 }
182 } else {
183 error!("Error displaying {cmd}: {:?}", e);
184 return false;
185 }
186 }
187 }
188 }
189 true
190 });
191 self.current = Some((child, handle))
192 } else {
193 error!("Failed to get stdout of preview command: {cmd}")
194 }
195 }
196 }
197 PreviewMessage::Stop => {
198 }
199 _ => unreachable!()
200 }
201
202 self.prune_procs();
203 }
204
205 let ret = self.cleanup_procs();
206 if ret.is_empty() { Ok(()) } else { Err(ret) }
207 }
208
209 fn dispatch_kill(&mut self) {
210 if let Some((mut child, old)) = self.current.take() {
211 let _ = child.kill();
212 self.procs.push(child);
213 let mut old = Box::pin(old); // pin it to heap
214
215 match old.as_mut().now_or_never() {
216 Some(Ok(result)) => {
217 if !result && let Some(ref tx) = self.event_controller_tx {
218 let _ = tx.send(Event::Refresh);
219 }
220 }
221 None => {
222 old.abort(); // still works because `AbortHandle` is separate
223 }
224 _ => {}
225 }
226 }
227 }
228
229 pub fn connect_controller(&mut self, event_controller_tx: UnboundedSender<Event>) {
230 self.event_controller_tx = Some(event_controller_tx)
231 }
232
233 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
234 // also, maybe don't want this delaying exit?
235 fn cleanup_procs(mut self) -> Vec<Child> {
236 let total_timeout = Duration::from_secs(1);
237 let start = Instant::now();
238
239 self.procs.retain_mut(|child| {
240 loop {
241 match child.try_wait() {
242 Ok(Some(_)) => return false,
243 Ok(None) => {
244 if start.elapsed() >= total_timeout {
245 error!("Child failed to exit in time: {:?}", child);
246 return true;
247 } else {
248 thread::sleep(Duration::from_millis(10));
249 }
250 }
251 Err(e) => {
252 error!("Error waiting on child: {e}");
253 return true;
254 }
255 }
256 }
257 });
258
259 self.procs
260 }
261
262 fn prune_procs(&mut self) {
263 self.procs.retain_mut(|child| match child.try_wait() {
264 Ok(None) => true,
265 Ok(Some(_)) => false,
266 Err(e) => {
267 warn!("Error waiting on child: {e}");
268 true
269 }
270 });
271 }
272}
273
274// ---------- NON ANSI VARIANT
275// let reader = BufReader::new(stdout);
276// if self.config.try_lossy {
277// for line_result in reader.split(b'\n') {
278// match line_result {
279// Ok(bytes) => {
280// let line =
281// String::from_utf8_lossy(&bytes).into_owned();
282// lines.push(Line::from(line));
283// }
284// Err(e) => error!("Failed to read line: {:?}", e),
285// }
286// }
287// } else {
288// for line_result in reader.lines() {
289// match line_result {
290// Ok(line) => lines.push(Line::from(line)),
291// Err(e) => {
292// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
293// error!("Error displaying {cmd}: {:?}", e);
294// break;
295// }
296// }
297// }
298// }
299
300// trait Resettable: Default {
301// fn reset(&mut self) {}
302// }
303// impl<T> Resettable for AppendOnly<T> {
304// fn reset(&mut self) {
305// self.clear();
306// }
307// }
308
309// use std::ops::{Deref, DerefMut};
310
311// #[derive(Debug)]
312// struct Queue<V: Resettable> {
313// entries: Vec<(String, V)>,
314// order: Vec<usize>, // indices ordered by recency (0 = most recent)
315// }
316
317// impl<V: Resettable> Queue<V> {
318// pub fn new(len: usize) -> Self {
319// Self {
320// entries: (0..len)
321// .map(|_| (String::default(), V::default()))
322// .collect(),
323// order: vec![len; len],
324// }
325// }
326
327// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
328// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
329// if order_idx == self.entries.len() {
330// return None
331// }
332// if self.entries[entries_idx].0 == key {
333// return Some((order_idx, entries_idx));
334// }
335// }
336// None
337// }
338
339// /// Try to get a key; if found, move it to the top.
340// /// If not found, replace the oldest, clear its vec, set new key.
341// pub fn try_get(&mut self, key: &str) -> bool {
342// let n = self.entries.len();
343
344// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
345// self.order.copy_within(0..order_idx, 1);
346// self.order[0] = idx;
347// true
348// } else {
349// let order_idx = (0..n)
350// .rfind(|&i| self.order[i] < n)
351// .map(|i| i + 1)
352// .unwrap_or(0);
353
354// let idx = if self.order[order_idx] < self.entries.len() {
355// order_idx
356// } else {
357// *self.order.last().unwrap()
358// };
359
360// // shift and insert at front
361// self.order.copy_within(0..order_idx, 1);
362// self.order[0] = idx;
363
364// // reset and assign new key
365// let (ref mut k, ref mut v) = self.entries[idx];
366// *k = key.to_owned();
367// v.reset();
368
369// false
370// }
371// }
372// }
373
374// impl<V: Resettable> Deref for Queue<V> {
375// type Target = V;
376// fn deref(&self) -> &Self::Target {
377// &self.entries[self.order[0]].1
378// }
379// }
380
381// impl<V: Resettable> DerefMut for Queue<V> {
382// fn deref_mut(&mut self) -> &mut Self::Target {
383// &mut self.entries[self.order[0]].1
384// }
385// }
386
387// impl<V: Resettable> Default for Queue<V> {
388// fn default() -> Self {
389// Self::new(1)
390// }
391// }