Skip to main content

command_stream/
lib.rs

1//! # command-stream
2//!
3//! Modern shell command execution library with streaming, async iteration, and event support.
4//!
5//! This library provides a Rust equivalent to the JavaScript command-stream library,
6//! offering powerful shell command execution with streaming capabilities.
7//!
8//! ## Features
9//!
10//! - Async command execution with tokio
11//! - Streaming output via async iterators
12//! - Event-based output handling (on, once, emit)
13//! - Virtual commands for common operations (cat, ls, mkdir, etc.)
14//! - Shell operator support (&&, ||, ;, |)
15//! - Pipeline support with `.pipe()` method and `Pipeline` builder
16//! - Global state management for shell settings
17//! - `cmd!` macro for ergonomic command creation (similar to JS `$` tagged template literals)
18//! - Cross-platform support
19//!
20//! ## Module Organization
21//!
22//! The codebase follows a modular architecture similar to the JavaScript implementation:
23//!
24//! - `ansi` - ANSI escape code handling utilities
25//! - `commands` - Virtual command implementations
26//! - `events` - Event emitter for stream events
27//! - `macros` - The `cmd!` macro for ergonomic command creation
28//! - `pipeline` - Pipeline execution support
29//! - `quote` - Shell quoting utilities
30//! - `shell_parser` - Shell command parsing
31//! - `state` - Global state management
32//! - `stream` - Async streaming and iteration support
33//! - `trace` - Logging and tracing utilities
34//! - `utils` - Command results and virtual command helpers
35//!
36//! ## Quick Start
37//!
38//! ```rust,no_run
39//! use command_stream::{run, cmd};
40//!
41//! #[tokio::main]
42//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
43//!     // Execute a simple command
44//!     let result = run("echo hello world").await?;
45//!     println!("{}", result.stdout);
46//!
47//!     // Using the cmd! macro (similar to JS $ tagged template)
48//!     let name = "world";
49//!     let result = cmd!("echo hello {}", name).await?;
50//!     println!("{}", result.stdout);
51//!
52//!     // Using pipelines
53//!     use command_stream::Pipeline;
54//!     let result = Pipeline::new()
55//!         .add("echo hello world")
56//!         .add("grep world")
57//!         .run()
58//!         .await?;
59//!
60//!     Ok(())
61//! }
62//! ```
63
64// Modular utility modules (following JavaScript modular pattern)
65pub mod ansi;
66pub mod events;
67#[doc(hidden)]
68pub mod macros;
69pub mod pipeline;
70pub mod quote;
71pub mod state;
72pub mod stream;
73pub mod trace;
74
75// Core modules
76pub mod commands;
77pub mod shell_parser;
78pub mod utils;
79
80use std::collections::HashMap;
81use std::path::PathBuf;
82use std::process::Stdio;
83use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
84use tokio::process::{Child, Command};
85use tokio::sync::mpsc;
86
87pub use commands::{CommandContext, StreamChunk};
88pub use shell_parser::{needs_real_shell, parse_shell_command, ParsedCommand};
89pub use utils::{CommandResult, VirtualUtils};
90
91// Re-export modular utilities at crate root for convenient access
92pub use ansi::{AnsiConfig, AnsiUtils};
93pub use events::{EventData, EventType, StreamEmitter};
94pub use pipeline::{Pipeline, PipelineBuilder, PipelineExt};
95pub use quote::quote;
96pub use state::{
97    get_shell_settings, global_state, reset_global_state, set_shell_option, unset_shell_option,
98    GlobalState, ShellSettings,
99};
100pub use stream::{AsyncIterator, IntoStream, OutputChunk, OutputStream, StreamingRunner};
101pub use trace::trace;
102
103/// Error type for command-stream operations
104#[derive(Debug, thiserror::Error)]
105pub enum Error {
106    #[error("IO error: {0}")]
107    Io(#[from] std::io::Error),
108
109    #[error("Command failed with exit code {code}: {message}")]
110    CommandFailed { code: i32, message: String },
111
112    #[error("Command not found: {0}")]
113    CommandNotFound(String),
114
115    #[error("Parse error: {0}")]
116    ParseError(String),
117
118    #[error("Cancelled")]
119    Cancelled,
120}
121
122/// Result type for command-stream operations
123pub type Result<T> = std::result::Result<T, Error>;
124
125/// Options for command execution
126#[derive(Debug, Clone)]
127pub struct RunOptions {
128    /// Mirror output to parent stdout/stderr
129    pub mirror: bool,
130    /// Capture output in result
131    pub capture: bool,
132    /// Standard input handling
133    pub stdin: StdinOption,
134    /// Working directory
135    pub cwd: Option<PathBuf>,
136    /// Environment variables
137    pub env: Option<HashMap<String, String>>,
138    /// Interactive mode (TTY forwarding)
139    pub interactive: bool,
140    /// Enable shell operator parsing
141    pub shell_operators: bool,
142    /// Enable tracing for this command
143    pub trace: bool,
144}
145
146impl Default for RunOptions {
147    fn default() -> Self {
148        RunOptions {
149            mirror: true,
150            capture: true,
151            stdin: StdinOption::Inherit,
152            cwd: None,
153            env: None,
154            interactive: false,
155            shell_operators: true,
156            trace: true,
157        }
158    }
159}
160
161/// Standard input options
162#[derive(Debug, Clone)]
163pub enum StdinOption {
164    /// Inherit from parent process
165    Inherit,
166    /// Pipe (allow writing to stdin)
167    Pipe,
168    /// Provide string content
169    Content(String),
170    /// Null device
171    Null,
172}
173
174/// A running or completed process
175pub struct ProcessRunner {
176    command: String,
177    options: RunOptions,
178    child: Option<Child>,
179    result: Option<CommandResult>,
180    started: bool,
181    finished: bool,
182    cancelled: bool,
183    output_tx: Option<mpsc::Sender<StreamChunk>>,
184    output_rx: Option<mpsc::Receiver<StreamChunk>>,
185}
186
187impl ProcessRunner {
188    /// Create a new process runner
189    pub fn new(command: impl Into<String>, options: RunOptions) -> Self {
190        let (tx, rx) = mpsc::channel(1024);
191        ProcessRunner {
192            command: command.into(),
193            options,
194            child: None,
195            result: None,
196            started: false,
197            finished: false,
198            cancelled: false,
199            output_tx: Some(tx),
200            output_rx: Some(rx),
201        }
202    }
203
204    /// Start the process
205    pub async fn start(&mut self) -> Result<()> {
206        if self.started {
207            return Ok(());
208        }
209        self.started = true;
210
211        utils::trace_lazy("ProcessRunner", || {
212            format!("Starting command: {}", self.command)
213        });
214
215        // Check if this is a virtual command
216        let first_word = self.command.split_whitespace().next().unwrap_or("");
217        if let Some(result) = self.try_virtual_command(first_word).await {
218            self.result = Some(result);
219            self.finished = true;
220            return Ok(());
221        }
222
223        // Parse command for shell operators (for future use with virtual command pipelines)
224        let _parsed = if self.options.shell_operators && !needs_real_shell(&self.command) {
225            parse_shell_command(&self.command)
226        } else {
227            None
228        };
229
230        // Execute via real shell if needed
231        let shell = find_available_shell();
232
233        let mut cmd = Command::new(&shell.cmd);
234        for arg in &shell.args {
235            cmd.arg(arg);
236        }
237        cmd.arg(&self.command);
238
239        // Configure stdin
240        match &self.options.stdin {
241            StdinOption::Inherit => {
242                cmd.stdin(Stdio::inherit());
243            }
244            StdinOption::Pipe => {
245                cmd.stdin(Stdio::piped());
246            }
247            StdinOption::Content(_) => {
248                cmd.stdin(Stdio::piped());
249            }
250            StdinOption::Null => {
251                cmd.stdin(Stdio::null());
252            }
253        }
254
255        // Configure stdout/stderr
256        if self.options.capture || self.options.mirror {
257            cmd.stdout(Stdio::piped());
258            cmd.stderr(Stdio::piped());
259        } else {
260            cmd.stdout(Stdio::inherit());
261            cmd.stderr(Stdio::inherit());
262        }
263
264        // Set working directory
265        if let Some(ref cwd) = self.options.cwd {
266            cmd.current_dir(cwd);
267        }
268
269        // Set environment
270        if let Some(ref env_vars) = self.options.env {
271            for (key, value) in env_vars {
272                cmd.env(key, value);
273            }
274        }
275
276        // Spawn the process
277        let child = cmd.spawn()?;
278        self.child = Some(child);
279
280        Ok(())
281    }
282
283    /// Run the process to completion
284    pub async fn run(&mut self) -> Result<CommandResult> {
285        self.start().await?;
286
287        if let Some(result) = &self.result {
288            return Ok(result.clone());
289        }
290
291        let mut child = self.child.take().ok_or_else(|| {
292            Error::Io(std::io::Error::new(
293                std::io::ErrorKind::Other,
294                "Process not started",
295            ))
296        })?;
297
298        // Handle stdin content if provided
299        if let StdinOption::Content(ref content) = self.options.stdin {
300            if let Some(mut stdin) = child.stdin.take() {
301                let content = content.clone();
302                tokio::spawn(async move {
303                    let _ = stdin.write_all(content.as_bytes()).await;
304                    let _ = stdin.shutdown().await;
305                });
306            }
307        }
308
309        // Collect output
310        let mut stdout_content = String::new();
311        let mut stderr_content = String::new();
312
313        if let Some(stdout) = child.stdout.take() {
314            let mut reader = BufReader::new(stdout).lines();
315            while let Ok(Some(line)) = reader.next_line().await {
316                if self.options.mirror {
317                    println!("{}", line);
318                }
319                stdout_content.push_str(&line);
320                stdout_content.push('\n');
321            }
322        }
323
324        if let Some(stderr) = child.stderr.take() {
325            let mut reader = BufReader::new(stderr).lines();
326            while let Ok(Some(line)) = reader.next_line().await {
327                if self.options.mirror {
328                    eprintln!("{}", line);
329                }
330                stderr_content.push_str(&line);
331                stderr_content.push('\n');
332            }
333        }
334
335        let status = child.wait().await?;
336        let code = status.code().unwrap_or(-1);
337
338        let result = CommandResult {
339            stdout: stdout_content,
340            stderr: stderr_content,
341            code,
342        };
343
344        self.result = Some(result.clone());
345        self.finished = true;
346
347        Ok(result)
348    }
349
350    /// Try to execute as a virtual command
351    async fn try_virtual_command(&self, cmd_name: &str) -> Option<CommandResult> {
352        if !commands::are_virtual_commands_enabled() {
353            return None;
354        }
355
356        // Parse args from command string
357        let parts: Vec<&str> = self.command.split_whitespace().collect();
358        let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
359
360        let ctx = CommandContext {
361            args,
362            stdin: match &self.options.stdin {
363                StdinOption::Content(s) => Some(s.clone()),
364                _ => None,
365            },
366            cwd: self.options.cwd.clone(),
367            env: self.options.env.clone(),
368            output_tx: self.output_tx.clone(),
369            is_cancelled: None,
370        };
371
372        match cmd_name {
373            "echo" => Some(commands::echo(ctx).await),
374            "pwd" => Some(commands::pwd(ctx).await),
375            "cd" => Some(commands::cd(ctx).await),
376            "true" => Some(commands::r#true(ctx).await),
377            "false" => Some(commands::r#false(ctx).await),
378            "sleep" => Some(commands::sleep(ctx).await),
379            "cat" => Some(commands::cat(ctx).await),
380            "ls" => Some(commands::ls(ctx).await),
381            "mkdir" => Some(commands::mkdir(ctx).await),
382            "rm" => Some(commands::rm(ctx).await),
383            "touch" => Some(commands::touch(ctx).await),
384            "cp" => Some(commands::cp(ctx).await),
385            "mv" => Some(commands::mv(ctx).await),
386            "basename" => Some(commands::basename(ctx).await),
387            "dirname" => Some(commands::dirname(ctx).await),
388            "env" => Some(commands::env(ctx).await),
389            "exit" => Some(commands::exit(ctx).await),
390            "which" => Some(commands::which(ctx).await),
391            "yes" => Some(commands::yes(ctx).await),
392            "seq" => Some(commands::seq(ctx).await),
393            "test" => Some(commands::test(ctx).await),
394            _ => None,
395        }
396    }
397
398    /// Kill the process
399    pub fn kill(&mut self) -> Result<()> {
400        self.cancelled = true;
401        if let Some(ref mut child) = self.child {
402            child.start_kill()?;
403        }
404        Ok(())
405    }
406
407    /// Check if the process is finished
408    pub fn is_finished(&self) -> bool {
409        self.finished
410    }
411
412    /// Get the result if available
413    pub fn result(&self) -> Option<&CommandResult> {
414        self.result.as_ref()
415    }
416
417    /// Get the command string
418    pub fn command(&self) -> &str {
419        &self.command
420    }
421
422    /// Get the options
423    pub fn options(&self) -> &RunOptions {
424        &self.options
425    }
426}
427
428/// Shell configuration
429#[derive(Debug, Clone)]
430struct ShellConfig {
431    cmd: String,
432    args: Vec<String>,
433}
434
435/// Find an available shell
436fn find_available_shell() -> ShellConfig {
437    let is_windows = cfg!(windows);
438
439    if is_windows {
440        // Windows shells
441        let shells = [
442            ("cmd.exe", vec!["/c"]),
443            ("powershell.exe", vec!["-Command"]),
444        ];
445
446        for (cmd, args) in shells {
447            if which::which(cmd).is_ok() {
448                return ShellConfig {
449                    cmd: cmd.to_string(),
450                    args: args.into_iter().map(String::from).collect(),
451                };
452            }
453        }
454
455        ShellConfig {
456            cmd: "cmd.exe".to_string(),
457            args: vec!["/c".to_string()],
458        }
459    } else {
460        // Unix shells
461        let shells = [
462            ("/bin/sh", vec!["-c"]),
463            ("/usr/bin/sh", vec!["-c"]),
464            ("/bin/bash", vec!["-c"]),
465            ("sh", vec!["-c"]),
466        ];
467
468        for (cmd, args) in shells {
469            if std::path::Path::new(cmd).exists() || which::which(cmd).is_ok() {
470                return ShellConfig {
471                    cmd: cmd.to_string(),
472                    args: args.into_iter().map(String::from).collect(),
473                };
474            }
475        }
476
477        ShellConfig {
478            cmd: "/bin/sh".to_string(),
479            args: vec!["-c".to_string()],
480        }
481    }
482}
483
484/// Execute a command and return the result
485///
486/// This is the main entry point for simple command execution.
487/// Named `run` instead of `$` since `$` is not a valid Rust identifier.
488pub async fn run(command: impl Into<String>) -> Result<CommandResult> {
489    let mut runner = ProcessRunner::new(command, RunOptions::default());
490    runner.run().await
491}
492
493/// Alias for `run` function - for JavaScript-like API feel
494/// Since `$` is not valid in Rust, this provides a similar short name
495pub use run as execute;
496
497/// Execute a command with custom options
498pub async fn exec(command: impl Into<String>, options: RunOptions) -> Result<CommandResult> {
499    let mut runner = ProcessRunner::new(command, options);
500    runner.run().await
501}
502
503/// Create a new process runner without starting it
504pub fn create(command: impl Into<String>, options: RunOptions) -> ProcessRunner {
505    ProcessRunner::new(command, options)
506}
507
508/// Execute a command synchronously (blocking)
509pub fn run_sync(command: impl Into<String>) -> Result<CommandResult> {
510    let rt = tokio::runtime::Runtime::new()?;
511    rt.block_on(run(command))
512}
513
514// Tests are located in tests/ directory for better organization