matchmaker/spawn/preview.rs
1use ansi_to_tui::IntoText;
2use futures::FutureExt;
3use log::{debug, error, warn};
4use ratatui::text::{Line, Text};
5use tokio::sync::mpsc::UnboundedSender;
6use std::io::{BufReader};
7use std::process::{Child, Stdio};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::thread;
11use std::time::{Duration, Instant};
12use strum_macros::Display;
13use tokio::sync::watch::{Receiver, Sender, channel};
14use tokio::task::JoinHandle;
15
16use super::{AppendOnly, EnvVars, spawn};
17use crate::config::PreviewerConfig;
18use crate::message::Event;
19
20#[derive(Debug, Display, Clone)]
21pub enum PreviewMessage {
22 Run(String, EnvVars),
23 Stop,
24}
25
26#[derive(Debug)]
27pub struct Previewer {
28 rx: Receiver<PreviewMessage>,
29 lines: AppendOnly<Line<'static>>, // append-only buffer
30 procs: Vec<Child>,
31 current: Option<(Child, JoinHandle<bool>)>,
32 changed: Arc<AtomicBool>,
33 pub config: PreviewerConfig,
34 controller_tx: Option<UnboundedSender<Event>>
35
36}
37
38#[derive(Debug)]
39pub struct PreviewerView {
40 lines: AppendOnly<Line<'static>>,
41 changed: Arc<AtomicBool>,
42}
43
44impl PreviewerView {
45 pub fn results(&self) -> Text<'_> {
46 let output = self.lines.read().unwrap(); // acquire read lock
47 Text::from_iter(output.iter().map(|(_, line)| line.clone()))
48 }
49
50 pub fn len(&self) -> usize {
51 let output = self.lines.read().unwrap();
52 output.count()
53 // todo: handle overflow possibility
54 }
55
56 pub fn changed(&self) -> bool {
57 self.changed.swap(false, Ordering::Relaxed)
58 }
59}
60
61impl Previewer {
62 pub fn new(config: PreviewerConfig) -> (Self, Sender<PreviewMessage>) {
63 let (tx, rx) = channel(PreviewMessage::Stop);
64
65 let lines = AppendOnly::new();
66
67 let new = Self {
68 rx,
69 lines: lines.clone(),
70 procs: Vec::new(),
71 current: None,
72 changed: Default::default(),
73 config,
74 controller_tx: None
75 };
76
77 (new, tx)
78 }
79
80 pub fn view(&self) -> PreviewerView {
81 PreviewerView {
82 lines: self.lines.clone(),
83 changed: self.changed.clone(),
84 }
85 }
86
87 pub async fn run(mut self) -> Result<(), Vec<Child>> {
88 while self.rx.changed().await.is_ok() {
89 if !self.procs.is_empty() {
90 debug!("procs: {:?}", self.procs);
91 }
92
93 self.dispatch_kill();
94
95 match &*self.rx.borrow() {
96 PreviewMessage::Run(cmd, variables) => {
97 self.lines.clear();
98 if let Some(mut child) = spawn(
99 &cmd,
100 variables.iter().cloned(),
101 Stdio::null(),
102 Stdio::piped(),
103 Stdio::null(),
104 ) {
105 if let Some(stdout) = child.stdout.take() {
106 self.changed.store(true, Ordering::Relaxed);
107 let lines = self.lines.clone();
108 let cmd = cmd.clone();
109 let handle = tokio::spawn(async move {
110 let mut reader = BufReader::new(stdout);
111 let mut leftover = Vec::new();
112 let mut buf = [0u8; 8192];
113
114 // TODO: want to use buffer over lines (for efficiency?), but partial lines are not handled, and damaged utf-8 still leaks thu somehow
115 loop {
116 let n = if let Ok(x) = std::io::Read::read(&mut reader, &mut buf) {x } else { break };
117 if n == 0 { break; }
118
119 leftover.extend_from_slice(&buf[..n]);
120
121 let valid_up_to = match std::str::from_utf8(&leftover) {
122 Ok(_) => leftover.len(),
123 Err(e) => e.valid_up_to(),
124 };
125
126 let split_at = leftover[..valid_up_to]
127 .iter()
128 .rposition(|&b| b == b'\n' || b == b'\r')
129 .map(|pos| pos + 1)
130 .unwrap_or(valid_up_to); // todo: problem if line exceeds
131
132 let (valid_bytes, rest) = leftover.split_at(split_at);
133
134 match valid_bytes.into_text() {
135 Ok(text) => {
136 for line in text {
137 lines.push(line);
138 }
139 }
140 Err(e) => {
141 if self.config.try_lossy {
142 for bytes in valid_bytes.split(|b| *b == b'\n') {
143 let line =
144 String::from_utf8_lossy(&bytes).into_owned();
145 lines.push(Line::from(line));
146 }
147 } else {
148 error!("Error displaying {cmd}: {:?}", e);
149 return false
150 }
151 }
152 }
153
154 leftover = rest.to_vec();
155 }
156
157 // handle any remaining bytes
158 if !leftover.is_empty() {
159 match leftover.into_text() {
160 Ok(text) => {
161 for line in text {
162 lines.push(line);
163 }
164 }
165 Err(e) => {
166 if self.config.try_lossy {
167 for bytes in leftover.split(|b| *b == b'\n') {
168 let line =
169 String::from_utf8_lossy(&bytes).into_owned();
170 lines.push(Line::from(line));
171 }
172 } else {
173 error!("Error displaying {cmd}: {:?}", e);
174 return false
175 }
176 }
177 }
178 }
179 true
180 });
181 self.current = Some((child, handle))
182 } else {
183 error!("Failed to get stdout of preview command: {cmd}")
184 }
185 }
186 }
187
188 PreviewMessage::Stop => {}
189 }
190
191 self.prune_procs();
192 }
193
194 let ret = self.cleanup_procs();
195 if ret.is_empty() { Ok(()) } else { Err(ret) }
196 }
197
198 fn dispatch_kill(&mut self) {
199 if let Some((mut child, old)) = self.current.take() {
200 let _ = child.kill();
201 self.procs.push(child);
202 let mut old = Box::pin(old); // pin it to heap
203
204 match old.as_mut().now_or_never() {
205 Some(Ok(result)) => {
206 if !result {
207 if let Some(ref tx) = self.controller_tx {
208 let _ = tx.send(Event::Refresh);
209 }
210 }
211 }
212 None => {
213 old.abort(); // still works because `AbortHandle` is separate
214 }
215 _ => {}
216 }
217
218 }
219 }
220
221 pub fn connect_controller(&mut self, controller_tx: UnboundedSender<Event>) {
222 self.controller_tx = Some(controller_tx)
223
224 }
225
226 // todo: This would be cleaner with tokio::Child, but does that merit a conversion? I'm not sure if its worth it for the previewer to yield control while waiting for output cuz we are multithreaded anyways
227 fn cleanup_procs(mut self) -> Vec<Child> {
228 let total_timeout = Duration::from_secs(1);
229 let start = Instant::now();
230
231 self.procs.retain_mut(|child| {
232 loop {
233 match child.try_wait() {
234 Ok(Some(_)) => return false,
235 Ok(None) => {
236 if start.elapsed() >= total_timeout {
237 error!("Child failed to exit in time: {:?}", child);
238 return true;
239 } else {
240 thread::sleep(Duration::from_millis(10));
241 }
242 }
243 Err(e) => {
244 error!("Error waiting on child: {e}");
245 return true;
246 }
247 }
248 }
249 });
250
251 self.procs
252 }
253
254 fn prune_procs(&mut self) {
255 self.procs.retain_mut(|child| match child.try_wait() {
256 Ok(None) => true,
257 Ok(Some(_)) => false,
258 Err(e) => {
259 warn!("Error waiting on child: {e}");
260 true
261 }
262 });
263 }
264}
265
266
267// ---------- NON ANSI VARIANT
268// let reader = BufReader::new(stdout);
269// if self.config.try_lossy {
270// for line_result in reader.split(b'\n') {
271// match line_result {
272// Ok(bytes) => {
273// let line =
274// String::from_utf8_lossy(&bytes).into_owned();
275// lines.push(Line::from(line));
276// }
277// Err(e) => error!("Failed to read line: {:?}", e),
278// }
279// }
280// } else {
281// for line_result in reader.lines() {
282// match line_result {
283// Ok(line) => lines.push(Line::from(line)),
284// Err(e) => {
285// // todo: don't know why that even with an explicit ratatui clear, garbage sometimes stays on the screen
286// error!("Error displaying {cmd}: {:?}", e);
287// break;
288// }
289// }
290// }
291// }