1use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21 parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24async fn apply_redirects(
30 mut result: ExecResult,
31 redirects: &[Redirect],
32 ctx: &ExecContext,
33) -> ExecResult {
34 for redir in redirects {
39 match redir.kind {
40 RedirectKind::MergeStderr => {
41 if result.out.is_empty() {
44 if let Some(ref output) = result.output {
45 result.out = output.to_canonical_string();
46 }
47 }
48 if !result.err.is_empty() {
49 result.out.push_str(&result.err);
50 result.err.clear();
51 }
52 }
53 RedirectKind::MergeStdout => {
54 if result.out.is_empty() {
56 if let Some(ref output) = result.output {
57 result.out = output.to_canonical_string();
58 }
59 }
60 if !result.out.is_empty() {
61 result.err.push_str(&result.out);
62 result.out.clear();
63 }
64 }
65 RedirectKind::StdoutOverwrite => {
66 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
67 if result.out.is_empty() && result.output.is_some() {
69 match std::fs::File::create(&path) {
70 Ok(mut file) => {
71 if let Some(ref output) = result.output {
72 if let Err(e) = output.write_canonical(&mut file, None) {
73 return ExecResult::failure(1, format!("redirect: {e}"));
74 }
75 }
76 }
77 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
78 }
79 } else {
80 if let Err(e) = tokio::fs::write(&path, &result.out).await {
81 return ExecResult::failure(1, format!("redirect: {e}"));
82 }
83 }
84 result.out.clear();
85 result.output = None;
86 }
87 }
88 RedirectKind::StdoutAppend => {
89 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
90 if result.out.is_empty() && result.output.is_some() {
92 match std::fs::OpenOptions::new().append(true).create(true).open(&path) {
93 Ok(mut file) => {
94 if let Some(ref output) = result.output {
95 if let Err(e) = output.write_canonical(&mut file, None) {
96 return ExecResult::failure(1, format!("redirect: {e}"));
97 }
98 }
99 }
100 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
101 }
102 } else {
103 let file = tokio::fs::OpenOptions::new()
104 .append(true)
105 .create(true)
106 .open(&path)
107 .await;
108 match file {
109 Ok(mut f) => {
110 if let Err(e) = f.write_all(result.out.as_bytes()).await {
111 return ExecResult::failure(1, format!("redirect: {e}"));
112 }
113 }
114 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
115 }
116 }
117 result.out.clear();
118 result.output = None;
119 }
120 }
121 RedirectKind::Stderr => {
122 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
123 if let Err(e) = tokio::fs::write(&path, &result.err).await {
124 return ExecResult::failure(1, format!("redirect: {e}"));
125 }
126 result.err.clear();
127 }
128 }
129 RedirectKind::Both => {
130 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
131 let combined = format!("{}{}", result.out, result.err);
132 if let Err(e) = tokio::fs::write(&path, combined).await {
133 return ExecResult::failure(1, format!("redirect: {e}"));
134 }
135 result.out.clear();
136 result.err.clear();
137 }
138 }
139 RedirectKind::Stdin | RedirectKind::HereDoc => {}
141 }
142 }
143 if result.out.is_empty() {
148 if let Some(ref output) = result.output {
149 result.out = output.to_canonical_string();
150 }
151 }
152 result
153}
154
155fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
157 eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
158}
159
160fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
163 for redir in &cmd.redirects {
164 match &redir.kind {
165 RedirectKind::Stdin => {
166 if let Some(path) = eval_redirect_target(&redir.target, ctx)
167 && let Ok(content) = std::fs::read_to_string(&path) {
168 ctx.set_stdin(content);
169 }
170 }
171 RedirectKind::HereDoc => {
172 match &redir.target {
173 Expr::Literal(Value::String(content)) => {
174 ctx.set_stdin(content.clone());
175 }
176 expr => {
177 if let Some(value) = eval_simple_expr(expr, ctx) {
178 ctx.set_stdin(value_to_string(&value));
179 }
180 }
181 }
182 }
183 _ => {}
184 }
185 }
186}
187
188#[derive(Clone)]
190pub struct PipelineRunner {
191 tools: Arc<ToolRegistry>,
192}
193
194impl PipelineRunner {
195 pub fn new(tools: Arc<ToolRegistry>) -> Self {
197 Self { tools }
198 }
199
200 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
210 pub async fn run(
211 &self,
212 commands: &[Command],
213 ctx: &mut ExecContext,
214 dispatcher: &dyn CommandDispatcher,
215 ) -> ExecResult {
216 if commands.is_empty() {
217 return ExecResult::success("");
218 }
219
220 if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
222 return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
223 }
224
225 self.run_sequential(commands, ctx, dispatcher).await
226 }
227
228 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
233 pub async fn run_sequential(
234 &self,
235 commands: &[Command],
236 ctx: &mut ExecContext,
237 dispatcher: &dyn CommandDispatcher,
238 ) -> ExecResult {
239 if commands.is_empty() {
240 return ExecResult::success("");
241 }
242
243 if commands.len() == 1 {
244 return self.run_single(&commands[0], ctx, None, dispatcher).await;
246 }
247
248 self.run_pipeline(commands, ctx, dispatcher).await
250 }
251
252 async fn run_scatter_gather(
254 &self,
255 commands: &[Command],
256 scatter_idx: usize,
257 gather_idx: usize,
258 ctx: &mut ExecContext,
259 _dispatcher: &dyn CommandDispatcher,
260 ) -> ExecResult {
261 let pre_scatter = &commands[..scatter_idx];
263 let scatter_cmd = &commands[scatter_idx];
264 let parallel = &commands[scatter_idx + 1..gather_idx];
265 let gather_cmd = &commands[gather_idx];
266 let post_gather = &commands[gather_idx + 1..];
267
268 let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
271 let gather_schema = self.tools.get("gather").map(|t| t.schema());
272 let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
273 let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
274
275 let scatter_dispatcher: Arc<dyn CommandDispatcher> =
279 Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
280
281 let runner = ScatterGatherRunner::new(self.tools.clone(), scatter_dispatcher);
282 runner
283 .run(
284 pre_scatter,
285 scatter_opts,
286 parallel,
287 gather_opts,
288 post_gather,
289 ctx,
290 )
291 .await
292 }
293
294 #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
299 async fn run_single(
300 &self,
301 cmd: &Command,
302 ctx: &mut ExecContext,
303 stdin: Option<String>,
304 dispatcher: &dyn CommandDispatcher,
305 ) -> ExecResult {
306 setup_stdin_redirects(cmd, ctx);
308
309 if let Some(input) = stdin {
311 ctx.set_stdin(input);
312 }
313
314 ctx.pipeline_position = PipelinePosition::Only;
316
317 let result = match dispatcher.dispatch(cmd, ctx).await {
319 Ok(result) => result,
320 Err(e) => ExecResult::failure(1, e.to_string()),
321 };
322
323 apply_redirects(result, &cmd.redirects, ctx).await
325 }
326
327 #[tracing::instrument(level = "debug", skip(self, commands, ctx, _dispatcher), fields(stage_count = commands.len()))]
337 async fn run_pipeline(
338 &self,
339 commands: &[Command],
340 ctx: &mut ExecContext,
341 _dispatcher: &dyn CommandDispatcher,
342 ) -> ExecResult {
343 let stage_count = commands.len();
344 let last_idx = stage_count - 1;
345
346 let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
348 let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
349
350 for _ in 0..last_idx {
351 let (writer, reader) = pipe_stream_default();
352 pipe_writers.push(Some(writer));
353 pipe_readers.push(Some(reader));
354 }
355
356 let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
358 let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
359
360 for _ in 0..last_idx {
361 let (tx, rx) = tokio::sync::oneshot::channel();
362 data_senders.push(Some(tx));
363 data_receivers.push(Some(rx));
364 }
365
366 let stage_dispatcher: Arc<dyn CommandDispatcher> =
371 Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
372
373 let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
374
375 for (i, cmd) in commands.iter().enumerate() {
376 let mut stage_ctx = ctx.child_for_pipeline();
377 let cmd = cmd.clone();
378
379 setup_stdin_redirects(&cmd, &mut stage_ctx);
381
382 if i == 0 {
384 if stage_ctx.stdin.is_none() {
387 stage_ctx.stdin = ctx.stdin.take();
388 }
389 if stage_ctx.stdin_data.is_none() {
390 stage_ctx.stdin_data = ctx.stdin_data.take();
391 }
392 } else {
393 stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
395 }
397
398 if i < last_idx {
400 stage_ctx.pipe_stdout = pipe_writers[i].take();
401 }
402
403 stage_ctx.pipeline_position = match i {
405 0 => PipelinePosition::First,
406 n if n == last_idx => PipelinePosition::Last,
407 _ => PipelinePosition::Middle,
408 };
409
410 let data_sender = if i < last_idx { data_senders[i].take() } else { None };
411 let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
412 let task_dispatcher = stage_dispatcher.clone();
413
414 let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> = tokio::spawn(async move {
415 if let Some(mut rx) = data_receiver {
422 if let Ok(data) = rx.try_recv() {
423 stage_ctx.stdin_data = data;
424 }
425 }
427
428 let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
430 Ok(result) => result,
431 Err(e) => ExecResult::failure(1, e.to_string()),
432 };
433
434 result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
436
437 if !result.err.is_empty() {
443 if let Some(ref stderr) = stage_ctx.stderr {
444 stderr.write_str(&result.err);
445 result.err.clear();
446 }
447 }
448
449 if let Some(tx) = data_sender {
455 let _ = tx.send(result.data.clone());
456 }
457
458 if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
461 let text = result.text_out();
462 if !text.is_empty() {
463 let _ = pipe_out.write_all(text.as_bytes()).await;
465 let _ = pipe_out.shutdown().await;
466 }
467 }
469
470 (result, stage_ctx)
471 });
472
473 handles.push(handle);
474 }
475
476 let mut last_result = ExecResult::success("");
481 let mut panics: Vec<String> = Vec::new();
482 for (i, handle) in handles.into_iter().enumerate() {
483 match handle.await {
484 Ok((result, stage_ctx)) => {
485 if i == last_idx {
486 last_result = result;
487 ctx.scope = stage_ctx.scope;
489 ctx.cwd = stage_ctx.cwd;
490 ctx.prev_cwd = stage_ctx.prev_cwd;
491 ctx.aliases = stage_ctx.aliases;
492 }
493 }
494 Err(e) => {
495 panics.push(format!("stage {}: {}", i, e));
496 }
497 }
498 }
499
500 if !panics.is_empty() {
501 last_result = ExecResult::failure(
502 1,
503 format!("pipeline stage(s) panicked: {}", panics.join("; ")),
504 );
505 }
506
507 last_result
508 }
509}
510
511pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str)> {
519 let mut map = HashMap::new();
520 for p in &schema.params {
521 map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str()));
522 for alias in &p.aliases {
523 let stripped = alias.trim_start_matches('-');
524 map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str()));
525 }
526 }
527 map
528}
529
530pub fn is_bool_type(param_type: &str) -> bool {
532 matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
533}
534
535pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
543 let mut tool_args = ToolArgs::new();
544 let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
545
546 let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
548 let mut past_double_dash = false;
549
550 let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
552 for (i, arg) in args.iter().enumerate() {
553 if let Arg::Positional(expr) = arg {
554 positional_indices.push((i, expr));
555 }
556 }
557
558 let mut i = 0;
560 while i < args.len() {
561 let arg = &args[i];
562
563 match arg {
564 Arg::DoubleDash => {
565 past_double_dash = true;
566 }
567 Arg::Positional(expr) => {
568 if !consumed_positionals.contains(&i)
570 && let Some(value) = eval_simple_expr(expr, ctx)
571 {
572 tool_args.positional.push(value);
573 }
574 }
575 Arg::Named { key, value } => {
576 if let Some(val) = eval_simple_expr(value, ctx) {
577 tool_args.named.insert(key.clone(), val);
578 }
579 }
580 Arg::ShortFlag(name) => {
581 if past_double_dash {
582 tool_args.positional.push(Value::String(format!("-{name}")));
583 } else if name.len() == 1 {
584 let flag_name = name.as_str();
587 let lookup = param_lookup.get(flag_name);
588 let is_bool = lookup
589 .map(|(_, typ)| is_bool_type(typ))
590 .unwrap_or(true);
591
592 if is_bool {
593 tool_args.flags.insert(flag_name.to_string());
594 } else {
595 let canonical = lookup.map(|(name, _)| *name).unwrap_or(flag_name);
597 let next_positional = positional_indices
598 .iter()
599 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
600
601 if let Some((pos_idx, expr)) = next_positional {
602 if let Some(value) = eval_simple_expr(expr, ctx) {
603 tool_args.named.insert(canonical.to_string(), value);
604 consumed_positionals.insert(*pos_idx);
605 } else {
606 tool_args.flags.insert(flag_name.to_string());
607 }
608 } else {
609 tool_args.flags.insert(flag_name.to_string());
610 }
611 }
612 } else if let Some(&(canonical, typ)) = param_lookup.get(name.as_str()) {
613 if is_bool_type(typ) {
615 tool_args.flags.insert(canonical.to_string());
616 } else {
617 let next_positional = positional_indices
618 .iter()
619 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
620 if let Some((pos_idx, expr)) = next_positional {
621 if let Some(value) = eval_simple_expr(expr, ctx) {
622 tool_args.named.insert(canonical.to_string(), value);
623 consumed_positionals.insert(*pos_idx);
624 } else {
625 tool_args.flags.insert(name.clone());
626 }
627 } else {
628 tool_args.flags.insert(name.clone());
629 }
630 }
631 } else {
632 for c in name.chars() {
634 tool_args.flags.insert(c.to_string());
635 }
636 }
637 }
638 Arg::LongFlag(name) => {
639 if past_double_dash {
640 tool_args.positional.push(Value::String(format!("--{name}")));
641 } else {
642 let lookup = param_lookup.get(name.as_str());
644 let is_bool = lookup
645 .map(|(_, typ)| is_bool_type(typ))
646 .unwrap_or(true); if is_bool {
649 tool_args.flags.insert(name.clone());
650 } else {
651 let canonical = lookup.map(|(name, _)| *name).unwrap_or(name.as_str());
653 let next_positional = positional_indices
654 .iter()
655 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
656
657 if let Some((pos_idx, expr)) = next_positional {
658 if let Some(value) = eval_simple_expr(expr, ctx) {
659 tool_args.named.insert(canonical.to_string(), value);
660 consumed_positionals.insert(*pos_idx);
661 } else {
662 tool_args.flags.insert(name.clone());
663 }
664 } else {
665 tool_args.flags.insert(name.clone());
666 }
667 }
668 }
669 }
670 }
671 i += 1;
672 }
673
674 if let Some(schema) = schema.filter(|s| s.map_positionals) {
679 let pre_dash_count = if past_double_dash {
681 let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
683 positional_indices.iter()
685 .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
686 .count()
687 } else {
688 tool_args.positional.len()
689 };
690
691 let mut remaining = Vec::new();
692 let mut positional_iter = tool_args.positional.drain(..).enumerate();
693
694 for param in &schema.params {
695 if tool_args.named.contains_key(¶m.name) || tool_args.flags.contains(¶m.name) {
696 continue; }
698 if is_bool_type(¶m.param_type) {
699 continue; }
701 loop {
703 match positional_iter.next() {
704 Some((idx, val)) if idx < pre_dash_count => {
705 tool_args.named.insert(param.name.clone(), val);
706 break;
707 }
708 Some((_, val)) => {
709 remaining.push(val); }
711 None => break,
712 }
713 }
714 }
715
716 remaining.extend(positional_iter.map(|(_, v)| v));
718 tool_args.positional = remaining;
719 }
720
721 tool_args
722}
723
724fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
726 match expr {
727 Expr::Literal(value) => Some(eval_literal(value, ctx)),
728 Expr::VarRef(path) => ctx.scope.resolve_path(path),
729 Expr::Interpolated(parts) => {
730 let mut result = String::new();
731 for part in parts {
732 match part {
733 crate::ast::StringPart::Literal(s) => result.push_str(s),
734 crate::ast::StringPart::Var(path) => {
735 if let Some(value) = ctx.scope.resolve_path(path) {
736 result.push_str(&value_to_string(&value));
737 }
738 }
739 crate::ast::StringPart::VarWithDefault { name, default } => {
740 match ctx.scope.get(name) {
741 Some(value) => {
742 let s = value_to_string(value);
743 if s.is_empty() {
744 result.push_str(&eval_string_parts_sync(default, ctx));
745 } else {
746 result.push_str(&s);
747 }
748 }
749 None => result.push_str(&eval_string_parts_sync(default, ctx)),
750 }
751 }
752 crate::ast::StringPart::VarLength(name) => {
753 let len = match ctx.scope.get(name) {
754 Some(value) => value_to_string(value).len(),
755 None => 0,
756 };
757 result.push_str(&len.to_string());
758 }
759 crate::ast::StringPart::Positional(n) => {
760 if let Some(s) = ctx.scope.get_positional(*n) {
761 result.push_str(s);
762 }
763 }
764 crate::ast::StringPart::AllArgs => {
765 result.push_str(&ctx.scope.all_args().join(" "));
766 }
767 crate::ast::StringPart::ArgCount => {
768 result.push_str(&ctx.scope.arg_count().to_string());
769 }
770 crate::ast::StringPart::Arithmetic(expr) => {
771 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
773 result.push_str(&value.to_string());
774 }
775 }
776 crate::ast::StringPart::CommandSubst(_) => {
777 }
779 crate::ast::StringPart::LastExitCode => {
780 result.push_str(&ctx.scope.last_result().code.to_string());
781 }
782 crate::ast::StringPart::CurrentPid => {
783 result.push_str(&ctx.scope.pid().to_string());
784 }
785 }
786 }
787 Some(Value::String(result))
788 }
789 _ => None, }
791}
792
793fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
795 value.clone()
796}
797
798fn value_to_string(value: &Value) -> String {
800 match value {
801 Value::Null => "".to_string(),
802 Value::Bool(b) => b.to_string(),
803 Value::Int(i) => i.to_string(),
804 Value::Float(f) => f.to_string(),
805 Value::String(s) => s.clone(),
806 Value::Json(json) => json.to_string(),
807 Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
808 }
809}
810
811fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
814 let mut result = String::new();
815 for part in parts {
816 match part {
817 crate::ast::StringPart::Literal(s) => result.push_str(s),
818 crate::ast::StringPart::Var(path) => {
819 if let Some(value) = ctx.scope.resolve_path(path) {
820 result.push_str(&value_to_string(&value));
821 }
822 }
823 crate::ast::StringPart::VarWithDefault { name, default } => {
824 match ctx.scope.get(name) {
825 Some(value) => {
826 let s = value_to_string(value);
827 if s.is_empty() {
828 result.push_str(&eval_string_parts_sync(default, ctx));
829 } else {
830 result.push_str(&s);
831 }
832 }
833 None => result.push_str(&eval_string_parts_sync(default, ctx)),
834 }
835 }
836 crate::ast::StringPart::VarLength(name) => {
837 let len = match ctx.scope.get(name) {
838 Some(value) => value_to_string(value).len(),
839 None => 0,
840 };
841 result.push_str(&len.to_string());
842 }
843 crate::ast::StringPart::Positional(n) => {
844 if let Some(s) = ctx.scope.get_positional(*n) {
845 result.push_str(s);
846 }
847 }
848 crate::ast::StringPart::AllArgs => {
849 result.push_str(&ctx.scope.all_args().join(" "));
850 }
851 crate::ast::StringPart::ArgCount => {
852 result.push_str(&ctx.scope.arg_count().to_string());
853 }
854 crate::ast::StringPart::Arithmetic(expr) => {
855 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
856 result.push_str(&value.to_string());
857 }
858 }
859 crate::ast::StringPart::CommandSubst(_) => {
860 }
862 crate::ast::StringPart::LastExitCode => {
863 result.push_str(&ctx.scope.last_result().code.to_string());
864 }
865 crate::ast::StringPart::CurrentPid => {
866 result.push_str(&ctx.scope.pid().to_string());
867 }
868 }
869 }
870 result
871}
872
873fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
878 let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
879 let gather_idx = commands.iter().position(|c| c.name == "gather")?;
880
881 if gather_idx > scatter_idx {
883 Some((scatter_idx, gather_idx))
884 } else {
885 None
886 }
887}
888
889#[cfg(test)]
890mod tests {
891 use super::*;
892 use crate::dispatch::BackendDispatcher;
893 use crate::tools::register_builtins;
894 use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
895 use std::path::Path;
896
897 async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
898 let mut tools = ToolRegistry::new();
899 register_builtins(&mut tools);
900 let tools = Arc::new(tools);
901 let runner = PipelineRunner::new(tools.clone());
902 let dispatcher = BackendDispatcher::new(tools.clone());
903
904 let mut vfs = VfsRouter::new();
905 let mem = MemoryFs::new();
906 mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
907 vfs.mount("/", mem);
908 let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
909
910 (runner, ctx, dispatcher)
911 }
912
913 fn make_cmd(name: &str, args: Vec<&str>) -> Command {
914 Command {
915 name: name.to_string(),
916 args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
917 redirects: vec![],
918 }
919 }
920
921 #[tokio::test]
922 async fn test_single_command() {
923 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
924 let cmd = make_cmd("echo", vec!["hello"]);
925
926 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
927 assert!(result.ok());
928 assert_eq!(result.out.trim(), "hello");
929 }
930
931 #[tokio::test]
932 async fn test_pipeline_echo_grep() {
933 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
934
935 let echo_cmd = Command {
937 name: "echo".to_string(),
938 args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
939 redirects: vec![],
940 };
941 let grep_cmd = Command {
942 name: "grep".to_string(),
943 args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
944 redirects: vec![],
945 };
946
947 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
948 assert!(result.ok());
949 assert_eq!(result.out.trim(), "world");
950 }
951
952 #[tokio::test]
953 async fn test_pipeline_cat_grep() {
954 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
955
956 let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
958 let grep_cmd = Command {
959 name: "grep".to_string(),
960 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
961 redirects: vec![],
962 };
963
964 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
965 assert!(result.ok());
966 assert!(result.out.contains("hello"));
967 }
968
969 #[tokio::test]
970 async fn test_command_not_found() {
971 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
972 let cmd = make_cmd("nonexistent", vec![]);
973
974 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
975 assert!(!result.ok());
976 assert_eq!(result.code, 127);
977 assert!(result.err.contains("not found"));
978 }
979
980 #[tokio::test]
981 async fn test_pipeline_continues_on_failure() {
982 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
985
986 let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
989 let grep_cmd = Command {
990 name: "grep".to_string(),
991 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
992 redirects: vec![],
993 };
994
995 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
996 assert!(!result.ok());
998 }
999
1000 #[tokio::test]
1001 async fn test_pipeline_last_command_exit_code() {
1002 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1004
1005 let echo_cmd = make_cmd("echo", vec!["hello"]);
1006 let cat_cmd = make_cmd("cat", vec![]);
1007
1008 let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1009 assert!(result.ok());
1010 assert!(result.out.contains("hello"));
1011 }
1012
1013 #[tokio::test]
1014 async fn test_empty_pipeline() {
1015 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1016 let result = runner.run(&[], &mut ctx, &dispatcher).await;
1017 assert!(result.ok());
1018 }
1019
1020 #[test]
1023 fn test_find_scatter_gather_both_present() {
1024 let commands = vec![
1025 make_cmd("echo", vec!["a"]),
1026 make_cmd("scatter", vec![]),
1027 make_cmd("process", vec![]),
1028 make_cmd("gather", vec![]),
1029 ];
1030 let result = find_scatter_gather(&commands);
1031 assert_eq!(result, Some((1, 3)));
1032 }
1033
1034 #[test]
1035 fn test_find_scatter_gather_no_scatter() {
1036 let commands = vec![
1037 make_cmd("echo", vec!["a"]),
1038 make_cmd("gather", vec![]),
1039 ];
1040 let result = find_scatter_gather(&commands);
1041 assert!(result.is_none());
1042 }
1043
1044 #[test]
1045 fn test_find_scatter_gather_no_gather() {
1046 let commands = vec![
1047 make_cmd("echo", vec!["a"]),
1048 make_cmd("scatter", vec![]),
1049 ];
1050 let result = find_scatter_gather(&commands);
1051 assert!(result.is_none());
1052 }
1053
1054 #[test]
1055 fn test_find_scatter_gather_wrong_order() {
1056 let commands = vec![
1057 make_cmd("gather", vec![]),
1058 make_cmd("scatter", vec![]),
1059 ];
1060 let result = find_scatter_gather(&commands);
1061 assert!(result.is_none());
1062 }
1063
1064 #[tokio::test]
1065 async fn test_scatter_gather_simple() {
1066 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1067
1068 let split_cmd = Command {
1070 name: "split".to_string(),
1071 args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1072 redirects: vec![],
1073 };
1074 let scatter_cmd = make_cmd("scatter", vec![]);
1075 let process_cmd = Command {
1076 name: "echo".to_string(),
1077 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1078 redirects: vec![],
1079 };
1080 let gather_cmd = make_cmd("gather", vec![]);
1081
1082 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1083 assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1084 assert!(result.out.contains("a"));
1086 assert!(result.out.contains("b"));
1087 assert!(result.out.contains("c"));
1088 }
1089
1090 #[tokio::test]
1091 async fn test_scatter_gather_empty_input() {
1092 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1093
1094 let echo_cmd = Command {
1096 name: "echo".to_string(),
1097 args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1098 redirects: vec![],
1099 };
1100 let scatter_cmd = make_cmd("scatter", vec![]);
1101 let process_cmd = Command {
1102 name: "echo".to_string(),
1103 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1104 redirects: vec![],
1105 };
1106 let gather_cmd = make_cmd("gather", vec![]);
1107
1108 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1109 assert!(result.ok());
1110 assert!(result.out.trim().is_empty());
1111 }
1112
1113 #[tokio::test]
1114 async fn test_scatter_gather_with_structured_stdin() {
1115 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1116
1117 let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1119 ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1120
1121 let scatter_cmd = make_cmd("scatter", vec![]);
1122 let process_cmd = Command {
1123 name: "echo".to_string(),
1124 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1125 redirects: vec![],
1126 };
1127 let gather_cmd = make_cmd("gather", vec![]);
1128
1129 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1130 assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1131 assert!(result.out.contains("x"));
1132 assert!(result.out.contains("y"));
1133 assert!(result.out.contains("z"));
1134 }
1135
1136 #[tokio::test]
1137 async fn test_scatter_gather_json_input() {
1138 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1139
1140 let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1142 ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1143
1144 let scatter_cmd = make_cmd("scatter", vec![]);
1145 let process_cmd = Command {
1146 name: "echo".to_string(),
1147 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1148 redirects: vec![],
1149 };
1150 let gather_cmd = make_cmd("gather", vec![]);
1151
1152 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1153 assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1154 assert!(result.out.contains("one"));
1155 assert!(result.out.contains("two"));
1156 assert!(result.out.contains("three"));
1157 }
1158
1159 #[tokio::test]
1160 async fn test_scatter_gather_with_post_gather() {
1161 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1162
1163 let split_cmd = Command {
1165 name: "split".to_string(),
1166 args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1167 redirects: vec![],
1168 };
1169 let scatter_cmd = make_cmd("scatter", vec![]);
1170 let process_cmd = Command {
1171 name: "echo".to_string(),
1172 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1173 redirects: vec![],
1174 };
1175 let gather_cmd = make_cmd("gather", vec![]);
1176 let grep_cmd = Command {
1177 name: "grep".to_string(),
1178 args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1179 redirects: vec![],
1180 };
1181
1182 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1183 assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1184 assert!(result.out.contains("a"));
1185 assert!(!result.out.contains("b"));
1186 }
1187
1188 #[tokio::test]
1189 async fn test_scatter_custom_var_name() {
1190 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1191
1192 let data = Value::Json(serde_json::json!(["test1", "test2"]));
1194 ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1195
1196 let scatter_cmd = Command {
1198 name: "scatter".to_string(),
1199 args: vec![Arg::Named {
1200 key: "as".to_string(),
1201 value: Expr::Literal(Value::String("URL".to_string())),
1202 }],
1203 redirects: vec![],
1204 };
1205 let process_cmd = Command {
1206 name: "echo".to_string(),
1207 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1208 redirects: vec![],
1209 };
1210 let gather_cmd = make_cmd("gather", vec![]);
1211
1212 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1213 assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1214 assert!(result.out.contains("test1"));
1215 assert!(result.out.contains("test2"));
1216 }
1217
1218 #[tokio::test]
1221 async fn test_pipeline_routes_through_backend() {
1222 use crate::backend::testing::MockBackend;
1223 use std::sync::atomic::Ordering;
1224
1225 let (backend, call_count) = MockBackend::new();
1227 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1228
1229 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1231
1232 let tools = std::sync::Arc::new(ToolRegistry::new());
1234 let runner = PipelineRunner::new(tools.clone());
1235 let dispatcher = BackendDispatcher::new(tools);
1236
1237 let cmd = make_cmd("test-tool", vec!["arg1"]);
1239 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1240
1241 assert!(result.ok(), "Mock backend should return success");
1242 assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1243 assert!(result.out.contains("mock executed"), "Output should be from mock backend");
1244 }
1245
1246 #[tokio::test]
1247 async fn test_multi_command_pipeline_routes_through_backend() {
1248 use crate::backend::testing::MockBackend;
1249 use std::sync::atomic::Ordering;
1250
1251 let (backend, call_count) = MockBackend::new();
1252 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1253 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1254
1255 let tools = std::sync::Arc::new(ToolRegistry::new());
1256 let runner = PipelineRunner::new(tools.clone());
1257 let dispatcher = BackendDispatcher::new(tools);
1258
1259 let cmd1 = make_cmd("tool1", vec![]);
1261 let cmd2 = make_cmd("tool2", vec![]);
1262 let cmd3 = make_cmd("tool3", vec![]);
1263
1264 let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1265
1266 assert!(result.ok());
1267 assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1268 }
1269
1270 use crate::tools::{ParamSchema, ToolSchema};
1273
1274 fn make_test_schema() -> ToolSchema {
1275 ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1276 .param(ParamSchema::required("query", "string", "Search query"))
1277 .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1278 .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1279 .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1280 .with_positional_mapping()
1281 }
1282
1283 fn make_minimal_ctx() -> ExecContext {
1284 let mut vfs = VfsRouter::new();
1285 vfs.mount("/", MemoryFs::new());
1286 ExecContext::new(Arc::new(vfs))
1287 }
1288
1289 #[test]
1290 fn test_schema_aware_string_arg() {
1291 let args = vec![
1293 Arg::LongFlag("query".to_string()),
1294 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1295 ];
1296 let schema = make_test_schema();
1297 let ctx = make_minimal_ctx();
1298
1299 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1300
1301 assert!(tool_args.flags.is_empty(), "No flags should be set");
1302 assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1303 assert_eq!(
1304 tool_args.named.get("query"),
1305 Some(&Value::String("test".to_string())),
1306 "--query should consume 'test' as its value"
1307 );
1308 }
1309
1310 #[test]
1311 fn test_schema_aware_bool_flag() {
1312 let args = vec![
1314 Arg::LongFlag("verbose".to_string()),
1315 ];
1316 let schema = make_test_schema();
1317 let ctx = make_minimal_ctx();
1318
1319 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1320
1321 assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1322 assert!(tool_args.named.is_empty(), "No named args");
1323 assert!(tool_args.positional.is_empty(), "No positionals");
1324 }
1325
1326 #[test]
1327 fn test_schema_aware_mixed() {
1328 let args = vec![
1331 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1332 Arg::LongFlag("output".to_string()),
1333 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1334 Arg::LongFlag("verbose".to_string()),
1335 ];
1336 let schema = make_test_schema();
1337 let ctx = make_minimal_ctx();
1338
1339 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1340
1341 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1342 assert_eq!(
1343 tool_args.named.get("query"),
1344 Some(&Value::String("file.txt".to_string()))
1345 );
1346 assert_eq!(
1347 tool_args.named.get("output"),
1348 Some(&Value::String("out.txt".to_string()))
1349 );
1350 assert!(tool_args.flags.contains("verbose"));
1351 }
1352
1353 #[test]
1354 fn test_schema_aware_multiple_string_args() {
1355 let args = vec![
1357 Arg::LongFlag("query".to_string()),
1358 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1359 Arg::LongFlag("output".to_string()),
1360 Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1361 Arg::LongFlag("verbose".to_string()),
1362 Arg::LongFlag("limit".to_string()),
1363 Arg::Positional(Expr::Literal(Value::Int(5))),
1364 ];
1365 let schema = make_test_schema();
1366 let ctx = make_minimal_ctx();
1367
1368 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1369
1370 assert!(tool_args.positional.is_empty(), "All positionals consumed");
1371 assert_eq!(
1372 tool_args.named.get("query"),
1373 Some(&Value::String("test".to_string()))
1374 );
1375 assert_eq!(
1376 tool_args.named.get("output"),
1377 Some(&Value::String("result.json".to_string()))
1378 );
1379 assert_eq!(
1380 tool_args.named.get("limit"),
1381 Some(&Value::Int(5))
1382 );
1383 assert!(tool_args.flags.contains("verbose"));
1384 }
1385
1386 #[test]
1387 fn test_schema_aware_double_dash() {
1388 let args = vec![
1391 Arg::LongFlag("output".to_string()),
1392 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1393 Arg::DoubleDash,
1394 Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1395 ];
1396 let schema = make_test_schema();
1397 let ctx = make_minimal_ctx();
1398
1399 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1400
1401 assert_eq!(
1402 tool_args.named.get("output"),
1403 Some(&Value::String("out.txt".to_string()))
1404 );
1405 assert_eq!(
1407 tool_args.positional,
1408 vec![Value::String("--this-is-data".to_string())]
1409 );
1410 }
1411
1412 #[test]
1413 fn test_no_schema_fallback() {
1414 let args = vec![
1416 Arg::LongFlag("query".to_string()),
1417 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1418 ];
1419 let ctx = make_minimal_ctx();
1420
1421 let tool_args = build_tool_args(&args, &ctx, None);
1422
1423 assert!(tool_args.flags.contains("query"), "--query should be a flag");
1425 assert_eq!(
1426 tool_args.positional,
1427 vec![Value::String("test".to_string())],
1428 "'test' should be a positional"
1429 );
1430 }
1431
1432 #[test]
1433 fn test_unknown_flag_in_schema() {
1434 let args = vec![
1436 Arg::LongFlag("unknown".to_string()),
1437 Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1438 ];
1439 let schema = make_test_schema();
1440 let ctx = make_minimal_ctx();
1441
1442 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1443
1444 assert!(tool_args.flags.contains("unknown"));
1445 assert!(tool_args.positional.is_empty(), "value consumed as query param");
1446 assert_eq!(
1447 tool_args.named.get("query"),
1448 Some(&Value::String("value".to_string()))
1449 );
1450 }
1451
1452 #[test]
1453 fn test_named_args_unchanged() {
1454 let args = vec![
1456 Arg::Named {
1457 key: "query".to_string(),
1458 value: Expr::Literal(Value::String("test".to_string())),
1459 },
1460 Arg::LongFlag("verbose".to_string()),
1461 ];
1462 let schema = make_test_schema();
1463 let ctx = make_minimal_ctx();
1464
1465 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1466
1467 assert_eq!(
1468 tool_args.named.get("query"),
1469 Some(&Value::String("test".to_string()))
1470 );
1471 assert!(tool_args.flags.contains("verbose"));
1472 }
1473
1474 #[test]
1475 fn test_short_flags_unchanged() {
1476 let args = vec![
1478 Arg::ShortFlag("la".to_string()),
1479 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1480 ];
1481 let schema = make_test_schema();
1482 let ctx = make_minimal_ctx();
1483
1484 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1485
1486 assert!(tool_args.flags.contains("l"));
1487 assert!(tool_args.flags.contains("a"));
1488 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1489 assert_eq!(
1490 tool_args.named.get("query"),
1491 Some(&Value::String("file.txt".to_string()))
1492 );
1493 }
1494
1495 #[test]
1496 fn test_flag_at_end_no_value() {
1497 let args = vec![
1500 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1501 Arg::LongFlag("output".to_string()),
1502 ];
1503 let schema = make_test_schema();
1504 let ctx = make_minimal_ctx();
1505
1506 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1507
1508 assert!(tool_args.flags.contains("output"));
1510 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1511 assert_eq!(
1512 tool_args.named.get("query"),
1513 Some(&Value::String("file.txt".to_string()))
1514 );
1515 }
1516
1517 #[test]
1518 fn test_positional_skips_bool_params() {
1519 let schema = ToolSchema::new("test", "")
1523 .param(ParamSchema::required("query", "string", ""))
1524 .param(ParamSchema::optional(
1525 "verbose",
1526 "bool",
1527 Value::Bool(false),
1528 "",
1529 ))
1530 .param(ParamSchema::optional(
1531 "output",
1532 "string",
1533 Value::Null,
1534 "",
1535 ))
1536 .with_positional_mapping();
1537 let args = vec![
1538 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1539 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1540 ];
1541 let ctx = make_minimal_ctx();
1542
1543 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1544
1545 assert_eq!(
1546 tool_args.named.get("query"),
1547 Some(&Value::String("val1".to_string()))
1548 );
1549 assert_eq!(
1550 tool_args.named.get("output"),
1551 Some(&Value::String("val2".to_string()))
1552 );
1553 assert!(!tool_args.flags.contains("verbose"));
1554 assert!(tool_args.positional.is_empty());
1555 }
1556
1557 #[test]
1558 fn test_positionals_fill_available_slots() {
1559 let args = vec![
1562 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1563 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1564 Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1565 ];
1566 let schema = make_test_schema(); let ctx = make_minimal_ctx();
1568
1569 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1570
1571 assert_eq!(
1574 tool_args.named.get("query"),
1575 Some(&Value::String("val1".to_string()))
1576 );
1577 assert_eq!(
1578 tool_args.named.get("limit"),
1579 Some(&Value::String("val2".to_string()))
1580 );
1581 assert_eq!(
1582 tool_args.named.get("output"),
1583 Some(&Value::String("val3".to_string()))
1584 );
1585 assert!(tool_args.positional.is_empty());
1586 }
1587
1588 #[test]
1589 fn test_truly_excess_positionals() {
1590 let schema = ToolSchema::new("test", "")
1592 .param(ParamSchema::required("name", "string", ""))
1593 .with_positional_mapping();
1594 let args = vec![
1595 Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1596 Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1597 Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1598 ];
1599 let ctx = make_minimal_ctx();
1600
1601 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1602
1603 assert_eq!(
1604 tool_args.named.get("name"),
1605 Some(&Value::String("first".to_string()))
1606 );
1607 assert_eq!(
1608 tool_args.positional,
1609 vec![
1610 Value::String("second".to_string()),
1611 Value::String("third".to_string()),
1612 ]
1613 );
1614 }
1615
1616 #[test]
1617 fn test_double_dash_positional_not_mapped() {
1618 let args = vec![
1620 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1621 Arg::DoubleDash,
1622 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1623 ];
1624 let schema = make_test_schema();
1625 let ctx = make_minimal_ctx();
1626
1627 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1628
1629 assert_eq!(
1630 tool_args.named.get("query"),
1631 Some(&Value::String("val1".to_string()))
1632 );
1633 assert_eq!(
1635 tool_args.positional,
1636 vec![Value::String("val2".to_string())]
1637 );
1638 }
1639
1640 #[test]
1641 fn test_all_params_filled_by_flags() {
1642 let args = vec![
1644 Arg::LongFlag("query".to_string()),
1645 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1646 Arg::LongFlag("output".to_string()),
1647 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1648 Arg::LongFlag("verbose".to_string()),
1649 ];
1650 let schema = make_test_schema();
1651 let ctx = make_minimal_ctx();
1652
1653 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1654
1655 assert_eq!(
1656 tool_args.named.get("query"),
1657 Some(&Value::String("search".to_string()))
1658 );
1659 assert_eq!(
1660 tool_args.named.get("output"),
1661 Some(&Value::String("out.txt".to_string()))
1662 );
1663 assert!(tool_args.flags.contains("verbose"));
1664 assert!(tool_args.positional.is_empty());
1665 }
1666
1667 #[test]
1668 fn test_mixed_flags_and_positional_fill() {
1669 let args = vec![
1671 Arg::LongFlag("output".to_string()),
1672 Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
1673 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1674 ];
1675 let schema = make_test_schema();
1676 let ctx = make_minimal_ctx();
1677
1678 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1679
1680 assert_eq!(
1681 tool_args.named.get("output"),
1682 Some(&Value::String("foo".to_string()))
1683 );
1684 assert_eq!(
1685 tool_args.named.get("query"),
1686 Some(&Value::String("val1".to_string()))
1687 );
1688 assert!(tool_args.positional.is_empty());
1689 }
1690
1691 #[test]
1692 fn test_alias_flag_prevents_mapping_overwrite() {
1693 let schema = ToolSchema::new("test", "")
1695 .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
1696 .param(ParamSchema::required("output", "string", ""))
1697 .with_positional_mapping();
1698 let args = vec![
1699 Arg::ShortFlag("q".to_string()),
1700 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1701 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1702 ];
1703 let ctx = make_minimal_ctx();
1704
1705 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1706
1707 assert_eq!(
1708 tool_args.named.get("query"),
1709 Some(&Value::String("search".to_string()))
1710 );
1711 assert_eq!(
1712 tool_args.named.get("output"),
1713 Some(&Value::String("out.txt".to_string()))
1714 );
1715 assert!(tool_args.positional.is_empty());
1716 }
1717
1718 #[test]
1719 fn test_builtin_schema_no_positional_mapping() {
1720 let schema = ToolSchema::new("echo", "")
1722 .param(ParamSchema::optional("args", "any", Value::Null, ""))
1723 .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
1724 let args = vec![
1726 Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
1727 Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
1728 ];
1729 let ctx = make_minimal_ctx();
1730
1731 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1732
1733 assert_eq!(
1735 tool_args.positional,
1736 vec![
1737 Value::String("hello".to_string()),
1738 Value::String("world".to_string()),
1739 ]
1740 );
1741 assert!(tool_args.named.get("args").is_none());
1742 }
1743
1744 #[test]
1745 fn test_short_flag_with_alias_consumes_value() {
1746 let schema = ToolSchema::new("head", "Output first part of files")
1749 .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
1750 .with_aliases(["-n"]));
1751 let args = vec![
1752 Arg::ShortFlag("n".to_string()),
1753 Arg::Positional(Expr::Literal(Value::Int(5))),
1754 Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
1755 ];
1756 let ctx = make_minimal_ctx();
1757
1758 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1759
1760 assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
1761 assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
1762 assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
1763 }
1764
1765 #[tokio::test]
1768 async fn test_merge_stderr_redirect() {
1769 let result = ExecResult::from_output(0, "stdout content", "stderr content");
1771
1772 let redirects = vec![Redirect {
1773 kind: RedirectKind::MergeStderr,
1774 target: Expr::Literal(Value::Null),
1775 }];
1776
1777 let ctx = make_minimal_ctx();
1778 let result = apply_redirects(result, &redirects, &ctx).await;
1779
1780 assert_eq!(result.out, "stdout contentstderr content");
1781 assert!(result.err.is_empty());
1782 }
1783
1784 #[tokio::test]
1785 async fn test_merge_stderr_with_empty_stderr() {
1786 let result = ExecResult::from_output(0, "stdout only", "");
1788
1789 let redirects = vec![Redirect {
1790 kind: RedirectKind::MergeStderr,
1791 target: Expr::Literal(Value::Null),
1792 }];
1793
1794 let ctx = make_minimal_ctx();
1795 let result = apply_redirects(result, &redirects, &ctx).await;
1796
1797 assert_eq!(result.out, "stdout only");
1798 assert!(result.err.is_empty());
1799 }
1800
1801 #[tokio::test]
1802 async fn test_merge_stderr_order_matters() {
1803 let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1808
1809 let redirects = vec![Redirect {
1811 kind: RedirectKind::MergeStderr,
1812 target: Expr::Literal(Value::Null),
1813 }];
1814
1815 let ctx = make_minimal_ctx();
1816 let result = apply_redirects(result, &redirects, &ctx).await;
1817
1818 assert_eq!(result.out, "stdout\nstderr\n");
1819 assert!(result.err.is_empty());
1820 }
1821
1822 #[tokio::test]
1823 async fn test_redirect_with_command_execution() {
1824 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1825
1826 let cmd = Command {
1828 name: "echo".to_string(),
1829 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1830 redirects: vec![Redirect {
1831 kind: RedirectKind::MergeStderr,
1832 target: Expr::Literal(Value::Null),
1833 }],
1834 };
1835
1836 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1837 assert!(result.ok());
1838 assert!(result.out.contains("hello"));
1840 }
1841
1842 #[tokio::test]
1843 async fn test_merge_stderr_in_pipeline() {
1844 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1845
1846 let echo_cmd = Command {
1849 name: "echo".to_string(),
1850 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1851 redirects: vec![Redirect {
1852 kind: RedirectKind::MergeStderr,
1853 target: Expr::Literal(Value::Null),
1854 }],
1855 };
1856 let grep_cmd = Command {
1857 name: "grep".to_string(),
1858 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1859 redirects: vec![],
1860 };
1861
1862 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1863 assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1864 assert!(result.out.contains("output"));
1865 }
1866}