Skip to main content

docker_wrapper/
stream.rs

1//! Streaming support for Docker command output.
2//!
3//! This module provides functionality to stream output from long-running Docker
4//! commands in real-time, rather than waiting for completion.
5
6use crate::error::Result;
7use crate::tracing_compat::{debug, info, info_span, trace, warn, Instrument};
8use async_trait::async_trait;
9use std::process::Stdio;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::process::Command as TokioCommand;
12use tokio::sync::mpsc;
13
14/// Represents a line of output from a streaming command
15#[derive(Debug, Clone)]
16pub enum OutputLine {
17    /// Standard output line
18    Stdout(String),
19    /// Standard error line  
20    Stderr(String),
21}
22
23/// Result returned from streaming commands
24#[derive(Debug, Clone)]
25pub struct StreamResult {
26    /// Exit code of the command
27    pub exit_code: i32,
28    /// Whether the command succeeded (exit code 0)
29    pub success: bool,
30    /// Accumulated stdout if captured
31    pub stdout: Option<String>,
32    /// Accumulated stderr if captured
33    pub stderr: Option<String>,
34}
35
36impl StreamResult {
37    /// Check if the command was successful
38    #[must_use]
39    pub fn is_success(&self) -> bool {
40        self.success
41    }
42}
43
44/// Trait for commands that support streaming output
45#[async_trait]
46pub trait StreamableCommand: Send + Sync {
47    /// Run the command with streaming output
48    ///
49    /// # Errors
50    ///
51    /// Returns an error if the command fails to spawn or encounters an I/O error
52    async fn stream<F>(&self, handler: F) -> Result<StreamResult>
53    where
54        F: FnMut(OutputLine) + Send + 'static;
55
56    /// Run the command with streaming output via a channel
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the command fails to spawn or encounters an I/O error
61    async fn stream_channel(&self) -> Result<(mpsc::Receiver<OutputLine>, StreamResult)>;
62}
63
64/// Stream handler utilities
65pub struct StreamHandler;
66
67impl StreamHandler {
68    /// Print output lines to stdout/stderr
69    pub fn print() -> impl FnMut(OutputLine) {
70        move |line| match line {
71            OutputLine::Stdout(s) => println!("{s}"),
72            OutputLine::Stderr(s) => eprintln!("{s}"),
73        }
74    }
75
76    /// Collect output while also calling another handler
77    pub fn tee<F>(mut handler: F) -> impl FnMut(OutputLine) -> (Vec<String>, Vec<String>)
78    where
79        F: FnMut(&OutputLine),
80    {
81        let mut stdout_lines = Vec::new();
82        let mut stderr_lines = Vec::new();
83
84        move |line| {
85            handler(&line);
86            match line {
87                OutputLine::Stdout(s) => stdout_lines.push(s),
88                OutputLine::Stderr(s) => stderr_lines.push(s),
89            }
90            (stdout_lines.clone(), stderr_lines.clone())
91        }
92    }
93
94    /// Filter lines by pattern
95    pub fn filter(pattern: String) -> impl FnMut(OutputLine) -> Option<String> {
96        move |line| {
97            let text = match &line {
98                OutputLine::Stdout(s) | OutputLine::Stderr(s) => s,
99            };
100            if text.contains(&pattern) {
101                Some(text.clone())
102            } else {
103                None
104            }
105        }
106    }
107
108    /// Log output lines with a prefix
109    pub fn with_prefix(prefix: String) -> impl FnMut(OutputLine) {
110        move |line| match line {
111            OutputLine::Stdout(s) => println!("{prefix}: {s}"),
112            OutputLine::Stderr(s) => eprintln!("{prefix} (error): {s}"),
113        }
114    }
115}
116
117/// Internal helper to spawn a streaming command.
118///
119/// `command_name` is a short label (e.g. "run", "logs", "build") used for
120/// tracing spans; it's purely diagnostic and not passed to the child process.
121pub(crate) async fn stream_command(
122    cmd: TokioCommand,
123    handler: impl FnMut(OutputLine) + Send + 'static,
124    command_name: &'static str,
125) -> Result<StreamResult> {
126    let span = info_span!("docker.stream", command = command_name, mode = "handler",);
127    stream_command_inner(cmd, handler, command_name)
128        .instrument(span)
129        .await
130}
131
132#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
133async fn stream_command_inner(
134    mut cmd: TokioCommand,
135    mut handler: impl FnMut(OutputLine) + Send + 'static,
136    command_name: &'static str,
137) -> Result<StreamResult> {
138    cmd.stdout(Stdio::piped());
139    cmd.stderr(Stdio::piped());
140
141    let started_at = std::time::Instant::now();
142
143    let mut child = cmd.spawn().map_err(|e| {
144        warn!(command = command_name, error = %e, "failed to spawn streaming command");
145        crate::error::Error::custom(format!("Failed to spawn command: {e}"))
146    })?;
147
148    let stdout = child
149        .stdout
150        .take()
151        .ok_or_else(|| crate::error::Error::custom("Failed to capture stdout"))?;
152    let stderr = child
153        .stderr
154        .take()
155        .ok_or_else(|| crate::error::Error::custom("Failed to capture stderr"))?;
156
157    let stdout_reader = BufReader::new(stdout);
158    let stderr_reader = BufReader::new(stderr);
159    let mut stdout_lines = stdout_reader.lines();
160    let mut stderr_lines = stderr_reader.lines();
161
162    let mut stdout_accumulator = Vec::new();
163    let mut stderr_accumulator = Vec::new();
164
165    loop {
166        tokio::select! {
167            line = stdout_lines.next_line() => {
168                match line {
169                    Ok(Some(text)) => {
170                        debug!(stream = "stdout", line = %text, "stream line");
171                        stdout_accumulator.push(text.clone());
172                        handler(OutputLine::Stdout(text));
173                    }
174                    Ok(None) => break,
175                    Err(e) => {
176                        return Err(crate::error::Error::custom(
177                            format!("Error reading stdout: {e}")
178                        ));
179                    }
180                }
181            }
182            line = stderr_lines.next_line() => {
183                match line {
184                    Ok(Some(text)) => {
185                        debug!(stream = "stderr", line = %text, "stream line");
186                        stderr_accumulator.push(text.clone());
187                        handler(OutputLine::Stderr(text));
188                    }
189                    Ok(None) => break,
190                    Err(e) => {
191                        return Err(crate::error::Error::custom(
192                            format!("Error reading stderr: {e}")
193                        ));
194                    }
195                }
196            }
197        }
198    }
199
200    let status = child
201        .wait()
202        .await
203        .map_err(|e| crate::error::Error::custom(format!("Failed to wait for command: {e}")))?;
204
205    let exit_code = status.code().unwrap_or(-1);
206    let success = status.success();
207    let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
208
209    #[cfg_attr(not(feature = "tracing"), allow(clippy::if_same_then_else))]
210    if success {
211        info!(
212            command = command_name,
213            exit_code = exit_code,
214            duration_ms = duration_ms,
215            stdout_lines = stdout_accumulator.len(),
216            stderr_lines = stderr_accumulator.len(),
217            "stream command completed"
218        );
219    } else {
220        warn!(
221            command = command_name,
222            exit_code = exit_code,
223            duration_ms = duration_ms,
224            stdout_lines = stdout_accumulator.len(),
225            stderr_lines = stderr_accumulator.len(),
226            "stream command exited non-zero"
227        );
228    }
229
230    trace!(command = command_name, "stream finished");
231
232    Ok(StreamResult {
233        exit_code,
234        success,
235        stdout: Some(stdout_accumulator.join("\n")),
236        stderr: Some(stderr_accumulator.join("\n")),
237    })
238}
239
240/// Internal helper to spawn a streaming command with channel output.
241///
242/// `command_name` is a short diagnostic label used for tracing spans.
243pub(crate) async fn stream_command_channel(
244    cmd: TokioCommand,
245    command_name: &'static str,
246) -> Result<(mpsc::Receiver<OutputLine>, StreamResult)> {
247    let span = info_span!("docker.stream", command = command_name, mode = "channel",);
248    stream_command_channel_inner(cmd, command_name)
249        .instrument(span)
250        .await
251}
252
253#[cfg_attr(not(feature = "tracing"), allow(unused_variables))]
254async fn stream_command_channel_inner(
255    mut cmd: TokioCommand,
256    command_name: &'static str,
257) -> Result<(mpsc::Receiver<OutputLine>, StreamResult)> {
258    let (tx, rx) = mpsc::channel(100);
259    let started_at = std::time::Instant::now();
260
261    cmd.stdout(Stdio::piped());
262    cmd.stderr(Stdio::piped());
263
264    let mut child = cmd.spawn().map_err(|e| {
265        warn!(command = command_name, error = %e, "failed to spawn streaming command");
266        crate::error::Error::custom(format!("Failed to spawn command: {e}"))
267    })?;
268
269    let stdout = child
270        .stdout
271        .take()
272        .ok_or_else(|| crate::error::Error::custom("Failed to capture stdout"))?;
273    let stderr = child
274        .stderr
275        .take()
276        .ok_or_else(|| crate::error::Error::custom("Failed to capture stderr"))?;
277
278    let tx_clone = tx.clone();
279
280    // Spawn task to read stdout
281    let stdout_task = tokio::spawn(async move {
282        let reader = BufReader::new(stdout);
283        let mut reader_lines = reader.lines();
284        let mut lines = Vec::new();
285        while let Ok(Some(line)) = reader_lines.next_line().await {
286            debug!(stream = "stdout", line = %line, "stream line");
287            lines.push(line.clone());
288            let _ = tx.send(OutputLine::Stdout(line)).await;
289        }
290        lines
291    });
292
293    // Spawn task to read stderr
294    let stderr_task = tokio::spawn(async move {
295        let reader = BufReader::new(stderr);
296        let mut reader_lines = reader.lines();
297        let mut lines = Vec::new();
298        while let Ok(Some(line)) = reader_lines.next_line().await {
299            debug!(stream = "stderr", line = %line, "stream line");
300            lines.push(line.clone());
301            let _ = tx_clone.send(OutputLine::Stderr(line)).await;
302        }
303        lines
304    });
305
306    // Wait for both tasks and the process
307    let status_future = child.wait();
308    let (stdout_lines, stderr_lines, status) =
309        tokio::join!(stdout_task, stderr_task, status_future);
310
311    let stdout_lines = stdout_lines.unwrap_or_default();
312    let stderr_lines = stderr_lines.unwrap_or_default();
313    let status = status
314        .map_err(|e| crate::error::Error::custom(format!("Failed to wait for command: {e}")))?;
315
316    let exit_code = status.code().unwrap_or(-1);
317    let success = status.success();
318    let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
319
320    #[cfg_attr(not(feature = "tracing"), allow(clippy::if_same_then_else))]
321    if success {
322        info!(
323            command = command_name,
324            exit_code = exit_code,
325            duration_ms = duration_ms,
326            stdout_lines = stdout_lines.len(),
327            stderr_lines = stderr_lines.len(),
328            "stream command completed"
329        );
330    } else {
331        warn!(
332            command = command_name,
333            exit_code = exit_code,
334            duration_ms = duration_ms,
335            stdout_lines = stdout_lines.len(),
336            stderr_lines = stderr_lines.len(),
337            "stream command exited non-zero"
338        );
339    }
340
341    Ok((
342        rx,
343        StreamResult {
344            exit_code,
345            success,
346            stdout: Some(stdout_lines.join("\n")),
347            stderr: Some(stderr_lines.join("\n")),
348        },
349    ))
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    #[test]
357    fn test_output_line() {
358        let stdout = OutputLine::Stdout("test".to_string());
359        let stderr = OutputLine::Stderr("error".to_string());
360
361        match stdout {
362            OutputLine::Stdout(s) => assert_eq!(s, "test"),
363            OutputLine::Stderr(_) => panic!("Wrong variant"),
364        }
365
366        match stderr {
367            OutputLine::Stderr(s) => assert_eq!(s, "error"),
368            OutputLine::Stdout(_) => panic!("Wrong variant"),
369        }
370    }
371
372    #[test]
373    fn test_stream_result() {
374        let result = StreamResult {
375            exit_code: 0,
376            success: true,
377            stdout: Some("output".to_string()),
378            stderr: None,
379        };
380
381        assert!(result.is_success());
382        assert_eq!(result.exit_code, 0);
383        assert_eq!(result.stdout, Some("output".to_string()));
384        assert!(result.stderr.is_none());
385    }
386
387    #[test]
388    fn test_stream_handler_filter() {
389        let mut filter = StreamHandler::filter("error".to_string());
390
391        let result1 = filter(OutputLine::Stdout(
392            "this contains error message".to_string(),
393        ));
394        assert_eq!(result1, Some("this contains error message".to_string()));
395
396        let result2 = filter(OutputLine::Stdout("normal message".to_string()));
397        assert!(result2.is_none());
398    }
399}