1use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21 parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24async fn apply_redirects(
30 mut result: ExecResult,
31 redirects: &[Redirect],
32 ctx: &ExecContext,
33) -> ExecResult {
34 for redir in redirects {
39 match redir.kind {
40 RedirectKind::MergeStderr => {
41 result.materialize();
44 if !result.err.is_empty() {
45 let err = std::mem::take(&mut result.err);
46 result.push_out(&err);
47 }
48 }
49 RedirectKind::MergeStdout => {
50 result.materialize();
52 if !result.text_out().is_empty() {
53 let out = result.text_out().into_owned();
54 result.err.push_str(&out);
55 result.clear_out();
56 }
57 }
58 RedirectKind::StdoutOverwrite => {
59 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
60 if let Some(output) = result.take_output_for_stream() {
62 let mut buf = Vec::new();
63 if let Err(e) = output.write_canonical(&mut buf, None) {
64 return ExecResult::failure(1, format!("redirect: {e}"));
65 }
66 if let Err(e) = redirect_write(ctx, &path, &buf).await {
67 return ExecResult::failure(1, format!("redirect: {e}"));
68 }
69 } else {
70 if let Err(e) = redirect_write(ctx, &path, result.text_out().as_bytes()).await {
71 return ExecResult::failure(1, format!("redirect: {e}"));
72 }
73 }
74 result.clear_out();
75 result.set_output(None);
76 }
77 }
78 RedirectKind::StdoutAppend => {
79 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
80 if let Some(output) = result.take_output_for_stream() {
82 let mut buf = Vec::new();
83 if let Err(e) = output.write_canonical(&mut buf, None) {
84 return ExecResult::failure(1, format!("redirect: {e}"));
85 }
86 if let Err(e) = redirect_append(ctx, &path, &buf).await {
87 return ExecResult::failure(1, format!("redirect: {e}"));
88 }
89 } else {
90 if let Err(e) = redirect_append(ctx, &path, result.text_out().as_bytes()).await {
91 return ExecResult::failure(1, format!("redirect: {e}"));
92 }
93 }
94 result.clear_out();
95 result.set_output(None);
96 }
97 }
98 RedirectKind::Stderr => {
99 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
100 if let Err(e) = redirect_write(ctx, &path, result.err.as_bytes()).await {
101 return ExecResult::failure(1, format!("redirect: {e}"));
102 }
103 result.err.clear();
104 }
105 }
106 RedirectKind::Both => {
107 if let Some(path) = eval_redirect_target(&redir.target, ctx) {
108 let combined = format!("{}{}", result.text_out(), result.err);
109 if let Err(e) = redirect_write(ctx, &path, combined.as_bytes()).await {
110 return ExecResult::failure(1, format!("redirect: {e}"));
111 }
112 result.clear_out();
113 result.set_output(None);
114 result.err.clear();
115 }
116 }
117 RedirectKind::Stdin | RedirectKind::HereDoc | RedirectKind::HereString => {}
119 }
120 }
121 result.materialize();
126 result
127}
128
129fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
131 eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
132}
133
134async fn redirect_write(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
136 use crate::backend::WriteMode;
137 let p = std::path::Path::new(path);
138 ctx.backend.write(p, data, WriteMode::Overwrite).await.map_err(|e| e.to_string())
139}
140
141async fn redirect_append(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
143 let p = std::path::Path::new(path);
144 ctx.backend.append(p, data).await.map_err(|e| e.to_string())
145}
146
147fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
150 for redir in &cmd.redirects {
151 match &redir.kind {
152 RedirectKind::Stdin => {
153 if let Some(path) = eval_redirect_target(&redir.target, ctx)
154 && let Ok(content) = std::fs::read_to_string(&path) {
155 ctx.set_stdin(content);
156 }
157 }
158 RedirectKind::HereDoc => {
159 match &redir.target {
160 Expr::Literal(Value::String(content)) => {
161 ctx.set_stdin(content.clone());
162 }
163 expr => {
164 if let Some(value) = eval_simple_expr(expr, ctx) {
165 ctx.set_stdin(value_to_string(&value));
166 }
167 }
168 }
169 }
170 RedirectKind::HereString => {
171 if let Some(value) = eval_simple_expr(&redir.target, ctx) {
174 let mut s = value_to_string(&value);
175 s.push('\n');
176 ctx.set_stdin(s);
177 }
178 }
179 _ => {}
180 }
181 }
182}
183
184#[derive(Clone)]
186pub struct PipelineRunner {
187 tools: Arc<ToolRegistry>,
188}
189
190impl PipelineRunner {
191 pub fn new(tools: Arc<ToolRegistry>) -> Self {
193 Self { tools }
194 }
195
196 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
206 pub async fn run(
207 &self,
208 commands: &[Command],
209 ctx: &mut ExecContext,
210 dispatcher: &dyn CommandDispatcher,
211 ) -> ExecResult {
212 if commands.is_empty() {
213 return ExecResult::success("");
214 }
215
216 if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
218 return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
219 }
220
221 self.run_sequential(commands, ctx, dispatcher).await
222 }
223
224 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
229 pub async fn run_sequential(
230 &self,
231 commands: &[Command],
232 ctx: &mut ExecContext,
233 dispatcher: &dyn CommandDispatcher,
234 ) -> ExecResult {
235 if commands.is_empty() {
236 return ExecResult::success("");
237 }
238
239 if commands.len() == 1 {
240 return self.run_single(&commands[0], ctx, None, dispatcher).await;
242 }
243
244 self.run_pipeline(commands, ctx, dispatcher).await
246 }
247
248 async fn run_scatter_gather(
250 &self,
251 commands: &[Command],
252 scatter_idx: usize,
253 gather_idx: usize,
254 ctx: &mut ExecContext,
255 dispatcher: &dyn CommandDispatcher,
256 ) -> ExecResult {
257 let pre_scatter = &commands[..scatter_idx];
259 let scatter_cmd = &commands[scatter_idx];
260 let parallel = &commands[scatter_idx + 1..gather_idx];
261 let gather_cmd = &commands[gather_idx];
262 let post_gather = &commands[gather_idx + 1..];
263
264 let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
267 let gather_schema = self.tools.get("gather").map(|t| t.schema());
268 let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
269 let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
270
271 let sequential_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
276
277 let runner = ScatterGatherRunner::new(self.tools.clone(), sequential_dispatcher);
278 runner
279 .run(
280 pre_scatter,
281 scatter_opts,
282 parallel,
283 gather_opts,
284 post_gather,
285 ctx,
286 )
287 .await
288 }
289
290 #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
295 async fn run_single(
296 &self,
297 cmd: &Command,
298 ctx: &mut ExecContext,
299 stdin: Option<String>,
300 dispatcher: &dyn CommandDispatcher,
301 ) -> ExecResult {
302 setup_stdin_redirects(cmd, ctx);
304
305 if let Some(input) = stdin {
307 ctx.set_stdin(input);
308 }
309
310 ctx.pipeline_position = PipelinePosition::Only;
312
313 let result = match dispatcher.dispatch(cmd, ctx).await {
315 Ok(result) => result,
316 Err(e) => ExecResult::failure(1, e.to_string()),
317 };
318
319 apply_redirects(result, &cmd.redirects, ctx).await
321 }
322
323 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(stage_count = commands.len()))]
333 async fn run_pipeline(
334 &self,
335 commands: &[Command],
336 ctx: &mut ExecContext,
337 dispatcher: &dyn CommandDispatcher,
338 ) -> ExecResult {
339 let stage_count = commands.len();
340 let last_idx = stage_count - 1;
341
342 let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
344 let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
345
346 for _ in 0..last_idx {
347 let (writer, reader) = pipe_stream_default();
348 pipe_writers.push(Some(writer));
349 pipe_readers.push(Some(reader));
350 }
351
352 let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
354 let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
355
356 for _ in 0..last_idx {
357 let (tx, rx) = tokio::sync::oneshot::channel();
358 data_senders.push(Some(tx));
359 data_receivers.push(Some(rx));
360 }
361
362 let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
363
364 for (i, cmd) in commands.iter().enumerate() {
365 let mut stage_ctx = ctx.child_for_pipeline();
366 let cmd = cmd.clone();
367
368 let task_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
373
374 setup_stdin_redirects(&cmd, &mut stage_ctx);
376
377 if i == 0 {
379 if stage_ctx.stdin.is_none() {
382 stage_ctx.stdin = ctx.stdin.take();
383 }
384 if stage_ctx.stdin_data.is_none() {
385 stage_ctx.stdin_data = ctx.stdin_data.take();
386 }
387 } else {
388 stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
390 }
392
393 if i < last_idx {
395 stage_ctx.pipe_stdout = pipe_writers[i].take();
396 }
397
398 stage_ctx.pipeline_position = match i {
400 0 => PipelinePosition::First,
401 n if n == last_idx => PipelinePosition::Last,
402 _ => PipelinePosition::Middle,
403 };
404
405 let data_sender = if i < last_idx { data_senders[i].take() } else { None };
406 let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
407
408 let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> = tokio::spawn(async move {
409 if let Some(mut rx) = data_receiver {
416 if let Ok(data) = rx.try_recv() {
417 stage_ctx.stdin_data = data;
418 }
419 }
421
422 let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
424 Ok(result) => result,
425 Err(e) => ExecResult::failure(1, e.to_string()),
426 };
427
428 result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
430
431 if !result.err.is_empty() {
437 if let Some(ref stderr) = stage_ctx.stderr {
438 stderr.write_str(&result.err);
439 result.err.clear();
440 }
441 }
442
443 if let Some(tx) = data_sender {
449 let _ = tx.send(result.data.clone());
450 }
451
452 if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
455 let text = result.text_out();
456 if !text.is_empty() {
457 let _ = pipe_out.write_all(text.as_bytes()).await;
459 let _ = pipe_out.shutdown().await;
460 }
461 }
463
464 (result, stage_ctx)
465 });
466
467 handles.push(handle);
468 }
469
470 let mut last_result = ExecResult::success("");
475 let mut panics: Vec<String> = Vec::new();
476 for (i, handle) in handles.into_iter().enumerate() {
477 match handle.await {
478 Ok((result, stage_ctx)) => {
479 if i == last_idx {
480 last_result = result;
481 ctx.scope = stage_ctx.scope;
483 ctx.cwd = stage_ctx.cwd;
484 ctx.prev_cwd = stage_ctx.prev_cwd;
485 ctx.aliases = stage_ctx.aliases;
486 }
487 }
488 Err(e) => {
489 panics.push(format!("stage {}: {}", i, e));
490 }
491 }
492 }
493
494 if !panics.is_empty() {
495 last_result = ExecResult::failure(
496 1,
497 format!("pipeline stage(s) panicked: {}", panics.join("; ")),
498 );
499 }
500
501 last_result
502 }
503}
504
505pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str, usize)> {
515 let mut map = HashMap::new();
516 for p in &schema.params {
517 map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
518 for alias in &p.aliases {
519 let stripped = alias.trim_start_matches('-');
520 map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
521 }
522 }
523 map
524}
525
526pub fn is_bool_type(param_type: &str) -> bool {
528 matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
529}
530
531pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
539 let mut tool_args = ToolArgs::new();
540 let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
541
542 let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
544 let mut past_double_dash = false;
545
546 let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
548 for (i, arg) in args.iter().enumerate() {
549 if let Arg::Positional(expr) = arg {
550 positional_indices.push((i, expr));
551 }
552 }
553
554 let mut i = 0;
556 while i < args.len() {
557 let arg = &args[i];
558
559 match arg {
560 Arg::DoubleDash => {
561 past_double_dash = true;
562 }
563 Arg::Positional(expr) => {
564 if !consumed_positionals.contains(&i)
566 && let Some(value) = eval_simple_expr(expr, ctx)
567 {
568 tool_args.positional.push(value);
569 }
570 }
571 Arg::Named { key, value } => {
572 if let Some(val) = eval_simple_expr(value, ctx) {
573 tool_args.named.insert(key.clone(), val);
574 }
575 }
576 Arg::ShortFlag(name) => {
577 if past_double_dash {
578 tool_args.positional.push(Value::String(format!("-{name}")));
579 } else if name.len() == 1 {
580 let flag_name = name.as_str();
583 let lookup = param_lookup.get(flag_name);
584 let is_bool = lookup
585 .map(|(_, typ, _)| is_bool_type(typ))
586 .unwrap_or(true);
587
588 if is_bool {
589 tool_args.flags.insert(flag_name.to_string());
590 } else {
591 let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
593 let next_positional = positional_indices
594 .iter()
595 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
596
597 if let Some((pos_idx, expr)) = next_positional {
598 if let Some(value) = eval_simple_expr(expr, ctx) {
599 tool_args.named.insert(canonical.to_string(), value);
600 consumed_positionals.insert(*pos_idx);
601 } else {
602 tool_args.flags.insert(flag_name.to_string());
603 }
604 } else {
605 tool_args.flags.insert(flag_name.to_string());
606 }
607 }
608 } else if let Some(&(canonical, typ, _)) = param_lookup.get(name.as_str()) {
609 if is_bool_type(typ) {
611 tool_args.flags.insert(canonical.to_string());
612 } else {
613 let next_positional = positional_indices
614 .iter()
615 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
616 if let Some((pos_idx, expr)) = next_positional {
617 if let Some(value) = eval_simple_expr(expr, ctx) {
618 tool_args.named.insert(canonical.to_string(), value);
619 consumed_positionals.insert(*pos_idx);
620 } else {
621 tool_args.flags.insert(name.clone());
622 }
623 } else {
624 tool_args.flags.insert(name.clone());
625 }
626 }
627 } else {
628 for c in name.chars() {
630 tool_args.flags.insert(c.to_string());
631 }
632 }
633 }
634 Arg::LongFlag(name) => {
635 if past_double_dash {
636 tool_args.positional.push(Value::String(format!("--{name}")));
637 } else {
638 let lookup = param_lookup.get(name.as_str());
640 let is_bool = lookup
641 .map(|(_, typ, _)| is_bool_type(typ))
642 .unwrap_or(true); if is_bool {
645 tool_args.flags.insert(name.clone());
646 } else {
647 let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
653 let next_positional = positional_indices
654 .iter()
655 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
656
657 if let Some((pos_idx, expr)) = next_positional {
658 if let Some(value) = eval_simple_expr(expr, ctx) {
659 tool_args.named.insert(canonical.to_string(), value);
660 consumed_positionals.insert(*pos_idx);
661 } else {
662 tool_args.flags.insert(name.clone());
663 }
664 } else {
665 tool_args.flags.insert(name.clone());
666 }
667 }
668 }
669 }
670 }
671 i += 1;
672 }
673
674 if let Some(schema) = schema.filter(|s| s.map_positionals) {
679 let pre_dash_count = if past_double_dash {
681 let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
683 positional_indices.iter()
685 .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
686 .count()
687 } else {
688 tool_args.positional.len()
689 };
690
691 let mut remaining = Vec::new();
692 let mut positional_iter = tool_args.positional.drain(..).enumerate();
693
694 for param in &schema.params {
695 if tool_args.named.contains_key(¶m.name) || tool_args.flags.contains(¶m.name) {
696 continue; }
698 if is_bool_type(¶m.param_type) {
699 continue; }
701 loop {
703 match positional_iter.next() {
704 Some((idx, val)) if idx < pre_dash_count => {
705 tool_args.named.insert(param.name.clone(), val);
706 break;
707 }
708 Some((_, val)) => {
709 remaining.push(val); }
711 None => break,
712 }
713 }
714 }
715
716 remaining.extend(positional_iter.map(|(_, v)| v));
718 tool_args.positional = remaining;
719 }
720
721 tool_args
722}
723
724fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
726 match expr {
727 Expr::Literal(value) => Some(eval_literal(value, ctx)),
728 Expr::VarRef(path) => ctx.scope.resolve_path(path),
729 Expr::Interpolated(parts) => {
730 let mut result = String::new();
731 for part in parts {
732 match part {
733 crate::ast::StringPart::Literal(s) => result.push_str(s),
734 crate::ast::StringPart::Var(path) => {
735 if let Some(value) = ctx.scope.resolve_path(path) {
736 result.push_str(&value_to_string(&value));
737 }
738 }
739 crate::ast::StringPart::VarWithDefault { name, default } => {
740 match ctx.scope.get(name) {
741 Some(value) => {
742 let s = value_to_string(value);
743 if s.is_empty() {
744 result.push_str(&eval_string_parts_sync(default, ctx));
745 } else {
746 result.push_str(&s);
747 }
748 }
749 None => result.push_str(&eval_string_parts_sync(default, ctx)),
750 }
751 }
752 crate::ast::StringPart::VarLength(name) => {
753 let len = match ctx.scope.get(name) {
754 Some(value) => value_to_string(value).len(),
755 None => 0,
756 };
757 result.push_str(&len.to_string());
758 }
759 crate::ast::StringPart::Positional(n) => {
760 if let Some(s) = ctx.scope.get_positional(*n) {
761 result.push_str(s);
762 }
763 }
764 crate::ast::StringPart::AllArgs => {
765 result.push_str(&ctx.scope.all_args().join(" "));
766 }
767 crate::ast::StringPart::ArgCount => {
768 result.push_str(&ctx.scope.arg_count().to_string());
769 }
770 crate::ast::StringPart::Arithmetic(expr) => {
771 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
773 result.push_str(&value.to_string());
774 }
775 }
776 crate::ast::StringPart::CommandSubst(_) => {
777 }
779 crate::ast::StringPart::LastExitCode => {
780 result.push_str(&ctx.scope.last_result().code.to_string());
781 }
782 crate::ast::StringPart::CurrentPid => {
783 result.push_str(&ctx.scope.pid().to_string());
784 }
785 }
786 }
787 Some(Value::String(result))
788 }
789 Expr::GlobPattern(s) => Some(Value::String(s.clone())),
790 Expr::HereDocBody { parts, strip_tabs } => {
791 let unwrapped: Vec<crate::ast::StringPart> =
795 parts.iter().map(|sp| sp.part.clone()).collect();
796 let raw = eval_string_parts_sync(&unwrapped, ctx);
797 let body = if *strip_tabs {
798 crate::interpreter::strip_leading_tabs(&raw)
799 } else {
800 raw
801 };
802 Some(Value::String(body))
803 }
804 _ => None, }
806}
807
808fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
810 value.clone()
811}
812
813fn value_to_string(value: &Value) -> String {
815 match value {
816 Value::Null => "".to_string(),
817 Value::Bool(b) => b.to_string(),
818 Value::Int(i) => i.to_string(),
819 Value::Float(f) => f.to_string(),
820 Value::String(s) => s.clone(),
821 Value::Json(json) => json.to_string(),
822 Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
823 }
824}
825
826fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
829 let mut result = String::new();
830 for part in parts {
831 match part {
832 crate::ast::StringPart::Literal(s) => result.push_str(s),
833 crate::ast::StringPart::Var(path) => {
834 if let Some(value) = ctx.scope.resolve_path(path) {
835 result.push_str(&value_to_string(&value));
836 }
837 }
838 crate::ast::StringPart::VarWithDefault { name, default } => {
839 match ctx.scope.get(name) {
840 Some(value) => {
841 let s = value_to_string(value);
842 if s.is_empty() {
843 result.push_str(&eval_string_parts_sync(default, ctx));
844 } else {
845 result.push_str(&s);
846 }
847 }
848 None => result.push_str(&eval_string_parts_sync(default, ctx)),
849 }
850 }
851 crate::ast::StringPart::VarLength(name) => {
852 let len = match ctx.scope.get(name) {
853 Some(value) => value_to_string(value).len(),
854 None => 0,
855 };
856 result.push_str(&len.to_string());
857 }
858 crate::ast::StringPart::Positional(n) => {
859 if let Some(s) = ctx.scope.get_positional(*n) {
860 result.push_str(s);
861 }
862 }
863 crate::ast::StringPart::AllArgs => {
864 result.push_str(&ctx.scope.all_args().join(" "));
865 }
866 crate::ast::StringPart::ArgCount => {
867 result.push_str(&ctx.scope.arg_count().to_string());
868 }
869 crate::ast::StringPart::Arithmetic(expr) => {
870 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
871 result.push_str(&value.to_string());
872 }
873 }
874 crate::ast::StringPart::CommandSubst(_) => {
875 }
877 crate::ast::StringPart::LastExitCode => {
878 result.push_str(&ctx.scope.last_result().code.to_string());
879 }
880 crate::ast::StringPart::CurrentPid => {
881 result.push_str(&ctx.scope.pid().to_string());
882 }
883 }
884 }
885 result
886}
887
888fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
893 let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
894 let gather_idx = commands.iter().position(|c| c.name == "gather")?;
895
896 if gather_idx > scatter_idx {
898 Some((scatter_idx, gather_idx))
899 } else {
900 None
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907 use crate::dispatch::BackendDispatcher;
908 use crate::tools::register_builtins;
909 use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
910 use std::path::Path;
911
912 async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
913 let mut tools = ToolRegistry::new();
914 register_builtins(&mut tools);
915 let tools = Arc::new(tools);
916 let runner = PipelineRunner::new(tools.clone());
917 let dispatcher = BackendDispatcher::new(tools.clone());
918
919 let mut vfs = VfsRouter::new();
920 let mem = MemoryFs::new();
921 mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
922 vfs.mount("/", mem);
923 let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
924
925 (runner, ctx, dispatcher)
926 }
927
928 fn make_cmd(name: &str, args: Vec<&str>) -> Command {
929 Command {
930 name: name.to_string(),
931 args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
932 redirects: vec![],
933 }
934 }
935
936 #[tokio::test]
937 async fn test_single_command() {
938 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
939 let cmd = make_cmd("echo", vec!["hello"]);
940
941 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
942 assert!(result.ok());
943 assert_eq!(result.text_out().trim(), "hello");
944 }
945
946 #[tokio::test]
947 async fn test_pipeline_echo_grep() {
948 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
949
950 let echo_cmd = Command {
952 name: "echo".to_string(),
953 args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
954 redirects: vec![],
955 };
956 let grep_cmd = Command {
957 name: "grep".to_string(),
958 args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
959 redirects: vec![],
960 };
961
962 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
963 assert!(result.ok());
964 assert_eq!(result.text_out().trim(), "world");
965 }
966
967 #[tokio::test]
968 async fn test_pipeline_cat_grep() {
969 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
970
971 let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
973 let grep_cmd = Command {
974 name: "grep".to_string(),
975 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
976 redirects: vec![],
977 };
978
979 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
980 assert!(result.ok());
981 assert!(result.text_out().contains("hello"));
982 }
983
984 #[tokio::test]
985 async fn test_command_not_found() {
986 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
987 let cmd = make_cmd("nonexistent", vec![]);
988
989 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
990 assert!(!result.ok());
991 assert_eq!(result.code, 127);
992 assert!(result.err.contains("not found"));
993 }
994
995 #[tokio::test]
996 async fn test_pipeline_continues_on_failure() {
997 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1000
1001 let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
1004 let grep_cmd = Command {
1005 name: "grep".to_string(),
1006 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1007 redirects: vec![],
1008 };
1009
1010 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1011 assert!(!result.ok());
1013 }
1014
1015 #[tokio::test]
1016 async fn test_pipeline_last_command_exit_code() {
1017 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1019
1020 let echo_cmd = make_cmd("echo", vec!["hello"]);
1021 let cat_cmd = make_cmd("cat", vec![]);
1022
1023 let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1024 assert!(result.ok());
1025 assert!(result.text_out().contains("hello"));
1026 }
1027
1028 #[tokio::test]
1029 async fn test_empty_pipeline() {
1030 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1031 let result = runner.run(&[], &mut ctx, &dispatcher).await;
1032 assert!(result.ok());
1033 }
1034
1035 #[test]
1038 fn test_find_scatter_gather_both_present() {
1039 let commands = vec![
1040 make_cmd("echo", vec!["a"]),
1041 make_cmd("scatter", vec![]),
1042 make_cmd("process", vec![]),
1043 make_cmd("gather", vec![]),
1044 ];
1045 let result = find_scatter_gather(&commands);
1046 assert_eq!(result, Some((1, 3)));
1047 }
1048
1049 #[test]
1050 fn test_find_scatter_gather_no_scatter() {
1051 let commands = vec![
1052 make_cmd("echo", vec!["a"]),
1053 make_cmd("gather", vec![]),
1054 ];
1055 let result = find_scatter_gather(&commands);
1056 assert!(result.is_none());
1057 }
1058
1059 #[test]
1060 fn test_find_scatter_gather_no_gather() {
1061 let commands = vec![
1062 make_cmd("echo", vec!["a"]),
1063 make_cmd("scatter", vec![]),
1064 ];
1065 let result = find_scatter_gather(&commands);
1066 assert!(result.is_none());
1067 }
1068
1069 #[test]
1070 fn test_find_scatter_gather_wrong_order() {
1071 let commands = vec![
1072 make_cmd("gather", vec![]),
1073 make_cmd("scatter", vec![]),
1074 ];
1075 let result = find_scatter_gather(&commands);
1076 assert!(result.is_none());
1077 }
1078
1079 #[tokio::test]
1080 async fn test_scatter_gather_simple() {
1081 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1082
1083 let split_cmd = Command {
1085 name: "split".to_string(),
1086 args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1087 redirects: vec![],
1088 };
1089 let scatter_cmd = make_cmd("scatter", vec![]);
1090 let process_cmd = Command {
1091 name: "echo".to_string(),
1092 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1093 redirects: vec![],
1094 };
1095 let gather_cmd = make_cmd("gather", vec![]);
1096
1097 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1098 assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1099 assert!(result.text_out().contains("a"));
1101 assert!(result.text_out().contains("b"));
1102 assert!(result.text_out().contains("c"));
1103 }
1104
1105 #[tokio::test]
1106 async fn test_scatter_gather_empty_input() {
1107 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1108
1109 let echo_cmd = Command {
1111 name: "echo".to_string(),
1112 args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1113 redirects: vec![],
1114 };
1115 let scatter_cmd = make_cmd("scatter", vec![]);
1116 let process_cmd = Command {
1117 name: "echo".to_string(),
1118 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1119 redirects: vec![],
1120 };
1121 let gather_cmd = make_cmd("gather", vec![]);
1122
1123 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1124 assert!(result.ok());
1125 assert!(result.text_out().trim().is_empty());
1126 }
1127
1128 #[tokio::test]
1129 async fn test_scatter_gather_with_structured_stdin() {
1130 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1131
1132 let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1134 ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1135
1136 let scatter_cmd = make_cmd("scatter", vec![]);
1137 let process_cmd = Command {
1138 name: "echo".to_string(),
1139 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1140 redirects: vec![],
1141 };
1142 let gather_cmd = make_cmd("gather", vec![]);
1143
1144 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1145 assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1146 assert!(result.text_out().contains("x"));
1147 assert!(result.text_out().contains("y"));
1148 assert!(result.text_out().contains("z"));
1149 }
1150
1151 #[tokio::test]
1152 async fn test_scatter_gather_json_input() {
1153 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1154
1155 let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1157 ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1158
1159 let scatter_cmd = make_cmd("scatter", vec![]);
1160 let process_cmd = Command {
1161 name: "echo".to_string(),
1162 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1163 redirects: vec![],
1164 };
1165 let gather_cmd = make_cmd("gather", vec![]);
1166
1167 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1168 assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1169 assert!(result.text_out().contains("one"));
1170 assert!(result.text_out().contains("two"));
1171 assert!(result.text_out().contains("three"));
1172 }
1173
1174 #[tokio::test]
1175 async fn test_scatter_gather_with_post_gather() {
1176 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1177
1178 let split_cmd = Command {
1180 name: "split".to_string(),
1181 args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1182 redirects: vec![],
1183 };
1184 let scatter_cmd = make_cmd("scatter", vec![]);
1185 let process_cmd = Command {
1186 name: "echo".to_string(),
1187 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1188 redirects: vec![],
1189 };
1190 let gather_cmd = make_cmd("gather", vec![]);
1191 let grep_cmd = Command {
1192 name: "grep".to_string(),
1193 args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1194 redirects: vec![],
1195 };
1196
1197 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1198 assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1199 assert!(result.text_out().contains("a"));
1200 assert!(!result.text_out().contains("b"));
1201 }
1202
1203 #[tokio::test]
1204 async fn test_scatter_custom_var_name() {
1205 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1206
1207 let data = Value::Json(serde_json::json!(["test1", "test2"]));
1209 ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1210
1211 let scatter_cmd = Command {
1213 name: "scatter".to_string(),
1214 args: vec![Arg::Named {
1215 key: "as".to_string(),
1216 value: Expr::Literal(Value::String("URL".to_string())),
1217 }],
1218 redirects: vec![],
1219 };
1220 let process_cmd = Command {
1221 name: "echo".to_string(),
1222 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1223 redirects: vec![],
1224 };
1225 let gather_cmd = make_cmd("gather", vec![]);
1226
1227 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1228 assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1229 assert!(result.text_out().contains("test1"));
1230 assert!(result.text_out().contains("test2"));
1231 }
1232
1233 #[tokio::test]
1236 async fn test_pipeline_routes_through_backend() {
1237 use crate::backend::testing::MockBackend;
1238 use std::sync::atomic::Ordering;
1239
1240 let (backend, call_count) = MockBackend::new();
1242 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1243
1244 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1246
1247 let tools = std::sync::Arc::new(ToolRegistry::new());
1249 let runner = PipelineRunner::new(tools.clone());
1250 let dispatcher = BackendDispatcher::new(tools);
1251
1252 let cmd = make_cmd("test-tool", vec!["arg1"]);
1254 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1255
1256 assert!(result.ok(), "Mock backend should return success");
1257 assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1258 assert!(result.text_out().contains("mock executed"), "Output should be from mock backend");
1259 }
1260
1261 #[tokio::test]
1262 async fn test_multi_command_pipeline_routes_through_backend() {
1263 use crate::backend::testing::MockBackend;
1264 use std::sync::atomic::Ordering;
1265
1266 let (backend, call_count) = MockBackend::new();
1267 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1268 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1269
1270 let tools = std::sync::Arc::new(ToolRegistry::new());
1271 let runner = PipelineRunner::new(tools.clone());
1272 let dispatcher = BackendDispatcher::new(tools);
1273
1274 let cmd1 = make_cmd("tool1", vec![]);
1276 let cmd2 = make_cmd("tool2", vec![]);
1277 let cmd3 = make_cmd("tool3", vec![]);
1278
1279 let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1280
1281 assert!(result.ok());
1282 assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1283 }
1284
1285 use crate::tools::{ParamSchema, ToolSchema};
1288
1289 fn make_test_schema() -> ToolSchema {
1290 ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1291 .param(ParamSchema::required("query", "string", "Search query"))
1292 .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1293 .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1294 .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1295 .with_positional_mapping()
1296 }
1297
1298 fn make_minimal_ctx() -> ExecContext {
1299 let mut vfs = VfsRouter::new();
1300 vfs.mount("/", MemoryFs::new());
1301 ExecContext::new(Arc::new(vfs))
1302 }
1303
1304 #[test]
1305 fn test_schema_aware_string_arg() {
1306 let args = vec![
1308 Arg::LongFlag("query".to_string()),
1309 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1310 ];
1311 let schema = make_test_schema();
1312 let ctx = make_minimal_ctx();
1313
1314 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1315
1316 assert!(tool_args.flags.is_empty(), "No flags should be set");
1317 assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1318 assert_eq!(
1319 tool_args.named.get("query"),
1320 Some(&Value::String("test".to_string())),
1321 "--query should consume 'test' as its value"
1322 );
1323 }
1324
1325 #[test]
1326 fn test_schema_aware_bool_flag() {
1327 let args = vec![
1329 Arg::LongFlag("verbose".to_string()),
1330 ];
1331 let schema = make_test_schema();
1332 let ctx = make_minimal_ctx();
1333
1334 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1335
1336 assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1337 assert!(tool_args.named.is_empty(), "No named args");
1338 assert!(tool_args.positional.is_empty(), "No positionals");
1339 }
1340
1341 #[test]
1342 fn test_schema_aware_mixed() {
1343 let args = vec![
1346 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1347 Arg::LongFlag("output".to_string()),
1348 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1349 Arg::LongFlag("verbose".to_string()),
1350 ];
1351 let schema = make_test_schema();
1352 let ctx = make_minimal_ctx();
1353
1354 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1355
1356 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1357 assert_eq!(
1358 tool_args.named.get("query"),
1359 Some(&Value::String("file.txt".to_string()))
1360 );
1361 assert_eq!(
1362 tool_args.named.get("output"),
1363 Some(&Value::String("out.txt".to_string()))
1364 );
1365 assert!(tool_args.flags.contains("verbose"));
1366 }
1367
1368 #[test]
1369 fn test_schema_aware_multiple_string_args() {
1370 let args = vec![
1372 Arg::LongFlag("query".to_string()),
1373 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1374 Arg::LongFlag("output".to_string()),
1375 Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1376 Arg::LongFlag("verbose".to_string()),
1377 Arg::LongFlag("limit".to_string()),
1378 Arg::Positional(Expr::Literal(Value::Int(5))),
1379 ];
1380 let schema = make_test_schema();
1381 let ctx = make_minimal_ctx();
1382
1383 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1384
1385 assert!(tool_args.positional.is_empty(), "All positionals consumed");
1386 assert_eq!(
1387 tool_args.named.get("query"),
1388 Some(&Value::String("test".to_string()))
1389 );
1390 assert_eq!(
1391 tool_args.named.get("output"),
1392 Some(&Value::String("result.json".to_string()))
1393 );
1394 assert_eq!(
1395 tool_args.named.get("limit"),
1396 Some(&Value::Int(5))
1397 );
1398 assert!(tool_args.flags.contains("verbose"));
1399 }
1400
1401 #[test]
1402 fn test_schema_aware_double_dash() {
1403 let args = vec![
1406 Arg::LongFlag("output".to_string()),
1407 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1408 Arg::DoubleDash,
1409 Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1410 ];
1411 let schema = make_test_schema();
1412 let ctx = make_minimal_ctx();
1413
1414 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1415
1416 assert_eq!(
1417 tool_args.named.get("output"),
1418 Some(&Value::String("out.txt".to_string()))
1419 );
1420 assert_eq!(
1422 tool_args.positional,
1423 vec![Value::String("--this-is-data".to_string())]
1424 );
1425 }
1426
1427 #[test]
1428 fn test_no_schema_fallback() {
1429 let args = vec![
1431 Arg::LongFlag("query".to_string()),
1432 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1433 ];
1434 let ctx = make_minimal_ctx();
1435
1436 let tool_args = build_tool_args(&args, &ctx, None);
1437
1438 assert!(tool_args.flags.contains("query"), "--query should be a flag");
1440 assert_eq!(
1441 tool_args.positional,
1442 vec![Value::String("test".to_string())],
1443 "'test' should be a positional"
1444 );
1445 }
1446
1447 #[test]
1448 fn test_unknown_flag_in_schema() {
1449 let args = vec![
1451 Arg::LongFlag("unknown".to_string()),
1452 Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1453 ];
1454 let schema = make_test_schema();
1455 let ctx = make_minimal_ctx();
1456
1457 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1458
1459 assert!(tool_args.flags.contains("unknown"));
1460 assert!(tool_args.positional.is_empty(), "value consumed as query param");
1461 assert_eq!(
1462 tool_args.named.get("query"),
1463 Some(&Value::String("value".to_string()))
1464 );
1465 }
1466
1467 #[test]
1468 fn test_named_args_unchanged() {
1469 let args = vec![
1471 Arg::Named {
1472 key: "query".to_string(),
1473 value: Expr::Literal(Value::String("test".to_string())),
1474 },
1475 Arg::LongFlag("verbose".to_string()),
1476 ];
1477 let schema = make_test_schema();
1478 let ctx = make_minimal_ctx();
1479
1480 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1481
1482 assert_eq!(
1483 tool_args.named.get("query"),
1484 Some(&Value::String("test".to_string()))
1485 );
1486 assert!(tool_args.flags.contains("verbose"));
1487 }
1488
1489 #[test]
1490 fn test_short_flags_unchanged() {
1491 let args = vec![
1493 Arg::ShortFlag("la".to_string()),
1494 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1495 ];
1496 let schema = make_test_schema();
1497 let ctx = make_minimal_ctx();
1498
1499 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1500
1501 assert!(tool_args.flags.contains("l"));
1502 assert!(tool_args.flags.contains("a"));
1503 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1504 assert_eq!(
1505 tool_args.named.get("query"),
1506 Some(&Value::String("file.txt".to_string()))
1507 );
1508 }
1509
1510 #[test]
1511 fn test_flag_at_end_no_value() {
1512 let args = vec![
1515 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1516 Arg::LongFlag("output".to_string()),
1517 ];
1518 let schema = make_test_schema();
1519 let ctx = make_minimal_ctx();
1520
1521 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1522
1523 assert!(tool_args.flags.contains("output"));
1525 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1526 assert_eq!(
1527 tool_args.named.get("query"),
1528 Some(&Value::String("file.txt".to_string()))
1529 );
1530 }
1531
1532 #[test]
1533 fn test_positional_skips_bool_params() {
1534 let schema = ToolSchema::new("test", "")
1538 .param(ParamSchema::required("query", "string", ""))
1539 .param(ParamSchema::optional(
1540 "verbose",
1541 "bool",
1542 Value::Bool(false),
1543 "",
1544 ))
1545 .param(ParamSchema::optional(
1546 "output",
1547 "string",
1548 Value::Null,
1549 "",
1550 ))
1551 .with_positional_mapping();
1552 let args = vec![
1553 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1554 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1555 ];
1556 let ctx = make_minimal_ctx();
1557
1558 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1559
1560 assert_eq!(
1561 tool_args.named.get("query"),
1562 Some(&Value::String("val1".to_string()))
1563 );
1564 assert_eq!(
1565 tool_args.named.get("output"),
1566 Some(&Value::String("val2".to_string()))
1567 );
1568 assert!(!tool_args.flags.contains("verbose"));
1569 assert!(tool_args.positional.is_empty());
1570 }
1571
1572 #[test]
1573 fn test_positionals_fill_available_slots() {
1574 let args = vec![
1577 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1578 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1579 Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1580 ];
1581 let schema = make_test_schema(); let ctx = make_minimal_ctx();
1583
1584 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1585
1586 assert_eq!(
1589 tool_args.named.get("query"),
1590 Some(&Value::String("val1".to_string()))
1591 );
1592 assert_eq!(
1593 tool_args.named.get("limit"),
1594 Some(&Value::String("val2".to_string()))
1595 );
1596 assert_eq!(
1597 tool_args.named.get("output"),
1598 Some(&Value::String("val3".to_string()))
1599 );
1600 assert!(tool_args.positional.is_empty());
1601 }
1602
1603 #[test]
1604 fn test_truly_excess_positionals() {
1605 let schema = ToolSchema::new("test", "")
1607 .param(ParamSchema::required("name", "string", ""))
1608 .with_positional_mapping();
1609 let args = vec![
1610 Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1611 Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1612 Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1613 ];
1614 let ctx = make_minimal_ctx();
1615
1616 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1617
1618 assert_eq!(
1619 tool_args.named.get("name"),
1620 Some(&Value::String("first".to_string()))
1621 );
1622 assert_eq!(
1623 tool_args.positional,
1624 vec![
1625 Value::String("second".to_string()),
1626 Value::String("third".to_string()),
1627 ]
1628 );
1629 }
1630
1631 #[test]
1632 fn test_double_dash_positional_not_mapped() {
1633 let args = vec![
1635 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1636 Arg::DoubleDash,
1637 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1638 ];
1639 let schema = make_test_schema();
1640 let ctx = make_minimal_ctx();
1641
1642 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1643
1644 assert_eq!(
1645 tool_args.named.get("query"),
1646 Some(&Value::String("val1".to_string()))
1647 );
1648 assert_eq!(
1650 tool_args.positional,
1651 vec![Value::String("val2".to_string())]
1652 );
1653 }
1654
1655 #[test]
1656 fn test_all_params_filled_by_flags() {
1657 let args = vec![
1659 Arg::LongFlag("query".to_string()),
1660 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1661 Arg::LongFlag("output".to_string()),
1662 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1663 Arg::LongFlag("verbose".to_string()),
1664 ];
1665 let schema = make_test_schema();
1666 let ctx = make_minimal_ctx();
1667
1668 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1669
1670 assert_eq!(
1671 tool_args.named.get("query"),
1672 Some(&Value::String("search".to_string()))
1673 );
1674 assert_eq!(
1675 tool_args.named.get("output"),
1676 Some(&Value::String("out.txt".to_string()))
1677 );
1678 assert!(tool_args.flags.contains("verbose"));
1679 assert!(tool_args.positional.is_empty());
1680 }
1681
1682 #[test]
1683 fn test_mixed_flags_and_positional_fill() {
1684 let args = vec![
1686 Arg::LongFlag("output".to_string()),
1687 Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
1688 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1689 ];
1690 let schema = make_test_schema();
1691 let ctx = make_minimal_ctx();
1692
1693 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1694
1695 assert_eq!(
1696 tool_args.named.get("output"),
1697 Some(&Value::String("foo".to_string()))
1698 );
1699 assert_eq!(
1700 tool_args.named.get("query"),
1701 Some(&Value::String("val1".to_string()))
1702 );
1703 assert!(tool_args.positional.is_empty());
1704 }
1705
1706 #[test]
1707 fn test_alias_flag_prevents_mapping_overwrite() {
1708 let schema = ToolSchema::new("test", "")
1710 .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
1711 .param(ParamSchema::required("output", "string", ""))
1712 .with_positional_mapping();
1713 let args = vec![
1714 Arg::ShortFlag("q".to_string()),
1715 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1716 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1717 ];
1718 let ctx = make_minimal_ctx();
1719
1720 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1721
1722 assert_eq!(
1723 tool_args.named.get("query"),
1724 Some(&Value::String("search".to_string()))
1725 );
1726 assert_eq!(
1727 tool_args.named.get("output"),
1728 Some(&Value::String("out.txt".to_string()))
1729 );
1730 assert!(tool_args.positional.is_empty());
1731 }
1732
1733 #[test]
1734 fn test_builtin_schema_no_positional_mapping() {
1735 let schema = ToolSchema::new("echo", "")
1737 .param(ParamSchema::optional("args", "any", Value::Null, ""))
1738 .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
1739 let args = vec![
1741 Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
1742 Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
1743 ];
1744 let ctx = make_minimal_ctx();
1745
1746 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1747
1748 assert_eq!(
1750 tool_args.positional,
1751 vec![
1752 Value::String("hello".to_string()),
1753 Value::String("world".to_string()),
1754 ]
1755 );
1756 assert!(tool_args.named.get("args").is_none());
1757 }
1758
1759 #[test]
1760 fn test_short_flag_with_alias_consumes_value() {
1761 let schema = ToolSchema::new("head", "Output first part of files")
1764 .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
1765 .with_aliases(["-n"]));
1766 let args = vec![
1767 Arg::ShortFlag("n".to_string()),
1768 Arg::Positional(Expr::Literal(Value::Int(5))),
1769 Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
1770 ];
1771 let ctx = make_minimal_ctx();
1772
1773 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1774
1775 assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
1776 assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
1777 assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
1778 }
1779
1780 #[tokio::test]
1783 async fn test_merge_stderr_redirect() {
1784 let result = ExecResult::from_output(0, "stdout content", "stderr content");
1786
1787 let redirects = vec![Redirect {
1788 kind: RedirectKind::MergeStderr,
1789 target: Expr::Literal(Value::Null),
1790 }];
1791
1792 let ctx = make_minimal_ctx();
1793 let result = apply_redirects(result, &redirects, &ctx).await;
1794
1795 assert_eq!(&*result.text_out(), "stdout contentstderr content");
1796 assert!(result.err.is_empty());
1797 }
1798
1799 #[tokio::test]
1800 async fn test_merge_stderr_with_empty_stderr() {
1801 let result = ExecResult::from_output(0, "stdout only", "");
1803
1804 let redirects = vec![Redirect {
1805 kind: RedirectKind::MergeStderr,
1806 target: Expr::Literal(Value::Null),
1807 }];
1808
1809 let ctx = make_minimal_ctx();
1810 let result = apply_redirects(result, &redirects, &ctx).await;
1811
1812 assert_eq!(&*result.text_out(), "stdout only");
1813 assert!(result.err.is_empty());
1814 }
1815
1816 #[tokio::test]
1817 async fn test_merge_stderr_order_matters() {
1818 let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1823
1824 let redirects = vec![Redirect {
1826 kind: RedirectKind::MergeStderr,
1827 target: Expr::Literal(Value::Null),
1828 }];
1829
1830 let ctx = make_minimal_ctx();
1831 let result = apply_redirects(result, &redirects, &ctx).await;
1832
1833 assert_eq!(&*result.text_out(), "stdout\nstderr\n");
1834 assert!(result.err.is_empty());
1835 }
1836
1837 #[tokio::test]
1838 async fn test_redirect_with_command_execution() {
1839 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1840
1841 let cmd = Command {
1843 name: "echo".to_string(),
1844 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1845 redirects: vec![Redirect {
1846 kind: RedirectKind::MergeStderr,
1847 target: Expr::Literal(Value::Null),
1848 }],
1849 };
1850
1851 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1852 assert!(result.ok());
1853 assert!(result.text_out().contains("hello"));
1855 }
1856
1857 #[tokio::test]
1858 async fn test_merge_stderr_in_pipeline() {
1859 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1860
1861 let echo_cmd = Command {
1864 name: "echo".to_string(),
1865 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1866 redirects: vec![Redirect {
1867 kind: RedirectKind::MergeStderr,
1868 target: Expr::Literal(Value::Null),
1869 }],
1870 };
1871 let grep_cmd = Command {
1872 name: "grep".to_string(),
1873 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1874 redirects: vec![],
1875 };
1876
1877 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1878 assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1879 assert!(result.text_out().contains("output"));
1880 }
1881}