1use std::collections::HashMap;
28use std::path::PathBuf;
29use std::process::Stdio;
30use tokio::io::{AsyncReadExt, AsyncWriteExt};
31use tokio::process::Command;
32
33use crate::trace::trace_lazy;
34use crate::{CommandResult, Result, RunOptions, StdinOption};
35
36#[derive(Debug, Clone)]
40pub struct Pipeline {
41 commands: Vec<String>,
43 stdin: Option<String>,
45 cwd: Option<PathBuf>,
47 env: Option<HashMap<String, String>>,
49 mirror: bool,
51 capture: bool,
53}
54
55impl Default for Pipeline {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl Pipeline {
62 pub fn new() -> Self {
64 Pipeline {
65 commands: Vec::new(),
66 stdin: None,
67 cwd: None,
68 env: None,
69 mirror: true,
70 capture: true,
71 }
72 }
73
74 pub fn add(mut self, command: impl Into<String>) -> Self {
76 self.commands.push(command.into());
77 self
78 }
79
80 pub fn stdin(mut self, content: impl Into<String>) -> Self {
82 self.stdin = Some(content.into());
83 self
84 }
85
86 pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
88 self.cwd = Some(path.into());
89 self
90 }
91
92 pub fn env(mut self, env: HashMap<String, String>) -> Self {
94 self.env = Some(env);
95 self
96 }
97
98 pub fn mirror_output(mut self, mirror: bool) -> Self {
100 self.mirror = mirror;
101 self
102 }
103
104 pub fn capture_output(mut self, capture: bool) -> Self {
106 self.capture = capture;
107 self
108 }
109
110 pub async fn run(self) -> Result<CommandResult> {
112 if self.commands.is_empty() {
113 return Ok(CommandResult {
114 stdout: String::new(),
115 stderr: "No commands in pipeline".to_string(),
116 code: 1,
117 });
118 }
119
120 trace_lazy("Pipeline", || {
121 format!("Running pipeline with {} commands", self.commands.len())
122 });
123
124 let mut current_stdin = self.stdin.clone();
125 let mut last_result = CommandResult {
126 stdout: String::new(),
127 stderr: String::new(),
128 code: 0,
129 };
130 let mut accumulated_stderr = String::new();
131
132 for (i, cmd_str) in self.commands.iter().enumerate() {
133 let is_last = i == self.commands.len() - 1;
134
135 trace_lazy("Pipeline", || {
136 format!(
137 "Executing command {}/{}: {}",
138 i + 1,
139 self.commands.len(),
140 cmd_str
141 )
142 });
143
144 let first_word = cmd_str.split_whitespace().next().unwrap_or("");
146 if crate::commands::are_virtual_commands_enabled() {
147 if let Some(result) = self
148 .try_virtual_command(first_word, cmd_str, ¤t_stdin)
149 .await
150 {
151 if result.code != 0 {
152 return Ok(CommandResult {
153 stdout: result.stdout,
154 stderr: accumulated_stderr + &result.stderr,
155 code: result.code,
156 });
157 }
158 current_stdin = Some(result.stdout.clone());
159 accumulated_stderr.push_str(&result.stderr);
160 last_result = result;
161 continue;
162 }
163 }
164
165 let shell = find_available_shell();
167 let mut cmd = Command::new(&shell.cmd);
168 for arg in &shell.args {
169 cmd.arg(arg);
170 }
171 cmd.arg(cmd_str);
172
173 cmd.stdin(Stdio::piped());
175 cmd.stdout(Stdio::piped());
176 cmd.stderr(Stdio::piped());
177
178 if let Some(ref cwd) = self.cwd {
180 cmd.current_dir(cwd);
181 }
182
183 if let Some(ref env_vars) = self.env {
185 for (key, value) in env_vars {
186 cmd.env(key, value);
187 }
188 }
189
190 let mut child = cmd.spawn()?;
192
193 if let Some(ref stdin_content) = current_stdin {
195 if let Some(mut stdin) = child.stdin.take() {
196 let content = stdin_content.clone();
197 tokio::spawn(async move {
198 let _ = stdin.write_all(content.as_bytes()).await;
199 let _ = stdin.shutdown().await;
200 });
201 }
202 }
203
204 let mut stdout_content = String::new();
206 if let Some(mut stdout) = child.stdout.take() {
207 stdout.read_to_string(&mut stdout_content).await?;
208 }
209
210 let mut stderr_content = String::new();
212 if let Some(mut stderr) = child.stderr.take() {
213 stderr.read_to_string(&mut stderr_content).await?;
214 }
215
216 if is_last && self.mirror {
218 if !stdout_content.is_empty() {
219 print!("{}", stdout_content);
220 }
221 if !stderr_content.is_empty() {
222 eprint!("{}", stderr_content);
223 }
224 }
225
226 let status = child.wait().await?;
228 let code = status.code().unwrap_or(-1);
229
230 accumulated_stderr.push_str(&stderr_content);
231
232 if code != 0 {
233 return Ok(CommandResult {
234 stdout: stdout_content,
235 stderr: accumulated_stderr,
236 code,
237 });
238 }
239
240 current_stdin = Some(stdout_content.clone());
242 last_result = CommandResult {
243 stdout: stdout_content,
244 stderr: String::new(),
245 code,
246 };
247 }
248
249 Ok(CommandResult {
250 stdout: last_result.stdout,
251 stderr: accumulated_stderr,
252 code: last_result.code,
253 })
254 }
255
256 async fn try_virtual_command(
258 &self,
259 cmd_name: &str,
260 full_cmd: &str,
261 stdin: &Option<String>,
262 ) -> Option<CommandResult> {
263 let parts: Vec<&str> = full_cmd.split_whitespace().collect();
264 let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
265
266 let ctx = crate::commands::CommandContext {
267 args,
268 stdin: stdin.clone(),
269 cwd: self.cwd.clone(),
270 env: self.env.clone(),
271 output_tx: None,
272 is_cancelled: None,
273 };
274
275 match cmd_name {
276 "echo" => Some(crate::commands::echo(ctx).await),
277 "pwd" => Some(crate::commands::pwd(ctx).await),
278 "cd" => Some(crate::commands::cd(ctx).await),
279 "true" => Some(crate::commands::r#true(ctx).await),
280 "false" => Some(crate::commands::r#false(ctx).await),
281 "sleep" => Some(crate::commands::sleep(ctx).await),
282 "cat" => Some(crate::commands::cat(ctx).await),
283 "ls" => Some(crate::commands::ls(ctx).await),
284 "mkdir" => Some(crate::commands::mkdir(ctx).await),
285 "rm" => Some(crate::commands::rm(ctx).await),
286 "touch" => Some(crate::commands::touch(ctx).await),
287 "cp" => Some(crate::commands::cp(ctx).await),
288 "mv" => Some(crate::commands::mv(ctx).await),
289 "basename" => Some(crate::commands::basename(ctx).await),
290 "dirname" => Some(crate::commands::dirname(ctx).await),
291 "env" => Some(crate::commands::env(ctx).await),
292 "exit" => Some(crate::commands::exit(ctx).await),
293 "which" => Some(crate::commands::which(ctx).await),
294 "yes" => Some(crate::commands::yes(ctx).await),
295 "seq" => Some(crate::commands::seq(ctx).await),
296 "test" => Some(crate::commands::test(ctx).await),
297 _ => None,
298 }
299 }
300}
301
302#[derive(Debug, Clone)]
304struct ShellConfig {
305 cmd: String,
306 args: Vec<String>,
307}
308
309fn find_available_shell() -> ShellConfig {
311 let is_windows = cfg!(windows);
312
313 if is_windows {
314 ShellConfig {
315 cmd: "cmd.exe".to_string(),
316 args: vec!["/c".to_string()],
317 }
318 } else {
319 let shells = [
320 ("/bin/sh", "-c"),
321 ("/usr/bin/sh", "-c"),
322 ("/bin/bash", "-c"),
323 ];
324
325 for (cmd, arg) in shells {
326 if std::path::Path::new(cmd).exists() {
327 return ShellConfig {
328 cmd: cmd.to_string(),
329 args: vec![arg.to_string()],
330 };
331 }
332 }
333
334 ShellConfig {
335 cmd: "/bin/sh".to_string(),
336 args: vec!["-c".to_string()],
337 }
338 }
339}
340
341pub trait PipelineExt {
343 fn pipe(self, command: impl Into<String>) -> PipelineBuilder;
345}
346
347impl PipelineExt for crate::ProcessRunner {
348 fn pipe(self, command: impl Into<String>) -> PipelineBuilder {
349 PipelineBuilder {
350 first: self,
351 additional: vec![command.into()],
352 }
353 }
354}
355
356pub struct PipelineBuilder {
358 first: crate::ProcessRunner,
359 additional: Vec<String>,
360}
361
362impl PipelineBuilder {
363 pub fn pipe(mut self, command: impl Into<String>) -> Self {
365 self.additional.push(command.into());
366 self
367 }
368
369 pub async fn run(mut self) -> Result<CommandResult> {
371 let first_result = self.first.run().await?;
373
374 if first_result.code != 0 {
375 return Ok(first_result);
376 }
377
378 let mut current_stdin = Some(first_result.stdout);
380 let mut accumulated_stderr = first_result.stderr;
381 let mut last_result = CommandResult {
382 stdout: String::new(),
383 stderr: String::new(),
384 code: 0,
385 };
386
387 for cmd_str in &self.additional {
388 let mut runner = crate::ProcessRunner::new(
389 cmd_str.clone(),
390 RunOptions {
391 stdin: StdinOption::Content(current_stdin.take().unwrap_or_default()),
392 mirror: false,
393 capture: true,
394 ..Default::default()
395 },
396 );
397
398 let result = runner.run().await?;
399 accumulated_stderr.push_str(&result.stderr);
400
401 if result.code != 0 {
402 return Ok(CommandResult {
403 stdout: result.stdout,
404 stderr: accumulated_stderr,
405 code: result.code,
406 });
407 }
408
409 current_stdin = Some(result.stdout.clone());
410 last_result = result;
411 }
412
413 Ok(CommandResult {
414 stdout: last_result.stdout,
415 stderr: accumulated_stderr,
416 code: last_result.code,
417 })
418 }
419}