1use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21 parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24async fn apply_redirects(
30 mut result: ExecResult,
31 redirects: &[Redirect],
32 ctx: &ExecContext,
33) -> ExecResult {
34 for redir in redirects {
39 match redir.kind {
40 RedirectKind::MergeStderr => {
41 if result.out.is_empty() {
44 if let Some(ref output) = result.output {
45 result.out = output.to_canonical_string();
46 }
47 }
48 if !result.err.is_empty() {
49 result.out.push_str(&result.err);
50 result.err.clear();
51 }
52 }
53 RedirectKind::MergeStdout => {
54 if result.out.is_empty() {
56 if let Some(ref output) = result.output {
57 result.out = output.to_canonical_string();
58 }
59 }
60 if !result.out.is_empty() {
61 result.err.push_str(&result.out);
62 result.out.clear();
63 }
64 }
65 RedirectKind::StdoutOverwrite => {
66 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
67 if result.out.is_empty() && result.output.is_some() {
69 match std::fs::File::create(&path) {
70 Ok(mut file) => {
71 if let Some(ref output) = result.output {
72 if let Err(e) = output.write_canonical(&mut file, None) {
73 return ExecResult::failure(1, format!("redirect: {e}"));
74 }
75 }
76 }
77 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
78 }
79 } else {
80 if let Err(e) = tokio::fs::write(&path, &result.out).await {
81 return ExecResult::failure(1, format!("redirect: {e}"));
82 }
83 }
84 result.out.clear();
85 result.output = None;
86 }
87 }
88 RedirectKind::StdoutAppend => {
89 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
90 if result.out.is_empty() && result.output.is_some() {
92 match std::fs::OpenOptions::new().append(true).create(true).open(&path) {
93 Ok(mut file) => {
94 if let Some(ref output) = result.output {
95 if let Err(e) = output.write_canonical(&mut file, None) {
96 return ExecResult::failure(1, format!("redirect: {e}"));
97 }
98 }
99 }
100 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
101 }
102 } else {
103 let file = tokio::fs::OpenOptions::new()
104 .append(true)
105 .create(true)
106 .open(&path)
107 .await;
108 match file {
109 Ok(mut f) => {
110 if let Err(e) = f.write_all(result.out.as_bytes()).await {
111 return ExecResult::failure(1, format!("redirect: {e}"));
112 }
113 }
114 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
115 }
116 }
117 result.out.clear();
118 result.output = None;
119 }
120 }
121 RedirectKind::Stderr => {
122 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
123 if let Err(e) = tokio::fs::write(&path, &result.err).await {
124 return ExecResult::failure(1, format!("redirect: {e}"));
125 }
126 result.err.clear();
127 }
128 }
129 RedirectKind::Both => {
130 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
131 let combined = format!("{}{}", result.out, result.err);
132 if let Err(e) = tokio::fs::write(&path, combined).await {
133 return ExecResult::failure(1, format!("redirect: {e}"));
134 }
135 result.out.clear();
136 result.err.clear();
137 }
138 }
139 RedirectKind::Stdin | RedirectKind::HereDoc => {}
141 }
142 }
143 if result.out.is_empty() {
148 if let Some(ref output) = result.output {
149 result.out = output.to_canonical_string();
150 }
151 }
152 result
153}
154
155fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
157 eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
158}
159
160fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
163 for redir in &cmd.redirects {
164 match &redir.kind {
165 RedirectKind::Stdin => {
166 if let Some(path) = eval_redirect_target(&redir.target, ctx)
167 && let Ok(content) = std::fs::read_to_string(&path) {
168 ctx.set_stdin(content);
169 }
170 }
171 RedirectKind::HereDoc => {
172 match &redir.target {
173 Expr::Literal(Value::String(content)) => {
174 ctx.set_stdin(content.clone());
175 }
176 expr => {
177 if let Some(value) = eval_simple_expr(expr, ctx) {
178 ctx.set_stdin(value_to_string(&value));
179 }
180 }
181 }
182 }
183 _ => {}
184 }
185 }
186}
187
188#[derive(Clone)]
190pub struct PipelineRunner {
191 tools: Arc<ToolRegistry>,
192}
193
194impl PipelineRunner {
195 pub fn new(tools: Arc<ToolRegistry>) -> Self {
197 Self { tools }
198 }
199
200 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
210 pub async fn run(
211 &self,
212 commands: &[Command],
213 ctx: &mut ExecContext,
214 dispatcher: &dyn CommandDispatcher,
215 ) -> ExecResult {
216 if commands.is_empty() {
217 return ExecResult::success("");
218 }
219
220 if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
222 return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
223 }
224
225 self.run_sequential(commands, ctx, dispatcher).await
226 }
227
228 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
233 pub async fn run_sequential(
234 &self,
235 commands: &[Command],
236 ctx: &mut ExecContext,
237 dispatcher: &dyn CommandDispatcher,
238 ) -> ExecResult {
239 if commands.is_empty() {
240 return ExecResult::success("");
241 }
242
243 if commands.len() == 1 {
244 return self.run_single(&commands[0], ctx, None, dispatcher).await;
246 }
247
248 self.run_pipeline(commands, ctx, dispatcher).await
250 }
251
252 async fn run_scatter_gather(
254 &self,
255 commands: &[Command],
256 scatter_idx: usize,
257 gather_idx: usize,
258 ctx: &mut ExecContext,
259 _dispatcher: &dyn CommandDispatcher,
260 ) -> ExecResult {
261 let pre_scatter = &commands[..scatter_idx];
263 let scatter_cmd = &commands[scatter_idx];
264 let parallel = &commands[scatter_idx + 1..gather_idx];
265 let gather_cmd = &commands[gather_idx];
266 let post_gather = &commands[gather_idx + 1..];
267
268 let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
271 let gather_schema = self.tools.get("gather").map(|t| t.schema());
272 let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
273 let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
274
275 let scatter_dispatcher: Arc<dyn CommandDispatcher> =
279 Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
280
281 let runner = ScatterGatherRunner::new(self.tools.clone(), scatter_dispatcher);
282 runner
283 .run(
284 pre_scatter,
285 scatter_opts,
286 parallel,
287 gather_opts,
288 post_gather,
289 ctx,
290 )
291 .await
292 }
293
294 #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
299 async fn run_single(
300 &self,
301 cmd: &Command,
302 ctx: &mut ExecContext,
303 stdin: Option<String>,
304 dispatcher: &dyn CommandDispatcher,
305 ) -> ExecResult {
306 setup_stdin_redirects(cmd, ctx);
308
309 if let Some(input) = stdin {
311 ctx.set_stdin(input);
312 }
313
314 ctx.pipeline_position = PipelinePosition::Only;
316
317 let result = match dispatcher.dispatch(cmd, ctx).await {
319 Ok(result) => result,
320 Err(e) => ExecResult::failure(1, e.to_string()),
321 };
322
323 apply_redirects(result, &cmd.redirects, ctx).await
325 }
326
327 #[tracing::instrument(level = "debug", skip(self, commands, ctx, _dispatcher), fields(stage_count = commands.len()))]
337 async fn run_pipeline(
338 &self,
339 commands: &[Command],
340 ctx: &mut ExecContext,
341 _dispatcher: &dyn CommandDispatcher,
342 ) -> ExecResult {
343 let stage_count = commands.len();
344 let last_idx = stage_count - 1;
345
346 let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
348 let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
349
350 for _ in 0..last_idx {
351 let (writer, reader) = pipe_stream_default();
352 pipe_writers.push(Some(writer));
353 pipe_readers.push(Some(reader));
354 }
355
356 let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
358 let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
359
360 for _ in 0..last_idx {
361 let (tx, rx) = tokio::sync::oneshot::channel();
362 data_senders.push(Some(tx));
363 data_receivers.push(Some(rx));
364 }
365
366 let stage_dispatcher: Arc<dyn CommandDispatcher> =
371 Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
372
373 let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
374
375 for (i, cmd) in commands.iter().enumerate() {
376 let mut stage_ctx = ctx.child_for_pipeline();
377 let cmd = cmd.clone();
378
379 setup_stdin_redirects(&cmd, &mut stage_ctx);
381
382 if i == 0 {
384 if stage_ctx.stdin.is_none() {
387 stage_ctx.stdin = ctx.stdin.take();
388 }
389 if stage_ctx.stdin_data.is_none() {
390 stage_ctx.stdin_data = ctx.stdin_data.take();
391 }
392 } else {
393 stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
395 }
397
398 if i < last_idx {
400 stage_ctx.pipe_stdout = pipe_writers[i].take();
401 }
402
403 stage_ctx.pipeline_position = match i {
405 0 => PipelinePosition::First,
406 n if n == last_idx => PipelinePosition::Last,
407 _ => PipelinePosition::Middle,
408 };
409
410 let data_sender = if i < last_idx { data_senders[i].take() } else { None };
411 let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
412 let task_dispatcher = stage_dispatcher.clone();
413
414 let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> = tokio::spawn(async move {
415 if let Some(mut rx) = data_receiver {
422 if let Ok(data) = rx.try_recv() {
423 stage_ctx.stdin_data = data;
424 }
425 }
427
428 let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
430 Ok(result) => result,
431 Err(e) => ExecResult::failure(1, e.to_string()),
432 };
433
434 result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
436
437 if !result.err.is_empty() {
443 if let Some(ref stderr) = stage_ctx.stderr {
444 stderr.write_str(&result.err);
445 result.err.clear();
446 }
447 }
448
449 if let Some(tx) = data_sender {
455 let _ = tx.send(result.data.clone());
456 }
457
458 if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
461 let text = result.text_out();
462 if !text.is_empty() {
463 let _ = pipe_out.write_all(text.as_bytes()).await;
465 let _ = pipe_out.shutdown().await;
466 }
467 }
469
470 (result, stage_ctx)
471 });
472
473 handles.push(handle);
474 }
475
476 let mut last_result = ExecResult::success("");
481 let mut panics: Vec<String> = Vec::new();
482 for (i, handle) in handles.into_iter().enumerate() {
483 match handle.await {
484 Ok((result, stage_ctx)) => {
485 if i == last_idx {
486 last_result = result;
487 ctx.scope = stage_ctx.scope;
489 ctx.cwd = stage_ctx.cwd;
490 ctx.prev_cwd = stage_ctx.prev_cwd;
491 ctx.aliases = stage_ctx.aliases;
492 }
493 }
494 Err(e) => {
495 panics.push(format!("stage {}: {}", i, e));
496 }
497 }
498 }
499
500 if !panics.is_empty() {
501 last_result = ExecResult::failure(
502 1,
503 format!("pipeline stage(s) panicked: {}", panics.join("; ")),
504 );
505 }
506
507 last_result
508 }
509}
510
511pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str)> {
519 let mut map = HashMap::new();
520 for p in &schema.params {
521 map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str()));
522 for alias in &p.aliases {
523 let stripped = alias.trim_start_matches('-');
524 map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str()));
525 }
526 }
527 map
528}
529
530pub fn is_bool_type(param_type: &str) -> bool {
532 matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
533}
534
535pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
543 let mut tool_args = ToolArgs::new();
544 let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
545
546 let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
548 let mut past_double_dash = false;
549
550 let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
552 for (i, arg) in args.iter().enumerate() {
553 if let Arg::Positional(expr) = arg {
554 positional_indices.push((i, expr));
555 }
556 }
557
558 let mut i = 0;
560 while i < args.len() {
561 let arg = &args[i];
562
563 match arg {
564 Arg::DoubleDash => {
565 past_double_dash = true;
566 }
567 Arg::Positional(expr) => {
568 if !consumed_positionals.contains(&i)
570 && let Some(value) = eval_simple_expr(expr, ctx)
571 {
572 tool_args.positional.push(value);
573 }
574 }
575 Arg::Named { key, value } => {
576 if let Some(val) = eval_simple_expr(value, ctx) {
577 tool_args.named.insert(key.clone(), val);
578 }
579 }
580 Arg::ShortFlag(name) => {
581 if past_double_dash {
582 tool_args.positional.push(Value::String(format!("-{name}")));
583 } else if name.len() == 1 {
584 let flag_name = name.as_str();
587 let lookup = param_lookup.get(flag_name);
588 let is_bool = lookup
589 .map(|(_, typ)| is_bool_type(typ))
590 .unwrap_or(true);
591
592 if is_bool {
593 tool_args.flags.insert(flag_name.to_string());
594 } else {
595 let canonical = lookup.map(|(name, _)| *name).unwrap_or(flag_name);
597 let next_positional = positional_indices
598 .iter()
599 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
600
601 if let Some((pos_idx, expr)) = next_positional {
602 if let Some(value) = eval_simple_expr(expr, ctx) {
603 tool_args.named.insert(canonical.to_string(), value);
604 consumed_positionals.insert(*pos_idx);
605 } else {
606 tool_args.flags.insert(flag_name.to_string());
607 }
608 } else {
609 tool_args.flags.insert(flag_name.to_string());
610 }
611 }
612 } else if let Some(&(canonical, typ)) = param_lookup.get(name.as_str()) {
613 if is_bool_type(typ) {
615 tool_args.flags.insert(canonical.to_string());
616 } else {
617 let next_positional = positional_indices
618 .iter()
619 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
620 if let Some((pos_idx, expr)) = next_positional {
621 if let Some(value) = eval_simple_expr(expr, ctx) {
622 tool_args.named.insert(canonical.to_string(), value);
623 consumed_positionals.insert(*pos_idx);
624 } else {
625 tool_args.flags.insert(name.clone());
626 }
627 } else {
628 tool_args.flags.insert(name.clone());
629 }
630 }
631 } else {
632 for c in name.chars() {
634 tool_args.flags.insert(c.to_string());
635 }
636 }
637 }
638 Arg::LongFlag(name) => {
639 if past_double_dash {
640 tool_args.positional.push(Value::String(format!("--{name}")));
641 } else {
642 let lookup = param_lookup.get(name.as_str());
644 let is_bool = lookup
645 .map(|(_, typ)| is_bool_type(typ))
646 .unwrap_or(true); if is_bool {
649 tool_args.flags.insert(name.clone());
650 } else {
651 let canonical = lookup.map(|(name, _)| *name).unwrap_or(name.as_str());
653 let next_positional = positional_indices
654 .iter()
655 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
656
657 if let Some((pos_idx, expr)) = next_positional {
658 if let Some(value) = eval_simple_expr(expr, ctx) {
659 tool_args.named.insert(canonical.to_string(), value);
660 consumed_positionals.insert(*pos_idx);
661 } else {
662 tool_args.flags.insert(name.clone());
663 }
664 } else {
665 tool_args.flags.insert(name.clone());
666 }
667 }
668 }
669 }
670 }
671 i += 1;
672 }
673
674 if let Some(schema) = schema.filter(|s| s.map_positionals) {
679 let pre_dash_count = if past_double_dash {
681 let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
683 positional_indices.iter()
685 .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
686 .count()
687 } else {
688 tool_args.positional.len()
689 };
690
691 let mut remaining = Vec::new();
692 let mut positional_iter = tool_args.positional.drain(..).enumerate();
693
694 for param in &schema.params {
695 if tool_args.named.contains_key(¶m.name) || tool_args.flags.contains(¶m.name) {
696 continue; }
698 if is_bool_type(¶m.param_type) {
699 continue; }
701 loop {
703 match positional_iter.next() {
704 Some((idx, val)) if idx < pre_dash_count => {
705 tool_args.named.insert(param.name.clone(), val);
706 break;
707 }
708 Some((_, val)) => {
709 remaining.push(val); }
711 None => break,
712 }
713 }
714 }
715
716 remaining.extend(positional_iter.map(|(_, v)| v));
718 tool_args.positional = remaining;
719 }
720
721 tool_args
722}
723
724fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
726 match expr {
727 Expr::Literal(value) => Some(eval_literal(value, ctx)),
728 Expr::VarRef(path) => ctx.scope.resolve_path(path),
729 Expr::Interpolated(parts) => {
730 let mut result = String::new();
731 for part in parts {
732 match part {
733 crate::ast::StringPart::Literal(s) => result.push_str(s),
734 crate::ast::StringPart::Var(path) => {
735 if let Some(value) = ctx.scope.resolve_path(path) {
736 result.push_str(&value_to_string(&value));
737 }
738 }
739 crate::ast::StringPart::VarWithDefault { name, default } => {
740 match ctx.scope.get(name) {
741 Some(value) => {
742 let s = value_to_string(value);
743 if s.is_empty() {
744 result.push_str(&eval_string_parts_sync(default, ctx));
745 } else {
746 result.push_str(&s);
747 }
748 }
749 None => result.push_str(&eval_string_parts_sync(default, ctx)),
750 }
751 }
752 crate::ast::StringPart::VarLength(name) => {
753 let len = match ctx.scope.get(name) {
754 Some(value) => value_to_string(value).len(),
755 None => 0,
756 };
757 result.push_str(&len.to_string());
758 }
759 crate::ast::StringPart::Positional(n) => {
760 if let Some(s) = ctx.scope.get_positional(*n) {
761 result.push_str(s);
762 }
763 }
764 crate::ast::StringPart::AllArgs => {
765 result.push_str(&ctx.scope.all_args().join(" "));
766 }
767 crate::ast::StringPart::ArgCount => {
768 result.push_str(&ctx.scope.arg_count().to_string());
769 }
770 crate::ast::StringPart::Arithmetic(expr) => {
771 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
773 result.push_str(&value.to_string());
774 }
775 }
776 crate::ast::StringPart::CommandSubst(_) => {
777 }
779 crate::ast::StringPart::LastExitCode => {
780 result.push_str(&ctx.scope.last_result().code.to_string());
781 }
782 crate::ast::StringPart::CurrentPid => {
783 result.push_str(&ctx.scope.pid().to_string());
784 }
785 }
786 }
787 Some(Value::String(result))
788 }
789 Expr::GlobPattern(s) => Some(Value::String(s.clone())),
790 _ => None, }
792}
793
794fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
796 value.clone()
797}
798
799fn value_to_string(value: &Value) -> String {
801 match value {
802 Value::Null => "".to_string(),
803 Value::Bool(b) => b.to_string(),
804 Value::Int(i) => i.to_string(),
805 Value::Float(f) => f.to_string(),
806 Value::String(s) => s.clone(),
807 Value::Json(json) => json.to_string(),
808 Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
809 }
810}
811
812fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
815 let mut result = String::new();
816 for part in parts {
817 match part {
818 crate::ast::StringPart::Literal(s) => result.push_str(s),
819 crate::ast::StringPart::Var(path) => {
820 if let Some(value) = ctx.scope.resolve_path(path) {
821 result.push_str(&value_to_string(&value));
822 }
823 }
824 crate::ast::StringPart::VarWithDefault { name, default } => {
825 match ctx.scope.get(name) {
826 Some(value) => {
827 let s = value_to_string(value);
828 if s.is_empty() {
829 result.push_str(&eval_string_parts_sync(default, ctx));
830 } else {
831 result.push_str(&s);
832 }
833 }
834 None => result.push_str(&eval_string_parts_sync(default, ctx)),
835 }
836 }
837 crate::ast::StringPart::VarLength(name) => {
838 let len = match ctx.scope.get(name) {
839 Some(value) => value_to_string(value).len(),
840 None => 0,
841 };
842 result.push_str(&len.to_string());
843 }
844 crate::ast::StringPart::Positional(n) => {
845 if let Some(s) = ctx.scope.get_positional(*n) {
846 result.push_str(s);
847 }
848 }
849 crate::ast::StringPart::AllArgs => {
850 result.push_str(&ctx.scope.all_args().join(" "));
851 }
852 crate::ast::StringPart::ArgCount => {
853 result.push_str(&ctx.scope.arg_count().to_string());
854 }
855 crate::ast::StringPart::Arithmetic(expr) => {
856 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
857 result.push_str(&value.to_string());
858 }
859 }
860 crate::ast::StringPart::CommandSubst(_) => {
861 }
863 crate::ast::StringPart::LastExitCode => {
864 result.push_str(&ctx.scope.last_result().code.to_string());
865 }
866 crate::ast::StringPart::CurrentPid => {
867 result.push_str(&ctx.scope.pid().to_string());
868 }
869 }
870 }
871 result
872}
873
874fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
879 let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
880 let gather_idx = commands.iter().position(|c| c.name == "gather")?;
881
882 if gather_idx > scatter_idx {
884 Some((scatter_idx, gather_idx))
885 } else {
886 None
887 }
888}
889
890#[cfg(test)]
891mod tests {
892 use super::*;
893 use crate::dispatch::BackendDispatcher;
894 use crate::tools::register_builtins;
895 use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
896 use std::path::Path;
897
898 async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
899 let mut tools = ToolRegistry::new();
900 register_builtins(&mut tools);
901 let tools = Arc::new(tools);
902 let runner = PipelineRunner::new(tools.clone());
903 let dispatcher = BackendDispatcher::new(tools.clone());
904
905 let mut vfs = VfsRouter::new();
906 let mem = MemoryFs::new();
907 mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
908 vfs.mount("/", mem);
909 let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
910
911 (runner, ctx, dispatcher)
912 }
913
914 fn make_cmd(name: &str, args: Vec<&str>) -> Command {
915 Command {
916 name: name.to_string(),
917 args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
918 redirects: vec![],
919 }
920 }
921
922 #[tokio::test]
923 async fn test_single_command() {
924 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
925 let cmd = make_cmd("echo", vec!["hello"]);
926
927 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
928 assert!(result.ok());
929 assert_eq!(result.out.trim(), "hello");
930 }
931
932 #[tokio::test]
933 async fn test_pipeline_echo_grep() {
934 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
935
936 let echo_cmd = Command {
938 name: "echo".to_string(),
939 args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
940 redirects: vec![],
941 };
942 let grep_cmd = Command {
943 name: "grep".to_string(),
944 args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
945 redirects: vec![],
946 };
947
948 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
949 assert!(result.ok());
950 assert_eq!(result.out.trim(), "world");
951 }
952
953 #[tokio::test]
954 async fn test_pipeline_cat_grep() {
955 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
956
957 let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
959 let grep_cmd = Command {
960 name: "grep".to_string(),
961 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
962 redirects: vec![],
963 };
964
965 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
966 assert!(result.ok());
967 assert!(result.out.contains("hello"));
968 }
969
970 #[tokio::test]
971 async fn test_command_not_found() {
972 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
973 let cmd = make_cmd("nonexistent", vec![]);
974
975 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
976 assert!(!result.ok());
977 assert_eq!(result.code, 127);
978 assert!(result.err.contains("not found"));
979 }
980
981 #[tokio::test]
982 async fn test_pipeline_continues_on_failure() {
983 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
986
987 let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
990 let grep_cmd = Command {
991 name: "grep".to_string(),
992 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
993 redirects: vec![],
994 };
995
996 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
997 assert!(!result.ok());
999 }
1000
1001 #[tokio::test]
1002 async fn test_pipeline_last_command_exit_code() {
1003 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1005
1006 let echo_cmd = make_cmd("echo", vec!["hello"]);
1007 let cat_cmd = make_cmd("cat", vec![]);
1008
1009 let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1010 assert!(result.ok());
1011 assert!(result.out.contains("hello"));
1012 }
1013
1014 #[tokio::test]
1015 async fn test_empty_pipeline() {
1016 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1017 let result = runner.run(&[], &mut ctx, &dispatcher).await;
1018 assert!(result.ok());
1019 }
1020
1021 #[test]
1024 fn test_find_scatter_gather_both_present() {
1025 let commands = vec![
1026 make_cmd("echo", vec!["a"]),
1027 make_cmd("scatter", vec![]),
1028 make_cmd("process", vec![]),
1029 make_cmd("gather", vec![]),
1030 ];
1031 let result = find_scatter_gather(&commands);
1032 assert_eq!(result, Some((1, 3)));
1033 }
1034
1035 #[test]
1036 fn test_find_scatter_gather_no_scatter() {
1037 let commands = vec![
1038 make_cmd("echo", vec!["a"]),
1039 make_cmd("gather", vec![]),
1040 ];
1041 let result = find_scatter_gather(&commands);
1042 assert!(result.is_none());
1043 }
1044
1045 #[test]
1046 fn test_find_scatter_gather_no_gather() {
1047 let commands = vec![
1048 make_cmd("echo", vec!["a"]),
1049 make_cmd("scatter", vec![]),
1050 ];
1051 let result = find_scatter_gather(&commands);
1052 assert!(result.is_none());
1053 }
1054
1055 #[test]
1056 fn test_find_scatter_gather_wrong_order() {
1057 let commands = vec![
1058 make_cmd("gather", vec![]),
1059 make_cmd("scatter", vec![]),
1060 ];
1061 let result = find_scatter_gather(&commands);
1062 assert!(result.is_none());
1063 }
1064
1065 #[tokio::test]
1066 async fn test_scatter_gather_simple() {
1067 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1068
1069 let split_cmd = Command {
1071 name: "split".to_string(),
1072 args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1073 redirects: vec![],
1074 };
1075 let scatter_cmd = make_cmd("scatter", vec![]);
1076 let process_cmd = Command {
1077 name: "echo".to_string(),
1078 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1079 redirects: vec![],
1080 };
1081 let gather_cmd = make_cmd("gather", vec![]);
1082
1083 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1084 assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1085 assert!(result.out.contains("a"));
1087 assert!(result.out.contains("b"));
1088 assert!(result.out.contains("c"));
1089 }
1090
1091 #[tokio::test]
1092 async fn test_scatter_gather_empty_input() {
1093 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1094
1095 let echo_cmd = Command {
1097 name: "echo".to_string(),
1098 args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1099 redirects: vec![],
1100 };
1101 let scatter_cmd = make_cmd("scatter", vec![]);
1102 let process_cmd = Command {
1103 name: "echo".to_string(),
1104 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1105 redirects: vec![],
1106 };
1107 let gather_cmd = make_cmd("gather", vec![]);
1108
1109 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1110 assert!(result.ok());
1111 assert!(result.out.trim().is_empty());
1112 }
1113
1114 #[tokio::test]
1115 async fn test_scatter_gather_with_structured_stdin() {
1116 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1117
1118 let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1120 ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1121
1122 let scatter_cmd = make_cmd("scatter", vec![]);
1123 let process_cmd = Command {
1124 name: "echo".to_string(),
1125 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1126 redirects: vec![],
1127 };
1128 let gather_cmd = make_cmd("gather", vec![]);
1129
1130 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1131 assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1132 assert!(result.out.contains("x"));
1133 assert!(result.out.contains("y"));
1134 assert!(result.out.contains("z"));
1135 }
1136
1137 #[tokio::test]
1138 async fn test_scatter_gather_json_input() {
1139 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1140
1141 let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1143 ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1144
1145 let scatter_cmd = make_cmd("scatter", vec![]);
1146 let process_cmd = Command {
1147 name: "echo".to_string(),
1148 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1149 redirects: vec![],
1150 };
1151 let gather_cmd = make_cmd("gather", vec![]);
1152
1153 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1154 assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1155 assert!(result.out.contains("one"));
1156 assert!(result.out.contains("two"));
1157 assert!(result.out.contains("three"));
1158 }
1159
1160 #[tokio::test]
1161 async fn test_scatter_gather_with_post_gather() {
1162 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1163
1164 let split_cmd = Command {
1166 name: "split".to_string(),
1167 args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1168 redirects: vec![],
1169 };
1170 let scatter_cmd = make_cmd("scatter", vec![]);
1171 let process_cmd = Command {
1172 name: "echo".to_string(),
1173 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1174 redirects: vec![],
1175 };
1176 let gather_cmd = make_cmd("gather", vec![]);
1177 let grep_cmd = Command {
1178 name: "grep".to_string(),
1179 args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1180 redirects: vec![],
1181 };
1182
1183 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1184 assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1185 assert!(result.out.contains("a"));
1186 assert!(!result.out.contains("b"));
1187 }
1188
1189 #[tokio::test]
1190 async fn test_scatter_custom_var_name() {
1191 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1192
1193 let data = Value::Json(serde_json::json!(["test1", "test2"]));
1195 ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1196
1197 let scatter_cmd = Command {
1199 name: "scatter".to_string(),
1200 args: vec![Arg::Named {
1201 key: "as".to_string(),
1202 value: Expr::Literal(Value::String("URL".to_string())),
1203 }],
1204 redirects: vec![],
1205 };
1206 let process_cmd = Command {
1207 name: "echo".to_string(),
1208 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1209 redirects: vec![],
1210 };
1211 let gather_cmd = make_cmd("gather", vec![]);
1212
1213 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1214 assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1215 assert!(result.out.contains("test1"));
1216 assert!(result.out.contains("test2"));
1217 }
1218
1219 #[tokio::test]
1222 async fn test_pipeline_routes_through_backend() {
1223 use crate::backend::testing::MockBackend;
1224 use std::sync::atomic::Ordering;
1225
1226 let (backend, call_count) = MockBackend::new();
1228 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1229
1230 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1232
1233 let tools = std::sync::Arc::new(ToolRegistry::new());
1235 let runner = PipelineRunner::new(tools.clone());
1236 let dispatcher = BackendDispatcher::new(tools);
1237
1238 let cmd = make_cmd("test-tool", vec!["arg1"]);
1240 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1241
1242 assert!(result.ok(), "Mock backend should return success");
1243 assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1244 assert!(result.out.contains("mock executed"), "Output should be from mock backend");
1245 }
1246
1247 #[tokio::test]
1248 async fn test_multi_command_pipeline_routes_through_backend() {
1249 use crate::backend::testing::MockBackend;
1250 use std::sync::atomic::Ordering;
1251
1252 let (backend, call_count) = MockBackend::new();
1253 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1254 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1255
1256 let tools = std::sync::Arc::new(ToolRegistry::new());
1257 let runner = PipelineRunner::new(tools.clone());
1258 let dispatcher = BackendDispatcher::new(tools);
1259
1260 let cmd1 = make_cmd("tool1", vec![]);
1262 let cmd2 = make_cmd("tool2", vec![]);
1263 let cmd3 = make_cmd("tool3", vec![]);
1264
1265 let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1266
1267 assert!(result.ok());
1268 assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1269 }
1270
1271 use crate::tools::{ParamSchema, ToolSchema};
1274
1275 fn make_test_schema() -> ToolSchema {
1276 ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1277 .param(ParamSchema::required("query", "string", "Search query"))
1278 .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1279 .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1280 .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1281 .with_positional_mapping()
1282 }
1283
1284 fn make_minimal_ctx() -> ExecContext {
1285 let mut vfs = VfsRouter::new();
1286 vfs.mount("/", MemoryFs::new());
1287 ExecContext::new(Arc::new(vfs))
1288 }
1289
1290 #[test]
1291 fn test_schema_aware_string_arg() {
1292 let args = vec![
1294 Arg::LongFlag("query".to_string()),
1295 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1296 ];
1297 let schema = make_test_schema();
1298 let ctx = make_minimal_ctx();
1299
1300 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1301
1302 assert!(tool_args.flags.is_empty(), "No flags should be set");
1303 assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1304 assert_eq!(
1305 tool_args.named.get("query"),
1306 Some(&Value::String("test".to_string())),
1307 "--query should consume 'test' as its value"
1308 );
1309 }
1310
1311 #[test]
1312 fn test_schema_aware_bool_flag() {
1313 let args = vec![
1315 Arg::LongFlag("verbose".to_string()),
1316 ];
1317 let schema = make_test_schema();
1318 let ctx = make_minimal_ctx();
1319
1320 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1321
1322 assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1323 assert!(tool_args.named.is_empty(), "No named args");
1324 assert!(tool_args.positional.is_empty(), "No positionals");
1325 }
1326
1327 #[test]
1328 fn test_schema_aware_mixed() {
1329 let args = vec![
1332 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1333 Arg::LongFlag("output".to_string()),
1334 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1335 Arg::LongFlag("verbose".to_string()),
1336 ];
1337 let schema = make_test_schema();
1338 let ctx = make_minimal_ctx();
1339
1340 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1341
1342 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1343 assert_eq!(
1344 tool_args.named.get("query"),
1345 Some(&Value::String("file.txt".to_string()))
1346 );
1347 assert_eq!(
1348 tool_args.named.get("output"),
1349 Some(&Value::String("out.txt".to_string()))
1350 );
1351 assert!(tool_args.flags.contains("verbose"));
1352 }
1353
1354 #[test]
1355 fn test_schema_aware_multiple_string_args() {
1356 let args = vec![
1358 Arg::LongFlag("query".to_string()),
1359 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1360 Arg::LongFlag("output".to_string()),
1361 Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1362 Arg::LongFlag("verbose".to_string()),
1363 Arg::LongFlag("limit".to_string()),
1364 Arg::Positional(Expr::Literal(Value::Int(5))),
1365 ];
1366 let schema = make_test_schema();
1367 let ctx = make_minimal_ctx();
1368
1369 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1370
1371 assert!(tool_args.positional.is_empty(), "All positionals consumed");
1372 assert_eq!(
1373 tool_args.named.get("query"),
1374 Some(&Value::String("test".to_string()))
1375 );
1376 assert_eq!(
1377 tool_args.named.get("output"),
1378 Some(&Value::String("result.json".to_string()))
1379 );
1380 assert_eq!(
1381 tool_args.named.get("limit"),
1382 Some(&Value::Int(5))
1383 );
1384 assert!(tool_args.flags.contains("verbose"));
1385 }
1386
1387 #[test]
1388 fn test_schema_aware_double_dash() {
1389 let args = vec![
1392 Arg::LongFlag("output".to_string()),
1393 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1394 Arg::DoubleDash,
1395 Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1396 ];
1397 let schema = make_test_schema();
1398 let ctx = make_minimal_ctx();
1399
1400 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1401
1402 assert_eq!(
1403 tool_args.named.get("output"),
1404 Some(&Value::String("out.txt".to_string()))
1405 );
1406 assert_eq!(
1408 tool_args.positional,
1409 vec![Value::String("--this-is-data".to_string())]
1410 );
1411 }
1412
1413 #[test]
1414 fn test_no_schema_fallback() {
1415 let args = vec![
1417 Arg::LongFlag("query".to_string()),
1418 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1419 ];
1420 let ctx = make_minimal_ctx();
1421
1422 let tool_args = build_tool_args(&args, &ctx, None);
1423
1424 assert!(tool_args.flags.contains("query"), "--query should be a flag");
1426 assert_eq!(
1427 tool_args.positional,
1428 vec![Value::String("test".to_string())],
1429 "'test' should be a positional"
1430 );
1431 }
1432
1433 #[test]
1434 fn test_unknown_flag_in_schema() {
1435 let args = vec![
1437 Arg::LongFlag("unknown".to_string()),
1438 Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1439 ];
1440 let schema = make_test_schema();
1441 let ctx = make_minimal_ctx();
1442
1443 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1444
1445 assert!(tool_args.flags.contains("unknown"));
1446 assert!(tool_args.positional.is_empty(), "value consumed as query param");
1447 assert_eq!(
1448 tool_args.named.get("query"),
1449 Some(&Value::String("value".to_string()))
1450 );
1451 }
1452
1453 #[test]
1454 fn test_named_args_unchanged() {
1455 let args = vec![
1457 Arg::Named {
1458 key: "query".to_string(),
1459 value: Expr::Literal(Value::String("test".to_string())),
1460 },
1461 Arg::LongFlag("verbose".to_string()),
1462 ];
1463 let schema = make_test_schema();
1464 let ctx = make_minimal_ctx();
1465
1466 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1467
1468 assert_eq!(
1469 tool_args.named.get("query"),
1470 Some(&Value::String("test".to_string()))
1471 );
1472 assert!(tool_args.flags.contains("verbose"));
1473 }
1474
1475 #[test]
1476 fn test_short_flags_unchanged() {
1477 let args = vec![
1479 Arg::ShortFlag("la".to_string()),
1480 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1481 ];
1482 let schema = make_test_schema();
1483 let ctx = make_minimal_ctx();
1484
1485 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1486
1487 assert!(tool_args.flags.contains("l"));
1488 assert!(tool_args.flags.contains("a"));
1489 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1490 assert_eq!(
1491 tool_args.named.get("query"),
1492 Some(&Value::String("file.txt".to_string()))
1493 );
1494 }
1495
1496 #[test]
1497 fn test_flag_at_end_no_value() {
1498 let args = vec![
1501 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1502 Arg::LongFlag("output".to_string()),
1503 ];
1504 let schema = make_test_schema();
1505 let ctx = make_minimal_ctx();
1506
1507 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1508
1509 assert!(tool_args.flags.contains("output"));
1511 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1512 assert_eq!(
1513 tool_args.named.get("query"),
1514 Some(&Value::String("file.txt".to_string()))
1515 );
1516 }
1517
1518 #[test]
1519 fn test_positional_skips_bool_params() {
1520 let schema = ToolSchema::new("test", "")
1524 .param(ParamSchema::required("query", "string", ""))
1525 .param(ParamSchema::optional(
1526 "verbose",
1527 "bool",
1528 Value::Bool(false),
1529 "",
1530 ))
1531 .param(ParamSchema::optional(
1532 "output",
1533 "string",
1534 Value::Null,
1535 "",
1536 ))
1537 .with_positional_mapping();
1538 let args = vec![
1539 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1540 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1541 ];
1542 let ctx = make_minimal_ctx();
1543
1544 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1545
1546 assert_eq!(
1547 tool_args.named.get("query"),
1548 Some(&Value::String("val1".to_string()))
1549 );
1550 assert_eq!(
1551 tool_args.named.get("output"),
1552 Some(&Value::String("val2".to_string()))
1553 );
1554 assert!(!tool_args.flags.contains("verbose"));
1555 assert!(tool_args.positional.is_empty());
1556 }
1557
1558 #[test]
1559 fn test_positionals_fill_available_slots() {
1560 let args = vec![
1563 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1564 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1565 Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1566 ];
1567 let schema = make_test_schema(); let ctx = make_minimal_ctx();
1569
1570 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1571
1572 assert_eq!(
1575 tool_args.named.get("query"),
1576 Some(&Value::String("val1".to_string()))
1577 );
1578 assert_eq!(
1579 tool_args.named.get("limit"),
1580 Some(&Value::String("val2".to_string()))
1581 );
1582 assert_eq!(
1583 tool_args.named.get("output"),
1584 Some(&Value::String("val3".to_string()))
1585 );
1586 assert!(tool_args.positional.is_empty());
1587 }
1588
1589 #[test]
1590 fn test_truly_excess_positionals() {
1591 let schema = ToolSchema::new("test", "")
1593 .param(ParamSchema::required("name", "string", ""))
1594 .with_positional_mapping();
1595 let args = vec![
1596 Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1597 Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1598 Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1599 ];
1600 let ctx = make_minimal_ctx();
1601
1602 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1603
1604 assert_eq!(
1605 tool_args.named.get("name"),
1606 Some(&Value::String("first".to_string()))
1607 );
1608 assert_eq!(
1609 tool_args.positional,
1610 vec![
1611 Value::String("second".to_string()),
1612 Value::String("third".to_string()),
1613 ]
1614 );
1615 }
1616
1617 #[test]
1618 fn test_double_dash_positional_not_mapped() {
1619 let args = vec![
1621 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1622 Arg::DoubleDash,
1623 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1624 ];
1625 let schema = make_test_schema();
1626 let ctx = make_minimal_ctx();
1627
1628 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1629
1630 assert_eq!(
1631 tool_args.named.get("query"),
1632 Some(&Value::String("val1".to_string()))
1633 );
1634 assert_eq!(
1636 tool_args.positional,
1637 vec![Value::String("val2".to_string())]
1638 );
1639 }
1640
1641 #[test]
1642 fn test_all_params_filled_by_flags() {
1643 let args = vec![
1645 Arg::LongFlag("query".to_string()),
1646 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1647 Arg::LongFlag("output".to_string()),
1648 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1649 Arg::LongFlag("verbose".to_string()),
1650 ];
1651 let schema = make_test_schema();
1652 let ctx = make_minimal_ctx();
1653
1654 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1655
1656 assert_eq!(
1657 tool_args.named.get("query"),
1658 Some(&Value::String("search".to_string()))
1659 );
1660 assert_eq!(
1661 tool_args.named.get("output"),
1662 Some(&Value::String("out.txt".to_string()))
1663 );
1664 assert!(tool_args.flags.contains("verbose"));
1665 assert!(tool_args.positional.is_empty());
1666 }
1667
1668 #[test]
1669 fn test_mixed_flags_and_positional_fill() {
1670 let args = vec![
1672 Arg::LongFlag("output".to_string()),
1673 Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
1674 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1675 ];
1676 let schema = make_test_schema();
1677 let ctx = make_minimal_ctx();
1678
1679 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1680
1681 assert_eq!(
1682 tool_args.named.get("output"),
1683 Some(&Value::String("foo".to_string()))
1684 );
1685 assert_eq!(
1686 tool_args.named.get("query"),
1687 Some(&Value::String("val1".to_string()))
1688 );
1689 assert!(tool_args.positional.is_empty());
1690 }
1691
1692 #[test]
1693 fn test_alias_flag_prevents_mapping_overwrite() {
1694 let schema = ToolSchema::new("test", "")
1696 .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
1697 .param(ParamSchema::required("output", "string", ""))
1698 .with_positional_mapping();
1699 let args = vec![
1700 Arg::ShortFlag("q".to_string()),
1701 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1702 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1703 ];
1704 let ctx = make_minimal_ctx();
1705
1706 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1707
1708 assert_eq!(
1709 tool_args.named.get("query"),
1710 Some(&Value::String("search".to_string()))
1711 );
1712 assert_eq!(
1713 tool_args.named.get("output"),
1714 Some(&Value::String("out.txt".to_string()))
1715 );
1716 assert!(tool_args.positional.is_empty());
1717 }
1718
1719 #[test]
1720 fn test_builtin_schema_no_positional_mapping() {
1721 let schema = ToolSchema::new("echo", "")
1723 .param(ParamSchema::optional("args", "any", Value::Null, ""))
1724 .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
1725 let args = vec![
1727 Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
1728 Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
1729 ];
1730 let ctx = make_minimal_ctx();
1731
1732 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1733
1734 assert_eq!(
1736 tool_args.positional,
1737 vec![
1738 Value::String("hello".to_string()),
1739 Value::String("world".to_string()),
1740 ]
1741 );
1742 assert!(tool_args.named.get("args").is_none());
1743 }
1744
1745 #[test]
1746 fn test_short_flag_with_alias_consumes_value() {
1747 let schema = ToolSchema::new("head", "Output first part of files")
1750 .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
1751 .with_aliases(["-n"]));
1752 let args = vec![
1753 Arg::ShortFlag("n".to_string()),
1754 Arg::Positional(Expr::Literal(Value::Int(5))),
1755 Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
1756 ];
1757 let ctx = make_minimal_ctx();
1758
1759 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1760
1761 assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
1762 assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
1763 assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
1764 }
1765
1766 #[tokio::test]
1769 async fn test_merge_stderr_redirect() {
1770 let result = ExecResult::from_output(0, "stdout content", "stderr content");
1772
1773 let redirects = vec![Redirect {
1774 kind: RedirectKind::MergeStderr,
1775 target: Expr::Literal(Value::Null),
1776 }];
1777
1778 let ctx = make_minimal_ctx();
1779 let result = apply_redirects(result, &redirects, &ctx).await;
1780
1781 assert_eq!(result.out, "stdout contentstderr content");
1782 assert!(result.err.is_empty());
1783 }
1784
1785 #[tokio::test]
1786 async fn test_merge_stderr_with_empty_stderr() {
1787 let result = ExecResult::from_output(0, "stdout only", "");
1789
1790 let redirects = vec![Redirect {
1791 kind: RedirectKind::MergeStderr,
1792 target: Expr::Literal(Value::Null),
1793 }];
1794
1795 let ctx = make_minimal_ctx();
1796 let result = apply_redirects(result, &redirects, &ctx).await;
1797
1798 assert_eq!(result.out, "stdout only");
1799 assert!(result.err.is_empty());
1800 }
1801
1802 #[tokio::test]
1803 async fn test_merge_stderr_order_matters() {
1804 let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1809
1810 let redirects = vec![Redirect {
1812 kind: RedirectKind::MergeStderr,
1813 target: Expr::Literal(Value::Null),
1814 }];
1815
1816 let ctx = make_minimal_ctx();
1817 let result = apply_redirects(result, &redirects, &ctx).await;
1818
1819 assert_eq!(result.out, "stdout\nstderr\n");
1820 assert!(result.err.is_empty());
1821 }
1822
1823 #[tokio::test]
1824 async fn test_redirect_with_command_execution() {
1825 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1826
1827 let cmd = Command {
1829 name: "echo".to_string(),
1830 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1831 redirects: vec![Redirect {
1832 kind: RedirectKind::MergeStderr,
1833 target: Expr::Literal(Value::Null),
1834 }],
1835 };
1836
1837 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1838 assert!(result.ok());
1839 assert!(result.out.contains("hello"));
1841 }
1842
1843 #[tokio::test]
1844 async fn test_merge_stderr_in_pipeline() {
1845 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1846
1847 let echo_cmd = Command {
1850 name: "echo".to_string(),
1851 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1852 redirects: vec![Redirect {
1853 kind: RedirectKind::MergeStderr,
1854 target: Expr::Literal(Value::Null),
1855 }],
1856 };
1857 let grep_cmd = Command {
1858 name: "grep".to_string(),
1859 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1860 redirects: vec![],
1861 };
1862
1863 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1864 assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1865 assert!(result.out.contains("output"));
1866 }
1867}