matchmaker/proc/previewer.rs
1use ansi_to_tui::IntoText;
2use futures::FutureExt;
3use log::{debug, error, warn};
4use ratatui::text::{Line, Text};
5use std::io::BufReader;
6use std::process::{Child, Stdio};
7use std::sync::{Arc, Mutex};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::thread;
10use std::time::{Duration, Instant};
11use strum_macros::Display;
12use tokio::sync::mpsc::UnboundedSender;
13use tokio::sync::watch::{Receiver, Sender, channel};
14use tokio::task::JoinHandle;
15
16use super::{AppendOnly, EnvVars, spawn};
17use crate::proc::Preview;
18use crate::config::PreviewerConfig;
19use crate::message::Event;
20
21#[derive(Debug, Display, Clone)]
22pub enum PreviewMessage {
23 Run(String, EnvVars),
24 Set(Text<'static>),
25 Unset,
26 Stop,
27}
28
29#[derive(Debug)]
30pub struct Previewer {
31 rx: Receiver<PreviewMessage>,
32 lines: AppendOnly<Line<'static>>, // append-only buffer
33 string: Arc<Mutex<Option<Text<'static>>>>,
34 changed: Arc<AtomicBool>,
35
36 procs: Vec<Child>,
37 current: Option<(Child, JoinHandle<bool>)>,
38 pub config: PreviewerConfig,
39 controller_tx: Option<UnboundedSender<Event>>,
40}
41
42impl Previewer {
43 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
44 let (tx, rx) = channel(PreviewMessage::Stop);
45
46 let new = Self {
47 rx,
48 lines: AppendOnly::new(),
49 string: Default::default(),
50 changed: Default::default(),
51
52 procs: Vec::new(),
53 current: None,
54 config,
55 controller_tx: None,
56 };
57
58 (new, tx)
59 }
60
61 pub fn view(&self) -> Preview {
62 Preview::new(self.lines.clone(), self.string.clone(), self.changed.clone())
63 }
64
65 pub fn set_string(&self, s: Text<'static>) {
66 let mut guard = self.string.lock().unwrap();
67 *guard = Some(s);
68 self.changed.store(true, Ordering::Release);
69 }
70
71 pub fn clear_string(&self) {
72 let mut guard = self.string.lock().unwrap();
73 *guard = None;
74 self.changed.store(true, Ordering::Release);
75 }
76
77 pub fn has_string(&self) -> bool {
78 let guard = self.string.lock().unwrap();
79 guard.is_some()
80 }
81
82 pub async fn run(mut self) -> Result<(), Vec<Child>> {
83 while self.rx.changed().await.is_ok() {
84 if !self.procs.is_empty() {
85 debug!("procs: {:?}", self.procs);
86 }
87
88 {
89 let m = &*self.rx.borrow();
90 if let PreviewMessage::Set(s) = m {
91 self.set_string(s.clone());
92 // don't kill the underlying
93 continue;
94 } else if matches!(m, PreviewMessage::Unset) {
95 self.clear_string();
96 continue;
97 }
98 }
99
100 self.dispatch_kill();
101 self.clear_string();
102 self.lines.clear();
103
104 match &*self.rx.borrow() {
105 PreviewMessage::Run(cmd, variables) => {
106 if let Some(mut child) = spawn(
107 cmd,
108 variables.iter().cloned(),
109 Stdio::null(),
110 Stdio::piped(),
111 Stdio::null(),
112 ) {
113 if let Some(stdout) = child.stdout.take() {
114 self.changed.store(true, Ordering::Relaxed);
115 let lines = self.lines.clone();
116 let cmd = cmd.clone();
117 let handle = tokio::spawn(async move {
118 let mut reader = BufReader::new(stdout);
119 let mut leftover = Vec::new();
120 let mut buf = [0u8; 8192];
121
122 // TODO: want to use buffer over lines (for efficiency?), but damaged utf-8 still leaks thu somehow
123 while let Ok(n) = std::io::Read::read(&mut reader, &mut buf) {
124 if n == 0 {
125 break;
126 }
127
128 leftover.extend_from_slice(&buf[..n]);
129
130 let valid_up_to = match std::str::from_utf8(&leftover) {
131 Ok(_) => leftover.len(),
132 Err(e) => e.valid_up_to(),
133 };
134
135 let split_at = leftover[..valid_up_to]
136 .iter()
137 .rposition(|&b| b == b'\n' || b == b'\r')
138 .map(|pos| pos + 1)
139 .unwrap_or(valid_up_to); // todo: fix this artificial break if line exceeds
140
141 let (valid_bytes, rest) = leftover.split_at(split_at);
142
143 match valid_bytes.into_text() {
144 Ok(text) => {
145 for line in text {
146 lines.push(line);
147 }
148 }
149 Err(e) => {
150 if self.config.try_lossy {
151 for bytes in valid_bytes.split(|b| *b == b'\n') {
152 let line =
153 String::from_utf8_lossy(bytes).into_owned();
154 lines.push(Line::from(line));
155 }
156 } else {
157 error!("Error displaying {cmd}: {:?}", e);
158 return false;
159 }
160 }
161 }
162
163 leftover = rest.to_vec();
164 }
165
166 // handle any remaining bytes
167 if !leftover.is_empty() {
168 match leftover.into_text() {
169 Ok(text) => {
170 for line in text {
171 lines.push(line);
172 }
173 }
174 Err(e) => {
175 if self.config.try_lossy {
176 for bytes in leftover.split(|b| *b == b'\n') {
177 let line =
178 String::from_utf8_lossy(bytes).into_owned();
179 lines.push(Line::from(line));
180 }
181 } else {
182 error!("Error displaying {cmd}: {:?}", e);
183 return false;
184 }
185 }
186 }
187 }
188 true
189 });
190 self.current = Some((child, handle))
191 } else {
192 error!("Failed to get stdout of preview command: {cmd}")
193 }
194 }
195 }
196 PreviewMessage::Stop => {
197 }
198 _ => unreachable!()
199 }
200
201 self.prune_procs();
202 }
203
204 let ret = self.cleanup_procs();
205 if ret.is_empty() { Ok(()) } else { Err(ret) }
206 }
207
208 fn dispatch_kill(&mut self) {
209 if let Some((mut child, old)) = self.current.take() {
210 let _ = child.kill();
211 self.procs.push(child);
212 let mut old = Box::pin(old); // pin it to heap
213
214 match old.as_mut().now_or_never() {
215 Some(Ok(result)) => {
216 if !result && let Some(ref tx) = self.controller_tx {
217 let _ = tx.send(Event::Refresh);
218 }
219 }
220 None => {
221 old.abort(); // still works because `AbortHandle` is separate
222 }
223 _ => {}
224 }
225 }
226 }
227
228 pub fn connect_controller(&mut self, controller_tx: UnboundedSender<Event>) {
229 self.controller_tx = Some(controller_tx)
230 }
231
232 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
233 // also, maybe don't want this delaying exit?
234 fn cleanup_procs(mut self) -> Vec<Child> {
235 let total_timeout = Duration::from_secs(1);
236 let start = Instant::now();
237
238 self.procs.retain_mut(|child| {
239 loop {
240 match child.try_wait() {
241 Ok(Some(_)) => return false,
242 Ok(None) => {
243 if start.elapsed() >= total_timeout {
244 error!("Child failed to exit in time: {:?}", child);
245 return true;
246 } else {
247 thread::sleep(Duration::from_millis(10));
248 }
249 }
250 Err(e) => {
251 error!("Error waiting on child: {e}");
252 return true;
253 }
254 }
255 }
256 });
257
258 self.procs
259 }
260
261 fn prune_procs(&mut self) {
262 self.procs.retain_mut(|child| match child.try_wait() {
263 Ok(None) => true,
264 Ok(Some(_)) => false,
265 Err(e) => {
266 warn!("Error waiting on child: {e}");
267 true
268 }
269 });
270 }
271}
272
273// ---------- NON ANSI VARIANT
274// let reader = BufReader::new(stdout);
275// if self.config.try_lossy {
276// for line_result in reader.split(b'\n') {
277// match line_result {
278// Ok(bytes) => {
279// let line =
280// String::from_utf8_lossy(&bytes).into_owned();
281// lines.push(Line::from(line));
282// }
283// Err(e) => error!("Failed to read line: {:?}", e),
284// }
285// }
286// } else {
287// for line_result in reader.lines() {
288// match line_result {
289// Ok(line) => lines.push(Line::from(line)),
290// Err(e) => {
291// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
292// error!("Error displaying {cmd}: {:?}", e);
293// break;
294// }
295// }
296// }
297// }
298
299// trait Resettable: Default {
300// fn reset(&mut self) {}
301// }
302// impl<T> Resettable for AppendOnly<T> {
303// fn reset(&mut self) {
304// self.clear();
305// }
306// }
307
308// use std::ops::{Deref, DerefMut};
309
310// #[derive(Debug)]
311// struct Queue<V: Resettable> {
312// entries: Vec<(String, V)>,
313// order: Vec<usize>, // indices ordered by recency (0 = most recent)
314// }
315
316// impl<V: Resettable> Queue<V> {
317// pub fn new(len: usize) -> Self {
318// Self {
319// entries: (0..len)
320// .map(|_| (String::default(), V::default()))
321// .collect(),
322// order: vec![len; len],
323// }
324// }
325
326// fn find_key_pos(&self, key: &str) -> Option<(usize, usize)> {
327// for (order_idx, &entries_idx) in self.order.iter().enumerate() {
328// if order_idx == self.entries.len() {
329// return None
330// }
331// if self.entries[entries_idx].0 == key {
332// return Some((order_idx, entries_idx));
333// }
334// }
335// None
336// }
337
338// /// Try to get a key; if found, move it to the top.
339// /// If not found, replace the oldest, clear its vec, set new key.
340// pub fn try_get(&mut self, key: &str) -> bool {
341// let n = self.entries.len();
342
343// if !key.is_empty() && let Some((order_idx, idx)) = self.find_key_pos(key) {
344// self.order.copy_within(0..order_idx, 1);
345// self.order[0] = idx;
346// true
347// } else {
348// let order_idx = (0..n)
349// .rfind(|&i| self.order[i] < n)
350// .map(|i| i + 1)
351// .unwrap_or(0);
352
353// let idx = if self.order[order_idx] < self.entries.len() {
354// order_idx
355// } else {
356// *self.order.last().unwrap()
357// };
358
359// // shift and insert at front
360// self.order.copy_within(0..order_idx, 1);
361// self.order[0] = idx;
362
363// // reset and assign new key
364// let (ref mut k, ref mut v) = self.entries[idx];
365// *k = key.to_owned();
366// v.reset();
367
368// false
369// }
370// }
371// }
372
373// impl<V: Resettable> Deref for Queue<V> {
374// type Target = V;
375// fn deref(&self) -> &Self::Target {
376// &self.entries[self.order[0]].1
377// }
378// }
379
380// impl<V: Resettable> DerefMut for Queue<V> {
381// fn deref_mut(&mut self) -> &mut Self::Target {
382// &mut self.entries[self.order[0]].1
383// }
384// }
385
386// impl<V: Resettable> Default for Queue<V> {
387// fn default() -> Self {
388// Self::new(1)
389// }
390// }