Skip to main content

command_stream/
stream.rs

1//! Streaming and async iteration support
2//!
3//! This module provides async streaming capabilities similar to JavaScript's
4//! async iterators and stream handling in `$.stream-utils.mjs`.
5//!
6//! ## Usage
7//!
8//! ```rust,no_run
9//! use command_stream::{StreamingRunner, OutputChunk};
10//!
11//! #[tokio::main]
12//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
13//!     let runner = StreamingRunner::new("yes hello");
14//!
15//!     // Stream output as it arrives
16//!     let mut stream = runner.stream();
17//!     let mut count = 0;
18//!     while let Some(chunk) = stream.next().await {
19//!         match chunk {
20//!             OutputChunk::Stdout(data) => {
21//!                 print!("{}", String::from_utf8_lossy(&data));
22//!                 count += 1;
23//!                 if count >= 5 {
24//!                     break;
25//!                 }
26//!             }
27//!             OutputChunk::Stderr(data) => {
28//!                 eprint!("{}", String::from_utf8_lossy(&data));
29//!             }
30//!             OutputChunk::Exit(code) => {
31//!                 println!("Process exited with code: {}", code);
32//!                 break;
33//!             }
34//!         }
35//!     }
36//!
37//!     Ok(())
38//! }
39//! ```
40
41use std::collections::HashMap;
42use std::path::PathBuf;
43use std::process::Stdio;
44use tokio::io::BufReader;
45use tokio::process::Command;
46use tokio::sync::mpsc;
47
48use crate::trace::trace_lazy;
49use crate::{CommandResult, Result};
50
51/// A chunk of output from a streaming process
52#[derive(Debug, Clone)]
53pub enum OutputChunk {
54    /// Stdout data
55    Stdout(Vec<u8>),
56    /// Stderr data
57    Stderr(Vec<u8>),
58    /// Process exit code
59    Exit(i32),
60}
61
62/// A streaming process runner that allows async iteration over output
63pub struct StreamingRunner {
64    command: String,
65    cwd: Option<PathBuf>,
66    env: Option<HashMap<String, String>>,
67    stdin_content: Option<String>,
68}
69
70impl StreamingRunner {
71    /// Create a new streaming runner
72    pub fn new(command: impl Into<String>) -> Self {
73        StreamingRunner {
74            command: command.into(),
75            cwd: None,
76            env: None,
77            stdin_content: None,
78        }
79    }
80
81    /// Set the working directory
82    pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
83        self.cwd = Some(path.into());
84        self
85    }
86
87    /// Set environment variables
88    pub fn env(mut self, env: HashMap<String, String>) -> Self {
89        self.env = Some(env);
90        self
91    }
92
93    /// Set stdin content
94    pub fn stdin(mut self, content: impl Into<String>) -> Self {
95        self.stdin_content = Some(content.into());
96        self
97    }
98
99    /// Start the process and return a stream of output chunks
100    pub fn stream(mut self) -> OutputStream {
101        let (tx, rx) = mpsc::channel(1024);
102
103        // Spawn the process handling task
104        let command = self.command.clone();
105        let cwd = self.cwd.take();
106        let env = self.env.take();
107        let stdin_content = self.stdin_content.take();
108
109        tokio::spawn(async move {
110            if let Err(e) =
111                run_streaming_process(command, cwd, env, stdin_content, tx.clone()).await
112            {
113                trace_lazy("StreamingRunner", || format!("Error: {}", e));
114            }
115        });
116
117        OutputStream { rx }
118    }
119
120    /// Run to completion and collect all output
121    pub async fn collect(self) -> Result<CommandResult> {
122        let mut stdout = Vec::new();
123        let mut stderr = Vec::new();
124        let mut exit_code = 0;
125
126        let mut stream = self.stream();
127        while let Some(chunk) = stream.rx.recv().await {
128            match chunk {
129                OutputChunk::Stdout(data) => stdout.extend(data),
130                OutputChunk::Stderr(data) => stderr.extend(data),
131                OutputChunk::Exit(code) => exit_code = code,
132            }
133        }
134
135        Ok(CommandResult {
136            stdout: String::from_utf8_lossy(&stdout).to_string(),
137            stderr: String::from_utf8_lossy(&stderr).to_string(),
138            code: exit_code,
139        })
140    }
141}
142
143/// Stream of output chunks from a process
144pub struct OutputStream {
145    rx: mpsc::Receiver<OutputChunk>,
146}
147
148impl OutputStream {
149    /// Receive the next chunk
150    pub async fn next(&mut self) -> Option<OutputChunk> {
151        self.rx.recv().await
152    }
153
154    /// Collect all remaining output into vectors
155    pub async fn collect(mut self) -> (Vec<u8>, Vec<u8>, i32) {
156        let mut stdout = Vec::new();
157        let mut stderr = Vec::new();
158        let mut exit_code = 0;
159
160        while let Some(chunk) = self.rx.recv().await {
161            match chunk {
162                OutputChunk::Stdout(data) => stdout.extend(data),
163                OutputChunk::Stderr(data) => stderr.extend(data),
164                OutputChunk::Exit(code) => exit_code = code,
165            }
166        }
167
168        (stdout, stderr, exit_code)
169    }
170
171    /// Collect stdout only, discarding stderr
172    pub async fn collect_stdout(mut self) -> Vec<u8> {
173        let mut stdout = Vec::new();
174
175        while let Some(chunk) = self.rx.recv().await {
176            if let OutputChunk::Stdout(data) = chunk {
177                stdout.extend(data);
178            }
179        }
180
181        stdout
182    }
183}
184
185/// Run a streaming process and send output to the channel
186async fn run_streaming_process(
187    command: String,
188    cwd: Option<PathBuf>,
189    env: Option<HashMap<String, String>>,
190    stdin_content: Option<String>,
191    tx: mpsc::Sender<OutputChunk>,
192) -> Result<()> {
193    trace_lazy("StreamingRunner", || format!("Starting: {}", command));
194
195    let shell = find_available_shell();
196    let mut cmd = Command::new(&shell.cmd);
197    for arg in &shell.args {
198        cmd.arg(arg);
199    }
200    cmd.arg(&command);
201
202    // Configure stdio
203    if stdin_content.is_some() {
204        cmd.stdin(Stdio::piped());
205    } else {
206        cmd.stdin(Stdio::null());
207    }
208    cmd.stdout(Stdio::piped());
209    cmd.stderr(Stdio::piped());
210
211    // Set working directory
212    if let Some(ref cwd) = cwd {
213        cmd.current_dir(cwd);
214    }
215
216    // Set environment
217    if let Some(ref env_vars) = env {
218        for (key, value) in env_vars {
219            cmd.env(key, value);
220        }
221    }
222
223    // Spawn the process
224    let mut child = cmd.spawn()?;
225
226    // Write stdin if needed
227    if let Some(content) = stdin_content {
228        if let Some(mut stdin) = child.stdin.take() {
229            use tokio::io::AsyncWriteExt;
230            let _ = stdin.write_all(content.as_bytes()).await;
231            let _ = stdin.shutdown().await;
232        }
233    }
234
235    // Spawn stdout reader
236    let stdout = child.stdout.take();
237    let tx_stdout = tx.clone();
238    let stdout_handle = if let Some(stdout) = stdout {
239        Some(tokio::spawn(async move {
240            let mut reader = BufReader::new(stdout);
241            let mut buf = vec![0u8; 8192];
242            loop {
243                use tokio::io::AsyncReadExt;
244                match reader.read(&mut buf).await {
245                    Ok(0) => break,
246                    Ok(n) => {
247                        if tx_stdout
248                            .send(OutputChunk::Stdout(buf[..n].to_vec()))
249                            .await
250                            .is_err()
251                        {
252                            break;
253                        }
254                    }
255                    Err(_) => break,
256                }
257            }
258        }))
259    } else {
260        None
261    };
262
263    // Spawn stderr reader
264    let stderr = child.stderr.take();
265    let tx_stderr = tx.clone();
266    let stderr_handle = if let Some(stderr) = stderr {
267        Some(tokio::spawn(async move {
268            let mut reader = BufReader::new(stderr);
269            let mut buf = vec![0u8; 8192];
270            loop {
271                use tokio::io::AsyncReadExt;
272                match reader.read(&mut buf).await {
273                    Ok(0) => break,
274                    Ok(n) => {
275                        if tx_stderr
276                            .send(OutputChunk::Stderr(buf[..n].to_vec()))
277                            .await
278                            .is_err()
279                        {
280                            break;
281                        }
282                    }
283                    Err(_) => break,
284                }
285            }
286        }))
287    } else {
288        None
289    };
290
291    // Wait for readers to complete
292    if let Some(handle) = stdout_handle {
293        let _ = handle.await;
294    }
295    if let Some(handle) = stderr_handle {
296        let _ = handle.await;
297    }
298
299    // Wait for process to exit
300    let status = child.wait().await?;
301    let code = status.code().unwrap_or(-1);
302
303    // Send exit code
304    let _ = tx.send(OutputChunk::Exit(code)).await;
305
306    trace_lazy("StreamingRunner", || format!("Exited with code: {}", code));
307
308    Ok(())
309}
310
311/// Shell configuration
312#[derive(Debug, Clone)]
313struct ShellConfig {
314    cmd: String,
315    args: Vec<String>,
316}
317
318/// Find an available shell
319fn find_available_shell() -> ShellConfig {
320    let is_windows = cfg!(windows);
321
322    if is_windows {
323        ShellConfig {
324            cmd: "cmd.exe".to_string(),
325            args: vec!["/c".to_string()],
326        }
327    } else {
328        let shells = [
329            ("/bin/sh", "-c"),
330            ("/usr/bin/sh", "-c"),
331            ("/bin/bash", "-c"),
332        ];
333
334        for (cmd, arg) in shells {
335            if std::path::Path::new(cmd).exists() {
336                return ShellConfig {
337                    cmd: cmd.to_string(),
338                    args: vec![arg.to_string()],
339                };
340            }
341        }
342
343        ShellConfig {
344            cmd: "/bin/sh".to_string(),
345            args: vec!["-c".to_string()],
346        }
347    }
348}
349
350/// Async iterator trait for output streams
351#[async_trait::async_trait]
352pub trait AsyncIterator {
353    type Item;
354
355    /// Get the next item from the iterator
356    async fn next(&mut self) -> Option<Self::Item>;
357}
358
359#[async_trait::async_trait]
360impl AsyncIterator for OutputStream {
361    type Item = OutputChunk;
362
363    async fn next(&mut self) -> Option<Self::Item> {
364        self.rx.recv().await
365    }
366}
367
368/// Extension trait to convert ProcessRunner into a stream
369pub trait IntoStream {
370    /// Convert into an output stream
371    fn into_stream(self) -> OutputStream;
372}
373
374impl IntoStream for crate::ProcessRunner {
375    fn into_stream(self) -> OutputStream {
376        let streaming = StreamingRunner::new(self.command().to_string());
377        streaming.stream()
378    }
379}