bare-script 0.1.1

The type-safe scripting authority for Rust. A framework for building robust shell commands and automation with 'Parse, don't validate' philosophy.
Documentation
//! Async pipeline support for chaining commands together.
//!
//! This module provides a type-safe way to create asynchronous command pipelines
//! where the output of one command is piped to the input of the next.

use std::process::Stdio;

use tokio::process::Command;

use crate::error::{ScriptError, ScriptResult};
use crate::output::Output;

/// A builder for creating asynchronous command pipelines.
///
/// # Example
///
/// ```rust,ignore
/// use bare_script::proc::Pipeline;
///
/// #[tokio::main]
/// async fn main() -> Result<(), bare_script::ScriptError> {
///     let output = Pipeline::new("echo")
///         .arg("hello world")
///         .pipe("grep")
///         .arg("hello")
///         .execute()
///         .await?;
///     
///     assert!(output.success());
///     Ok(())
/// }
/// ```
#[derive(Debug)]
pub struct Pipeline {
    commands: Vec<Command>,
}

impl Pipeline {
    /// Creates a new pipeline starting with the specified program.
    pub fn new<S: AsRef<std::ffi::OsStr>>(program: S) -> Self {
        let mut cmd = Command::new(program);
        let _ = cmd.stdin(Stdio::piped());
        Self {
            commands: vec![cmd],
        }
    }

    /// Adds an argument to the last command in the pipeline.
    pub fn arg<S: AsRef<std::ffi::OsStr>>(mut self, arg: S) -> Self {
        if let Some(cmd) = self.commands.last_mut() {
            let _ = cmd.arg(arg);
        }
        self
    }

    /// Adds multiple arguments to the last command in the pipeline.
    pub fn args<I, S>(mut self, args: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: AsRef<std::ffi::OsStr>,
    {
        if let Some(cmd) = self.commands.last_mut() {
            let _ = cmd.args(args);
        }
        self
    }

    /// Adds a new command to the pipeline, piping the output of the previous
    /// command to the stdin of this command.
    pub fn pipe<S: AsRef<std::ffi::OsStr>>(mut self, program: S) -> Self {
        // Configure the previous command's stdout to be piped
        if let Some(prev_cmd) = self.commands.last_mut() {
            let _ = prev_cmd.stdout(Stdio::piped());
        }

        // Create the new command with piped stdin
        let mut cmd = Command::new(program);
        let _ = cmd.stdin(Stdio::piped());
        self.commands.push(cmd);

        self
    }

    /// Sets an environment variable for all commands in the pipeline.
    pub fn env<K, V>(mut self, key: K, value: V) -> Self
    where
        K: AsRef<std::ffi::OsStr>,
        V: AsRef<std::ffi::OsStr>,
    {
        for cmd in &mut self.commands {
            let _ = cmd.env(key.as_ref(), value.as_ref());
        }
        self
    }

    /// Sets the working directory for all commands in the pipeline.
    pub fn current_dir<D>(mut self, dir: D) -> Self
    where
        D: AsRef<std::path::Path>,
    {
        for cmd in &mut self.commands {
            let _ = cmd.current_dir(dir.as_ref());
        }
        self
    }

    /// Configures the pipeline to capture the final output.
    pub fn capture_output(mut self) -> Self {
        if let Some(cmd) = self.commands.last_mut() {
            let _ = cmd.stdout(Stdio::piped());
            let _ = cmd.stderr(Stdio::piped());
        }
        self
    }

    /// Executes the pipeline asynchronously and returns the output of the final command.
    ///
    /// Note: For async pipelines, commands are executed sequentially, with each
    /// command's stdout passed to the next command's stdin.
    ///
    /// # Errors
    ///
    /// Returns an error if any command in the pipeline fails to execute.
    pub async fn execute(&mut self) -> ScriptResult<Output> {
        if self.commands.is_empty() {
            return Err(ScriptError::PipelineEmpty);
        }

        let num_commands = self.commands.len();
        let mut previous_output: Option<Vec<u8>> = None;

        for i in 0..num_commands {
            let cmd = &mut self.commands[i];

            // Configure stdin to pipe if there's previous output
            if previous_output.is_some() {
                let _ = cmd.stdin(Stdio::piped());
            }

            if i == num_commands - 1 {
                // Last command - execute and capture output
                // If there's previous output, we need to use spawn + wait_with_output
                if let Some(ref prev_stdout) = previous_output {
                    let mut child = cmd.spawn().map_err(ScriptError::IoError)?;

                    // Write previous output to stdin
                    if let Some(ref mut stdin) = child.stdin {
                        use tokio::io::AsyncWriteExt;
                        let _unused = stdin.write_all(prev_stdout).await;
                        drop(_unused);
                    }

                    let out = child
                        .wait_with_output()
                        .await
                        .map_err(ScriptError::IoError)?;
                    return Ok(Output::new(out.stdout, out.stderr, out.status));
                } else {
                    let output = cmd.output().await.map_err(ScriptError::IoError)?;
                    return Ok(Output::new(output.stdout, output.stderr, output.status));
                }
            } else {
                // Not the last command - execute and capture output to pass to next
                let output = cmd.output().await.map_err(ScriptError::IoError)?;
                previous_output = Some(output.stdout);
            }
        }

        Err(ScriptError::PipelineError(
            "Pipeline execution failed".into(),
        ))
    }
}

impl Default for Pipeline {
    fn default() -> Self {
        Self::new("")
    }
}