1use std::collections::HashMap;
28use std::path::PathBuf;
29use std::process::Stdio;
30use tokio::io::{AsyncReadExt, AsyncWriteExt};
31use tokio::process::Command;
32
33use crate::trace::trace_lazy;
34use crate::{CommandResult, Result, RunOptions, StdinOption};
35
36#[derive(Debug, Clone)]
40pub struct Pipeline {
41 commands: Vec<String>,
43 stdin: Option<String>,
45 cwd: Option<PathBuf>,
47 env: Option<HashMap<String, String>>,
49 mirror: bool,
51 capture: bool,
53}
54
55impl Default for Pipeline {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl Pipeline {
62 pub fn new() -> Self {
64 Pipeline {
65 commands: Vec::new(),
66 stdin: None,
67 cwd: None,
68 env: None,
69 mirror: true,
70 capture: true,
71 }
72 }
73
74 pub fn add(mut self, command: impl Into<String>) -> Self {
76 self.commands.push(command.into());
77 self
78 }
79
80 pub fn stdin(mut self, content: impl Into<String>) -> Self {
82 self.stdin = Some(content.into());
83 self
84 }
85
86 pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
88 self.cwd = Some(path.into());
89 self
90 }
91
92 pub fn env(mut self, env: HashMap<String, String>) -> Self {
94 self.env = Some(env);
95 self
96 }
97
98 pub fn mirror_output(mut self, mirror: bool) -> Self {
100 self.mirror = mirror;
101 self
102 }
103
104 pub fn capture_output(mut self, capture: bool) -> Self {
106 self.capture = capture;
107 self
108 }
109
110 pub async fn run(self) -> Result<CommandResult> {
112 if self.commands.is_empty() {
113 return Ok(CommandResult {
114 stdout: String::new(),
115 stderr: "No commands in pipeline".to_string(),
116 code: 1,
117 });
118 }
119
120 trace_lazy("Pipeline", || {
121 format!("Running pipeline with {} commands", self.commands.len())
122 });
123
124 let mut current_stdin = self.stdin.clone();
125 let mut last_result = CommandResult {
126 stdout: String::new(),
127 stderr: String::new(),
128 code: 0,
129 };
130 let mut accumulated_stderr = String::new();
131
132 for (i, cmd_str) in self.commands.iter().enumerate() {
133 let is_last = i == self.commands.len() - 1;
134
135 trace_lazy("Pipeline", || {
136 format!(
137 "Executing command {}/{}: {}",
138 i + 1,
139 self.commands.len(),
140 cmd_str
141 )
142 });
143
144 let first_word = cmd_str.split_whitespace().next().unwrap_or("");
146 if crate::commands::are_virtual_commands_enabled() {
147 if let Some(result) = self
148 .try_virtual_command(first_word, cmd_str, ¤t_stdin)
149 .await
150 {
151 if result.code != 0 {
152 return Ok(CommandResult {
153 stdout: result.stdout,
154 stderr: accumulated_stderr + &result.stderr,
155 code: result.code,
156 });
157 }
158 current_stdin = Some(result.stdout.clone());
159 accumulated_stderr.push_str(&result.stderr);
160 last_result = result;
161 continue;
162 }
163 }
164
165 let shell = find_available_shell();
167 let mut cmd = Command::new(&shell.cmd);
168 for arg in &shell.args {
169 cmd.arg(arg);
170 }
171 cmd.arg(cmd_str);
172
173 cmd.stdin(Stdio::piped());
175 cmd.stdout(Stdio::piped());
176 cmd.stderr(Stdio::piped());
177
178 if let Some(cwd) = crate::resolve_spawn_cwd(self.cwd.as_ref()) {
181 cmd.current_dir(cwd);
182 }
183
184 if let Some(ref env_vars) = self.env {
186 for (key, value) in env_vars {
187 cmd.env(key, value);
188 }
189 }
190
191 let mut child = cmd.spawn()?;
193
194 if let Some(ref stdin_content) = current_stdin {
196 if let Some(mut stdin) = child.stdin.take() {
197 let content = stdin_content.clone();
198 tokio::spawn(async move {
199 let _ = stdin.write_all(content.as_bytes()).await;
200 let _ = stdin.shutdown().await;
201 });
202 }
203 }
204
205 let mut stdout_content = String::new();
207 if let Some(mut stdout) = child.stdout.take() {
208 stdout.read_to_string(&mut stdout_content).await?;
209 }
210
211 let mut stderr_content = String::new();
213 if let Some(mut stderr) = child.stderr.take() {
214 stderr.read_to_string(&mut stderr_content).await?;
215 }
216
217 if is_last && self.mirror {
219 if !stdout_content.is_empty() {
220 print!("{}", stdout_content);
221 }
222 if !stderr_content.is_empty() {
223 eprint!("{}", stderr_content);
224 }
225 }
226
227 let status = child.wait().await?;
229 let code = status.code().unwrap_or(-1);
230
231 accumulated_stderr.push_str(&stderr_content);
232
233 if code != 0 {
234 return Ok(CommandResult {
235 stdout: stdout_content,
236 stderr: accumulated_stderr,
237 code,
238 });
239 }
240
241 current_stdin = Some(stdout_content.clone());
243 last_result = CommandResult {
244 stdout: stdout_content,
245 stderr: String::new(),
246 code,
247 };
248 }
249
250 Ok(CommandResult {
251 stdout: last_result.stdout,
252 stderr: accumulated_stderr,
253 code: last_result.code,
254 })
255 }
256
257 async fn try_virtual_command(
259 &self,
260 cmd_name: &str,
261 full_cmd: &str,
262 stdin: &Option<String>,
263 ) -> Option<CommandResult> {
264 let parts: Vec<&str> = full_cmd.split_whitespace().collect();
265 let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
266
267 let ctx = crate::commands::CommandContext {
268 args,
269 stdin: stdin.clone(),
270 cwd: self.cwd.clone(),
271 env: self.env.clone(),
272 output_tx: None,
273 is_cancelled: None,
274 };
275
276 match cmd_name {
277 "echo" => Some(crate::commands::echo(ctx).await),
278 "pwd" => Some(crate::commands::pwd(ctx).await),
279 "cd" => Some(crate::commands::cd(ctx).await),
280 "true" => Some(crate::commands::r#true(ctx).await),
281 "false" => Some(crate::commands::r#false(ctx).await),
282 "sleep" => Some(crate::commands::sleep(ctx).await),
283 "cat" => Some(crate::commands::cat(ctx).await),
284 "ls" => Some(crate::commands::ls(ctx).await),
285 "mkdir" => Some(crate::commands::mkdir(ctx).await),
286 "rm" => Some(crate::commands::rm(ctx).await),
287 "touch" => Some(crate::commands::touch(ctx).await),
288 "cp" => Some(crate::commands::cp(ctx).await),
289 "mv" => Some(crate::commands::mv(ctx).await),
290 "basename" => Some(crate::commands::basename(ctx).await),
291 "dirname" => Some(crate::commands::dirname(ctx).await),
292 "env" => Some(crate::commands::env(ctx).await),
293 "exit" => Some(crate::commands::exit(ctx).await),
294 "which" => Some(crate::commands::which(ctx).await),
295 "yes" => Some(crate::commands::yes(ctx).await),
296 "seq" => Some(crate::commands::seq(ctx).await),
297 "test" => Some(crate::commands::test(ctx).await),
298 _ => None,
299 }
300 }
301}
302
303#[derive(Debug, Clone)]
305struct ShellConfig {
306 cmd: String,
307 args: Vec<String>,
308}
309
310fn find_available_shell() -> ShellConfig {
312 let is_windows = cfg!(windows);
313
314 if is_windows {
315 ShellConfig {
316 cmd: "cmd.exe".to_string(),
317 args: vec!["/c".to_string()],
318 }
319 } else {
320 let shells = [
321 ("/bin/sh", "-c"),
322 ("/usr/bin/sh", "-c"),
323 ("/bin/bash", "-c"),
324 ];
325
326 for (cmd, arg) in shells {
327 if std::path::Path::new(cmd).exists() {
328 return ShellConfig {
329 cmd: cmd.to_string(),
330 args: vec![arg.to_string()],
331 };
332 }
333 }
334
335 ShellConfig {
336 cmd: "/bin/sh".to_string(),
337 args: vec!["-c".to_string()],
338 }
339 }
340}
341
342pub trait PipelineExt {
344 fn pipe(self, command: impl Into<String>) -> PipelineBuilder;
346}
347
348impl PipelineExt for crate::ProcessRunner {
349 fn pipe(self, command: impl Into<String>) -> PipelineBuilder {
350 PipelineBuilder {
351 first: self,
352 additional: vec![command.into()],
353 }
354 }
355}
356
357pub struct PipelineBuilder {
359 first: crate::ProcessRunner,
360 additional: Vec<String>,
361}
362
363impl PipelineBuilder {
364 pub fn pipe(mut self, command: impl Into<String>) -> Self {
366 self.additional.push(command.into());
367 self
368 }
369
370 pub async fn run(mut self) -> Result<CommandResult> {
372 let first_result = self.first.run().await?;
374
375 if first_result.code != 0 {
376 return Ok(first_result);
377 }
378
379 let mut current_stdin = Some(first_result.stdout);
381 let mut accumulated_stderr = first_result.stderr;
382 let mut last_result = CommandResult {
383 stdout: String::new(),
384 stderr: String::new(),
385 code: 0,
386 };
387
388 for cmd_str in &self.additional {
389 let mut runner = crate::ProcessRunner::new(
390 cmd_str.clone(),
391 RunOptions {
392 stdin: StdinOption::Content(current_stdin.take().unwrap_or_default()),
393 mirror: false,
394 capture: true,
395 ..Default::default()
396 },
397 );
398
399 let result = runner.run().await?;
400 accumulated_stderr.push_str(&result.stderr);
401
402 if result.code != 0 {
403 return Ok(CommandResult {
404 stdout: result.stdout,
405 stderr: accumulated_stderr,
406 code: result.code,
407 });
408 }
409
410 current_stdin = Some(result.stdout.clone());
411 last_result = result;
412 }
413
414 Ok(CommandResult {
415 stdout: last_result.stdout,
416 stderr: accumulated_stderr,
417 code: last_result.code,
418 })
419 }
420}