1use crate::error::Result;
7use crate::tracing_compat::{debug, info, info_span, trace, warn, Instrument};
8use async_trait::async_trait;
9use std::process::Stdio;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::process::Command as TokioCommand;
12use tokio::sync::mpsc;
13
14#[derive(Debug, Clone)]
16pub enum OutputLine {
17 Stdout(String),
19 Stderr(String),
21}
22
23#[derive(Debug, Clone)]
25pub struct StreamResult {
26 pub exit_code: i32,
28 pub success: bool,
30 pub stdout: Option<String>,
32 pub stderr: Option<String>,
34}
35
36impl StreamResult {
37 #[must_use]
39 pub fn is_success(&self) -> bool {
40 self.success
41 }
42}
43
44#[async_trait]
46pub trait StreamableCommand: Send + Sync {
47 async fn stream<F>(&self, handler: F) -> Result<StreamResult>
53 where
54 F: FnMut(OutputLine) + Send + 'static;
55
56 async fn stream_channel(&self) -> Result<(mpsc::Receiver<OutputLine>, StreamResult)>;
62}
63
64pub struct StreamHandler;
66
67impl StreamHandler {
68 pub fn print() -> impl FnMut(OutputLine) {
70 move |line| match line {
71 OutputLine::Stdout(s) => println!("{s}"),
72 OutputLine::Stderr(s) => eprintln!("{s}"),
73 }
74 }
75
76 pub fn tee<F>(mut handler: F) -> impl FnMut(OutputLine) -> (Vec<String>, Vec<String>)
78 where
79 F: FnMut(&OutputLine),
80 {
81 let mut stdout_lines = Vec::new();
82 let mut stderr_lines = Vec::new();
83
84 move |line| {
85 handler(&line);
86 match line {
87 OutputLine::Stdout(s) => stdout_lines.push(s),
88 OutputLine::Stderr(s) => stderr_lines.push(s),
89 }
90 (stdout_lines.clone(), stderr_lines.clone())
91 }
92 }
93
94 pub fn filter(pattern: String) -> impl FnMut(OutputLine) -> Option<String> {
96 move |line| {
97 let text = match &line {
98 OutputLine::Stdout(s) | OutputLine::Stderr(s) => s,
99 };
100 if text.contains(&pattern) {
101 Some(text.clone())
102 } else {
103 None
104 }
105 }
106 }
107
108 pub fn with_prefix(prefix: String) -> impl FnMut(OutputLine) {
110 move |line| match line {
111 OutputLine::Stdout(s) => println!("{prefix}: {s}"),
112 OutputLine::Stderr(s) => eprintln!("{prefix} (error): {s}"),
113 }
114 }
115}
116
117pub(crate) async fn stream_command(
122 cmd: TokioCommand,
123 handler: impl FnMut(OutputLine) + Send + 'static,
124 command_name: &'static str,
125) -> Result<StreamResult> {
126 let span = info_span!("docker.stream", command = command_name, mode = "handler",);
127 stream_command_inner(cmd, handler, command_name)
128 .instrument(span)
129 .await
130}
131
132#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
133async fn stream_command_inner(
134 mut cmd: TokioCommand,
135 mut handler: impl FnMut(OutputLine) + Send + 'static,
136 command_name: &'static str,
137) -> Result<StreamResult> {
138 cmd.stdout(Stdio::piped());
139 cmd.stderr(Stdio::piped());
140
141 let started_at = std::time::Instant::now();
142
143 let mut child = cmd.spawn().map_err(|e| {
144 warn!(command = command_name, error = %e, "failed to spawn streaming command");
145 crate::error::Error::custom(format!("Failed to spawn command: {e}"))
146 })?;
147
148 let stdout = child
149 .stdout
150 .take()
151 .ok_or_else(|| crate::error::Error::custom("Failed to capture stdout"))?;
152 let stderr = child
153 .stderr
154 .take()
155 .ok_or_else(|| crate::error::Error::custom("Failed to capture stderr"))?;
156
157 let stdout_reader = BufReader::new(stdout);
158 let stderr_reader = BufReader::new(stderr);
159 let mut stdout_lines = stdout_reader.lines();
160 let mut stderr_lines = stderr_reader.lines();
161
162 let mut stdout_accumulator = Vec::new();
163 let mut stderr_accumulator = Vec::new();
164
165 loop {
166 tokio::select! {
167 line = stdout_lines.next_line() => {
168 match line {
169 Ok(Some(text)) => {
170 debug!(stream = "stdout", line = %text, "stream line");
171 stdout_accumulator.push(text.clone());
172 handler(OutputLine::Stdout(text));
173 }
174 Ok(None) => break,
175 Err(e) => {
176 return Err(crate::error::Error::custom(
177 format!("Error reading stdout: {e}")
178 ));
179 }
180 }
181 }
182 line = stderr_lines.next_line() => {
183 match line {
184 Ok(Some(text)) => {
185 debug!(stream = "stderr", line = %text, "stream line");
186 stderr_accumulator.push(text.clone());
187 handler(OutputLine::Stderr(text));
188 }
189 Ok(None) => break,
190 Err(e) => {
191 return Err(crate::error::Error::custom(
192 format!("Error reading stderr: {e}")
193 ));
194 }
195 }
196 }
197 }
198 }
199
200 let status = child
201 .wait()
202 .await
203 .map_err(|e| crate::error::Error::custom(format!("Failed to wait for command: {e}")))?;
204
205 let exit_code = status.code().unwrap_or(-1);
206 let success = status.success();
207 let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
208
209 #[cfg_attr(not(feature = "tracing"), allow(clippy::if_same_then_else))]
210 if success {
211 info!(
212 command = command_name,
213 exit_code = exit_code,
214 duration_ms = duration_ms,
215 stdout_lines = stdout_accumulator.len(),
216 stderr_lines = stderr_accumulator.len(),
217 "stream command completed"
218 );
219 } else {
220 warn!(
221 command = command_name,
222 exit_code = exit_code,
223 duration_ms = duration_ms,
224 stdout_lines = stdout_accumulator.len(),
225 stderr_lines = stderr_accumulator.len(),
226 "stream command exited non-zero"
227 );
228 }
229
230 trace!(command = command_name, "stream finished");
231
232 Ok(StreamResult {
233 exit_code,
234 success,
235 stdout: Some(stdout_accumulator.join("\n")),
236 stderr: Some(stderr_accumulator.join("\n")),
237 })
238}
239
240pub(crate) async fn stream_command_channel(
244 cmd: TokioCommand,
245 command_name: &'static str,
246) -> Result<(mpsc::Receiver<OutputLine>, StreamResult)> {
247 let span = info_span!("docker.stream", command = command_name, mode = "channel",);
248 stream_command_channel_inner(cmd, command_name)
249 .instrument(span)
250 .await
251}
252
253#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
254async fn stream_command_channel_inner(
255 mut cmd: TokioCommand,
256 command_name: &'static str,
257) -> Result<(mpsc::Receiver<OutputLine>, StreamResult)> {
258 let (tx, rx) = mpsc::channel(100);
259 let started_at = std::time::Instant::now();
260
261 cmd.stdout(Stdio::piped());
262 cmd.stderr(Stdio::piped());
263
264 let mut child = cmd.spawn().map_err(|e| {
265 warn!(command = command_name, error = %e, "failed to spawn streaming command");
266 crate::error::Error::custom(format!("Failed to spawn command: {e}"))
267 })?;
268
269 let stdout = child
270 .stdout
271 .take()
272 .ok_or_else(|| crate::error::Error::custom("Failed to capture stdout"))?;
273 let stderr = child
274 .stderr
275 .take()
276 .ok_or_else(|| crate::error::Error::custom("Failed to capture stderr"))?;
277
278 let tx_clone = tx.clone();
279
280 let stdout_task = tokio::spawn(async move {
282 let reader = BufReader::new(stdout);
283 let mut reader_lines = reader.lines();
284 let mut lines = Vec::new();
285 while let Ok(Some(line)) = reader_lines.next_line().await {
286 debug!(stream = "stdout", line = %line, "stream line");
287 lines.push(line.clone());
288 let _ = tx.send(OutputLine::Stdout(line)).await;
289 }
290 lines
291 });
292
293 let stderr_task = tokio::spawn(async move {
295 let reader = BufReader::new(stderr);
296 let mut reader_lines = reader.lines();
297 let mut lines = Vec::new();
298 while let Ok(Some(line)) = reader_lines.next_line().await {
299 debug!(stream = "stderr", line = %line, "stream line");
300 lines.push(line.clone());
301 let _ = tx_clone.send(OutputLine::Stderr(line)).await;
302 }
303 lines
304 });
305
306 let status_future = child.wait();
308 let (stdout_lines, stderr_lines, status) =
309 tokio::join!(stdout_task, stderr_task, status_future);
310
311 let stdout_lines = stdout_lines.unwrap_or_default();
312 let stderr_lines = stderr_lines.unwrap_or_default();
313 let status = status
314 .map_err(|e| crate::error::Error::custom(format!("Failed to wait for command: {e}")))?;
315
316 let exit_code = status.code().unwrap_or(-1);
317 let success = status.success();
318 let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
319
320 #[cfg_attr(not(feature = "tracing"), allow(clippy::if_same_then_else))]
321 if success {
322 info!(
323 command = command_name,
324 exit_code = exit_code,
325 duration_ms = duration_ms,
326 stdout_lines = stdout_lines.len(),
327 stderr_lines = stderr_lines.len(),
328 "stream command completed"
329 );
330 } else {
331 warn!(
332 command = command_name,
333 exit_code = exit_code,
334 duration_ms = duration_ms,
335 stdout_lines = stdout_lines.len(),
336 stderr_lines = stderr_lines.len(),
337 "stream command exited non-zero"
338 );
339 }
340
341 Ok((
342 rx,
343 StreamResult {
344 exit_code,
345 success,
346 stdout: Some(stdout_lines.join("\n")),
347 stderr: Some(stderr_lines.join("\n")),
348 },
349 ))
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[test]
357 fn test_output_line() {
358 let stdout = OutputLine::Stdout("test".to_string());
359 let stderr = OutputLine::Stderr("error".to_string());
360
361 match stdout {
362 OutputLine::Stdout(s) => assert_eq!(s, "test"),
363 OutputLine::Stderr(_) => panic!("Wrong variant"),
364 }
365
366 match stderr {
367 OutputLine::Stderr(s) => assert_eq!(s, "error"),
368 OutputLine::Stdout(_) => panic!("Wrong variant"),
369 }
370 }
371
372 #[test]
373 fn test_stream_result() {
374 let result = StreamResult {
375 exit_code: 0,
376 success: true,
377 stdout: Some("output".to_string()),
378 stderr: None,
379 };
380
381 assert!(result.is_success());
382 assert_eq!(result.exit_code, 0);
383 assert_eq!(result.stdout, Some("output".to_string()));
384 assert!(result.stderr.is_none());
385 }
386
387 #[test]
388 fn test_stream_handler_filter() {
389 let mut filter = StreamHandler::filter("error".to_string());
390
391 let result1 = filter(OutputLine::Stdout(
392 "this contains error message".to_string(),
393 ));
394 assert_eq!(result1, Some("this contains error message".to_string()));
395
396 let result2 = filter(OutputLine::Stdout("normal message".to_string()));
397 assert!(result2.is_none());
398 }
399}