1use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::scatter::{
20 parse_gather_options, parse_scatter_options, ScatterGatherRunner,
21};
22
23async fn apply_redirects(
29 mut result: ExecResult,
30 redirects: &[Redirect],
31 ctx: &ExecContext,
32) -> ExecResult {
33 for redir in redirects {
34 match redir.kind {
35 RedirectKind::MergeStderr => {
36 if !result.err.is_empty() {
38 result.out.push_str(&result.err);
39 result.err.clear();
40 }
41 }
42 RedirectKind::MergeStdout => {
43 if !result.out.is_empty() {
45 result.err.push_str(&result.out);
46 result.out.clear();
47 }
48 }
49 RedirectKind::StdoutOverwrite => {
50 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
51 if let Err(e) = tokio::fs::write(&path, &result.out).await {
52 return ExecResult::failure(1, format!("redirect: {e}"));
53 }
54 result.out.clear();
55 }
56 }
57 RedirectKind::StdoutAppend => {
58 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
59 let file = tokio::fs::OpenOptions::new()
60 .append(true)
61 .create(true)
62 .open(&path)
63 .await;
64 match file {
65 Ok(mut f) => {
66 if let Err(e) = f.write_all(result.out.as_bytes()).await {
67 return ExecResult::failure(1, format!("redirect: {e}"));
68 }
69 }
70 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
71 }
72 result.out.clear();
73 }
74 }
75 RedirectKind::Stderr => {
76 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
77 if let Err(e) = tokio::fs::write(&path, &result.err).await {
78 return ExecResult::failure(1, format!("redirect: {e}"));
79 }
80 result.err.clear();
81 }
82 }
83 RedirectKind::Both => {
84 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
85 let combined = format!("{}{}", result.out, result.err);
86 if let Err(e) = tokio::fs::write(&path, combined).await {
87 return ExecResult::failure(1, format!("redirect: {e}"));
88 }
89 result.out.clear();
90 result.err.clear();
91 }
92 }
93 RedirectKind::Stdin | RedirectKind::HereDoc => {}
95 }
96 }
97 result
98}
99
100fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
102 eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
103}
104
105fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
108 for redir in &cmd.redirects {
109 match &redir.kind {
110 RedirectKind::Stdin => {
111 if let Some(path) = eval_redirect_target(&redir.target, ctx)
112 && let Ok(content) = std::fs::read_to_string(&path) {
113 ctx.set_stdin(content);
114 }
115 }
116 RedirectKind::HereDoc => {
117 if let Expr::Literal(Value::String(content)) = &redir.target {
118 ctx.set_stdin(content.clone());
119 }
120 }
121 _ => {}
122 }
123 }
124}
125
126#[derive(Clone)]
128pub struct PipelineRunner {
129 tools: Arc<ToolRegistry>,
130}
131
132impl PipelineRunner {
133 pub fn new(tools: Arc<ToolRegistry>) -> Self {
135 Self { tools }
136 }
137
138 pub async fn run(
148 &self,
149 commands: &[Command],
150 ctx: &mut ExecContext,
151 dispatcher: &dyn CommandDispatcher,
152 ) -> ExecResult {
153 if commands.is_empty() {
154 return ExecResult::success("");
155 }
156
157 if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
159 return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
160 }
161
162 self.run_sequential(commands, ctx, dispatcher).await
163 }
164
165 pub async fn run_sequential(
170 &self,
171 commands: &[Command],
172 ctx: &mut ExecContext,
173 dispatcher: &dyn CommandDispatcher,
174 ) -> ExecResult {
175 if commands.is_empty() {
176 return ExecResult::success("");
177 }
178
179 if commands.len() == 1 {
180 return self.run_single(&commands[0], ctx, None, dispatcher).await;
182 }
183
184 self.run_pipeline(commands, ctx, dispatcher).await
186 }
187
188 async fn run_scatter_gather(
190 &self,
191 commands: &[Command],
192 scatter_idx: usize,
193 gather_idx: usize,
194 ctx: &mut ExecContext,
195 _dispatcher: &dyn CommandDispatcher,
196 ) -> ExecResult {
197 let pre_scatter = &commands[..scatter_idx];
199 let scatter_cmd = &commands[scatter_idx];
200 let parallel = &commands[scatter_idx + 1..gather_idx];
201 let gather_cmd = &commands[gather_idx];
202 let post_gather = &commands[gather_idx + 1..];
203
204 let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
207 let gather_schema = self.tools.get("gather").map(|t| t.schema());
208 let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
209 let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
210
211 let scatter_dispatcher: Arc<dyn CommandDispatcher> =
215 Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
216
217 let runner = ScatterGatherRunner::new(self.tools.clone(), scatter_dispatcher);
218 runner
219 .run(
220 pre_scatter,
221 scatter_opts,
222 parallel,
223 gather_opts,
224 post_gather,
225 ctx,
226 )
227 .await
228 }
229
230 async fn run_single(
235 &self,
236 cmd: &Command,
237 ctx: &mut ExecContext,
238 stdin: Option<String>,
239 dispatcher: &dyn CommandDispatcher,
240 ) -> ExecResult {
241 setup_stdin_redirects(cmd, ctx);
243
244 if let Some(input) = stdin {
246 ctx.set_stdin(input);
247 }
248
249 ctx.pipeline_position = PipelinePosition::Only;
251
252 let result = match dispatcher.dispatch(cmd, ctx).await {
254 Ok(result) => result,
255 Err(e) => ExecResult::failure(1, e.to_string()),
256 };
257
258 apply_redirects(result, &cmd.redirects, ctx).await
260 }
261
262 async fn run_pipeline(
267 &self,
268 commands: &[Command],
269 ctx: &mut ExecContext,
270 dispatcher: &dyn CommandDispatcher,
271 ) -> ExecResult {
272 let mut current_stdin: Option<String> = None;
273 let mut current_data: Option<Value> = None;
274 let mut last_result = ExecResult::success("");
275 let last_idx = commands.len() - 1;
276
277 for (i, cmd) in commands.iter().enumerate() {
278 setup_stdin_redirects(cmd, ctx);
280
281 if let Some(input) = current_stdin.take() {
284 ctx.set_stdin_with_data(input, current_data.take());
285 }
286
287 ctx.pipeline_position = match i {
289 0 if last_idx == 0 => PipelinePosition::Only,
290 0 => PipelinePosition::First,
291 n if n == last_idx => PipelinePosition::Last,
292 _ => PipelinePosition::Middle,
293 };
294
295 last_result = match dispatcher.dispatch(cmd, ctx).await {
297 Ok(result) => result,
298 Err(e) => ExecResult::failure(1, e.to_string()),
299 };
300
301 last_result = apply_redirects(last_result, &cmd.redirects, ctx).await;
303
304 if !last_result.ok() {
306 return last_result;
307 }
308
309 if i < last_idx {
311 current_stdin = Some(last_result.out.clone());
312 current_data = last_result.data.clone();
313 }
314 }
315
316 last_result
317 }
318}
319
320pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str)> {
328 let mut map = HashMap::new();
329 for p in &schema.params {
330 map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str()));
331 for alias in &p.aliases {
332 let stripped = alias.trim_start_matches('-');
333 map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str()));
334 }
335 }
336 map
337}
338
339pub fn is_bool_type(param_type: &str) -> bool {
341 matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
342}
343
344pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
352 let mut tool_args = ToolArgs::new();
353 let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
354
355 let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
357 let mut past_double_dash = false;
358
359 let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
361 for (i, arg) in args.iter().enumerate() {
362 if let Arg::Positional(expr) = arg {
363 positional_indices.push((i, expr));
364 }
365 }
366
367 let mut i = 0;
369 while i < args.len() {
370 let arg = &args[i];
371
372 match arg {
373 Arg::DoubleDash => {
374 past_double_dash = true;
375 }
376 Arg::Positional(expr) => {
377 if !consumed_positionals.contains(&i)
379 && let Some(value) = eval_simple_expr(expr, ctx)
380 {
381 tool_args.positional.push(value);
382 }
383 }
384 Arg::Named { key, value } => {
385 if let Some(val) = eval_simple_expr(value, ctx) {
386 tool_args.named.insert(key.clone(), val);
387 }
388 }
389 Arg::ShortFlag(name) => {
390 if past_double_dash {
391 tool_args.positional.push(Value::String(format!("-{name}")));
392 } else if name.len() == 1 {
393 let flag_name = name.as_str();
396 let lookup = param_lookup.get(flag_name);
397 let is_bool = lookup
398 .map(|(_, typ)| is_bool_type(typ))
399 .unwrap_or(true);
400
401 if is_bool {
402 tool_args.flags.insert(flag_name.to_string());
403 } else {
404 let canonical = lookup.map(|(name, _)| *name).unwrap_or(flag_name);
406 let next_positional = positional_indices
407 .iter()
408 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
409
410 if let Some((pos_idx, expr)) = next_positional {
411 if let Some(value) = eval_simple_expr(expr, ctx) {
412 tool_args.named.insert(canonical.to_string(), value);
413 consumed_positionals.insert(*pos_idx);
414 } else {
415 tool_args.flags.insert(flag_name.to_string());
416 }
417 } else {
418 tool_args.flags.insert(flag_name.to_string());
419 }
420 }
421 } else {
422 for c in name.chars() {
424 tool_args.flags.insert(c.to_string());
425 }
426 }
427 }
428 Arg::LongFlag(name) => {
429 if past_double_dash {
430 tool_args.positional.push(Value::String(format!("--{name}")));
431 } else {
432 let lookup = param_lookup.get(name.as_str());
434 let is_bool = lookup
435 .map(|(_, typ)| is_bool_type(typ))
436 .unwrap_or(true); if is_bool {
439 tool_args.flags.insert(name.clone());
440 } else {
441 let canonical = lookup.map(|(name, _)| *name).unwrap_or(name.as_str());
443 let next_positional = positional_indices
444 .iter()
445 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
446
447 if let Some((pos_idx, expr)) = next_positional {
448 if let Some(value) = eval_simple_expr(expr, ctx) {
449 tool_args.named.insert(canonical.to_string(), value);
450 consumed_positionals.insert(*pos_idx);
451 } else {
452 tool_args.flags.insert(name.clone());
453 }
454 } else {
455 tool_args.flags.insert(name.clone());
456 }
457 }
458 }
459 }
460 }
461 i += 1;
462 }
463
464 tool_args
465}
466
467fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
469 match expr {
470 Expr::Literal(value) => Some(eval_literal(value, ctx)),
471 Expr::VarRef(path) => ctx.scope.resolve_path(path),
472 Expr::Interpolated(parts) => {
473 let mut result = String::new();
474 for part in parts {
475 match part {
476 crate::ast::StringPart::Literal(s) => result.push_str(s),
477 crate::ast::StringPart::Var(path) => {
478 if let Some(value) = ctx.scope.resolve_path(path) {
479 result.push_str(&value_to_string(&value));
480 }
481 }
482 crate::ast::StringPart::VarWithDefault { name, default } => {
483 match ctx.scope.get(name) {
484 Some(value) => {
485 let s = value_to_string(value);
486 if s.is_empty() {
487 result.push_str(&eval_string_parts_sync(default, ctx));
488 } else {
489 result.push_str(&s);
490 }
491 }
492 None => result.push_str(&eval_string_parts_sync(default, ctx)),
493 }
494 }
495 crate::ast::StringPart::VarLength(name) => {
496 let len = match ctx.scope.get(name) {
497 Some(value) => value_to_string(value).len(),
498 None => 0,
499 };
500 result.push_str(&len.to_string());
501 }
502 crate::ast::StringPart::Positional(n) => {
503 if let Some(s) = ctx.scope.get_positional(*n) {
504 result.push_str(s);
505 }
506 }
507 crate::ast::StringPart::AllArgs => {
508 result.push_str(&ctx.scope.all_args().join(" "));
509 }
510 crate::ast::StringPart::ArgCount => {
511 result.push_str(&ctx.scope.arg_count().to_string());
512 }
513 crate::ast::StringPart::Arithmetic(expr) => {
514 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
516 result.push_str(&value.to_string());
517 }
518 }
519 crate::ast::StringPart::CommandSubst(_) => {
520 }
522 crate::ast::StringPart::LastExitCode => {
523 result.push_str(&ctx.scope.last_result().code.to_string());
524 }
525 crate::ast::StringPart::CurrentPid => {
526 result.push_str(&ctx.scope.pid().to_string());
527 }
528 }
529 }
530 Some(Value::String(result))
531 }
532 _ => None, }
534}
535
536fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
538 value.clone()
539}
540
541fn value_to_string(value: &Value) -> String {
543 match value {
544 Value::Null => "".to_string(),
545 Value::Bool(b) => b.to_string(),
546 Value::Int(i) => i.to_string(),
547 Value::Float(f) => f.to_string(),
548 Value::String(s) => s.clone(),
549 Value::Json(json) => json.to_string(),
550 Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
551 }
552}
553
554fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
557 let mut result = String::new();
558 for part in parts {
559 match part {
560 crate::ast::StringPart::Literal(s) => result.push_str(s),
561 crate::ast::StringPart::Var(path) => {
562 if let Some(value) = ctx.scope.resolve_path(path) {
563 result.push_str(&value_to_string(&value));
564 }
565 }
566 crate::ast::StringPart::VarWithDefault { name, default } => {
567 match ctx.scope.get(name) {
568 Some(value) => {
569 let s = value_to_string(value);
570 if s.is_empty() {
571 result.push_str(&eval_string_parts_sync(default, ctx));
572 } else {
573 result.push_str(&s);
574 }
575 }
576 None => result.push_str(&eval_string_parts_sync(default, ctx)),
577 }
578 }
579 crate::ast::StringPart::VarLength(name) => {
580 let len = match ctx.scope.get(name) {
581 Some(value) => value_to_string(value).len(),
582 None => 0,
583 };
584 result.push_str(&len.to_string());
585 }
586 crate::ast::StringPart::Positional(n) => {
587 if let Some(s) = ctx.scope.get_positional(*n) {
588 result.push_str(s);
589 }
590 }
591 crate::ast::StringPart::AllArgs => {
592 result.push_str(&ctx.scope.all_args().join(" "));
593 }
594 crate::ast::StringPart::ArgCount => {
595 result.push_str(&ctx.scope.arg_count().to_string());
596 }
597 crate::ast::StringPart::Arithmetic(expr) => {
598 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
599 result.push_str(&value.to_string());
600 }
601 }
602 crate::ast::StringPart::CommandSubst(_) => {
603 }
605 crate::ast::StringPart::LastExitCode => {
606 result.push_str(&ctx.scope.last_result().code.to_string());
607 }
608 crate::ast::StringPart::CurrentPid => {
609 result.push_str(&ctx.scope.pid().to_string());
610 }
611 }
612 }
613 result
614}
615
616fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
621 let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
622 let gather_idx = commands.iter().position(|c| c.name == "gather")?;
623
624 if gather_idx > scatter_idx {
626 Some((scatter_idx, gather_idx))
627 } else {
628 None
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use crate::dispatch::BackendDispatcher;
636 use crate::tools::register_builtins;
637 use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
638 use std::path::Path;
639
640 async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
641 let mut tools = ToolRegistry::new();
642 register_builtins(&mut tools);
643 let tools = Arc::new(tools);
644 let runner = PipelineRunner::new(tools.clone());
645 let dispatcher = BackendDispatcher::new(tools.clone());
646
647 let mut vfs = VfsRouter::new();
648 let mem = MemoryFs::new();
649 mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
650 vfs.mount("/", mem);
651 let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
652
653 (runner, ctx, dispatcher)
654 }
655
656 fn make_cmd(name: &str, args: Vec<&str>) -> Command {
657 Command {
658 name: name.to_string(),
659 args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
660 redirects: vec![],
661 }
662 }
663
664 #[tokio::test]
665 async fn test_single_command() {
666 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
667 let cmd = make_cmd("echo", vec!["hello"]);
668
669 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
670 assert!(result.ok());
671 assert_eq!(result.out.trim(), "hello");
672 }
673
674 #[tokio::test]
675 async fn test_pipeline_echo_grep() {
676 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
677
678 let echo_cmd = Command {
680 name: "echo".to_string(),
681 args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
682 redirects: vec![],
683 };
684 let grep_cmd = Command {
685 name: "grep".to_string(),
686 args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
687 redirects: vec![],
688 };
689
690 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
691 assert!(result.ok());
692 assert_eq!(result.out.trim(), "world");
693 }
694
695 #[tokio::test]
696 async fn test_pipeline_cat_grep() {
697 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
698
699 let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
701 let grep_cmd = Command {
702 name: "grep".to_string(),
703 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
704 redirects: vec![],
705 };
706
707 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
708 assert!(result.ok());
709 assert!(result.out.contains("hello"));
710 }
711
712 #[tokio::test]
713 async fn test_command_not_found() {
714 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
715 let cmd = make_cmd("nonexistent", vec![]);
716
717 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
718 assert!(!result.ok());
719 assert_eq!(result.code, 127);
720 assert!(result.err.contains("not found"));
721 }
722
723 #[tokio::test]
724 async fn test_pipeline_stops_on_failure() {
725 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
726
727 let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
729 let grep_cmd = Command {
730 name: "grep".to_string(),
731 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
732 redirects: vec![],
733 };
734
735 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
736 assert!(!result.ok());
737 }
738
739 #[tokio::test]
740 async fn test_empty_pipeline() {
741 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
742 let result = runner.run(&[], &mut ctx, &dispatcher).await;
743 assert!(result.ok());
744 }
745
746 #[test]
749 fn test_find_scatter_gather_both_present() {
750 let commands = vec![
751 make_cmd("echo", vec!["a"]),
752 make_cmd("scatter", vec![]),
753 make_cmd("process", vec![]),
754 make_cmd("gather", vec![]),
755 ];
756 let result = find_scatter_gather(&commands);
757 assert_eq!(result, Some((1, 3)));
758 }
759
760 #[test]
761 fn test_find_scatter_gather_no_scatter() {
762 let commands = vec![
763 make_cmd("echo", vec!["a"]),
764 make_cmd("gather", vec![]),
765 ];
766 let result = find_scatter_gather(&commands);
767 assert!(result.is_none());
768 }
769
770 #[test]
771 fn test_find_scatter_gather_no_gather() {
772 let commands = vec![
773 make_cmd("echo", vec!["a"]),
774 make_cmd("scatter", vec![]),
775 ];
776 let result = find_scatter_gather(&commands);
777 assert!(result.is_none());
778 }
779
780 #[test]
781 fn test_find_scatter_gather_wrong_order() {
782 let commands = vec![
783 make_cmd("gather", vec![]),
784 make_cmd("scatter", vec![]),
785 ];
786 let result = find_scatter_gather(&commands);
787 assert!(result.is_none());
788 }
789
790 #[tokio::test]
791 async fn test_scatter_gather_simple() {
792 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
793
794 let echo_cmd = Command {
796 name: "echo".to_string(),
797 args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb\nc".to_string())))],
798 redirects: vec![],
799 };
800 let scatter_cmd = make_cmd("scatter", vec![]);
801 let process_cmd = Command {
802 name: "echo".to_string(),
803 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
804 redirects: vec![],
805 };
806 let gather_cmd = make_cmd("gather", vec![]);
807
808 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
809 assert!(result.ok());
810 assert!(result.out.contains("a"));
812 assert!(result.out.contains("b"));
813 assert!(result.out.contains("c"));
814 }
815
816 #[tokio::test]
817 async fn test_scatter_gather_empty_input() {
818 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
819
820 let echo_cmd = Command {
822 name: "echo".to_string(),
823 args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
824 redirects: vec![],
825 };
826 let scatter_cmd = make_cmd("scatter", vec![]);
827 let process_cmd = Command {
828 name: "echo".to_string(),
829 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
830 redirects: vec![],
831 };
832 let gather_cmd = make_cmd("gather", vec![]);
833
834 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
835 assert!(result.ok());
836 assert!(result.out.trim().is_empty());
837 }
838
839 #[tokio::test]
840 async fn test_scatter_gather_with_stdin() {
841 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
842
843 ctx.set_stdin("x\ny\nz".to_string());
845
846 let scatter_cmd = make_cmd("scatter", vec![]);
847 let process_cmd = Command {
848 name: "echo".to_string(),
849 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
850 redirects: vec![],
851 };
852 let gather_cmd = make_cmd("gather", vec![]);
853
854 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
855 assert!(result.ok());
856 assert!(result.out.contains("x"));
857 assert!(result.out.contains("y"));
858 assert!(result.out.contains("z"));
859 }
860
861 #[tokio::test]
862 async fn test_scatter_gather_json_input() {
863 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
864
865 ctx.set_stdin(r#"["one", "two", "three"]"#.to_string());
867
868 let scatter_cmd = make_cmd("scatter", vec![]);
869 let process_cmd = Command {
870 name: "echo".to_string(),
871 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
872 redirects: vec![],
873 };
874 let gather_cmd = make_cmd("gather", vec![]);
875
876 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
877 assert!(result.ok());
878 assert!(result.out.contains("one"));
879 assert!(result.out.contains("two"));
880 assert!(result.out.contains("three"));
881 }
882
883 #[tokio::test]
884 async fn test_scatter_gather_with_post_gather() {
885 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
886
887 let echo_cmd = Command {
889 name: "echo".to_string(),
890 args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb".to_string())))],
891 redirects: vec![],
892 };
893 let scatter_cmd = make_cmd("scatter", vec![]);
894 let process_cmd = Command {
895 name: "echo".to_string(),
896 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
897 redirects: vec![],
898 };
899 let gather_cmd = make_cmd("gather", vec![]);
900 let grep_cmd = Command {
901 name: "grep".to_string(),
902 args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
903 redirects: vec![],
904 };
905
906 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
907 assert!(result.ok());
908 assert!(result.out.contains("a"));
909 assert!(!result.out.contains("b"));
910 }
911
912 #[tokio::test]
913 async fn test_scatter_custom_var_name() {
914 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
915
916 ctx.set_stdin("test1\ntest2".to_string());
917
918 let scatter_cmd = Command {
920 name: "scatter".to_string(),
921 args: vec![Arg::Named {
922 key: "as".to_string(),
923 value: Expr::Literal(Value::String("URL".to_string())),
924 }],
925 redirects: vec![],
926 };
927 let process_cmd = Command {
928 name: "echo".to_string(),
929 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
930 redirects: vec![],
931 };
932 let gather_cmd = make_cmd("gather", vec![]);
933
934 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
935 assert!(result.ok());
936 assert!(result.out.contains("test1"));
937 assert!(result.out.contains("test2"));
938 }
939
940 #[tokio::test]
943 async fn test_pipeline_routes_through_backend() {
944 use crate::backend::testing::MockBackend;
945 use std::sync::atomic::Ordering;
946
947 let (backend, call_count) = MockBackend::new();
949 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
950
951 let mut ctx = crate::tools::ExecContext::with_backend(backend);
953
954 let tools = std::sync::Arc::new(ToolRegistry::new());
956 let runner = PipelineRunner::new(tools.clone());
957 let dispatcher = BackendDispatcher::new(tools);
958
959 let cmd = make_cmd("test-tool", vec!["arg1"]);
961 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
962
963 assert!(result.ok(), "Mock backend should return success");
964 assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
965 assert!(result.out.contains("mock executed"), "Output should be from mock backend");
966 }
967
968 #[tokio::test]
969 async fn test_multi_command_pipeline_routes_through_backend() {
970 use crate::backend::testing::MockBackend;
971 use std::sync::atomic::Ordering;
972
973 let (backend, call_count) = MockBackend::new();
974 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
975 let mut ctx = crate::tools::ExecContext::with_backend(backend);
976
977 let tools = std::sync::Arc::new(ToolRegistry::new());
978 let runner = PipelineRunner::new(tools.clone());
979 let dispatcher = BackendDispatcher::new(tools);
980
981 let cmd1 = make_cmd("tool1", vec![]);
983 let cmd2 = make_cmd("tool2", vec![]);
984 let cmd3 = make_cmd("tool3", vec![]);
985
986 let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
987
988 assert!(result.ok());
989 assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
990 }
991
992 use crate::tools::{ParamSchema, ToolSchema};
995
996 fn make_test_schema() -> ToolSchema {
997 ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
998 .param(ParamSchema::required("query", "string", "Search query"))
999 .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1000 .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1001 .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1002 }
1003
1004 fn make_minimal_ctx() -> ExecContext {
1005 let mut vfs = VfsRouter::new();
1006 vfs.mount("/", MemoryFs::new());
1007 ExecContext::new(Arc::new(vfs))
1008 }
1009
1010 #[test]
1011 fn test_schema_aware_string_arg() {
1012 let args = vec![
1014 Arg::LongFlag("query".to_string()),
1015 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1016 ];
1017 let schema = make_test_schema();
1018 let ctx = make_minimal_ctx();
1019
1020 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1021
1022 assert!(tool_args.flags.is_empty(), "No flags should be set");
1023 assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1024 assert_eq!(
1025 tool_args.named.get("query"),
1026 Some(&Value::String("test".to_string())),
1027 "--query should consume 'test' as its value"
1028 );
1029 }
1030
1031 #[test]
1032 fn test_schema_aware_bool_flag() {
1033 let args = vec![
1035 Arg::LongFlag("verbose".to_string()),
1036 ];
1037 let schema = make_test_schema();
1038 let ctx = make_minimal_ctx();
1039
1040 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1041
1042 assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1043 assert!(tool_args.named.is_empty(), "No named args");
1044 assert!(tool_args.positional.is_empty(), "No positionals");
1045 }
1046
1047 #[test]
1048 fn test_schema_aware_mixed() {
1049 let args = vec![
1052 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1053 Arg::LongFlag("output".to_string()),
1054 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1055 Arg::LongFlag("verbose".to_string()),
1056 ];
1057 let schema = make_test_schema();
1058 let ctx = make_minimal_ctx();
1059
1060 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1061
1062 assert_eq!(tool_args.positional, vec![Value::String("file.txt".to_string())]);
1063 assert_eq!(
1064 tool_args.named.get("output"),
1065 Some(&Value::String("out.txt".to_string()))
1066 );
1067 assert!(tool_args.flags.contains("verbose"));
1068 }
1069
1070 #[test]
1071 fn test_schema_aware_multiple_string_args() {
1072 let args = vec![
1074 Arg::LongFlag("query".to_string()),
1075 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1076 Arg::LongFlag("output".to_string()),
1077 Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1078 Arg::LongFlag("verbose".to_string()),
1079 Arg::LongFlag("limit".to_string()),
1080 Arg::Positional(Expr::Literal(Value::Int(5))),
1081 ];
1082 let schema = make_test_schema();
1083 let ctx = make_minimal_ctx();
1084
1085 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1086
1087 assert!(tool_args.positional.is_empty(), "All positionals consumed");
1088 assert_eq!(
1089 tool_args.named.get("query"),
1090 Some(&Value::String("test".to_string()))
1091 );
1092 assert_eq!(
1093 tool_args.named.get("output"),
1094 Some(&Value::String("result.json".to_string()))
1095 );
1096 assert_eq!(
1097 tool_args.named.get("limit"),
1098 Some(&Value::Int(5))
1099 );
1100 assert!(tool_args.flags.contains("verbose"));
1101 }
1102
1103 #[test]
1104 fn test_schema_aware_double_dash() {
1105 let args = vec![
1108 Arg::LongFlag("output".to_string()),
1109 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1110 Arg::DoubleDash,
1111 Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1112 ];
1113 let schema = make_test_schema();
1114 let ctx = make_minimal_ctx();
1115
1116 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1117
1118 assert_eq!(
1119 tool_args.named.get("output"),
1120 Some(&Value::String("out.txt".to_string()))
1121 );
1122 assert_eq!(
1124 tool_args.positional,
1125 vec![Value::String("--this-is-data".to_string())]
1126 );
1127 }
1128
1129 #[test]
1130 fn test_no_schema_fallback() {
1131 let args = vec![
1133 Arg::LongFlag("query".to_string()),
1134 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1135 ];
1136 let ctx = make_minimal_ctx();
1137
1138 let tool_args = build_tool_args(&args, &ctx, None);
1139
1140 assert!(tool_args.flags.contains("query"), "--query should be a flag");
1142 assert_eq!(
1143 tool_args.positional,
1144 vec![Value::String("test".to_string())],
1145 "'test' should be a positional"
1146 );
1147 }
1148
1149 #[test]
1150 fn test_unknown_flag_in_schema() {
1151 let args = vec![
1153 Arg::LongFlag("unknown".to_string()),
1154 Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1155 ];
1156 let schema = make_test_schema();
1157 let ctx = make_minimal_ctx();
1158
1159 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1160
1161 assert!(tool_args.flags.contains("unknown"));
1163 assert_eq!(
1164 tool_args.positional,
1165 vec![Value::String("value".to_string())]
1166 );
1167 }
1168
1169 #[test]
1170 fn test_named_args_unchanged() {
1171 let args = vec![
1173 Arg::Named {
1174 key: "query".to_string(),
1175 value: Expr::Literal(Value::String("test".to_string())),
1176 },
1177 Arg::LongFlag("verbose".to_string()),
1178 ];
1179 let schema = make_test_schema();
1180 let ctx = make_minimal_ctx();
1181
1182 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1183
1184 assert_eq!(
1185 tool_args.named.get("query"),
1186 Some(&Value::String("test".to_string()))
1187 );
1188 assert!(tool_args.flags.contains("verbose"));
1189 }
1190
1191 #[test]
1192 fn test_short_flags_unchanged() {
1193 let args = vec![
1195 Arg::ShortFlag("la".to_string()),
1196 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1197 ];
1198 let schema = make_test_schema();
1199 let ctx = make_minimal_ctx();
1200
1201 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1202
1203 assert!(tool_args.flags.contains("l"));
1204 assert!(tool_args.flags.contains("a"));
1205 assert_eq!(
1206 tool_args.positional,
1207 vec![Value::String("file.txt".to_string())]
1208 );
1209 }
1210
1211 #[test]
1212 fn test_flag_at_end_no_value() {
1213 let args = vec![
1215 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1216 Arg::LongFlag("output".to_string()),
1217 ];
1218 let schema = make_test_schema();
1219 let ctx = make_minimal_ctx();
1220
1221 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1222
1223 assert!(tool_args.flags.contains("output"));
1225 assert_eq!(
1226 tool_args.positional,
1227 vec![Value::String("file.txt".to_string())]
1228 );
1229 }
1230
1231 #[test]
1232 fn test_short_flag_with_alias_consumes_value() {
1233 let schema = ToolSchema::new("head", "Output first part of files")
1236 .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
1237 .with_aliases(["-n"]));
1238 let args = vec![
1239 Arg::ShortFlag("n".to_string()),
1240 Arg::Positional(Expr::Literal(Value::Int(5))),
1241 Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
1242 ];
1243 let ctx = make_minimal_ctx();
1244
1245 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1246
1247 assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
1248 assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
1249 assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
1250 }
1251
1252 #[tokio::test]
1255 async fn test_merge_stderr_redirect() {
1256 let result = ExecResult::from_output(0, "stdout content", "stderr content");
1258
1259 let redirects = vec![Redirect {
1260 kind: RedirectKind::MergeStderr,
1261 target: Expr::Literal(Value::Null),
1262 }];
1263
1264 let ctx = make_minimal_ctx();
1265 let result = apply_redirects(result, &redirects, &ctx).await;
1266
1267 assert_eq!(result.out, "stdout contentstderr content");
1268 assert!(result.err.is_empty());
1269 }
1270
1271 #[tokio::test]
1272 async fn test_merge_stderr_with_empty_stderr() {
1273 let result = ExecResult::from_output(0, "stdout only", "");
1275
1276 let redirects = vec![Redirect {
1277 kind: RedirectKind::MergeStderr,
1278 target: Expr::Literal(Value::Null),
1279 }];
1280
1281 let ctx = make_minimal_ctx();
1282 let result = apply_redirects(result, &redirects, &ctx).await;
1283
1284 assert_eq!(result.out, "stdout only");
1285 assert!(result.err.is_empty());
1286 }
1287
1288 #[tokio::test]
1289 async fn test_merge_stderr_order_matters() {
1290 let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1295
1296 let redirects = vec![Redirect {
1298 kind: RedirectKind::MergeStderr,
1299 target: Expr::Literal(Value::Null),
1300 }];
1301
1302 let ctx = make_minimal_ctx();
1303 let result = apply_redirects(result, &redirects, &ctx).await;
1304
1305 assert_eq!(result.out, "stdout\nstderr\n");
1306 assert!(result.err.is_empty());
1307 }
1308
1309 #[tokio::test]
1310 async fn test_redirect_with_command_execution() {
1311 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1312
1313 let cmd = Command {
1315 name: "echo".to_string(),
1316 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1317 redirects: vec![Redirect {
1318 kind: RedirectKind::MergeStderr,
1319 target: Expr::Literal(Value::Null),
1320 }],
1321 };
1322
1323 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1324 assert!(result.ok());
1325 assert!(result.out.contains("hello"));
1327 }
1328
1329 #[tokio::test]
1330 async fn test_merge_stderr_in_pipeline() {
1331 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1332
1333 let echo_cmd = Command {
1336 name: "echo".to_string(),
1337 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1338 redirects: vec![Redirect {
1339 kind: RedirectKind::MergeStderr,
1340 target: Expr::Literal(Value::Null),
1341 }],
1342 };
1343 let grep_cmd = Command {
1344 name: "grep".to_string(),
1345 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1346 redirects: vec![],
1347 };
1348
1349 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1350 assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1351 assert!(result.out.contains("output"));
1352 }
1353}