bare_script/proc/
pipeline.rs1use std::process::Stdio;
7
8use tokio::process::Command;
9
10use crate::error::{ScriptError, ScriptResult};
11use crate::output::Output;
12
13#[derive(Debug)]
34pub struct Pipeline {
35 commands: Vec<Command>,
36}
37
38impl Pipeline {
39 pub fn new<S: AsRef<std::ffi::OsStr>>(program: S) -> Self {
41 let mut cmd = Command::new(program);
42 let _ = cmd.stdin(Stdio::piped());
43 Self {
44 commands: vec![cmd],
45 }
46 }
47
48 pub fn arg<S: AsRef<std::ffi::OsStr>>(mut self, arg: S) -> Self {
50 if let Some(cmd) = self.commands.last_mut() {
51 let _ = cmd.arg(arg);
52 }
53 self
54 }
55
56 pub fn args<I, S>(mut self, args: I) -> Self
58 where
59 I: IntoIterator<Item = S>,
60 S: AsRef<std::ffi::OsStr>,
61 {
62 if let Some(cmd) = self.commands.last_mut() {
63 let _ = cmd.args(args);
64 }
65 self
66 }
67
68 pub fn pipe<S: AsRef<std::ffi::OsStr>>(mut self, program: S) -> Self {
71 if let Some(prev_cmd) = self.commands.last_mut() {
73 let _ = prev_cmd.stdout(Stdio::piped());
74 }
75
76 let mut cmd = Command::new(program);
78 let _ = cmd.stdin(Stdio::piped());
79 self.commands.push(cmd);
80
81 self
82 }
83
84 pub fn env<K, V>(mut self, key: K, value: V) -> Self
86 where
87 K: AsRef<std::ffi::OsStr>,
88 V: AsRef<std::ffi::OsStr>,
89 {
90 for cmd in &mut self.commands {
91 let _ = cmd.env(key.as_ref(), value.as_ref());
92 }
93 self
94 }
95
96 pub fn current_dir<D>(mut self, dir: D) -> Self
98 where
99 D: AsRef<std::path::Path>,
100 {
101 for cmd in &mut self.commands {
102 let _ = cmd.current_dir(dir.as_ref());
103 }
104 self
105 }
106
107 pub fn capture_output(mut self) -> Self {
109 if let Some(cmd) = self.commands.last_mut() {
110 let _ = cmd.stdout(Stdio::piped());
111 let _ = cmd.stderr(Stdio::piped());
112 }
113 self
114 }
115
116 pub async fn execute(&mut self) -> ScriptResult<Output> {
125 if self.commands.is_empty() {
126 return Err(ScriptError::PipelineEmpty);
127 }
128
129 let num_commands = self.commands.len();
130 let mut previous_output: Option<Vec<u8>> = None;
131
132 for i in 0..num_commands {
133 let cmd = &mut self.commands[i];
134
135 if previous_output.is_some() {
137 let _ = cmd.stdin(Stdio::piped());
138 }
139
140 if i == num_commands - 1 {
141 if let Some(ref prev_stdout) = previous_output {
144 let mut child = cmd.spawn().map_err(ScriptError::IoError)?;
145
146 if let Some(ref mut stdin) = child.stdin {
148 use tokio::io::AsyncWriteExt;
149 let _unused = stdin.write_all(prev_stdout).await;
150 drop(_unused);
151 }
152
153 let out = child
154 .wait_with_output()
155 .await
156 .map_err(ScriptError::IoError)?;
157 return Ok(Output::new(out.stdout, out.stderr, out.status));
158 } else {
159 let output = cmd.output().await.map_err(ScriptError::IoError)?;
160 return Ok(Output::new(output.stdout, output.stderr, output.status));
161 }
162 } else {
163 let output = cmd.output().await.map_err(ScriptError::IoError)?;
165 previous_output = Some(output.stdout);
166 }
167 }
168
169 Err(ScriptError::PipelineError(
170 "Pipeline execution failed".into(),
171 ))
172 }
173}
174
175impl Default for Pipeline {
176 fn default() -> Self {
177 Self::new("")
178 }
179}