Skip to main content

command_stream/
lib.rs

1//! # command-stream
2//!
3//! Modern shell command execution library with streaming, async iteration, and event support.
4//!
5//! This library provides a Rust equivalent to the JavaScript command-stream library,
6//! offering powerful shell command execution with streaming capabilities.
7//!
8//! ## Features
9//!
10//! - Async command execution with tokio
11//! - Streaming output via async iterators
12//! - Event-based output handling (on, once, emit)
13//! - Virtual commands for common operations (cat, ls, mkdir, etc.)
14//! - Shell operator support (&&, ||, ;, |)
15//! - Pipeline support with `.pipe()` method and `Pipeline` builder
16//! - Global state management for shell settings
17//! - `cmd!` macro for ergonomic command creation (similar to JS `$` tagged template literals)
18//! - Cross-platform support
19//!
20//! ## Module Organization
21//!
22//! The codebase follows a modular architecture similar to the JavaScript implementation:
23//!
24//! - `ansi` - ANSI escape code handling utilities
25//! - `commands` - Virtual command implementations
26//! - `events` - Event emitter for stream events
27//! - `macros` - The `cmd!` macro for ergonomic command creation
28//! - `pipeline` - Pipeline execution support
29//! - `quote` - Shell quoting utilities
30//! - `shell_parser` - Shell command parsing
31//! - `state` - Global state management
32//! - `stream` - Async streaming and iteration support
33//! - `trace` - Logging and tracing utilities
34//! - `utils` - Command results and virtual command helpers
35//!
36//! ## Quick Start
37//!
38//! ```rust,no_run
39//! use command_stream::{run, cmd};
40//!
41//! #[tokio::main]
42//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
43//!     // Execute a simple command
44//!     let result = run("echo hello world").await?;
45//!     println!("{}", result.stdout);
46//!
47//!     // Using the cmd! macro (similar to JS $ tagged template)
48//!     let name = "world";
49//!     let result = cmd!("echo hello {}", name).await?;
50//!     println!("{}", result.stdout);
51//!
52//!     // Using pipelines
53//!     use command_stream::Pipeline;
54//!     let result = Pipeline::new()
55//!         .add("echo hello world")
56//!         .add("grep world")
57//!         .run()
58//!         .await?;
59//!
60//!     Ok(())
61//! }
62//! ```
63
64// Modular utility modules (following JavaScript modular pattern)
65pub mod ansi;
66pub mod events;
67#[doc(hidden)]
68pub mod macros;
69pub mod pipeline;
70pub mod quote;
71pub mod state;
72pub mod stream;
73pub mod trace;
74
75// Core modules
76pub mod commands;
77pub mod shell_parser;
78pub mod utils;
79
80use std::collections::HashMap;
81use std::path::PathBuf;
82use std::process::Stdio;
83use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
84use tokio::process::{Child, Command};
85use tokio::sync::mpsc;
86
87pub use commands::{CommandContext, StreamChunk};
88pub use shell_parser::{needs_real_shell, parse_shell_command, ParsedCommand};
89pub use utils::{CommandResult, VirtualUtils};
90
91// Re-export modular utilities at crate root for convenient access
92pub use ansi::{AnsiConfig, AnsiUtils};
93pub use events::{EventData, EventType, StreamEmitter};
94pub use pipeline::{Pipeline, PipelineBuilder, PipelineExt};
95pub use quote::quote;
96pub use state::{
97    get_shell_settings, global_state, reset_global_state, set_shell_option, unset_shell_option,
98    GlobalState, ShellSettings,
99};
100pub use stream::{AsyncIterator, IntoStream, OutputChunk, OutputStream, StreamingRunner};
101pub use trace::trace;
102
103/// Resolve a working directory that is safe to spawn a child process in.
104///
105/// When no explicit cwd is requested the child normally inherits the parent's
106/// working directory. But if that directory has been deleted or become
107/// inaccessible (the "getcwd() failed" scenario from issue #44), inheriting it
108/// makes the OS-level spawn fail. In that case fall back to a directory that is
109/// known to exist so the command still runs.
110///
111/// Normal behavior is preserved: when an explicit cwd is given, or when the
112/// inherited working directory is valid, this returns the requested value
113/// (`None` meaning "inherit").
114fn resolve_spawn_cwd(cwd: Option<&PathBuf>) -> Option<PathBuf> {
115    // An explicit directory is always honored as-is.
116    if let Some(c) = cwd {
117        return Some(c.clone());
118    }
119
120    // No explicit cwd: we would inherit the parent's working directory. Make
121    // sure that directory is actually usable before relying on inheritance.
122    match std::env::current_dir() {
123        Ok(_) => None,
124        Err(e) => {
125            let fallback = std::env::var_os("HOME")
126                .or_else(|| std::env::var_os("USERPROFILE"))
127                .map(PathBuf::from)
128                .unwrap_or_else(std::env::temp_dir);
129            trace(
130                "ProcessRunner",
131                &format!(
132                    "current_dir() failed ({}); spawning in fallback directory {}",
133                    e,
134                    fallback.display()
135                ),
136            );
137            if fallback.exists() {
138                Some(fallback)
139            } else {
140                Some(std::env::temp_dir())
141            }
142        }
143    }
144}
145
146/// Error type for command-stream operations
147#[derive(Debug, thiserror::Error)]
148pub enum Error {
149    #[error("IO error: {0}")]
150    Io(#[from] std::io::Error),
151
152    #[error("Command failed with exit code {code}: {message}")]
153    CommandFailed { code: i32, message: String },
154
155    #[error("Command not found: {0}")]
156    CommandNotFound(String),
157
158    #[error("Parse error: {0}")]
159    ParseError(String),
160
161    #[error("Cancelled")]
162    Cancelled,
163}
164
165/// Result type for command-stream operations
166pub type Result<T> = std::result::Result<T, Error>;
167
168/// Options for command execution
169#[derive(Debug, Clone)]
170pub struct RunOptions {
171    /// Mirror output to parent stdout/stderr
172    pub mirror: bool,
173    /// Capture output in result
174    pub capture: bool,
175    /// Standard input handling
176    pub stdin: StdinOption,
177    /// Working directory
178    pub cwd: Option<PathBuf>,
179    /// Environment variables
180    pub env: Option<HashMap<String, String>>,
181    /// Interactive mode (TTY forwarding)
182    pub interactive: bool,
183    /// Enable shell operator parsing
184    pub shell_operators: bool,
185    /// Enable tracing for this command
186    pub trace: bool,
187}
188
189impl Default for RunOptions {
190    fn default() -> Self {
191        RunOptions {
192            mirror: true,
193            capture: true,
194            stdin: StdinOption::Inherit,
195            cwd: None,
196            env: None,
197            interactive: false,
198            shell_operators: true,
199            trace: true,
200        }
201    }
202}
203
204/// Standard input options
205#[derive(Debug, Clone)]
206pub enum StdinOption {
207    /// Inherit from parent process
208    Inherit,
209    /// Pipe (allow writing to stdin)
210    Pipe,
211    /// Provide string content
212    Content(String),
213    /// Null device
214    Null,
215}
216
217/// A running or completed process
218pub struct ProcessRunner {
219    command: String,
220    options: RunOptions,
221    child: Option<Child>,
222    result: Option<CommandResult>,
223    started: bool,
224    finished: bool,
225    cancelled: bool,
226    output_tx: Option<mpsc::Sender<StreamChunk>>,
227    output_rx: Option<mpsc::Receiver<StreamChunk>>,
228}
229
230impl ProcessRunner {
231    /// Create a new process runner
232    pub fn new(command: impl Into<String>, options: RunOptions) -> Self {
233        let (tx, rx) = mpsc::channel(1024);
234        ProcessRunner {
235            command: command.into(),
236            options,
237            child: None,
238            result: None,
239            started: false,
240            finished: false,
241            cancelled: false,
242            output_tx: Some(tx),
243            output_rx: Some(rx),
244        }
245    }
246
247    /// Start the process
248    pub async fn start(&mut self) -> Result<()> {
249        if self.started {
250            return Ok(());
251        }
252        self.started = true;
253
254        utils::trace_lazy("ProcessRunner", || {
255            format!("Starting command: {}", self.command)
256        });
257
258        // Check if this is a virtual command
259        let first_word = self.command.split_whitespace().next().unwrap_or("");
260        if let Some(result) = self.try_virtual_command(first_word).await {
261            self.result = Some(result);
262            self.finished = true;
263            return Ok(());
264        }
265
266        // Parse command for shell operators (for future use with virtual command pipelines)
267        let _parsed = if self.options.shell_operators && !needs_real_shell(&self.command) {
268            parse_shell_command(&self.command)
269        } else {
270            None
271        };
272
273        // Execute via real shell if needed
274        let shell = find_available_shell();
275
276        let mut cmd = Command::new(&shell.cmd);
277        for arg in &shell.args {
278            cmd.arg(arg);
279        }
280        cmd.arg(&self.command);
281
282        // Configure stdin
283        match &self.options.stdin {
284            StdinOption::Inherit => {
285                cmd.stdin(Stdio::inherit());
286            }
287            StdinOption::Pipe => {
288                cmd.stdin(Stdio::piped());
289            }
290            StdinOption::Content(_) => {
291                cmd.stdin(Stdio::piped());
292            }
293            StdinOption::Null => {
294                cmd.stdin(Stdio::null());
295            }
296        }
297
298        // Configure stdout/stderr
299        if self.options.capture || self.options.mirror {
300            cmd.stdout(Stdio::piped());
301            cmd.stderr(Stdio::piped());
302        } else {
303            cmd.stdout(Stdio::inherit());
304            cmd.stderr(Stdio::inherit());
305        }
306
307        // Set working directory. Fall back to a valid directory when the
308        // inherited working directory has been deleted (issue #44).
309        if let Some(cwd) = resolve_spawn_cwd(self.options.cwd.as_ref()) {
310            cmd.current_dir(cwd);
311        }
312
313        // Set environment
314        if let Some(ref env_vars) = self.options.env {
315            for (key, value) in env_vars {
316                cmd.env(key, value);
317            }
318        }
319
320        // Spawn the process
321        let child = cmd.spawn()?;
322        self.child = Some(child);
323
324        Ok(())
325    }
326
327    /// Run the process to completion
328    pub async fn run(&mut self) -> Result<CommandResult> {
329        self.start().await?;
330
331        if let Some(result) = &self.result {
332            return Ok(result.clone());
333        }
334
335        let mut child = self.child.take().ok_or_else(|| {
336            Error::Io(std::io::Error::new(
337                std::io::ErrorKind::Other,
338                "Process not started",
339            ))
340        })?;
341
342        // Handle stdin content if provided
343        if let StdinOption::Content(ref content) = self.options.stdin {
344            if let Some(mut stdin) = child.stdin.take() {
345                let content = content.clone();
346                tokio::spawn(async move {
347                    let _ = stdin.write_all(content.as_bytes()).await;
348                    let _ = stdin.shutdown().await;
349                });
350            }
351        }
352
353        // Collect output
354        let mut stdout_content = String::new();
355        let mut stderr_content = String::new();
356
357        if let Some(stdout) = child.stdout.take() {
358            let mut reader = BufReader::new(stdout).lines();
359            while let Ok(Some(line)) = reader.next_line().await {
360                if self.options.mirror {
361                    println!("{}", line);
362                }
363                stdout_content.push_str(&line);
364                stdout_content.push('\n');
365            }
366        }
367
368        if let Some(stderr) = child.stderr.take() {
369            let mut reader = BufReader::new(stderr).lines();
370            while let Ok(Some(line)) = reader.next_line().await {
371                if self.options.mirror {
372                    eprintln!("{}", line);
373                }
374                stderr_content.push_str(&line);
375                stderr_content.push('\n');
376            }
377        }
378
379        let status = child.wait().await?;
380        let code = status.code().unwrap_or(-1);
381
382        let result = CommandResult {
383            stdout: stdout_content,
384            stderr: stderr_content,
385            code,
386        };
387
388        self.result = Some(result.clone());
389        self.finished = true;
390
391        Ok(result)
392    }
393
394    /// Try to execute as a virtual command
395    async fn try_virtual_command(&self, cmd_name: &str) -> Option<CommandResult> {
396        if !commands::are_virtual_commands_enabled() {
397            return None;
398        }
399
400        // Parse args from command string
401        let parts: Vec<&str> = self.command.split_whitespace().collect();
402        let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
403
404        let ctx = CommandContext {
405            args,
406            stdin: match &self.options.stdin {
407                StdinOption::Content(s) => Some(s.clone()),
408                _ => None,
409            },
410            cwd: self.options.cwd.clone(),
411            env: self.options.env.clone(),
412            output_tx: self.output_tx.clone(),
413            is_cancelled: None,
414        };
415
416        match cmd_name {
417            "echo" => Some(commands::echo(ctx).await),
418            "pwd" => Some(commands::pwd(ctx).await),
419            "cd" => Some(commands::cd(ctx).await),
420            "true" => Some(commands::r#true(ctx).await),
421            "false" => Some(commands::r#false(ctx).await),
422            "sleep" => Some(commands::sleep(ctx).await),
423            "cat" => Some(commands::cat(ctx).await),
424            "ls" => Some(commands::ls(ctx).await),
425            "mkdir" => Some(commands::mkdir(ctx).await),
426            "rm" => Some(commands::rm(ctx).await),
427            "touch" => Some(commands::touch(ctx).await),
428            "cp" => Some(commands::cp(ctx).await),
429            "mv" => Some(commands::mv(ctx).await),
430            "basename" => Some(commands::basename(ctx).await),
431            "dirname" => Some(commands::dirname(ctx).await),
432            "env" => Some(commands::env(ctx).await),
433            "exit" => Some(commands::exit(ctx).await),
434            "which" => Some(commands::which(ctx).await),
435            "yes" => Some(commands::yes(ctx).await),
436            "seq" => Some(commands::seq(ctx).await),
437            "test" => Some(commands::test(ctx).await),
438            _ => None,
439        }
440    }
441
442    /// Kill the process
443    pub fn kill(&mut self) -> Result<()> {
444        self.cancelled = true;
445        if let Some(ref mut child) = self.child {
446            child.start_kill()?;
447        }
448        Ok(())
449    }
450
451    /// Check if the process is finished
452    pub fn is_finished(&self) -> bool {
453        self.finished
454    }
455
456    /// Get the result if available
457    pub fn result(&self) -> Option<&CommandResult> {
458        self.result.as_ref()
459    }
460
461    /// Get the command string
462    pub fn command(&self) -> &str {
463        &self.command
464    }
465
466    /// Get the options
467    pub fn options(&self) -> &RunOptions {
468        &self.options
469    }
470}
471
472/// Shell configuration
473#[derive(Debug, Clone)]
474struct ShellConfig {
475    cmd: String,
476    args: Vec<String>,
477}
478
479/// Find an available shell
480fn find_available_shell() -> ShellConfig {
481    let is_windows = cfg!(windows);
482
483    if is_windows {
484        // Windows shells
485        let shells = [
486            ("cmd.exe", vec!["/c"]),
487            ("powershell.exe", vec!["-Command"]),
488        ];
489
490        for (cmd, args) in shells {
491            if which::which(cmd).is_ok() {
492                return ShellConfig {
493                    cmd: cmd.to_string(),
494                    args: args.into_iter().map(String::from).collect(),
495                };
496            }
497        }
498
499        ShellConfig {
500            cmd: "cmd.exe".to_string(),
501            args: vec!["/c".to_string()],
502        }
503    } else {
504        // Unix shells
505        let shells = [
506            ("/bin/sh", vec!["-c"]),
507            ("/usr/bin/sh", vec!["-c"]),
508            ("/bin/bash", vec!["-c"]),
509            ("sh", vec!["-c"]),
510        ];
511
512        for (cmd, args) in shells {
513            if std::path::Path::new(cmd).exists() || which::which(cmd).is_ok() {
514                return ShellConfig {
515                    cmd: cmd.to_string(),
516                    args: args.into_iter().map(String::from).collect(),
517                };
518            }
519        }
520
521        ShellConfig {
522            cmd: "/bin/sh".to_string(),
523            args: vec!["-c".to_string()],
524        }
525    }
526}
527
528/// Execute a command and return the result
529///
530/// This is the main entry point for simple command execution.
531/// Named `run` instead of `$` since `$` is not a valid Rust identifier.
532pub async fn run(command: impl Into<String>) -> Result<CommandResult> {
533    let mut runner = ProcessRunner::new(command, RunOptions::default());
534    runner.run().await
535}
536
537/// Alias for `run` function - for JavaScript-like API feel
538/// Since `$` is not valid in Rust, this provides a similar short name
539pub use run as execute;
540
541/// Execute a command with custom options
542pub async fn exec(command: impl Into<String>, options: RunOptions) -> Result<CommandResult> {
543    let mut runner = ProcessRunner::new(command, options);
544    runner.run().await
545}
546
547/// Create a new process runner without starting it
548pub fn create(command: impl Into<String>, options: RunOptions) -> ProcessRunner {
549    ProcessRunner::new(command, options)
550}
551
552/// Execute a command synchronously (blocking)
553pub fn run_sync(command: impl Into<String>) -> Result<CommandResult> {
554    let rt = tokio::runtime::Runtime::new()?;
555    rt.block_on(run(command))
556}
557
558// Tests are located in tests/ directory for better organization