1use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::interpreter::{apply_output_format, ExecResult};
15use crate::tools::{extract_output_format, ExecContext, ToolArgs, ToolRegistry, ToolSchema};
16use tokio::io::AsyncWriteExt;
17
18use super::scatter::{
19 parse_gather_options, parse_scatter_options, ScatterGatherRunner,
20};
21
22async fn apply_redirects(
28 mut result: ExecResult,
29 redirects: &[Redirect],
30 ctx: &ExecContext,
31) -> ExecResult {
32 for redir in redirects {
33 match redir.kind {
34 RedirectKind::MergeStderr => {
35 if !result.err.is_empty() {
37 result.out.push_str(&result.err);
38 result.err.clear();
39 }
40 }
41 RedirectKind::MergeStdout => {
42 if !result.out.is_empty() {
44 result.err.push_str(&result.out);
45 result.out.clear();
46 }
47 }
48 RedirectKind::StdoutOverwrite => {
49 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
50 if let Err(e) = tokio::fs::write(&path, &result.out).await {
51 return ExecResult::failure(1, format!("redirect: {e}"));
52 }
53 result.out.clear();
54 }
55 }
56 RedirectKind::StdoutAppend => {
57 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
58 let file = tokio::fs::OpenOptions::new()
59 .append(true)
60 .create(true)
61 .open(&path)
62 .await;
63 match file {
64 Ok(mut f) => {
65 if let Err(e) = f.write_all(result.out.as_bytes()).await {
66 return ExecResult::failure(1, format!("redirect: {e}"));
67 }
68 }
69 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
70 }
71 result.out.clear();
72 }
73 }
74 RedirectKind::Stderr => {
75 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
76 if let Err(e) = tokio::fs::write(&path, &result.err).await {
77 return ExecResult::failure(1, format!("redirect: {e}"));
78 }
79 result.err.clear();
80 }
81 }
82 RedirectKind::Both => {
83 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
84 let combined = format!("{}{}", result.out, result.err);
85 if let Err(e) = tokio::fs::write(&path, combined).await {
86 return ExecResult::failure(1, format!("redirect: {e}"));
87 }
88 result.out.clear();
89 result.err.clear();
90 }
91 }
92 RedirectKind::Stdin | RedirectKind::HereDoc => {}
94 }
95 }
96 result
97}
98
99fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
101 eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
102}
103
104fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
107 for redir in &cmd.redirects {
108 match &redir.kind {
109 RedirectKind::Stdin => {
110 if let Some(path) = eval_redirect_target(&redir.target, ctx)
111 && let Ok(content) = std::fs::read_to_string(&path) {
112 ctx.set_stdin(content);
113 }
114 }
115 RedirectKind::HereDoc => {
116 if let Expr::Literal(Value::String(content)) = &redir.target {
117 ctx.set_stdin(content.clone());
118 }
119 }
120 _ => {}
121 }
122 }
123}
124
125#[derive(Clone)]
127pub struct PipelineRunner {
128 tools: Arc<ToolRegistry>,
129}
130
131impl PipelineRunner {
132 pub fn new(tools: Arc<ToolRegistry>) -> Self {
134 Self { tools }
135 }
136
137 pub async fn run(
143 &self,
144 commands: &[Command],
145 ctx: &mut ExecContext,
146 ) -> ExecResult {
147 if commands.is_empty() {
148 return ExecResult::success("");
149 }
150
151 if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
153 return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx).await;
154 }
155
156 if commands.len() == 1 {
157 return self.run_single(&commands[0], ctx, None).await;
159 }
160
161 self.run_pipeline(commands, ctx).await
163 }
164
165 async fn run_scatter_gather(
167 &self,
168 commands: &[Command],
169 scatter_idx: usize,
170 gather_idx: usize,
171 ctx: &mut ExecContext,
172 ) -> ExecResult {
173 let pre_scatter = &commands[..scatter_idx];
175 let scatter_cmd = &commands[scatter_idx];
176 let parallel = &commands[scatter_idx + 1..gather_idx];
177 let gather_cmd = &commands[gather_idx];
178 let post_gather = &commands[gather_idx + 1..];
179
180 let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
183 let gather_schema = self.tools.get("gather").map(|t| t.schema());
184 let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
185 let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
186
187 let runner = ScatterGatherRunner::new(self.tools.clone());
189 runner
190 .run(
191 pre_scatter,
192 scatter_opts,
193 parallel,
194 gather_opts,
195 post_gather,
196 ctx,
197 )
198 .await
199 }
200
201 async fn run_single(
203 &self,
204 cmd: &Command,
205 ctx: &mut ExecContext,
206 stdin: Option<String>,
207 ) -> ExecResult {
208 match cmd.name.as_str() {
210 "true" => return ExecResult::success(""),
211 "false" => return ExecResult::failure(1, ""),
212 _ => {}
213 }
214
215 let schema = self.tools.get(&cmd.name).map(|t| t.schema());
217 let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
218 let output_format = extract_output_format(&mut tool_args, schema.as_ref());
219
220 setup_stdin_redirects(cmd, ctx);
222
223 if let Some(input) = stdin {
225 ctx.set_stdin(input);
226 }
227
228 let backend = ctx.backend.clone();
230 let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
231 Ok(tool_result) => {
232 let mut exec = ExecResult::from_output(
233 tool_result.code as i64, tool_result.stdout, tool_result.stderr,
234 );
235 exec.output = tool_result.output;
236 exec
237 }
238 Err(e) => ExecResult::failure(127, e.to_string()),
239 };
240
241 let result = match output_format {
243 Some(format) => apply_output_format(result, format),
244 None => result,
245 };
246
247 apply_redirects(result, &cmd.redirects, ctx).await
249 }
250
251 async fn run_pipeline(
253 &self,
254 commands: &[Command],
255 ctx: &mut ExecContext,
256 ) -> ExecResult {
257 let mut current_stdin: Option<String> = None;
261 let mut current_data: Option<Value> = None;
262 let mut last_result = ExecResult::success("");
263
264 for (i, cmd) in commands.iter().enumerate() {
265 let schema = self.tools.get(&cmd.name).map(|t| t.schema());
267 let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
268 let output_format = extract_output_format(&mut tool_args, schema.as_ref());
269
270 setup_stdin_redirects(cmd, ctx);
272
273 if let Some(input) = current_stdin.take() {
276 ctx.set_stdin_with_data(input, current_data.take());
277 }
278
279 let backend = ctx.backend.clone();
281 last_result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
282 Ok(tool_result) => {
283 let mut exec = ExecResult::from_output(
284 tool_result.code as i64, tool_result.stdout, tool_result.stderr,
285 );
286 exec.output = tool_result.output;
287 exec
288 }
289 Err(e) => ExecResult::failure(127, e.to_string()),
290 };
291
292 last_result = match output_format {
294 Some(format) => apply_output_format(last_result, format),
295 None => last_result,
296 };
297
298 last_result = apply_redirects(last_result, &cmd.redirects, ctx).await;
300
301 if !last_result.ok() {
303 return last_result;
304 }
305
306 if i < commands.len() - 1 {
308 current_stdin = Some(last_result.out.clone());
309 current_data = last_result.data.clone();
310 }
311 }
312
313 last_result
314 }
315}
316
317fn schema_param_types(schema: &ToolSchema) -> HashMap<&str, &str> {
321 schema
322 .params
323 .iter()
324 .map(|p| (p.name.as_str(), p.param_type.as_str()))
325 .collect()
326}
327
328fn is_bool_type(param_type: &str) -> bool {
330 matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
331}
332
333pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
341 let mut tool_args = ToolArgs::new();
342 let param_types = schema.map(schema_param_types).unwrap_or_default();
343
344 let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
346 let mut past_double_dash = false;
347
348 let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
350 for (i, arg) in args.iter().enumerate() {
351 if let Arg::Positional(expr) = arg {
352 positional_indices.push((i, expr));
353 }
354 }
355
356 let mut i = 0;
358 while i < args.len() {
359 let arg = &args[i];
360
361 match arg {
362 Arg::DoubleDash => {
363 past_double_dash = true;
364 }
365 Arg::Positional(expr) => {
366 if !consumed_positionals.contains(&i)
368 && let Some(value) = eval_simple_expr(expr, ctx)
369 {
370 tool_args.positional.push(value);
371 }
372 }
373 Arg::Named { key, value } => {
374 if let Some(val) = eval_simple_expr(value, ctx) {
375 tool_args.named.insert(key.clone(), val);
376 }
377 }
378 Arg::ShortFlag(name) => {
379 if past_double_dash {
380 tool_args.positional.push(Value::String(format!("-{name}")));
382 } else {
383 for c in name.chars() {
385 tool_args.flags.insert(c.to_string());
386 }
387 }
388 }
389 Arg::LongFlag(name) => {
390 if past_double_dash {
391 tool_args.positional.push(Value::String(format!("--{name}")));
393 } else {
394 let is_bool = param_types
396 .get(name.as_str())
397 .map(|t| is_bool_type(t))
398 .unwrap_or(true); if is_bool {
401 tool_args.flags.insert(name.clone());
403 } else {
404 let next_positional = positional_indices
407 .iter()
408 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
409
410 if let Some((pos_idx, expr)) = next_positional {
411 if let Some(value) = eval_simple_expr(expr, ctx) {
412 tool_args.named.insert(name.clone(), value);
413 consumed_positionals.insert(*pos_idx);
414 } else {
415 tool_args.flags.insert(name.clone());
417 }
418 } else {
419 tool_args.flags.insert(name.clone());
422 }
423 }
424 }
425 }
426 }
427 i += 1;
428 }
429
430 tool_args
431}
432
433fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
435 match expr {
436 Expr::Literal(value) => Some(eval_literal(value, ctx)),
437 Expr::VarRef(path) => ctx.scope.resolve_path(path),
438 Expr::Interpolated(parts) => {
439 let mut result = String::new();
440 for part in parts {
441 match part {
442 crate::ast::StringPart::Literal(s) => result.push_str(s),
443 crate::ast::StringPart::Var(path) => {
444 if let Some(value) = ctx.scope.resolve_path(path) {
445 result.push_str(&value_to_string(&value));
446 }
447 }
448 crate::ast::StringPart::VarWithDefault { name, default } => {
449 match ctx.scope.get(name) {
450 Some(value) => {
451 let s = value_to_string(value);
452 if s.is_empty() {
453 result.push_str(&eval_string_parts_sync(default, ctx));
454 } else {
455 result.push_str(&s);
456 }
457 }
458 None => result.push_str(&eval_string_parts_sync(default, ctx)),
459 }
460 }
461 crate::ast::StringPart::VarLength(name) => {
462 let len = match ctx.scope.get(name) {
463 Some(value) => value_to_string(value).len(),
464 None => 0,
465 };
466 result.push_str(&len.to_string());
467 }
468 crate::ast::StringPart::Positional(n) => {
469 if let Some(s) = ctx.scope.get_positional(*n) {
470 result.push_str(s);
471 }
472 }
473 crate::ast::StringPart::AllArgs => {
474 result.push_str(&ctx.scope.all_args().join(" "));
475 }
476 crate::ast::StringPart::ArgCount => {
477 result.push_str(&ctx.scope.arg_count().to_string());
478 }
479 crate::ast::StringPart::Arithmetic(expr) => {
480 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
482 result.push_str(&value.to_string());
483 }
484 }
485 crate::ast::StringPart::CommandSubst(_) => {
486 }
488 crate::ast::StringPart::LastExitCode => {
489 result.push_str(&ctx.scope.last_result().code.to_string());
490 }
491 crate::ast::StringPart::CurrentPid => {
492 result.push_str(&ctx.scope.pid().to_string());
493 }
494 }
495 }
496 Some(Value::String(result))
497 }
498 _ => None, }
500}
501
502fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
504 value.clone()
505}
506
507fn value_to_string(value: &Value) -> String {
509 match value {
510 Value::Null => "".to_string(),
511 Value::Bool(b) => b.to_string(),
512 Value::Int(i) => i.to_string(),
513 Value::Float(f) => f.to_string(),
514 Value::String(s) => s.clone(),
515 Value::Json(json) => json.to_string(),
516 Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
517 }
518}
519
520fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
523 let mut result = String::new();
524 for part in parts {
525 match part {
526 crate::ast::StringPart::Literal(s) => result.push_str(s),
527 crate::ast::StringPart::Var(path) => {
528 if let Some(value) = ctx.scope.resolve_path(path) {
529 result.push_str(&value_to_string(&value));
530 }
531 }
532 crate::ast::StringPart::VarWithDefault { name, default } => {
533 match ctx.scope.get(name) {
534 Some(value) => {
535 let s = value_to_string(value);
536 if s.is_empty() {
537 result.push_str(&eval_string_parts_sync(default, ctx));
538 } else {
539 result.push_str(&s);
540 }
541 }
542 None => result.push_str(&eval_string_parts_sync(default, ctx)),
543 }
544 }
545 crate::ast::StringPart::VarLength(name) => {
546 let len = match ctx.scope.get(name) {
547 Some(value) => value_to_string(value).len(),
548 None => 0,
549 };
550 result.push_str(&len.to_string());
551 }
552 crate::ast::StringPart::Positional(n) => {
553 if let Some(s) = ctx.scope.get_positional(*n) {
554 result.push_str(s);
555 }
556 }
557 crate::ast::StringPart::AllArgs => {
558 result.push_str(&ctx.scope.all_args().join(" "));
559 }
560 crate::ast::StringPart::ArgCount => {
561 result.push_str(&ctx.scope.arg_count().to_string());
562 }
563 crate::ast::StringPart::Arithmetic(expr) => {
564 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
565 result.push_str(&value.to_string());
566 }
567 }
568 crate::ast::StringPart::CommandSubst(_) => {
569 }
571 crate::ast::StringPart::LastExitCode => {
572 result.push_str(&ctx.scope.last_result().code.to_string());
573 }
574 crate::ast::StringPart::CurrentPid => {
575 result.push_str(&ctx.scope.pid().to_string());
576 }
577 }
578 }
579 result
580}
581
582fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
587 let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
588 let gather_idx = commands.iter().position(|c| c.name == "gather")?;
589
590 if gather_idx > scatter_idx {
592 Some((scatter_idx, gather_idx))
593 } else {
594 None
595 }
596}
597
598pub async fn run_sequential_pipeline(
603 tools: &Arc<ToolRegistry>,
604 commands: &[Command],
605 ctx: &mut ExecContext,
606) -> ExecResult {
607 if commands.is_empty() {
608 return ExecResult::success("");
609 }
610
611 let mut current_stdin: Option<String> = ctx.take_stdin();
612 let mut current_data: Option<Value> = ctx.take_stdin_data();
613 let mut last_result = ExecResult::success("");
614
615 for (i, cmd) in commands.iter().enumerate() {
616 match cmd.name.as_str() {
618 "true" => {
619 last_result = ExecResult::success("");
620 if i < commands.len() - 1 {
621 current_stdin = Some(last_result.out.clone());
622 current_data = last_result.data.clone();
623 }
624 continue;
625 }
626 "false" => {
627 return ExecResult::failure(1, "");
628 }
629 _ => {}
630 }
631
632 let schema = tools.get(&cmd.name).map(|t| t.schema());
634 let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
635 let output_format = extract_output_format(&mut tool_args, schema.as_ref());
636
637 setup_stdin_redirects(cmd, ctx);
639
640 if let Some(input) = current_stdin.take() {
643 ctx.set_stdin_with_data(input, current_data.take());
644 }
645
646 let backend = ctx.backend.clone();
648 last_result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
649 Ok(tool_result) => {
650 let mut exec = ExecResult::from_output(
651 tool_result.code as i64, tool_result.stdout, tool_result.stderr,
652 );
653 exec.output = tool_result.output;
654 exec
655 }
656 Err(e) => ExecResult::failure(127, e.to_string()),
657 };
658
659 last_result = match output_format {
661 Some(format) => apply_output_format(last_result, format),
662 None => last_result,
663 };
664
665 last_result = apply_redirects(last_result, &cmd.redirects, ctx).await;
667
668 if !last_result.ok() {
670 return last_result;
671 }
672
673 if i < commands.len() - 1 {
675 current_stdin = Some(last_result.out.clone());
676 current_data = last_result.data.clone();
677 }
678 }
679
680 last_result
681}
682
683pub async fn run_sequential_pipeline_owned(
687 tools: Arc<ToolRegistry>,
688 commands: Vec<Command>,
689 ctx: &mut ExecContext,
690) -> ExecResult {
691 run_sequential_pipeline(&tools, &commands, ctx).await
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use crate::tools::register_builtins;
698 use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
699 use std::path::Path;
700
701 async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext) {
702 let mut tools = ToolRegistry::new();
703 register_builtins(&mut tools);
704 let tools = Arc::new(tools);
705 let runner = PipelineRunner::new(tools.clone());
706
707 let mut vfs = VfsRouter::new();
708 let mem = MemoryFs::new();
709 mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
710 vfs.mount("/", mem);
711 let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
712
713 (runner, ctx)
714 }
715
716 fn make_cmd(name: &str, args: Vec<&str>) -> Command {
717 Command {
718 name: name.to_string(),
719 args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
720 redirects: vec![],
721 }
722 }
723
724 #[tokio::test]
725 async fn test_single_command() {
726 let (runner, mut ctx) = make_runner_and_ctx().await;
727 let cmd = make_cmd("echo", vec!["hello"]);
728
729 let result = runner.run(&[cmd], &mut ctx).await;
730 assert!(result.ok());
731 assert_eq!(result.out.trim(), "hello");
732 }
733
734 #[tokio::test]
735 async fn test_pipeline_echo_grep() {
736 let (runner, mut ctx) = make_runner_and_ctx().await;
737
738 let echo_cmd = Command {
740 name: "echo".to_string(),
741 args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
742 redirects: vec![],
743 };
744 let grep_cmd = Command {
745 name: "grep".to_string(),
746 args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
747 redirects: vec![],
748 };
749
750 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx).await;
751 assert!(result.ok());
752 assert_eq!(result.out.trim(), "world");
753 }
754
755 #[tokio::test]
756 async fn test_pipeline_cat_grep() {
757 let (runner, mut ctx) = make_runner_and_ctx().await;
758
759 let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
761 let grep_cmd = Command {
762 name: "grep".to_string(),
763 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
764 redirects: vec![],
765 };
766
767 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx).await;
768 assert!(result.ok());
769 assert!(result.out.contains("hello"));
770 }
771
772 #[tokio::test]
773 async fn test_command_not_found() {
774 let (runner, mut ctx) = make_runner_and_ctx().await;
775 let cmd = make_cmd("nonexistent", vec![]);
776
777 let result = runner.run(&[cmd], &mut ctx).await;
778 assert!(!result.ok());
779 assert_eq!(result.code, 127);
780 assert!(result.err.contains("not found"));
781 }
782
783 #[tokio::test]
784 async fn test_pipeline_stops_on_failure() {
785 let (runner, mut ctx) = make_runner_and_ctx().await;
786
787 let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
789 let grep_cmd = Command {
790 name: "grep".to_string(),
791 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
792 redirects: vec![],
793 };
794
795 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx).await;
796 assert!(!result.ok());
797 }
798
799 #[tokio::test]
800 async fn test_empty_pipeline() {
801 let (runner, mut ctx) = make_runner_and_ctx().await;
802 let result = runner.run(&[], &mut ctx).await;
803 assert!(result.ok());
804 }
805
806 #[test]
809 fn test_find_scatter_gather_both_present() {
810 let commands = vec![
811 make_cmd("echo", vec!["a"]),
812 make_cmd("scatter", vec![]),
813 make_cmd("process", vec![]),
814 make_cmd("gather", vec![]),
815 ];
816 let result = find_scatter_gather(&commands);
817 assert_eq!(result, Some((1, 3)));
818 }
819
820 #[test]
821 fn test_find_scatter_gather_no_scatter() {
822 let commands = vec![
823 make_cmd("echo", vec!["a"]),
824 make_cmd("gather", vec![]),
825 ];
826 let result = find_scatter_gather(&commands);
827 assert!(result.is_none());
828 }
829
830 #[test]
831 fn test_find_scatter_gather_no_gather() {
832 let commands = vec![
833 make_cmd("echo", vec!["a"]),
834 make_cmd("scatter", vec![]),
835 ];
836 let result = find_scatter_gather(&commands);
837 assert!(result.is_none());
838 }
839
840 #[test]
841 fn test_find_scatter_gather_wrong_order() {
842 let commands = vec![
843 make_cmd("gather", vec![]),
844 make_cmd("scatter", vec![]),
845 ];
846 let result = find_scatter_gather(&commands);
847 assert!(result.is_none());
848 }
849
850 #[tokio::test]
851 async fn test_scatter_gather_simple() {
852 let (runner, mut ctx) = make_runner_and_ctx().await;
853
854 let echo_cmd = Command {
856 name: "echo".to_string(),
857 args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb\nc".to_string())))],
858 redirects: vec![],
859 };
860 let scatter_cmd = make_cmd("scatter", vec![]);
861 let process_cmd = Command {
862 name: "echo".to_string(),
863 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
864 redirects: vec![],
865 };
866 let gather_cmd = make_cmd("gather", vec![]);
867
868 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
869 assert!(result.ok());
870 assert!(result.out.contains("a"));
872 assert!(result.out.contains("b"));
873 assert!(result.out.contains("c"));
874 }
875
876 #[tokio::test]
877 async fn test_scatter_gather_empty_input() {
878 let (runner, mut ctx) = make_runner_and_ctx().await;
879
880 let echo_cmd = Command {
882 name: "echo".to_string(),
883 args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
884 redirects: vec![],
885 };
886 let scatter_cmd = make_cmd("scatter", vec![]);
887 let process_cmd = Command {
888 name: "echo".to_string(),
889 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
890 redirects: vec![],
891 };
892 let gather_cmd = make_cmd("gather", vec![]);
893
894 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
895 assert!(result.ok());
896 assert!(result.out.trim().is_empty());
897 }
898
899 #[tokio::test]
900 async fn test_scatter_gather_with_stdin() {
901 let (runner, mut ctx) = make_runner_and_ctx().await;
902
903 ctx.set_stdin("x\ny\nz".to_string());
905
906 let scatter_cmd = make_cmd("scatter", vec![]);
907 let process_cmd = Command {
908 name: "echo".to_string(),
909 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
910 redirects: vec![],
911 };
912 let gather_cmd = make_cmd("gather", vec![]);
913
914 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
915 assert!(result.ok());
916 assert!(result.out.contains("x"));
917 assert!(result.out.contains("y"));
918 assert!(result.out.contains("z"));
919 }
920
921 #[tokio::test]
922 async fn test_scatter_gather_json_input() {
923 let (runner, mut ctx) = make_runner_and_ctx().await;
924
925 ctx.set_stdin(r#"["one", "two", "three"]"#.to_string());
927
928 let scatter_cmd = make_cmd("scatter", vec![]);
929 let process_cmd = Command {
930 name: "echo".to_string(),
931 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
932 redirects: vec![],
933 };
934 let gather_cmd = make_cmd("gather", vec![]);
935
936 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
937 assert!(result.ok());
938 assert!(result.out.contains("one"));
939 assert!(result.out.contains("two"));
940 assert!(result.out.contains("three"));
941 }
942
943 #[tokio::test]
944 async fn test_scatter_gather_with_post_gather() {
945 let (runner, mut ctx) = make_runner_and_ctx().await;
946
947 let echo_cmd = Command {
949 name: "echo".to_string(),
950 args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb".to_string())))],
951 redirects: vec![],
952 };
953 let scatter_cmd = make_cmd("scatter", vec![]);
954 let process_cmd = Command {
955 name: "echo".to_string(),
956 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
957 redirects: vec![],
958 };
959 let gather_cmd = make_cmd("gather", vec![]);
960 let grep_cmd = Command {
961 name: "grep".to_string(),
962 args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
963 redirects: vec![],
964 };
965
966 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx).await;
967 assert!(result.ok());
968 assert!(result.out.contains("a"));
969 assert!(!result.out.contains("b"));
970 }
971
972 #[tokio::test]
973 async fn test_scatter_custom_var_name() {
974 let (runner, mut ctx) = make_runner_and_ctx().await;
975
976 ctx.set_stdin("test1\ntest2".to_string());
977
978 let scatter_cmd = Command {
980 name: "scatter".to_string(),
981 args: vec![Arg::Named {
982 key: "as".to_string(),
983 value: Expr::Literal(Value::String("URL".to_string())),
984 }],
985 redirects: vec![],
986 };
987 let process_cmd = Command {
988 name: "echo".to_string(),
989 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
990 redirects: vec![],
991 };
992 let gather_cmd = make_cmd("gather", vec![]);
993
994 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
995 assert!(result.ok());
996 assert!(result.out.contains("test1"));
997 assert!(result.out.contains("test2"));
998 }
999
1000 #[tokio::test]
1003 async fn test_pipeline_routes_through_backend() {
1004 use crate::backend::testing::MockBackend;
1005 use std::sync::atomic::Ordering;
1006
1007 let (backend, call_count) = MockBackend::new();
1009 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1010
1011 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1013
1014 let tools = std::sync::Arc::new(ToolRegistry::new());
1016 let runner = PipelineRunner::new(tools);
1017
1018 let cmd = make_cmd("test-tool", vec!["arg1"]);
1020 let result = runner.run(&[cmd], &mut ctx).await;
1021
1022 assert!(result.ok(), "Mock backend should return success");
1023 assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1024 assert!(result.out.contains("mock executed"), "Output should be from mock backend");
1025 }
1026
1027 #[tokio::test]
1028 async fn test_multi_command_pipeline_routes_through_backend() {
1029 use crate::backend::testing::MockBackend;
1030 use std::sync::atomic::Ordering;
1031
1032 let (backend, call_count) = MockBackend::new();
1033 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1034 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1035
1036 let tools = std::sync::Arc::new(ToolRegistry::new());
1037 let runner = PipelineRunner::new(tools);
1038
1039 let cmd1 = make_cmd("tool1", vec![]);
1041 let cmd2 = make_cmd("tool2", vec![]);
1042 let cmd3 = make_cmd("tool3", vec![]);
1043
1044 let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx).await;
1045
1046 assert!(result.ok());
1047 assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1048 }
1049
1050 use crate::tools::{ParamSchema, ToolSchema};
1053
1054 fn make_test_schema() -> ToolSchema {
1055 ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1056 .param(ParamSchema::required("query", "string", "Search query"))
1057 .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1058 .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1059 .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1060 }
1061
1062 fn make_minimal_ctx() -> ExecContext {
1063 let mut vfs = VfsRouter::new();
1064 vfs.mount("/", MemoryFs::new());
1065 ExecContext::new(Arc::new(vfs))
1066 }
1067
1068 #[test]
1069 fn test_schema_aware_string_arg() {
1070 let args = vec![
1072 Arg::LongFlag("query".to_string()),
1073 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1074 ];
1075 let schema = make_test_schema();
1076 let ctx = make_minimal_ctx();
1077
1078 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1079
1080 assert!(tool_args.flags.is_empty(), "No flags should be set");
1081 assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1082 assert_eq!(
1083 tool_args.named.get("query"),
1084 Some(&Value::String("test".to_string())),
1085 "--query should consume 'test' as its value"
1086 );
1087 }
1088
1089 #[test]
1090 fn test_schema_aware_bool_flag() {
1091 let args = vec![
1093 Arg::LongFlag("verbose".to_string()),
1094 ];
1095 let schema = make_test_schema();
1096 let ctx = make_minimal_ctx();
1097
1098 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1099
1100 assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1101 assert!(tool_args.named.is_empty(), "No named args");
1102 assert!(tool_args.positional.is_empty(), "No positionals");
1103 }
1104
1105 #[test]
1106 fn test_schema_aware_mixed() {
1107 let args = vec![
1110 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1111 Arg::LongFlag("output".to_string()),
1112 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1113 Arg::LongFlag("verbose".to_string()),
1114 ];
1115 let schema = make_test_schema();
1116 let ctx = make_minimal_ctx();
1117
1118 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1119
1120 assert_eq!(tool_args.positional, vec![Value::String("file.txt".to_string())]);
1121 assert_eq!(
1122 tool_args.named.get("output"),
1123 Some(&Value::String("out.txt".to_string()))
1124 );
1125 assert!(tool_args.flags.contains("verbose"));
1126 }
1127
1128 #[test]
1129 fn test_schema_aware_multiple_string_args() {
1130 let args = vec![
1132 Arg::LongFlag("query".to_string()),
1133 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1134 Arg::LongFlag("output".to_string()),
1135 Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1136 Arg::LongFlag("verbose".to_string()),
1137 Arg::LongFlag("limit".to_string()),
1138 Arg::Positional(Expr::Literal(Value::Int(5))),
1139 ];
1140 let schema = make_test_schema();
1141 let ctx = make_minimal_ctx();
1142
1143 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1144
1145 assert!(tool_args.positional.is_empty(), "All positionals consumed");
1146 assert_eq!(
1147 tool_args.named.get("query"),
1148 Some(&Value::String("test".to_string()))
1149 );
1150 assert_eq!(
1151 tool_args.named.get("output"),
1152 Some(&Value::String("result.json".to_string()))
1153 );
1154 assert_eq!(
1155 tool_args.named.get("limit"),
1156 Some(&Value::Int(5))
1157 );
1158 assert!(tool_args.flags.contains("verbose"));
1159 }
1160
1161 #[test]
1162 fn test_schema_aware_double_dash() {
1163 let args = vec![
1166 Arg::LongFlag("output".to_string()),
1167 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1168 Arg::DoubleDash,
1169 Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1170 ];
1171 let schema = make_test_schema();
1172 let ctx = make_minimal_ctx();
1173
1174 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1175
1176 assert_eq!(
1177 tool_args.named.get("output"),
1178 Some(&Value::String("out.txt".to_string()))
1179 );
1180 assert_eq!(
1182 tool_args.positional,
1183 vec![Value::String("--this-is-data".to_string())]
1184 );
1185 }
1186
1187 #[test]
1188 fn test_no_schema_fallback() {
1189 let args = vec![
1191 Arg::LongFlag("query".to_string()),
1192 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1193 ];
1194 let ctx = make_minimal_ctx();
1195
1196 let tool_args = build_tool_args(&args, &ctx, None);
1197
1198 assert!(tool_args.flags.contains("query"), "--query should be a flag");
1200 assert_eq!(
1201 tool_args.positional,
1202 vec![Value::String("test".to_string())],
1203 "'test' should be a positional"
1204 );
1205 }
1206
1207 #[test]
1208 fn test_unknown_flag_in_schema() {
1209 let args = vec![
1211 Arg::LongFlag("unknown".to_string()),
1212 Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1213 ];
1214 let schema = make_test_schema();
1215 let ctx = make_minimal_ctx();
1216
1217 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1218
1219 assert!(tool_args.flags.contains("unknown"));
1221 assert_eq!(
1222 tool_args.positional,
1223 vec![Value::String("value".to_string())]
1224 );
1225 }
1226
1227 #[test]
1228 fn test_named_args_unchanged() {
1229 let args = vec![
1231 Arg::Named {
1232 key: "query".to_string(),
1233 value: Expr::Literal(Value::String("test".to_string())),
1234 },
1235 Arg::LongFlag("verbose".to_string()),
1236 ];
1237 let schema = make_test_schema();
1238 let ctx = make_minimal_ctx();
1239
1240 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1241
1242 assert_eq!(
1243 tool_args.named.get("query"),
1244 Some(&Value::String("test".to_string()))
1245 );
1246 assert!(tool_args.flags.contains("verbose"));
1247 }
1248
1249 #[test]
1250 fn test_short_flags_unchanged() {
1251 let args = vec![
1253 Arg::ShortFlag("la".to_string()),
1254 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1255 ];
1256 let schema = make_test_schema();
1257 let ctx = make_minimal_ctx();
1258
1259 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1260
1261 assert!(tool_args.flags.contains("l"));
1262 assert!(tool_args.flags.contains("a"));
1263 assert_eq!(
1264 tool_args.positional,
1265 vec![Value::String("file.txt".to_string())]
1266 );
1267 }
1268
1269 #[test]
1270 fn test_flag_at_end_no_value() {
1271 let args = vec![
1273 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1274 Arg::LongFlag("output".to_string()),
1275 ];
1276 let schema = make_test_schema();
1277 let ctx = make_minimal_ctx();
1278
1279 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1280
1281 assert!(tool_args.flags.contains("output"));
1283 assert_eq!(
1284 tool_args.positional,
1285 vec![Value::String("file.txt".to_string())]
1286 );
1287 }
1288
1289 #[tokio::test]
1292 async fn test_merge_stderr_redirect() {
1293 let result = ExecResult::from_output(0, "stdout content", "stderr content");
1295
1296 let redirects = vec![Redirect {
1297 kind: RedirectKind::MergeStderr,
1298 target: Expr::Literal(Value::Null),
1299 }];
1300
1301 let ctx = make_minimal_ctx();
1302 let result = apply_redirects(result, &redirects, &ctx).await;
1303
1304 assert_eq!(result.out, "stdout contentstderr content");
1305 assert!(result.err.is_empty());
1306 }
1307
1308 #[tokio::test]
1309 async fn test_merge_stderr_with_empty_stderr() {
1310 let result = ExecResult::from_output(0, "stdout only", "");
1312
1313 let redirects = vec![Redirect {
1314 kind: RedirectKind::MergeStderr,
1315 target: Expr::Literal(Value::Null),
1316 }];
1317
1318 let ctx = make_minimal_ctx();
1319 let result = apply_redirects(result, &redirects, &ctx).await;
1320
1321 assert_eq!(result.out, "stdout only");
1322 assert!(result.err.is_empty());
1323 }
1324
1325 #[tokio::test]
1326 async fn test_merge_stderr_order_matters() {
1327 let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1332
1333 let redirects = vec![Redirect {
1335 kind: RedirectKind::MergeStderr,
1336 target: Expr::Literal(Value::Null),
1337 }];
1338
1339 let ctx = make_minimal_ctx();
1340 let result = apply_redirects(result, &redirects, &ctx).await;
1341
1342 assert_eq!(result.out, "stdout\nstderr\n");
1343 assert!(result.err.is_empty());
1344 }
1345
1346 #[tokio::test]
1347 async fn test_redirect_with_command_execution() {
1348 let (runner, mut ctx) = make_runner_and_ctx().await;
1349
1350 let cmd = Command {
1352 name: "echo".to_string(),
1353 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1354 redirects: vec![Redirect {
1355 kind: RedirectKind::MergeStderr,
1356 target: Expr::Literal(Value::Null),
1357 }],
1358 };
1359
1360 let result = runner.run(&[cmd], &mut ctx).await;
1361 assert!(result.ok());
1362 assert!(result.out.contains("hello"));
1364 }
1365
1366 #[tokio::test]
1367 async fn test_merge_stderr_in_pipeline() {
1368 let (runner, mut ctx) = make_runner_and_ctx().await;
1369
1370 let echo_cmd = Command {
1373 name: "echo".to_string(),
1374 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1375 redirects: vec![Redirect {
1376 kind: RedirectKind::MergeStderr,
1377 target: Expr::Literal(Value::Null),
1378 }],
1379 };
1380 let grep_cmd = Command {
1381 name: "grep".to_string(),
1382 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1383 redirects: vec![],
1384 };
1385
1386 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx).await;
1387 assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1388 assert!(result.out.contains("output"));
1389 }
1390}