Skip to main content

bare_script/proc/
pipeline.rs

1//! Async pipeline support for chaining commands together.
2//!
3//! This module provides a type-safe way to create asynchronous command pipelines
4//! where the output of one command is piped to the input of the next.
5
6use std::process::Stdio;
7
8use tokio::process::Command;
9
10use crate::error::{ScriptError, ScriptResult};
11use crate::output::Output;
12
13/// A builder for creating asynchronous command pipelines.
14///
15/// # Example
16///
17/// ```rust,ignore
18/// use bare_script::proc::Pipeline;
19///
20/// #[tokio::main]
21/// async fn main() -> Result<(), bare_script::ScriptError> {
22///     let output = Pipeline::new("echo")
23///         .arg("hello world")
24///         .pipe("grep")
25///         .arg("hello")
26///         .execute()
27///         .await?;
28///     
29///     assert!(output.success());
30///     Ok(())
31/// }
32/// ```
33#[derive(Debug)]
34pub struct Pipeline {
35    commands: Vec<Command>,
36}
37
38impl Pipeline {
39    /// Creates a new pipeline starting with the specified program.
40    pub fn new<S: AsRef<std::ffi::OsStr>>(program: S) -> Self {
41        let mut cmd = Command::new(program);
42        let _ = cmd.stdin(Stdio::piped());
43        Self {
44            commands: vec![cmd],
45        }
46    }
47
48    /// Adds an argument to the last command in the pipeline.
49    pub fn arg<S: AsRef<std::ffi::OsStr>>(mut self, arg: S) -> Self {
50        if let Some(cmd) = self.commands.last_mut() {
51            let _ = cmd.arg(arg);
52        }
53        self
54    }
55
56    /// Adds multiple arguments to the last command in the pipeline.
57    pub fn args<I, S>(mut self, args: I) -> Self
58    where
59        I: IntoIterator<Item = S>,
60        S: AsRef<std::ffi::OsStr>,
61    {
62        if let Some(cmd) = self.commands.last_mut() {
63            let _ = cmd.args(args);
64        }
65        self
66    }
67
68    /// Adds a new command to the pipeline, piping the output of the previous
69    /// command to the stdin of this command.
70    pub fn pipe<S: AsRef<std::ffi::OsStr>>(mut self, program: S) -> Self {
71        // Configure the previous command's stdout to be piped
72        if let Some(prev_cmd) = self.commands.last_mut() {
73            let _ = prev_cmd.stdout(Stdio::piped());
74        }
75
76        // Create the new command with piped stdin
77        let mut cmd = Command::new(program);
78        let _ = cmd.stdin(Stdio::piped());
79        self.commands.push(cmd);
80
81        self
82    }
83
84    /// Sets an environment variable for all commands in the pipeline.
85    pub fn env<K, V>(mut self, key: K, value: V) -> Self
86    where
87        K: AsRef<std::ffi::OsStr>,
88        V: AsRef<std::ffi::OsStr>,
89    {
90        for cmd in &mut self.commands {
91            let _ = cmd.env(key.as_ref(), value.as_ref());
92        }
93        self
94    }
95
96    /// Sets the working directory for all commands in the pipeline.
97    pub fn current_dir<D>(mut self, dir: D) -> Self
98    where
99        D: AsRef<std::path::Path>,
100    {
101        for cmd in &mut self.commands {
102            let _ = cmd.current_dir(dir.as_ref());
103        }
104        self
105    }
106
107    /// Configures the pipeline to capture the final output.
108    pub fn capture_output(mut self) -> Self {
109        if let Some(cmd) = self.commands.last_mut() {
110            let _ = cmd.stdout(Stdio::piped());
111            let _ = cmd.stderr(Stdio::piped());
112        }
113        self
114    }
115
116    /// Executes the pipeline asynchronously and returns the output of the final command.
117    ///
118    /// Note: For async pipelines, commands are executed sequentially, with each
119    /// command's stdout passed to the next command's stdin.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if any command in the pipeline fails to execute.
124    pub async fn execute(&mut self) -> ScriptResult<Output> {
125        if self.commands.is_empty() {
126            return Err(ScriptError::PipelineEmpty);
127        }
128
129        let num_commands = self.commands.len();
130        let mut previous_output: Option<Vec<u8>> = None;
131
132        for i in 0..num_commands {
133            let cmd = &mut self.commands[i];
134
135            // Configure stdin to pipe if there's previous output
136            if previous_output.is_some() {
137                let _ = cmd.stdin(Stdio::piped());
138            }
139
140            if i == num_commands - 1 {
141                // Last command - execute and capture output
142                // If there's previous output, we need to use spawn + wait_with_output
143                if let Some(ref prev_stdout) = previous_output {
144                    let mut child = cmd.spawn().map_err(ScriptError::IoError)?;
145
146                    // Write previous output to stdin
147                    if let Some(ref mut stdin) = child.stdin {
148                        use tokio::io::AsyncWriteExt;
149                        let _unused = stdin.write_all(prev_stdout).await;
150                        drop(_unused);
151                    }
152
153                    let out = child
154                        .wait_with_output()
155                        .await
156                        .map_err(ScriptError::IoError)?;
157                    return Ok(Output::new(out.stdout, out.stderr, out.status));
158                } else {
159                    let output = cmd.output().await.map_err(ScriptError::IoError)?;
160                    return Ok(Output::new(output.stdout, output.stderr, output.status));
161                }
162            } else {
163                // Not the last command - execute and capture output to pass to next
164                let output = cmd.output().await.map_err(ScriptError::IoError)?;
165                previous_output = Some(output.stdout);
166            }
167        }
168
169        Err(ScriptError::PipelineError(
170            "Pipeline execution failed".into(),
171        ))
172    }
173}
174
175impl Default for Pipeline {
176    fn default() -> Self {
177        Self::new("")
178    }
179}