Skip to main content

command_stream/
pipeline.rs

1//! Pipeline execution support
2//!
3//! This module provides pipeline functionality similar to the JavaScript
4//! `$.process-runner-pipeline.mjs` module. It allows chaining commands
5//! together with the output of one command becoming the input of the next.
6//!
7//! ## Usage
8//!
9//! ```rust,no_run
10//! use command_stream::{Pipeline, run};
11//!
12//! #[tokio::main]
13//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
14//!     // Create a pipeline
15//!     let result = Pipeline::new()
16//!         .add("echo hello world")
17//!         .add("grep world")
18//!         .add("wc -l")
19//!         .run()
20//!         .await?;
21//!
22//!     println!("Output: {}", result.stdout);
23//!     Ok(())
24//! }
25//! ```
26
27use std::collections::HashMap;
28use std::path::PathBuf;
29use std::process::Stdio;
30use tokio::io::{AsyncReadExt, AsyncWriteExt};
31use tokio::process::Command;
32
33use crate::trace::trace_lazy;
34use crate::{CommandResult, Result, RunOptions, StdinOption};
35
36/// A pipeline of commands to be executed sequentially
37///
38/// Each command's stdout is piped to the next command's stdin.
39#[derive(Debug, Clone)]
40pub struct Pipeline {
41    /// Commands in the pipeline
42    commands: Vec<String>,
43    /// Initial stdin content (optional)
44    stdin: Option<String>,
45    /// Working directory
46    cwd: Option<PathBuf>,
47    /// Environment variables
48    env: Option<HashMap<String, String>>,
49    /// Whether to mirror output to parent stdout/stderr
50    mirror: bool,
51    /// Whether to capture output
52    capture: bool,
53}
54
55impl Default for Pipeline {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl Pipeline {
62    /// Create a new empty pipeline
63    pub fn new() -> Self {
64        Pipeline {
65            commands: Vec::new(),
66            stdin: None,
67            cwd: None,
68            env: None,
69            mirror: true,
70            capture: true,
71        }
72    }
73
74    /// Add a command to the pipeline
75    pub fn add(mut self, command: impl Into<String>) -> Self {
76        self.commands.push(command.into());
77        self
78    }
79
80    /// Set the initial stdin content for the first command
81    pub fn stdin(mut self, content: impl Into<String>) -> Self {
82        self.stdin = Some(content.into());
83        self
84    }
85
86    /// Set the working directory for all commands
87    pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
88        self.cwd = Some(path.into());
89        self
90    }
91
92    /// Set environment variables for all commands
93    pub fn env(mut self, env: HashMap<String, String>) -> Self {
94        self.env = Some(env);
95        self
96    }
97
98    /// Set whether to mirror output to stdout/stderr
99    pub fn mirror_output(mut self, mirror: bool) -> Self {
100        self.mirror = mirror;
101        self
102    }
103
104    /// Set whether to capture output
105    pub fn capture_output(mut self, capture: bool) -> Self {
106        self.capture = capture;
107        self
108    }
109
110    /// Execute the pipeline and return the result
111    pub async fn run(self) -> Result<CommandResult> {
112        if self.commands.is_empty() {
113            return Ok(CommandResult {
114                stdout: String::new(),
115                stderr: "No commands in pipeline".to_string(),
116                code: 1,
117            });
118        }
119
120        trace_lazy("Pipeline", || {
121            format!("Running pipeline with {} commands", self.commands.len())
122        });
123
124        let mut current_stdin = self.stdin.clone();
125        let mut last_result = CommandResult {
126            stdout: String::new(),
127            stderr: String::new(),
128            code: 0,
129        };
130        let mut accumulated_stderr = String::new();
131
132        for (i, cmd_str) in self.commands.iter().enumerate() {
133            let is_last = i == self.commands.len() - 1;
134
135            trace_lazy("Pipeline", || {
136                format!(
137                    "Executing command {}/{}: {}",
138                    i + 1,
139                    self.commands.len(),
140                    cmd_str
141                )
142            });
143
144            // Check if this is a virtual command
145            let first_word = cmd_str.split_whitespace().next().unwrap_or("");
146            if crate::commands::are_virtual_commands_enabled() {
147                if let Some(result) = self
148                    .try_virtual_command(first_word, cmd_str, &current_stdin)
149                    .await
150                {
151                    if result.code != 0 {
152                        return Ok(CommandResult {
153                            stdout: result.stdout,
154                            stderr: accumulated_stderr + &result.stderr,
155                            code: result.code,
156                        });
157                    }
158                    current_stdin = Some(result.stdout.clone());
159                    accumulated_stderr.push_str(&result.stderr);
160                    last_result = result;
161                    continue;
162                }
163            }
164
165            // Execute via shell
166            let shell = find_available_shell();
167            let mut cmd = Command::new(&shell.cmd);
168            for arg in &shell.args {
169                cmd.arg(arg);
170            }
171            cmd.arg(cmd_str);
172
173            // Configure stdio
174            cmd.stdin(Stdio::piped());
175            cmd.stdout(Stdio::piped());
176            cmd.stderr(Stdio::piped());
177
178            // Set working directory. Fall back to a valid directory when the
179            // inherited working directory has been deleted (issue #44).
180            if let Some(cwd) = crate::resolve_spawn_cwd(self.cwd.as_ref()) {
181                cmd.current_dir(cwd);
182            }
183
184            // Set environment
185            if let Some(ref env_vars) = self.env {
186                for (key, value) in env_vars {
187                    cmd.env(key, value);
188                }
189            }
190
191            // Spawn the process
192            let mut child = cmd.spawn()?;
193
194            // Write stdin if available
195            if let Some(ref stdin_content) = current_stdin {
196                if let Some(mut stdin) = child.stdin.take() {
197                    let content = stdin_content.clone();
198                    tokio::spawn(async move {
199                        let _ = stdin.write_all(content.as_bytes()).await;
200                        let _ = stdin.shutdown().await;
201                    });
202                }
203            }
204
205            // Read stdout
206            let mut stdout_content = String::new();
207            if let Some(mut stdout) = child.stdout.take() {
208                stdout.read_to_string(&mut stdout_content).await?;
209            }
210
211            // Read stderr
212            let mut stderr_content = String::new();
213            if let Some(mut stderr) = child.stderr.take() {
214                stderr.read_to_string(&mut stderr_content).await?;
215            }
216
217            // Mirror output if enabled and this is the last command
218            if is_last && self.mirror {
219                if !stdout_content.is_empty() {
220                    print!("{}", stdout_content);
221                }
222                if !stderr_content.is_empty() {
223                    eprint!("{}", stderr_content);
224                }
225            }
226
227            // Wait for the process
228            let status = child.wait().await?;
229            let code = status.code().unwrap_or(-1);
230
231            accumulated_stderr.push_str(&stderr_content);
232
233            if code != 0 {
234                return Ok(CommandResult {
235                    stdout: stdout_content,
236                    stderr: accumulated_stderr,
237                    code,
238                });
239            }
240
241            // Set up stdin for next command
242            current_stdin = Some(stdout_content.clone());
243            last_result = CommandResult {
244                stdout: stdout_content,
245                stderr: String::new(),
246                code,
247            };
248        }
249
250        Ok(CommandResult {
251            stdout: last_result.stdout,
252            stderr: accumulated_stderr,
253            code: last_result.code,
254        })
255    }
256
257    /// Try to execute a virtual command
258    async fn try_virtual_command(
259        &self,
260        cmd_name: &str,
261        full_cmd: &str,
262        stdin: &Option<String>,
263    ) -> Option<CommandResult> {
264        let parts: Vec<&str> = full_cmd.split_whitespace().collect();
265        let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
266
267        let ctx = crate::commands::CommandContext {
268            args,
269            stdin: stdin.clone(),
270            cwd: self.cwd.clone(),
271            env: self.env.clone(),
272            output_tx: None,
273            is_cancelled: None,
274        };
275
276        match cmd_name {
277            "echo" => Some(crate::commands::echo(ctx).await),
278            "pwd" => Some(crate::commands::pwd(ctx).await),
279            "cd" => Some(crate::commands::cd(ctx).await),
280            "true" => Some(crate::commands::r#true(ctx).await),
281            "false" => Some(crate::commands::r#false(ctx).await),
282            "sleep" => Some(crate::commands::sleep(ctx).await),
283            "cat" => Some(crate::commands::cat(ctx).await),
284            "ls" => Some(crate::commands::ls(ctx).await),
285            "mkdir" => Some(crate::commands::mkdir(ctx).await),
286            "rm" => Some(crate::commands::rm(ctx).await),
287            "touch" => Some(crate::commands::touch(ctx).await),
288            "cp" => Some(crate::commands::cp(ctx).await),
289            "mv" => Some(crate::commands::mv(ctx).await),
290            "basename" => Some(crate::commands::basename(ctx).await),
291            "dirname" => Some(crate::commands::dirname(ctx).await),
292            "env" => Some(crate::commands::env(ctx).await),
293            "exit" => Some(crate::commands::exit(ctx).await),
294            "which" => Some(crate::commands::which(ctx).await),
295            "yes" => Some(crate::commands::yes(ctx).await),
296            "seq" => Some(crate::commands::seq(ctx).await),
297            "test" => Some(crate::commands::test(ctx).await),
298            _ => None,
299        }
300    }
301}
302
303/// Shell configuration
304#[derive(Debug, Clone)]
305struct ShellConfig {
306    cmd: String,
307    args: Vec<String>,
308}
309
310/// Find an available shell
311fn find_available_shell() -> ShellConfig {
312    let is_windows = cfg!(windows);
313
314    if is_windows {
315        ShellConfig {
316            cmd: "cmd.exe".to_string(),
317            args: vec!["/c".to_string()],
318        }
319    } else {
320        let shells = [
321            ("/bin/sh", "-c"),
322            ("/usr/bin/sh", "-c"),
323            ("/bin/bash", "-c"),
324        ];
325
326        for (cmd, arg) in shells {
327            if std::path::Path::new(cmd).exists() {
328                return ShellConfig {
329                    cmd: cmd.to_string(),
330                    args: vec![arg.to_string()],
331                };
332            }
333        }
334
335        ShellConfig {
336            cmd: "/bin/sh".to_string(),
337            args: vec!["-c".to_string()],
338        }
339    }
340}
341
342/// Extension trait to add `.pipe()` method to ProcessRunner
343pub trait PipelineExt {
344    /// Pipe the output of this command to another command
345    fn pipe(self, command: impl Into<String>) -> PipelineBuilder;
346}
347
348impl PipelineExt for crate::ProcessRunner {
349    fn pipe(self, command: impl Into<String>) -> PipelineBuilder {
350        PipelineBuilder {
351            first: self,
352            additional: vec![command.into()],
353        }
354    }
355}
356
357/// Builder for piping commands together
358pub struct PipelineBuilder {
359    first: crate::ProcessRunner,
360    additional: Vec<String>,
361}
362
363impl PipelineBuilder {
364    /// Add another command to the pipeline
365    pub fn pipe(mut self, command: impl Into<String>) -> Self {
366        self.additional.push(command.into());
367        self
368    }
369
370    /// Execute the pipeline
371    pub async fn run(mut self) -> Result<CommandResult> {
372        // First, run the initial command
373        let first_result = self.first.run().await?;
374
375        if first_result.code != 0 {
376            return Ok(first_result);
377        }
378
379        // Then run the rest as a pipeline
380        let mut current_stdin = Some(first_result.stdout);
381        let mut accumulated_stderr = first_result.stderr;
382        let mut last_result = CommandResult {
383            stdout: String::new(),
384            stderr: String::new(),
385            code: 0,
386        };
387
388        for cmd_str in &self.additional {
389            let mut runner = crate::ProcessRunner::new(
390                cmd_str.clone(),
391                RunOptions {
392                    stdin: StdinOption::Content(current_stdin.take().unwrap_or_default()),
393                    mirror: false,
394                    capture: true,
395                    ..Default::default()
396                },
397            );
398
399            let result = runner.run().await?;
400            accumulated_stderr.push_str(&result.stderr);
401
402            if result.code != 0 {
403                return Ok(CommandResult {
404                    stdout: result.stdout,
405                    stderr: accumulated_stderr,
406                    code: result.code,
407                });
408            }
409
410            current_stdin = Some(result.stdout.clone());
411            last_result = result;
412        }
413
414        Ok(CommandResult {
415            stdout: last_result.stdout,
416            stderr: accumulated_stderr,
417            code: last_result.code,
418        })
419    }
420}