Skip to main content

command_stream/
stream.rs

1//! Streaming and async iteration support
2//!
3//! This module provides async streaming capabilities similar to JavaScript's
4//! async iterators and stream handling in `$.stream-utils.mjs`.
5//!
6//! It mirrors the JavaScript implementation's behavior for issue #155:
7//!
8//!   1. The stream yields an explicit `OutputChunk::Exit(code)` when the
9//!      process exits, so consumers can observe the exit code from inside the
10//!      loop.
11//!   2. The stream does not hang forever when the process has exited but a
12//!      grandchild keeps the stdio pipes open (the readers are drained with a
13//!      grace period and then aborted).
14//!   3. The process can be stopped from inside the loop via
15//!      [`OutputStream::kill`] / [`OutputStream::kill_with`], and abandoning the
16//!      stream (e.g. `break`) also stops the process.
17//!   4. The stop signal is configurable via
18//!      [`StreamingRunner::kill_signal`] (default `SIGTERM`), just like the
19//!      JavaScript `killSignal` option.
20//!
21//! ## Usage
22//!
23//! ```rust,no_run
24//! use command_stream::{StreamingRunner, OutputChunk};
25//!
26//! #[tokio::main]
27//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
28//!     let runner = StreamingRunner::new("yes hello");
29//!
30//!     // Stream output as it arrives
31//!     let mut stream = runner.stream();
32//!     let mut count = 0;
33//!     while let Some(chunk) = stream.next().await {
34//!         match chunk {
35//!             OutputChunk::Stdout(data) => {
36//!                 print!("{}", String::from_utf8_lossy(&data));
37//!                 count += 1;
38//!                 if count >= 5 {
39//!                     // Stop the process from inside the loop.
40//!                     stream.kill();
41//!                 }
42//!             }
43//!             OutputChunk::Stderr(data) => {
44//!                 eprint!("{}", String::from_utf8_lossy(&data));
45//!             }
46//!             OutputChunk::Exit(code) => {
47//!                 println!("Process exited with code: {}", code);
48//!                 break;
49//!             }
50//!         }
51//!     }
52//!
53//!     Ok(())
54//! }
55//! ```
56
57use std::collections::HashMap;
58use std::path::PathBuf;
59use std::process::Stdio;
60use std::time::Duration;
61use tokio::io::BufReader;
62use tokio::process::Command;
63use tokio::sync::mpsc;
64
65use crate::trace::trace_lazy;
66use crate::{CommandResult, Result};
67
68/// Default grace period (in milliseconds) to keep draining the stdio pipes
69/// after the process has exited before aborting any lingering readers. Mirrors
70/// the JavaScript `exitPumpGrace` default.
71const DEFAULT_EXIT_PUMP_GRACE_MS: u64 = 100;
72
73/// Default signal used to stop a process when no explicit signal is given.
74const DEFAULT_KILL_SIGNAL: &str = "SIGTERM";
75
76/// A chunk of output from a streaming process
77#[derive(Debug, Clone)]
78pub enum OutputChunk {
79    /// Stdout data
80    Stdout(Vec<u8>),
81    /// Stderr data
82    Stderr(Vec<u8>),
83    /// Process exit code
84    Exit(i32),
85}
86
87/// A streaming process runner that allows async iteration over output
88pub struct StreamingRunner {
89    command: String,
90    cwd: Option<PathBuf>,
91    env: Option<HashMap<String, String>>,
92    stdin_content: Option<String>,
93    kill_signal: String,
94    exit_pump_grace_ms: u64,
95}
96
97impl StreamingRunner {
98    /// Create a new streaming runner
99    pub fn new(command: impl Into<String>) -> Self {
100        StreamingRunner {
101            command: command.into(),
102            cwd: None,
103            env: None,
104            stdin_content: None,
105            kill_signal: DEFAULT_KILL_SIGNAL.to_string(),
106            exit_pump_grace_ms: DEFAULT_EXIT_PUMP_GRACE_MS,
107        }
108    }
109
110    /// Set the working directory
111    pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
112        self.cwd = Some(path.into());
113        self
114    }
115
116    /// Set environment variables
117    pub fn env(mut self, env: HashMap<String, String>) -> Self {
118        self.env = Some(env);
119        self
120    }
121
122    /// Set stdin content
123    pub fn stdin(mut self, content: impl Into<String>) -> Self {
124        self.stdin_content = Some(content.into());
125        self
126    }
127
128    /// Configure the signal used to stop the process when it is killed without
129    /// an explicit signal — i.e. [`OutputStream::kill`] or abandoning the
130    /// stream. Mirrors the JavaScript `killSignal` option (default `SIGTERM`).
131    ///
132    /// The reported exit code follows the conventional `128 + signal` mapping
133    /// (e.g. `SIGTERM` => 143, `SIGINT` => 130, `SIGKILL` => 137).
134    pub fn kill_signal(mut self, signal: impl Into<String>) -> Self {
135        self.kill_signal = signal.into();
136        self
137    }
138
139    /// Configure the grace period (in milliseconds) to keep draining the stdio
140    /// pipes after the process exits before aborting lingering readers. Mirrors
141    /// the JavaScript `exitPumpGrace` option (default 100ms).
142    pub fn exit_pump_grace_ms(mut self, ms: u64) -> Self {
143        self.exit_pump_grace_ms = ms;
144        self
145    }
146
147    /// Start the process and return a stream of output chunks
148    pub fn stream(mut self) -> OutputStream {
149        let (tx, rx) = mpsc::channel(1024);
150        // Unbounded so a synchronous Drop can request a kill without awaiting.
151        let (kill_tx, kill_rx) = mpsc::unbounded_channel::<String>();
152
153        // Spawn the process handling task
154        let command = self.command.clone();
155        let cwd = self.cwd.take();
156        let env = self.env.take();
157        let stdin_content = self.stdin_content.take();
158        let grace = self.exit_pump_grace_ms;
159        let kill_signal = self.kill_signal.clone();
160
161        tokio::spawn(async move {
162            if let Err(e) =
163                run_streaming_process(command, cwd, env, stdin_content, grace, tx.clone(), kill_rx)
164                    .await
165            {
166                trace_lazy("StreamingRunner", || format!("Error: {}", e));
167            }
168        });
169
170        OutputStream {
171            rx,
172            kill_tx,
173            kill_signal,
174            killed: false,
175        }
176    }
177
178    /// Run to completion and collect all output
179    pub async fn collect(self) -> Result<CommandResult> {
180        let mut stdout = Vec::new();
181        let mut stderr = Vec::new();
182        let mut exit_code = 0;
183
184        let mut stream = self.stream();
185        while let Some(chunk) = stream.rx.recv().await {
186            match chunk {
187                OutputChunk::Stdout(data) => stdout.extend(data),
188                OutputChunk::Stderr(data) => stderr.extend(data),
189                OutputChunk::Exit(code) => exit_code = code,
190            }
191        }
192
193        Ok(CommandResult {
194            stdout: String::from_utf8_lossy(&stdout).to_string(),
195            stderr: String::from_utf8_lossy(&stderr).to_string(),
196            code: exit_code,
197        })
198    }
199}
200
201/// Stream of output chunks from a process
202pub struct OutputStream {
203    rx: mpsc::Receiver<OutputChunk>,
204    kill_tx: mpsc::UnboundedSender<String>,
205    kill_signal: String,
206    killed: bool,
207}
208
209impl OutputStream {
210    /// Receive the next chunk
211    pub async fn next(&mut self) -> Option<OutputChunk> {
212        self.rx.recv().await
213    }
214
215    /// Stop the process using the configured kill signal (default `SIGTERM`).
216    ///
217    /// This can be called from inside the consumption loop to stop a
218    /// long-running or endless process; a terminating `OutputChunk::Exit` is
219    /// still delivered afterwards.
220    pub fn kill(&mut self) {
221        let signal = self.kill_signal.clone();
222        self.kill_with(&signal);
223    }
224
225    /// Stop the process using an explicit signal, overriding the configured
226    /// kill signal for this call.
227    pub fn kill_with(&mut self, signal: &str) {
228        if self.killed {
229            return;
230        }
231        self.killed = true;
232        trace_lazy("OutputStream", || format!("kill | signal={}", signal));
233        // Best effort: the task may have already finished, in which case the
234        // receiver is gone and the send fails harmlessly.
235        let _ = self.kill_tx.send(signal.to_string());
236    }
237
238    /// Collect all remaining output into vectors
239    pub async fn collect(mut self) -> (Vec<u8>, Vec<u8>, i32) {
240        let mut stdout = Vec::new();
241        let mut stderr = Vec::new();
242        let mut exit_code = 0;
243
244        while let Some(chunk) = self.rx.recv().await {
245            match chunk {
246                OutputChunk::Stdout(data) => stdout.extend(data),
247                OutputChunk::Stderr(data) => stderr.extend(data),
248                OutputChunk::Exit(code) => exit_code = code,
249            }
250        }
251
252        (stdout, stderr, exit_code)
253    }
254
255    /// Collect stdout only, discarding stderr
256    pub async fn collect_stdout(mut self) -> Vec<u8> {
257        let mut stdout = Vec::new();
258
259        while let Some(chunk) = self.rx.recv().await {
260            if let OutputChunk::Stdout(data) = chunk {
261                stdout.extend(data);
262            }
263        }
264
265        stdout
266    }
267}
268
269impl Drop for OutputStream {
270    fn drop(&mut self) {
271        // Abandoning the stream (e.g. `break`-ing out of the loop) must stop the
272        // process, matching the JavaScript iterator's `finally` cleanup. If the
273        // process already finished this is a harmless no-op.
274        if !self.killed {
275            let _ = self.kill_tx.send(self.kill_signal.clone());
276        }
277    }
278}
279
280/// Run a streaming process and send output to the channel
281async fn run_streaming_process(
282    command: String,
283    cwd: Option<PathBuf>,
284    env: Option<HashMap<String, String>>,
285    stdin_content: Option<String>,
286    exit_pump_grace_ms: u64,
287    tx: mpsc::Sender<OutputChunk>,
288    mut kill_rx: mpsc::UnboundedReceiver<String>,
289) -> Result<()> {
290    trace_lazy("StreamingRunner", || format!("Starting: {}", command));
291
292    let shell = find_available_shell();
293    let mut cmd = Command::new(&shell.cmd);
294    for arg in &shell.args {
295        cmd.arg(arg);
296    }
297    cmd.arg(&command);
298
299    // Configure stdio
300    if stdin_content.is_some() {
301        cmd.stdin(Stdio::piped());
302    } else {
303        cmd.stdin(Stdio::null());
304    }
305    cmd.stdout(Stdio::piped());
306    cmd.stderr(Stdio::piped());
307
308    // Run the child in its own process group so we can signal the whole group
309    // (parent + grandchildren), matching the JavaScript implementation.
310    #[cfg(unix)]
311    cmd.process_group(0);
312
313    // Set working directory
314    if let Some(ref cwd) = cwd {
315        cmd.current_dir(cwd);
316    }
317
318    // Set environment
319    if let Some(ref env_vars) = env {
320        for (key, value) in env_vars {
321            cmd.env(key, value);
322        }
323    }
324
325    // Spawn the process
326    let mut child = cmd.spawn()?;
327
328    // Write stdin if needed
329    if let Some(content) = stdin_content {
330        if let Some(mut stdin) = child.stdin.take() {
331            use tokio::io::AsyncWriteExt;
332            let _ = stdin.write_all(content.as_bytes()).await;
333            let _ = stdin.shutdown().await;
334        }
335    }
336
337    // Spawn stdout reader
338    let stdout = child.stdout.take();
339    let tx_stdout = tx.clone();
340    let stdout_handle = stdout.map(|stdout| {
341        tokio::spawn(async move {
342            let mut reader = BufReader::new(stdout);
343            let mut buf = vec![0u8; 8192];
344            loop {
345                use tokio::io::AsyncReadExt;
346                match reader.read(&mut buf).await {
347                    Ok(0) => break,
348                    Ok(n) => {
349                        if tx_stdout
350                            .send(OutputChunk::Stdout(buf[..n].to_vec()))
351                            .await
352                            .is_err()
353                        {
354                            break;
355                        }
356                    }
357                    Err(_) => break,
358                }
359            }
360        })
361    });
362
363    // Spawn stderr reader
364    let stderr = child.stderr.take();
365    let tx_stderr = tx.clone();
366    let stderr_handle = stderr.map(|stderr| {
367        tokio::spawn(async move {
368            let mut reader = BufReader::new(stderr);
369            let mut buf = vec![0u8; 8192];
370            loop {
371                use tokio::io::AsyncReadExt;
372                match reader.read(&mut buf).await {
373                    Ok(0) => break,
374                    Ok(n) => {
375                        if tx_stderr
376                            .send(OutputChunk::Stderr(buf[..n].to_vec()))
377                            .await
378                            .is_err()
379                        {
380                            break;
381                        }
382                    }
383                    Err(_) => break,
384                }
385            }
386        })
387    });
388
389    // Wait for the process to exit OR for a kill request — crucially we do NOT
390    // wait for the readers first. If a grandchild keeps the pipe open the
391    // readers would never finish, so waiting on them before the exit (as the
392    // old implementation did) would hang forever (issue #155).
393    let pid = child.id();
394    let code;
395    tokio::select! {
396        status = child.wait() => {
397            code = status_to_code(status?);
398        }
399        maybe_signal = kill_rx.recv() => {
400            // A kill was requested (explicit kill()/kill_with() or the stream
401            // being dropped). Stop the process group with the requested signal.
402            let signal = maybe_signal.unwrap_or_else(|| DEFAULT_KILL_SIGNAL.to_string());
403            trace_lazy("StreamingRunner", || format!("Kill requested | signal={}", signal));
404            if let Some(pid) = pid {
405                send_signal_to_process(pid, &signal);
406            }
407            // Give it a brief moment to exit on the requested signal, then
408            // escalate to a forceful kill so it always terminates.
409            if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), child.wait())
410                .await
411                .is_err()
412            {
413                let _ = child.start_kill();
414                let _ = child.wait().await;
415            }
416            // Report the conventional 128 + signal code for the requested
417            // signal, matching the JavaScript implementation.
418            code = 128 + signal_number(&signal);
419        }
420    }
421
422    // The process has exited. Give the readers a short grace period to flush any
423    // buffered output, then abort any that are still blocked on an inherited
424    // open pipe so we don't hang.
425    let stdout_abort = stdout_handle.as_ref().map(|h| h.abort_handle());
426    let stderr_abort = stderr_handle.as_ref().map(|h| h.abort_handle());
427    let drain = async {
428        if let Some(handle) = stdout_handle {
429            let _ = handle.await;
430        }
431        if let Some(handle) = stderr_handle {
432            let _ = handle.await;
433        }
434    };
435    if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), drain)
436        .await
437        .is_err()
438    {
439        // A reader is still blocked on an inherited open pipe — abort it so the
440        // exit chunk is delivered without waiting for the grandchild.
441        if let Some(abort) = stdout_abort {
442            abort.abort();
443        }
444        if let Some(abort) = stderr_abort {
445            abort.abort();
446        }
447    }
448
449    // Send exit code (always — even if a reader was aborted).
450    let _ = tx.send(OutputChunk::Exit(code)).await;
451
452    trace_lazy("StreamingRunner", || format!("Exited with code: {}", code));
453
454    Ok(())
455}
456
457/// Convert an exit status into a numeric exit code, using the conventional
458/// `128 + signal` mapping when the process was terminated by a signal.
459fn status_to_code(status: std::process::ExitStatus) -> i32 {
460    if let Some(code) = status.code() {
461        return code;
462    }
463    #[cfg(unix)]
464    {
465        use std::os::unix::process::ExitStatusExt;
466        if let Some(sig) = status.signal() {
467            return 128 + sig;
468        }
469    }
470    -1
471}
472
473/// Map a signal name to its numeric value for the `128 + signal` exit-code
474/// convention. Unknown names fall back to `SIGTERM`.
475fn signal_number(signal: &str) -> i32 {
476    match signal {
477        "SIGHUP" => 1,
478        "SIGINT" => 2,
479        "SIGQUIT" => 3,
480        "SIGKILL" => 9,
481        "SIGUSR1" => 10,
482        "SIGUSR2" => 12,
483        "SIGTERM" => 15,
484        _ => 15,
485    }
486}
487
488/// Send a signal to a process and its process group (best effort).
489#[cfg(unix)]
490fn send_signal_to_process(pid: u32, signal: &str) {
491    use nix::sys::signal::{kill, Signal};
492    use nix::unistd::Pid;
493
494    let sig = match signal {
495        "SIGHUP" => Signal::SIGHUP,
496        "SIGINT" => Signal::SIGINT,
497        "SIGQUIT" => Signal::SIGQUIT,
498        "SIGKILL" => Signal::SIGKILL,
499        "SIGUSR1" => Signal::SIGUSR1,
500        "SIGUSR2" => Signal::SIGUSR2,
501        "SIGTERM" => Signal::SIGTERM,
502        _ => Signal::SIGTERM,
503    };
504
505    // Signal the process itself.
506    let _ = kill(Pid::from_raw(pid as i32), sig);
507    // Signal the whole process group (negative pid) to reach grandchildren.
508    let _ = kill(Pid::from_raw(-(pid as i32)), sig);
509}
510
511/// On non-Unix platforms there is no signal delivery; the forceful
512/// `start_kill()` escalation in the caller handles termination.
513#[cfg(not(unix))]
514fn send_signal_to_process(_pid: u32, _signal: &str) {}
515
516/// Shell configuration
517#[derive(Debug, Clone)]
518struct ShellConfig {
519    cmd: String,
520    args: Vec<String>,
521}
522
523/// Find an available shell
524fn find_available_shell() -> ShellConfig {
525    let is_windows = cfg!(windows);
526
527    if is_windows {
528        ShellConfig {
529            cmd: "cmd.exe".to_string(),
530            args: vec!["/c".to_string()],
531        }
532    } else {
533        let shells = [
534            ("/bin/sh", "-c"),
535            ("/usr/bin/sh", "-c"),
536            ("/bin/bash", "-c"),
537        ];
538
539        for (cmd, arg) in shells {
540            if std::path::Path::new(cmd).exists() {
541                return ShellConfig {
542                    cmd: cmd.to_string(),
543                    args: vec![arg.to_string()],
544                };
545            }
546        }
547
548        ShellConfig {
549            cmd: "/bin/sh".to_string(),
550            args: vec!["-c".to_string()],
551        }
552    }
553}
554
555/// Async iterator trait for output streams
556#[async_trait::async_trait]
557pub trait AsyncIterator {
558    type Item;
559
560    /// Get the next item from the iterator
561    async fn next(&mut self) -> Option<Self::Item>;
562}
563
564#[async_trait::async_trait]
565impl AsyncIterator for OutputStream {
566    type Item = OutputChunk;
567
568    async fn next(&mut self) -> Option<Self::Item> {
569        self.rx.recv().await
570    }
571}
572
573/// Extension trait to convert ProcessRunner into a stream
574pub trait IntoStream {
575    /// Convert into an output stream
576    fn into_stream(self) -> OutputStream;
577}
578
579impl IntoStream for crate::ProcessRunner {
580    fn into_stream(self) -> OutputStream {
581        let streaming = StreamingRunner::new(self.command().to_string());
582        streaming.stream()
583    }
584}