Skip to main content

command_stream/
pipeline.rs

1//! Pipeline execution support
2//!
3//! This module provides pipeline functionality similar to the JavaScript
4//! `$.process-runner-pipeline.mjs` module. It allows chaining commands
5//! together with the output of one command becoming the input of the next.
6//!
7//! ## Usage
8//!
9//! ```rust,no_run
10//! use command_stream::{Pipeline, run};
11//!
12//! #[tokio::main]
13//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
14//!     // Create a pipeline
15//!     let result = Pipeline::new()
16//!         .add("echo hello world")
17//!         .add("grep world")
18//!         .add("wc -l")
19//!         .run()
20//!         .await?;
21//!
22//!     println!("Output: {}", result.stdout);
23//!     Ok(())
24//! }
25//! ```
26
27use std::collections::HashMap;
28use std::path::PathBuf;
29use std::process::Stdio;
30use tokio::io::{AsyncReadExt, AsyncWriteExt};
31use tokio::process::Command;
32
33use crate::trace::trace_lazy;
34use crate::{CommandResult, Result, RunOptions, StdinOption};
35
36/// A pipeline of commands to be executed sequentially
37///
38/// Each command's stdout is piped to the next command's stdin.
39#[derive(Debug, Clone)]
40pub struct Pipeline {
41    /// Commands in the pipeline
42    commands: Vec<String>,
43    /// Initial stdin content (optional)
44    stdin: Option<String>,
45    /// Working directory
46    cwd: Option<PathBuf>,
47    /// Environment variables
48    env: Option<HashMap<String, String>>,
49    /// Whether to mirror output to parent stdout/stderr
50    mirror: bool,
51    /// Whether to capture output
52    capture: bool,
53}
54
55impl Default for Pipeline {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl Pipeline {
62    /// Create a new empty pipeline
63    pub fn new() -> Self {
64        Pipeline {
65            commands: Vec::new(),
66            stdin: None,
67            cwd: None,
68            env: None,
69            mirror: true,
70            capture: true,
71        }
72    }
73
74    /// Add a command to the pipeline
75    pub fn add(mut self, command: impl Into<String>) -> Self {
76        self.commands.push(command.into());
77        self
78    }
79
80    /// Set the initial stdin content for the first command
81    pub fn stdin(mut self, content: impl Into<String>) -> Self {
82        self.stdin = Some(content.into());
83        self
84    }
85
86    /// Set the working directory for all commands
87    pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
88        self.cwd = Some(path.into());
89        self
90    }
91
92    /// Set environment variables for all commands
93    pub fn env(mut self, env: HashMap<String, String>) -> Self {
94        self.env = Some(env);
95        self
96    }
97
98    /// Set whether to mirror output to stdout/stderr
99    pub fn mirror_output(mut self, mirror: bool) -> Self {
100        self.mirror = mirror;
101        self
102    }
103
104    /// Set whether to capture output
105    pub fn capture_output(mut self, capture: bool) -> Self {
106        self.capture = capture;
107        self
108    }
109
110    /// Execute the pipeline and return the result
111    pub async fn run(self) -> Result<CommandResult> {
112        if self.commands.is_empty() {
113            return Ok(CommandResult {
114                stdout: String::new(),
115                stderr: "No commands in pipeline".to_string(),
116                code: 1,
117            });
118        }
119
120        trace_lazy("Pipeline", || {
121            format!("Running pipeline with {} commands", self.commands.len())
122        });
123
124        let mut current_stdin = self.stdin.clone();
125        let mut last_result = CommandResult {
126            stdout: String::new(),
127            stderr: String::new(),
128            code: 0,
129        };
130        let mut accumulated_stderr = String::new();
131
132        for (i, cmd_str) in self.commands.iter().enumerate() {
133            let is_last = i == self.commands.len() - 1;
134
135            trace_lazy("Pipeline", || {
136                format!(
137                    "Executing command {}/{}: {}",
138                    i + 1,
139                    self.commands.len(),
140                    cmd_str
141                )
142            });
143
144            // Check if this is a virtual command
145            let first_word = cmd_str.split_whitespace().next().unwrap_or("");
146            if crate::commands::are_virtual_commands_enabled() {
147                if let Some(result) = self
148                    .try_virtual_command(first_word, cmd_str, &current_stdin)
149                    .await
150                {
151                    if result.code != 0 {
152                        return Ok(CommandResult {
153                            stdout: result.stdout,
154                            stderr: accumulated_stderr + &result.stderr,
155                            code: result.code,
156                        });
157                    }
158                    current_stdin = Some(result.stdout.clone());
159                    accumulated_stderr.push_str(&result.stderr);
160                    last_result = result;
161                    continue;
162                }
163            }
164
165            // Execute via shell
166            let shell = find_available_shell();
167            let mut cmd = Command::new(&shell.cmd);
168            for arg in &shell.args {
169                cmd.arg(arg);
170            }
171            cmd.arg(cmd_str);
172
173            // Configure stdio
174            cmd.stdin(Stdio::piped());
175            cmd.stdout(Stdio::piped());
176            cmd.stderr(Stdio::piped());
177
178            // Set working directory
179            if let Some(ref cwd) = self.cwd {
180                cmd.current_dir(cwd);
181            }
182
183            // Set environment
184            if let Some(ref env_vars) = self.env {
185                for (key, value) in env_vars {
186                    cmd.env(key, value);
187                }
188            }
189
190            // Spawn the process
191            let mut child = cmd.spawn()?;
192
193            // Write stdin if available
194            if let Some(ref stdin_content) = current_stdin {
195                if let Some(mut stdin) = child.stdin.take() {
196                    let content = stdin_content.clone();
197                    tokio::spawn(async move {
198                        let _ = stdin.write_all(content.as_bytes()).await;
199                        let _ = stdin.shutdown().await;
200                    });
201                }
202            }
203
204            // Read stdout
205            let mut stdout_content = String::new();
206            if let Some(mut stdout) = child.stdout.take() {
207                stdout.read_to_string(&mut stdout_content).await?;
208            }
209
210            // Read stderr
211            let mut stderr_content = String::new();
212            if let Some(mut stderr) = child.stderr.take() {
213                stderr.read_to_string(&mut stderr_content).await?;
214            }
215
216            // Mirror output if enabled and this is the last command
217            if is_last && self.mirror {
218                if !stdout_content.is_empty() {
219                    print!("{}", stdout_content);
220                }
221                if !stderr_content.is_empty() {
222                    eprint!("{}", stderr_content);
223                }
224            }
225
226            // Wait for the process
227            let status = child.wait().await?;
228            let code = status.code().unwrap_or(-1);
229
230            accumulated_stderr.push_str(&stderr_content);
231
232            if code != 0 {
233                return Ok(CommandResult {
234                    stdout: stdout_content,
235                    stderr: accumulated_stderr,
236                    code,
237                });
238            }
239
240            // Set up stdin for next command
241            current_stdin = Some(stdout_content.clone());
242            last_result = CommandResult {
243                stdout: stdout_content,
244                stderr: String::new(),
245                code,
246            };
247        }
248
249        Ok(CommandResult {
250            stdout: last_result.stdout,
251            stderr: accumulated_stderr,
252            code: last_result.code,
253        })
254    }
255
256    /// Try to execute a virtual command
257    async fn try_virtual_command(
258        &self,
259        cmd_name: &str,
260        full_cmd: &str,
261        stdin: &Option<String>,
262    ) -> Option<CommandResult> {
263        let parts: Vec<&str> = full_cmd.split_whitespace().collect();
264        let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
265
266        let ctx = crate::commands::CommandContext {
267            args,
268            stdin: stdin.clone(),
269            cwd: self.cwd.clone(),
270            env: self.env.clone(),
271            output_tx: None,
272            is_cancelled: None,
273        };
274
275        match cmd_name {
276            "echo" => Some(crate::commands::echo(ctx).await),
277            "pwd" => Some(crate::commands::pwd(ctx).await),
278            "cd" => Some(crate::commands::cd(ctx).await),
279            "true" => Some(crate::commands::r#true(ctx).await),
280            "false" => Some(crate::commands::r#false(ctx).await),
281            "sleep" => Some(crate::commands::sleep(ctx).await),
282            "cat" => Some(crate::commands::cat(ctx).await),
283            "ls" => Some(crate::commands::ls(ctx).await),
284            "mkdir" => Some(crate::commands::mkdir(ctx).await),
285            "rm" => Some(crate::commands::rm(ctx).await),
286            "touch" => Some(crate::commands::touch(ctx).await),
287            "cp" => Some(crate::commands::cp(ctx).await),
288            "mv" => Some(crate::commands::mv(ctx).await),
289            "basename" => Some(crate::commands::basename(ctx).await),
290            "dirname" => Some(crate::commands::dirname(ctx).await),
291            "env" => Some(crate::commands::env(ctx).await),
292            "exit" => Some(crate::commands::exit(ctx).await),
293            "which" => Some(crate::commands::which(ctx).await),
294            "yes" => Some(crate::commands::yes(ctx).await),
295            "seq" => Some(crate::commands::seq(ctx).await),
296            "test" => Some(crate::commands::test(ctx).await),
297            _ => None,
298        }
299    }
300}
301
302/// Shell configuration
303#[derive(Debug, Clone)]
304struct ShellConfig {
305    cmd: String,
306    args: Vec<String>,
307}
308
309/// Find an available shell
310fn find_available_shell() -> ShellConfig {
311    let is_windows = cfg!(windows);
312
313    if is_windows {
314        ShellConfig {
315            cmd: "cmd.exe".to_string(),
316            args: vec!["/c".to_string()],
317        }
318    } else {
319        let shells = [
320            ("/bin/sh", "-c"),
321            ("/usr/bin/sh", "-c"),
322            ("/bin/bash", "-c"),
323        ];
324
325        for (cmd, arg) in shells {
326            if std::path::Path::new(cmd).exists() {
327                return ShellConfig {
328                    cmd: cmd.to_string(),
329                    args: vec![arg.to_string()],
330                };
331            }
332        }
333
334        ShellConfig {
335            cmd: "/bin/sh".to_string(),
336            args: vec!["-c".to_string()],
337        }
338    }
339}
340
341/// Extension trait to add `.pipe()` method to ProcessRunner
342pub trait PipelineExt {
343    /// Pipe the output of this command to another command
344    fn pipe(self, command: impl Into<String>) -> PipelineBuilder;
345}
346
347impl PipelineExt for crate::ProcessRunner {
348    fn pipe(self, command: impl Into<String>) -> PipelineBuilder {
349        PipelineBuilder {
350            first: self,
351            additional: vec![command.into()],
352        }
353    }
354}
355
356/// Builder for piping commands together
357pub struct PipelineBuilder {
358    first: crate::ProcessRunner,
359    additional: Vec<String>,
360}
361
362impl PipelineBuilder {
363    /// Add another command to the pipeline
364    pub fn pipe(mut self, command: impl Into<String>) -> Self {
365        self.additional.push(command.into());
366        self
367    }
368
369    /// Execute the pipeline
370    pub async fn run(mut self) -> Result<CommandResult> {
371        // First, run the initial command
372        let first_result = self.first.run().await?;
373
374        if first_result.code != 0 {
375            return Ok(first_result);
376        }
377
378        // Then run the rest as a pipeline
379        let mut current_stdin = Some(first_result.stdout);
380        let mut accumulated_stderr = first_result.stderr;
381        let mut last_result = CommandResult {
382            stdout: String::new(),
383            stderr: String::new(),
384            code: 0,
385        };
386
387        for cmd_str in &self.additional {
388            let mut runner = crate::ProcessRunner::new(
389                cmd_str.clone(),
390                RunOptions {
391                    stdin: StdinOption::Content(current_stdin.take().unwrap_or_default()),
392                    mirror: false,
393                    capture: true,
394                    ..Default::default()
395                },
396            );
397
398            let result = runner.run().await?;
399            accumulated_stderr.push_str(&result.stderr);
400
401            if result.code != 0 {
402                return Ok(CommandResult {
403                    stdout: result.stdout,
404                    stderr: accumulated_stderr,
405                    code: result.code,
406                });
407            }
408
409            current_stdin = Some(result.stdout.clone());
410            last_result = result;
411        }
412
413        Ok(CommandResult {
414            stdout: last_result.stdout,
415            stderr: accumulated_stderr,
416            code: last_result.code,
417        })
418    }
419}