1use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21 parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24async fn apply_redirects(
30 mut result: ExecResult,
31 redirects: &[Redirect],
32 ctx: &ExecContext,
33) -> ExecResult {
34 for redir in redirects {
39 match redir.kind {
40 RedirectKind::MergeStderr => {
41 result.materialize();
44 if !result.err.is_empty() {
45 let err = std::mem::take(&mut result.err);
46 result.push_out(&err);
47 }
48 }
49 RedirectKind::MergeStdout => {
50 if result.is_bytes() {
54 return ExecResult::failure(
55 1,
56 "redirect: cannot merge binary stdout into stderr (1>&2) — \
57 redirect it to a file or pipe through base64/xxd",
58 );
59 }
60 result.materialize();
61 if !result.text_out().is_empty() {
62 let out = result.text_out().into_owned();
63 result.err.push_str(&out);
64 result.clear_out();
65 }
66 }
67 RedirectKind::StdoutOverwrite => {
68 let path = match eval_redirect_target(&redir.target, ctx).await {
69 Ok(p) => p,
70 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
71 };
72 if let Some(bytes) = result.out_bytes() {
74 if let Err(e) = redirect_write(ctx, &path, bytes).await {
75 return ExecResult::failure(1, format!("redirect: {e}"));
76 }
77 } else if let Some(output) = result.take_output_for_stream() {
78 let mut buf = Vec::new();
80 if let Err(e) = output.write_canonical(&mut buf, None) {
81 return ExecResult::failure(1, format!("redirect: {e}"));
82 }
83 if let Err(e) = redirect_write(ctx, &path, &buf).await {
84 return ExecResult::failure(1, format!("redirect: {e}"));
85 }
86 } else if let Err(e) = redirect_write(ctx, &path, result.text_out().as_bytes()).await {
87 return ExecResult::failure(1, format!("redirect: {e}"));
88 }
89 result.clear_out();
90 result.set_output(None);
91 }
92 RedirectKind::StdoutAppend => {
93 let path = match eval_redirect_target(&redir.target, ctx).await {
94 Ok(p) => p,
95 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
96 };
97 if let Some(bytes) = result.out_bytes() {
99 if let Err(e) = redirect_append(ctx, &path, bytes).await {
100 return ExecResult::failure(1, format!("redirect: {e}"));
101 }
102 } else if let Some(output) = result.take_output_for_stream() {
103 let mut buf = Vec::new();
105 if let Err(e) = output.write_canonical(&mut buf, None) {
106 return ExecResult::failure(1, format!("redirect: {e}"));
107 }
108 if let Err(e) = redirect_append(ctx, &path, &buf).await {
109 return ExecResult::failure(1, format!("redirect: {e}"));
110 }
111 } else if let Err(e) = redirect_append(ctx, &path, result.text_out().as_bytes()).await {
112 return ExecResult::failure(1, format!("redirect: {e}"));
113 }
114 result.clear_out();
115 result.set_output(None);
116 }
117 RedirectKind::Stderr => {
118 let path = match eval_redirect_target(&redir.target, ctx).await {
119 Ok(p) => p,
120 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
121 };
122 if let Err(e) = redirect_write(ctx, &path, result.err.as_bytes()).await {
123 return ExecResult::failure(1, format!("redirect: {e}"));
124 }
125 result.err.clear();
126 }
127 RedirectKind::Both => {
128 let path = match eval_redirect_target(&redir.target, ctx).await {
129 Ok(p) => p,
130 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
131 };
132 let mut combined: Vec<u8> = match result.out_bytes() {
135 Some(b) => b.to_vec(),
136 None => result.text_out().into_owned().into_bytes(),
137 };
138 combined.extend_from_slice(result.err.as_bytes());
139 if let Err(e) = redirect_write(ctx, &path, &combined).await {
140 return ExecResult::failure(1, format!("redirect: {e}"));
141 }
142 result.clear_out();
143 result.set_output(None);
144 result.err.clear();
145 }
146 RedirectKind::Stdin | RedirectKind::HereDoc | RedirectKind::HereString => {}
148 }
149 }
150 result.materialize();
155 result
156}
157
158async fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Result<String, String> {
165 if let Some(dispatcher) = &ctx.dispatcher {
166 dispatcher
167 .eval_expr(expr, ctx)
168 .await
169 .map(|v| value_to_string(&v))
170 .map_err(|e| e.to_string())
171 } else {
172 eval_simple_expr(expr, ctx)
173 .map(|v| value_to_string(&v))
174 .ok_or_else(|| "could not evaluate redirect target".to_string())
175 }
176}
177
178async fn redirect_write(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
185 use crate::backend::WriteMode;
186 let resolved = ctx.resolve_path(path);
187 ctx.backend.write(&resolved, data, WriteMode::Overwrite).await.map_err(|e| e.to_string())
188}
189
190async fn redirect_append(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
194 let resolved = ctx.resolve_path(path);
195 ctx.backend.append(&resolved, data).await.map_err(|e| e.to_string())
196}
197
198async fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) -> Result<(), String> {
206 use std::path::Path;
207 for redir in &cmd.redirects {
208 match &redir.kind {
209 RedirectKind::Stdin => {
210 let path = eval_redirect_target(&redir.target, ctx).await?;
211 let resolved = ctx.resolve_path(&path);
212 let data = ctx
213 .backend
214 .read(Path::new(&resolved), None)
215 .await
216 .map_err(|e| format!("redirect: {path}: {e}"))?;
217 let content = String::from_utf8(data)
218 .map_err(|_| format!("redirect: {path}: invalid UTF-8"))?;
219 ctx.set_stdin(content);
220 }
221 RedirectKind::HereDoc => {
222 match &redir.target {
223 Expr::Literal(Value::String(content)) => {
224 ctx.set_stdin(content.clone());
225 }
226 expr => {
229 let body = eval_redirect_target(expr, ctx).await?;
230 ctx.set_stdin(body);
231 }
232 }
233 }
234 RedirectKind::HereString => {
235 let mut s = eval_redirect_target(&redir.target, ctx).await?;
238 s.push('\n');
239 ctx.set_stdin(s);
240 }
241 _ => {}
242 }
243 }
244 Ok(())
245}
246
247#[derive(Clone)]
249pub struct PipelineRunner {
250 tools: Arc<ToolRegistry>,
251}
252
253impl PipelineRunner {
254 pub fn new(tools: Arc<ToolRegistry>) -> Self {
256 Self { tools }
257 }
258
259 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
269 pub async fn run(
270 &self,
271 commands: &[Command],
272 ctx: &mut ExecContext,
273 dispatcher: &dyn CommandDispatcher,
274 ) -> ExecResult {
275 if commands.is_empty() {
276 return ExecResult::success("");
277 }
278
279 if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
281 return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
282 }
283
284 self.run_sequential(commands, ctx, dispatcher).await
285 }
286
287 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
292 pub async fn run_sequential(
293 &self,
294 commands: &[Command],
295 ctx: &mut ExecContext,
296 dispatcher: &dyn CommandDispatcher,
297 ) -> ExecResult {
298 if commands.is_empty() {
299 return ExecResult::success("");
300 }
301
302 if commands.len() == 1 {
303 return self.run_single(&commands[0], ctx, None, dispatcher).await;
305 }
306
307 self.run_pipeline(commands, ctx, dispatcher).await
309 }
310
311 async fn run_scatter_gather(
313 &self,
314 commands: &[Command],
315 scatter_idx: usize,
316 gather_idx: usize,
317 ctx: &mut ExecContext,
318 dispatcher: &dyn CommandDispatcher,
319 ) -> ExecResult {
320 let pre_scatter = &commands[..scatter_idx];
322 let scatter_cmd = &commands[scatter_idx];
323 let parallel = &commands[scatter_idx + 1..gather_idx];
324 let gather_cmd = &commands[gather_idx];
325 let post_gather = &commands[gather_idx + 1..];
326
327 let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
330 let gather_schema = self.tools.get("gather").map(|t| t.schema());
331 let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
332 let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
333
334 let sequential_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
339
340 let runner = ScatterGatherRunner::new(self.tools.clone(), sequential_dispatcher);
341 runner
342 .run(
343 pre_scatter,
344 scatter_opts,
345 parallel,
346 gather_opts,
347 post_gather,
348 ctx,
349 )
350 .await
351 }
352
353 #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
358 async fn run_single(
359 &self,
360 cmd: &Command,
361 ctx: &mut ExecContext,
362 stdin: Option<String>,
363 dispatcher: &dyn CommandDispatcher,
364 ) -> ExecResult {
365 if let Err(e) = setup_stdin_redirects(cmd, ctx).await {
367 return ExecResult::failure(1, e);
368 }
369
370 if let Some(input) = stdin {
372 ctx.set_stdin(input);
373 }
374
375 ctx.pipeline_position = PipelinePosition::Only;
377
378 let result = match dispatcher.dispatch(cmd, ctx).await {
380 Ok(result) => result,
381 Err(e) => ExecResult::failure(1, e.to_string()),
382 };
383
384 apply_redirects(result, &cmd.redirects, ctx).await
386 }
387
388 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(stage_count = commands.len()))]
398 async fn run_pipeline(
399 &self,
400 commands: &[Command],
401 ctx: &mut ExecContext,
402 dispatcher: &dyn CommandDispatcher,
403 ) -> ExecResult {
404 let stage_count = commands.len();
405 let last_idx = stage_count - 1;
406
407 let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
409 let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
410
411 for _ in 0..last_idx {
412 let (writer, reader) = pipe_stream_default();
413 pipe_writers.push(Some(writer));
414 pipe_readers.push(Some(reader));
415 }
416
417 let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
419 let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
420
421 for _ in 0..last_idx {
422 let (tx, rx) = tokio::sync::oneshot::channel();
423 data_senders.push(Some(tx));
424 data_receivers.push(Some(rx));
425 }
426
427 let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
428
429 for (i, cmd) in commands.iter().enumerate() {
430 let mut stage_ctx = ctx.child_for_pipeline();
431 let cmd = cmd.clone();
432
433 let task_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
438
439 let stdin_setup = setup_stdin_redirects(&cmd, &mut stage_ctx).await;
443
444 if i == 0 {
446 if stage_ctx.stdin.is_none() {
449 stage_ctx.stdin = ctx.stdin.take();
450 }
451 if stage_ctx.stdin_data.is_none() {
452 stage_ctx.stdin_data = ctx.stdin_data.take();
453 }
454 } else {
455 stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
457 }
459
460 if i < last_idx {
462 stage_ctx.pipe_stdout = pipe_writers[i].take();
463 }
464
465 stage_ctx.pipeline_position = match i {
467 0 => PipelinePosition::First,
468 n if n == last_idx => PipelinePosition::Last,
469 _ => PipelinePosition::Middle,
470 };
471
472 let data_sender = if i < last_idx { data_senders[i].take() } else { None };
473 let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
474
475 let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> =
478 tokio::spawn(crate::telemetry::bind_current_context(async move {
479 if let Err(e) = stdin_setup {
481 return (ExecResult::failure(1, e), stage_ctx);
482 }
483
484 if let Some(mut rx) = data_receiver {
491 if let Ok(data) = rx.try_recv() {
492 stage_ctx.stdin_data = data;
493 }
494 }
496
497 let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
499 Ok(result) => result,
500 Err(e) => ExecResult::failure(1, e.to_string()),
501 };
502
503 result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
505
506 if !result.err.is_empty() {
512 if let Some(ref stderr) = stage_ctx.stderr {
513 stderr.write_str(&result.err);
514 result.err.clear();
515 }
516 }
517
518 if let Some(tx) = data_sender {
524 let _ = tx.send(result.data.clone());
525 }
526
527 if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
530 let bytes: Vec<u8> = match result.out_bytes() {
534 Some(b) => b.to_vec(),
535 None => result.text_out().into_owned().into_bytes(),
536 };
537 if !bytes.is_empty() {
538 let _ = pipe_out.write_all(&bytes).await;
540 let _ = pipe_out.shutdown().await;
541 }
542 }
544
545 (result, stage_ctx)
546 }));
547
548 handles.push(handle);
549 }
550
551 let mut last_result = ExecResult::success("");
556 let mut panics: Vec<String> = Vec::new();
557 for (i, handle) in handles.into_iter().enumerate() {
558 match handle.await {
559 Ok((result, stage_ctx)) => {
560 if i == last_idx {
561 last_result = result;
562 ctx.scope = stage_ctx.scope;
564 ctx.cwd = stage_ctx.cwd;
565 ctx.prev_cwd = stage_ctx.prev_cwd;
566 ctx.aliases = stage_ctx.aliases;
567 }
568 }
569 Err(e) => {
570 panics.push(format!("stage {}: {}", i, e));
571 }
572 }
573 }
574
575 if !panics.is_empty() {
576 last_result = ExecResult::failure(
577 1,
578 format!("pipeline stage(s) panicked: {}", panics.join("; ")),
579 );
580 }
581
582 last_result
583 }
584}
585
586pub fn select_leaf<'a>(schema: &'a ToolSchema, args: &[Arg]) -> anyhow::Result<&'a ToolSchema> {
626 let root_lookup = schema_param_lookup(schema);
629 let is_root_value_flag = |name: &str| -> bool {
630 root_lookup.get(name).is_some_and(|(_, typ, _)| !is_bool_type(typ))
631 };
632
633 let mut node = schema;
634 let mut skip_next_positional = false;
635 for arg in args {
636 match arg {
637 Arg::DoubleDash => break,
639 Arg::LongFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
642 Arg::ShortFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
643 Arg::Positional(expr) => {
644 if skip_next_positional {
645 skip_next_positional = false;
646 continue; }
648 if node.subcommands.is_empty() {
649 break; }
651 match classify_subcommand_positional(expr) {
652 SubcommandWord::Word(word) => {
653 match node.subcommands.iter().find(|c| c.matches_command(word)) {
654 Some(child) => node = child, None => break, }
657 }
658 SubcommandWord::OtherLiteral => break,
662 SubcommandWord::Computed(kind) => anyhow::bail!(
663 "{}: a subcommand name is required here, but got {kind}. \
664 Subcommands must be literal words — spell it out \
665 (e.g. `{} <subcommand> …`) or use the `--flag=value` form.",
666 node.name,
667 schema.name
668 ),
669 }
670 }
671 _ => {}
673 }
674 }
675 Ok(node)
676}
677
678enum SubcommandWord<'a> {
680 Word(&'a str),
682 OtherLiteral,
684 Computed(&'static str),
686}
687
688fn classify_subcommand_positional(expr: &Expr) -> SubcommandWord<'_> {
689 match expr {
690 Expr::Literal(Value::String(s)) => SubcommandWord::Word(s),
691 Expr::Literal(_) => SubcommandWord::OtherLiteral,
692 Expr::CommandSubst(_) | Expr::Command(_) => SubcommandWord::Computed("a command substitution `$(…)`"),
693 Expr::VarRef(_)
694 | Expr::VarWithDefault { .. }
695 | Expr::VarLength(_)
696 | Expr::Positional(_)
697 | Expr::AllArgs
698 | Expr::ArgCount
699 | Expr::CurrentPid
700 | Expr::LastExitCode => SubcommandWord::Computed("a variable reference"),
701 Expr::Interpolated(_) | Expr::HereDocBody { .. } => SubcommandWord::Computed("an interpolated string"),
702 Expr::GlobPattern(_) => SubcommandWord::Computed("a glob pattern"),
703 Expr::Arithmetic(_) => SubcommandWord::Computed("an arithmetic expansion"),
704 _ => SubcommandWord::Computed("a value computed at runtime"),
705 }
706}
707
708pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str, usize)> {
709 let mut map = HashMap::new();
710 for p in schema.params.iter().filter(|p| !p.positional) {
711 map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
712 for alias in &p.aliases {
713 let stripped = alias.trim_start_matches('-');
714 map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
715 }
716 }
717 map
718}
719
720pub fn is_bool_type(param_type: &str) -> bool {
722 matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
723}
724
725pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
733 let mut tool_args = ToolArgs::new();
734 let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
735 let accepts_word_assign = schema
736 .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
737 .unwrap_or(false);
738
739 let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
741 let mut past_double_dash = false;
742
743 let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
745 for (i, arg) in args.iter().enumerate() {
746 if let Arg::Positional(expr) = arg {
747 positional_indices.push((i, expr));
748 }
749 }
750
751 let mut i = 0;
753 while i < args.len() {
754 let arg = &args[i];
755
756 match arg {
757 Arg::DoubleDash => {
758 past_double_dash = true;
759 }
760 Arg::Positional(expr) => {
761 if !consumed_positionals.contains(&i)
763 && let Some(value) = eval_simple_expr(expr, ctx)
764 {
765 tool_args.positional.push(value);
766 }
767 }
768 Arg::Named { key, value } => {
769 if let Some(val) = eval_simple_expr(value, ctx) {
770 tool_args.named.insert(key.clone(), val);
771 }
772 }
773 Arg::WordAssign { key, value } => {
774 if let Some(val) = eval_simple_expr(value, ctx) {
775 if accepts_word_assign {
776 tool_args.named.insert(key.clone(), val);
777 } else {
778 let val_str = crate::interpreter::value_to_string(&val);
779 tool_args.positional.push(Value::String(format!("{key}={val_str}")));
780 }
781 }
782 }
783 Arg::ShortFlag(name) => {
784 if past_double_dash {
785 tool_args.positional.push(Value::String(format!("-{name}")));
786 } else if name.len() == 1 {
787 let flag_name = name.as_str();
790 let lookup = param_lookup.get(flag_name);
791 let is_bool = lookup
792 .map(|(_, typ, _)| is_bool_type(typ))
793 .unwrap_or(true);
794
795 if is_bool {
796 tool_args.flags.insert(flag_name.to_string());
797 } else {
798 let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
800 let next_positional = positional_indices
801 .iter()
802 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
803
804 if let Some((pos_idx, expr)) = next_positional {
805 if let Some(value) = eval_simple_expr(expr, ctx) {
806 tool_args.named.insert(canonical.to_string(), value);
807 consumed_positionals.insert(*pos_idx);
808 } else {
809 tool_args.flags.insert(flag_name.to_string());
810 }
811 } else {
812 tool_args.flags.insert(flag_name.to_string());
813 }
814 }
815 } else if let Some(&(canonical, typ, _)) = param_lookup.get(name.as_str()) {
816 if is_bool_type(typ) {
818 tool_args.flags.insert(canonical.to_string());
819 } else {
820 let next_positional = positional_indices
821 .iter()
822 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
823 if let Some((pos_idx, expr)) = next_positional {
824 if let Some(value) = eval_simple_expr(expr, ctx) {
825 tool_args.named.insert(canonical.to_string(), value);
826 consumed_positionals.insert(*pos_idx);
827 } else {
828 tool_args.flags.insert(name.clone());
829 }
830 } else {
831 tool_args.flags.insert(name.clone());
832 }
833 }
834 } else {
835 for c in name.chars() {
837 tool_args.flags.insert(c.to_string());
838 }
839 }
840 }
841 Arg::LongFlag(name) => {
842 if past_double_dash {
843 tool_args.positional.push(Value::String(format!("--{name}")));
844 } else {
845 let lookup = param_lookup.get(name.as_str());
847 let is_bool = lookup
848 .map(|(_, typ, _)| is_bool_type(typ))
849 .unwrap_or(true); if is_bool {
852 tool_args.flags.insert(name.clone());
853 } else {
854 let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
860 let next_positional = positional_indices
861 .iter()
862 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
863
864 if let Some((pos_idx, expr)) = next_positional {
865 if let Some(value) = eval_simple_expr(expr, ctx) {
866 tool_args.named.insert(canonical.to_string(), value);
867 consumed_positionals.insert(*pos_idx);
868 } else {
869 tool_args.flags.insert(name.clone());
870 }
871 } else {
872 tool_args.flags.insert(name.clone());
873 }
874 }
875 }
876 }
877 }
878 i += 1;
879 }
880
881 if let Some(schema) = schema.filter(|s| s.map_positionals) {
886 let pre_dash_count = if past_double_dash {
888 let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
890 positional_indices.iter()
892 .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
893 .count()
894 } else {
895 tool_args.positional.len()
896 };
897
898 let mut remaining = Vec::new();
899 let mut positional_iter = tool_args.positional.drain(..).enumerate();
900
901 for param in &schema.params {
902 if tool_args.named.contains_key(¶m.name) || tool_args.flags.contains(¶m.name) {
903 continue; }
905 if is_bool_type(¶m.param_type) {
906 continue; }
908 loop {
910 match positional_iter.next() {
911 Some((idx, val)) if idx < pre_dash_count => {
912 tool_args.named.insert(param.name.clone(), val);
913 break;
914 }
915 Some((_, val)) => {
916 remaining.push(val); }
918 None => break,
919 }
920 }
921 }
922
923 remaining.extend(positional_iter.map(|(_, v)| v));
925 tool_args.positional = remaining;
926 }
927
928 tool_args
929}
930
931pub(crate) fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
933 match expr {
934 Expr::Literal(value) => Some(eval_literal(value, ctx)),
935 Expr::VarRef(path) => ctx.scope.resolve_path(path),
936 Expr::Interpolated(parts) => {
937 let mut result = String::new();
938 for part in parts {
939 match part {
940 crate::ast::StringPart::Literal(s) => result.push_str(s),
941 crate::ast::StringPart::Var(path) => {
942 if let Some(value) = ctx.scope.resolve_path(path) {
943 result.push_str(&value_to_string(&value));
944 }
945 }
946 crate::ast::StringPart::VarWithDefault { name, default } => {
947 match ctx.scope.get(name) {
948 Some(value) => {
949 let s = value_to_string(value);
950 if s.is_empty() {
951 result.push_str(&eval_string_parts_sync(default, ctx));
952 } else {
953 result.push_str(&s);
954 }
955 }
956 None => result.push_str(&eval_string_parts_sync(default, ctx)),
957 }
958 }
959 crate::ast::StringPart::VarLength(name) => {
960 let len = match ctx.scope.get(name) {
961 Some(value) => value_to_string(value).len(),
962 None => 0,
963 };
964 result.push_str(&len.to_string());
965 }
966 crate::ast::StringPart::Positional(n) => {
967 if let Some(s) = ctx.scope.get_positional(*n) {
968 result.push_str(s);
969 }
970 }
971 crate::ast::StringPart::AllArgs => {
972 result.push_str(&ctx.scope.all_args().join(" "));
973 }
974 crate::ast::StringPart::ArgCount => {
975 result.push_str(&ctx.scope.arg_count().to_string());
976 }
977 crate::ast::StringPart::Arithmetic(expr) => {
978 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
980 result.push_str(&value.to_string());
981 }
982 }
983 crate::ast::StringPart::CommandSubst(_) => {
984 }
986 crate::ast::StringPart::LastExitCode => {
987 result.push_str(&ctx.scope.last_result().code.to_string());
988 }
989 crate::ast::StringPart::CurrentPid => {
990 result.push_str(&ctx.scope.pid().to_string());
991 }
992 }
993 }
994 Some(Value::String(result))
995 }
996 Expr::GlobPattern(s) => Some(Value::String(s.clone())),
997 Expr::HereDocBody { parts, strip_tabs } => {
998 let unwrapped: Vec<crate::ast::StringPart> =
1002 parts.iter().map(|sp| sp.part.clone()).collect();
1003 let raw = eval_string_parts_sync(&unwrapped, ctx);
1004 let body = if *strip_tabs {
1005 crate::interpreter::strip_leading_tabs(&raw)
1006 } else {
1007 raw
1008 };
1009 Some(Value::String(body))
1010 }
1011 _ => None, }
1013}
1014
1015fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
1017 value.clone()
1018}
1019
1020fn value_to_string(value: &Value) -> String {
1022 match value {
1023 Value::Null => "".to_string(),
1024 Value::Bool(b) => b.to_string(),
1025 Value::Int(i) => i.to_string(),
1026 Value::Float(f) => f.to_string(),
1027 Value::String(s) => s.clone(),
1028 Value::Json(json) => json.to_string(),
1029 Value::Bytes(b) => format!("[binary: {} bytes]", b.len()),
1030 }
1031}
1032
1033fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
1036 let mut result = String::new();
1037 for part in parts {
1038 match part {
1039 crate::ast::StringPart::Literal(s) => result.push_str(s),
1040 crate::ast::StringPart::Var(path) => {
1041 if let Some(value) = ctx.scope.resolve_path(path) {
1042 result.push_str(&value_to_string(&value));
1043 }
1044 }
1045 crate::ast::StringPart::VarWithDefault { name, default } => {
1046 match ctx.scope.get(name) {
1047 Some(value) => {
1048 let s = value_to_string(value);
1049 if s.is_empty() {
1050 result.push_str(&eval_string_parts_sync(default, ctx));
1051 } else {
1052 result.push_str(&s);
1053 }
1054 }
1055 None => result.push_str(&eval_string_parts_sync(default, ctx)),
1056 }
1057 }
1058 crate::ast::StringPart::VarLength(name) => {
1059 let len = match ctx.scope.get(name) {
1060 Some(value) => value_to_string(value).len(),
1061 None => 0,
1062 };
1063 result.push_str(&len.to_string());
1064 }
1065 crate::ast::StringPart::Positional(n) => {
1066 if let Some(s) = ctx.scope.get_positional(*n) {
1067 result.push_str(s);
1068 }
1069 }
1070 crate::ast::StringPart::AllArgs => {
1071 result.push_str(&ctx.scope.all_args().join(" "));
1072 }
1073 crate::ast::StringPart::ArgCount => {
1074 result.push_str(&ctx.scope.arg_count().to_string());
1075 }
1076 crate::ast::StringPart::Arithmetic(expr) => {
1077 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
1078 result.push_str(&value.to_string());
1079 }
1080 }
1081 crate::ast::StringPart::CommandSubst(_) => {
1082 }
1084 crate::ast::StringPart::LastExitCode => {
1085 result.push_str(&ctx.scope.last_result().code.to_string());
1086 }
1087 crate::ast::StringPart::CurrentPid => {
1088 result.push_str(&ctx.scope.pid().to_string());
1089 }
1090 }
1091 }
1092 result
1093}
1094
1095fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
1100 let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
1101 let gather_idx = commands.iter().position(|c| c.name == "gather")?;
1102
1103 if gather_idx > scatter_idx {
1105 Some((scatter_idx, gather_idx))
1106 } else {
1107 None
1108 }
1109}
1110
1111#[cfg(test)]
1112mod select_leaf_tests {
1113 use super::*;
1114 use crate::tools::ParamSchema;
1115
1116 fn kj_schema() -> ToolSchema {
1121 ToolSchema::new("kj", "kaijutsu")
1122 .param(ParamSchema::new("confirm", "string"))
1123 .param(ParamSchema::new("verbose", "bool"))
1124 .subcommand(
1125 ToolSchema::new("context", "context ops")
1126 .with_command_aliases(["ctx"])
1127 .subcommand(ToolSchema::new("list", "list").with_command_aliases(["ls"]))
1128 .subcommand(
1129 ToolSchema::new("create", "create").param(
1130 ParamSchema::new("type", "string").with_aliases(["t"]),
1131 ),
1132 ),
1133 )
1134 }
1135
1136 fn word(s: &str) -> Arg {
1137 Arg::Positional(Expr::Literal(Value::String(s.to_string())))
1138 }
1139
1140 #[test]
1141 fn flat_tool_returns_root() {
1142 let schema = ToolSchema::new("cat", "concat")
1143 .param(ParamSchema::required("path", "string", "f").positional());
1144 let leaf = select_leaf(&schema, &[word("foo.txt")]).expect("flat ok");
1145 assert_eq!(leaf.name, "cat");
1146 }
1147
1148 #[test]
1149 fn single_hop() {
1150 let schema = kj_schema();
1151 let leaf = select_leaf(&schema, &[word("context")]).expect("ok");
1152 assert_eq!(leaf.name, "context");
1153 }
1154
1155 #[test]
1156 fn two_hops() {
1157 let schema = kj_schema();
1158 let leaf = select_leaf(&schema, &[word("context"), word("create")]).expect("ok");
1159 assert_eq!(leaf.name, "create");
1160 assert!(leaf.params.iter().any(|p| p.name == "type"), "leaf has --type");
1161 }
1162
1163 #[test]
1164 fn alias_hops_route() {
1165 let schema = kj_schema();
1166 let leaf = select_leaf(&schema, &[word("ctx"), word("ls")]).expect("ok");
1168 assert_eq!(leaf.name, "list");
1169 }
1170
1171 #[test]
1172 fn unknown_subcommand_stops_at_current_node() {
1173 let schema = kj_schema();
1174 let leaf = select_leaf(&schema, &[word("context"), word("nonesuch")]).expect("ok");
1177 assert_eq!(leaf.name, "context");
1178 }
1179
1180 #[test]
1181 fn root_bool_flag_before_path_does_not_disrupt_routing() {
1182 let schema = kj_schema();
1183 let args = vec![Arg::LongFlag("verbose".into()), word("context"), word("create")];
1186 let leaf = select_leaf(&schema, &args).expect("ok");
1187 assert_eq!(leaf.name, "create");
1188 }
1189
1190 #[test]
1191 fn root_value_flag_space_form_before_path_skips_its_value() {
1192 let schema = kj_schema();
1193 let args = vec![
1196 Arg::LongFlag("confirm".into()),
1197 word("nonce"),
1198 word("context"),
1199 word("create"),
1200 ];
1201 let leaf = select_leaf(&schema, &args).expect("ok");
1202 assert_eq!(leaf.name, "create");
1203 }
1204
1205 #[test]
1206 fn leaf_value_flag_after_path_routes_to_leaf() {
1207 let schema = kj_schema();
1208 let args = vec![
1211 word("context"),
1212 word("create"),
1213 Arg::LongFlag("type".into()),
1214 word("x"),
1215 ];
1216 let leaf = select_leaf(&schema, &args).expect("ok");
1217 assert_eq!(leaf.name, "create");
1218 assert!(leaf.params.iter().any(|p| p.name == "type"));
1219 }
1220
1221 #[test]
1222 fn double_dash_stops_routing() {
1223 let schema = kj_schema();
1224 let leaf = select_leaf(&schema, &[Arg::DoubleDash, word("context")]).expect("ok");
1226 assert_eq!(leaf.name, "kj");
1227 }
1228
1229 #[test]
1230 fn computed_subcommand_selector_errors() {
1231 let schema = kj_schema();
1232 let args = vec![Arg::Positional(Expr::CommandSubst(vec![
1235 crate::ast::Stmt::Command(crate::ast::Command {
1236 name: "echo".into(),
1237 args: vec![],
1238 redirects: vec![],
1239 }),
1240 ]))];
1241 let err = select_leaf(&schema, &args).expect_err("must error");
1242 let msg = err.to_string();
1243 assert!(msg.contains("subcommand name is required"), "got: {msg}");
1244 assert!(msg.contains("command substitution"), "names the cause: {msg}");
1245 }
1246
1247 #[test]
1248 fn variable_subcommand_selector_errors() {
1249 let schema = kj_schema();
1250 let args = vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("sub")))];
1251 let err = select_leaf(&schema, &args).expect_err("must error");
1252 assert!(err.to_string().contains("variable reference"), "got: {err}");
1253 }
1254
1255 #[test]
1256 fn computed_positional_after_leaf_is_fine() {
1257 let schema = kj_schema();
1258 let args = vec![
1261 word("context"),
1262 word("list"),
1263 Arg::Positional(Expr::CommandSubst(vec![crate::ast::Stmt::Command(
1264 crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
1265 )])),
1266 ];
1267 let leaf = select_leaf(&schema, &args).expect("ok");
1268 assert_eq!(leaf.name, "list");
1269 }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274 use super::*;
1275 use crate::dispatch::BackendDispatcher;
1276 use crate::tools::register_builtins;
1277 use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
1278 use std::path::Path;
1279
1280 async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
1281 let mut tools = ToolRegistry::new();
1282 register_builtins(&mut tools);
1283 let tools = Arc::new(tools);
1284 let runner = PipelineRunner::new(tools.clone());
1285 let dispatcher = BackendDispatcher::new(tools.clone());
1286
1287 let mut vfs = VfsRouter::new();
1288 let mem = MemoryFs::new();
1289 mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
1290 vfs.mount("/", mem);
1291 let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
1292
1293 (runner, ctx, dispatcher)
1294 }
1295
1296 fn make_cmd(name: &str, args: Vec<&str>) -> Command {
1297 Command {
1298 name: name.to_string(),
1299 args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
1300 redirects: vec![],
1301 }
1302 }
1303
1304 #[tokio::test]
1305 async fn test_single_command() {
1306 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1307 let cmd = make_cmd("echo", vec!["hello"]);
1308
1309 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1310 assert!(result.ok());
1311 assert_eq!(result.text_out().trim(), "hello");
1312 }
1313
1314 #[tokio::test]
1315 async fn test_pipeline_echo_grep() {
1316 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1317
1318 let echo_cmd = Command {
1320 name: "echo".to_string(),
1321 args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
1322 redirects: vec![],
1323 };
1324 let grep_cmd = Command {
1325 name: "grep".to_string(),
1326 args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
1327 redirects: vec![],
1328 };
1329
1330 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1331 assert!(result.ok());
1332 assert_eq!(result.text_out().trim(), "world");
1333 }
1334
1335 #[tokio::test]
1336 async fn test_pipeline_cat_grep() {
1337 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1338
1339 let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
1341 let grep_cmd = Command {
1342 name: "grep".to_string(),
1343 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1344 redirects: vec![],
1345 };
1346
1347 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1348 assert!(result.ok());
1349 assert!(result.text_out().contains("hello"));
1350 }
1351
1352 #[tokio::test]
1353 async fn test_command_not_found() {
1354 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1355 let cmd = make_cmd("nonexistent", vec![]);
1356
1357 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1358 assert!(!result.ok());
1359 assert_eq!(result.code, 127);
1360 assert!(result.err.contains("not found"));
1361 }
1362
1363 #[tokio::test]
1364 async fn test_pipeline_continues_on_failure() {
1365 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1368
1369 let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
1372 let grep_cmd = Command {
1373 name: "grep".to_string(),
1374 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1375 redirects: vec![],
1376 };
1377
1378 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1379 assert!(!result.ok());
1381 }
1382
1383 #[tokio::test]
1384 async fn test_pipeline_last_command_exit_code() {
1385 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1387
1388 let echo_cmd = make_cmd("echo", vec!["hello"]);
1389 let cat_cmd = make_cmd("cat", vec![]);
1390
1391 let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1392 assert!(result.ok());
1393 assert!(result.text_out().contains("hello"));
1394 }
1395
1396 #[tokio::test]
1397 async fn test_empty_pipeline() {
1398 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1399 let result = runner.run(&[], &mut ctx, &dispatcher).await;
1400 assert!(result.ok());
1401 }
1402
1403 #[test]
1406 fn test_find_scatter_gather_both_present() {
1407 let commands = vec![
1408 make_cmd("echo", vec!["a"]),
1409 make_cmd("scatter", vec![]),
1410 make_cmd("process", vec![]),
1411 make_cmd("gather", vec![]),
1412 ];
1413 let result = find_scatter_gather(&commands);
1414 assert_eq!(result, Some((1, 3)));
1415 }
1416
1417 #[test]
1418 fn test_find_scatter_gather_no_scatter() {
1419 let commands = vec![
1420 make_cmd("echo", vec!["a"]),
1421 make_cmd("gather", vec![]),
1422 ];
1423 let result = find_scatter_gather(&commands);
1424 assert!(result.is_none());
1425 }
1426
1427 #[test]
1428 fn test_find_scatter_gather_no_gather() {
1429 let commands = vec![
1430 make_cmd("echo", vec!["a"]),
1431 make_cmd("scatter", vec![]),
1432 ];
1433 let result = find_scatter_gather(&commands);
1434 assert!(result.is_none());
1435 }
1436
1437 #[test]
1438 fn test_find_scatter_gather_wrong_order() {
1439 let commands = vec![
1440 make_cmd("gather", vec![]),
1441 make_cmd("scatter", vec![]),
1442 ];
1443 let result = find_scatter_gather(&commands);
1444 assert!(result.is_none());
1445 }
1446
1447 #[tokio::test]
1448 async fn test_scatter_gather_simple() {
1449 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1450
1451 let split_cmd = Command {
1453 name: "split".to_string(),
1454 args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1455 redirects: vec![],
1456 };
1457 let scatter_cmd = make_cmd("scatter", vec![]);
1458 let process_cmd = Command {
1459 name: "echo".to_string(),
1460 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1461 redirects: vec![],
1462 };
1463 let gather_cmd = make_cmd("gather", vec![]);
1464
1465 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1466 assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1467 assert!(result.text_out().contains("a"));
1469 assert!(result.text_out().contains("b"));
1470 assert!(result.text_out().contains("c"));
1471 }
1472
1473 #[tokio::test]
1474 async fn test_scatter_gather_empty_input() {
1475 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1476
1477 let echo_cmd = Command {
1479 name: "echo".to_string(),
1480 args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1481 redirects: vec![],
1482 };
1483 let scatter_cmd = make_cmd("scatter", vec![]);
1484 let process_cmd = Command {
1485 name: "echo".to_string(),
1486 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1487 redirects: vec![],
1488 };
1489 let gather_cmd = make_cmd("gather", vec![]);
1490
1491 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1492 assert!(result.ok());
1493 assert!(result.text_out().trim().is_empty());
1494 }
1495
1496 #[tokio::test]
1497 async fn test_scatter_gather_with_structured_stdin() {
1498 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1499
1500 let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1502 ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1503
1504 let scatter_cmd = make_cmd("scatter", vec![]);
1505 let process_cmd = Command {
1506 name: "echo".to_string(),
1507 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1508 redirects: vec![],
1509 };
1510 let gather_cmd = make_cmd("gather", vec![]);
1511
1512 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1513 assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1514 assert!(result.text_out().contains("x"));
1515 assert!(result.text_out().contains("y"));
1516 assert!(result.text_out().contains("z"));
1517 }
1518
1519 #[tokio::test]
1520 async fn test_scatter_gather_json_input() {
1521 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1522
1523 let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1525 ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1526
1527 let scatter_cmd = make_cmd("scatter", vec![]);
1528 let process_cmd = Command {
1529 name: "echo".to_string(),
1530 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1531 redirects: vec![],
1532 };
1533 let gather_cmd = make_cmd("gather", vec![]);
1534
1535 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1536 assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1537 assert!(result.text_out().contains("one"));
1538 assert!(result.text_out().contains("two"));
1539 assert!(result.text_out().contains("three"));
1540 }
1541
1542 #[tokio::test]
1543 async fn test_scatter_gather_with_post_gather() {
1544 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1545
1546 let split_cmd = Command {
1548 name: "split".to_string(),
1549 args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1550 redirects: vec![],
1551 };
1552 let scatter_cmd = make_cmd("scatter", vec![]);
1553 let process_cmd = Command {
1554 name: "echo".to_string(),
1555 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1556 redirects: vec![],
1557 };
1558 let gather_cmd = make_cmd("gather", vec![]);
1559 let grep_cmd = Command {
1560 name: "grep".to_string(),
1561 args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1562 redirects: vec![],
1563 };
1564
1565 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1566 assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1567 assert!(result.text_out().contains("a"));
1568 assert!(!result.text_out().contains("b"));
1569 }
1570
1571 #[tokio::test]
1572 async fn test_scatter_custom_var_name() {
1573 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1574
1575 let data = Value::Json(serde_json::json!(["test1", "test2"]));
1577 ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1578
1579 let scatter_cmd = Command {
1581 name: "scatter".to_string(),
1582 args: vec![Arg::Named {
1583 key: "as".to_string(),
1584 value: Expr::Literal(Value::String("URL".to_string())),
1585 }],
1586 redirects: vec![],
1587 };
1588 let process_cmd = Command {
1589 name: "echo".to_string(),
1590 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1591 redirects: vec![],
1592 };
1593 let gather_cmd = make_cmd("gather", vec![]);
1594
1595 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1596 assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1597 assert!(result.text_out().contains("test1"));
1598 assert!(result.text_out().contains("test2"));
1599 }
1600
1601 #[tokio::test]
1604 async fn test_pipeline_routes_through_backend() {
1605 use crate::backend::testing::MockBackend;
1606 use std::sync::atomic::Ordering;
1607
1608 let (backend, call_count) = MockBackend::new();
1610 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1611
1612 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1614
1615 let tools = std::sync::Arc::new(ToolRegistry::new());
1617 let runner = PipelineRunner::new(tools.clone());
1618 let dispatcher = BackendDispatcher::new(tools);
1619
1620 let cmd = make_cmd("test-tool", vec!["arg1"]);
1622 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1623
1624 assert!(result.ok(), "Mock backend should return success");
1625 assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1626 assert!(result.text_out().contains("mock executed"), "Output should be from mock backend");
1627 }
1628
1629 #[tokio::test]
1630 async fn test_multi_command_pipeline_routes_through_backend() {
1631 use crate::backend::testing::MockBackend;
1632 use std::sync::atomic::Ordering;
1633
1634 let (backend, call_count) = MockBackend::new();
1635 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1636 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1637
1638 let tools = std::sync::Arc::new(ToolRegistry::new());
1639 let runner = PipelineRunner::new(tools.clone());
1640 let dispatcher = BackendDispatcher::new(tools);
1641
1642 let cmd1 = make_cmd("tool1", vec![]);
1644 let cmd2 = make_cmd("tool2", vec![]);
1645 let cmd3 = make_cmd("tool3", vec![]);
1646
1647 let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1648
1649 assert!(result.ok());
1650 assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1651 }
1652
1653 use crate::tools::{ParamSchema, ToolSchema};
1656
1657 fn make_test_schema() -> ToolSchema {
1658 ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1659 .param(ParamSchema::required("query", "string", "Search query"))
1660 .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1661 .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1662 .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1663 .with_positional_mapping()
1664 }
1665
1666 fn make_minimal_ctx() -> ExecContext {
1667 let mut vfs = VfsRouter::new();
1668 vfs.mount("/", MemoryFs::new());
1669 ExecContext::new(Arc::new(vfs))
1670 }
1671
1672 #[test]
1673 fn test_schema_aware_string_arg() {
1674 let args = vec![
1676 Arg::LongFlag("query".to_string()),
1677 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1678 ];
1679 let schema = make_test_schema();
1680 let ctx = make_minimal_ctx();
1681
1682 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1683
1684 assert!(tool_args.flags.is_empty(), "No flags should be set");
1685 assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1686 assert_eq!(
1687 tool_args.named.get("query"),
1688 Some(&Value::String("test".to_string())),
1689 "--query should consume 'test' as its value"
1690 );
1691 }
1692
1693 #[test]
1694 fn test_schema_aware_bool_flag() {
1695 let args = vec![
1697 Arg::LongFlag("verbose".to_string()),
1698 ];
1699 let schema = make_test_schema();
1700 let ctx = make_minimal_ctx();
1701
1702 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1703
1704 assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1705 assert!(tool_args.named.is_empty(), "No named args");
1706 assert!(tool_args.positional.is_empty(), "No positionals");
1707 }
1708
1709 #[test]
1710 fn test_schema_aware_mixed() {
1711 let args = vec![
1714 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1715 Arg::LongFlag("output".to_string()),
1716 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1717 Arg::LongFlag("verbose".to_string()),
1718 ];
1719 let schema = make_test_schema();
1720 let ctx = make_minimal_ctx();
1721
1722 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1723
1724 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1725 assert_eq!(
1726 tool_args.named.get("query"),
1727 Some(&Value::String("file.txt".to_string()))
1728 );
1729 assert_eq!(
1730 tool_args.named.get("output"),
1731 Some(&Value::String("out.txt".to_string()))
1732 );
1733 assert!(tool_args.flags.contains("verbose"));
1734 }
1735
1736 #[test]
1737 fn test_schema_aware_multiple_string_args() {
1738 let args = vec![
1740 Arg::LongFlag("query".to_string()),
1741 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1742 Arg::LongFlag("output".to_string()),
1743 Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1744 Arg::LongFlag("verbose".to_string()),
1745 Arg::LongFlag("limit".to_string()),
1746 Arg::Positional(Expr::Literal(Value::Int(5))),
1747 ];
1748 let schema = make_test_schema();
1749 let ctx = make_minimal_ctx();
1750
1751 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1752
1753 assert!(tool_args.positional.is_empty(), "All positionals consumed");
1754 assert_eq!(
1755 tool_args.named.get("query"),
1756 Some(&Value::String("test".to_string()))
1757 );
1758 assert_eq!(
1759 tool_args.named.get("output"),
1760 Some(&Value::String("result.json".to_string()))
1761 );
1762 assert_eq!(
1763 tool_args.named.get("limit"),
1764 Some(&Value::Int(5))
1765 );
1766 assert!(tool_args.flags.contains("verbose"));
1767 }
1768
1769 #[test]
1770 fn test_schema_aware_double_dash() {
1771 let args = vec![
1774 Arg::LongFlag("output".to_string()),
1775 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1776 Arg::DoubleDash,
1777 Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1778 ];
1779 let schema = make_test_schema();
1780 let ctx = make_minimal_ctx();
1781
1782 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1783
1784 assert_eq!(
1785 tool_args.named.get("output"),
1786 Some(&Value::String("out.txt".to_string()))
1787 );
1788 assert_eq!(
1790 tool_args.positional,
1791 vec![Value::String("--this-is-data".to_string())]
1792 );
1793 }
1794
1795 #[test]
1796 fn test_no_schema_fallback() {
1797 let args = vec![
1799 Arg::LongFlag("query".to_string()),
1800 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1801 ];
1802 let ctx = make_minimal_ctx();
1803
1804 let tool_args = build_tool_args(&args, &ctx, None);
1805
1806 assert!(tool_args.flags.contains("query"), "--query should be a flag");
1808 assert_eq!(
1809 tool_args.positional,
1810 vec![Value::String("test".to_string())],
1811 "'test' should be a positional"
1812 );
1813 }
1814
1815 #[test]
1816 fn test_unknown_flag_in_schema() {
1817 let args = vec![
1819 Arg::LongFlag("unknown".to_string()),
1820 Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1821 ];
1822 let schema = make_test_schema();
1823 let ctx = make_minimal_ctx();
1824
1825 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1826
1827 assert!(tool_args.flags.contains("unknown"));
1828 assert!(tool_args.positional.is_empty(), "value consumed as query param");
1829 assert_eq!(
1830 tool_args.named.get("query"),
1831 Some(&Value::String("value".to_string()))
1832 );
1833 }
1834
1835 #[test]
1836 fn test_named_args_unchanged() {
1837 let args = vec![
1839 Arg::Named {
1840 key: "query".to_string(),
1841 value: Expr::Literal(Value::String("test".to_string())),
1842 },
1843 Arg::LongFlag("verbose".to_string()),
1844 ];
1845 let schema = make_test_schema();
1846 let ctx = make_minimal_ctx();
1847
1848 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1849
1850 assert_eq!(
1851 tool_args.named.get("query"),
1852 Some(&Value::String("test".to_string()))
1853 );
1854 assert!(tool_args.flags.contains("verbose"));
1855 }
1856
1857 #[test]
1858 fn test_short_flags_unchanged() {
1859 let args = vec![
1861 Arg::ShortFlag("la".to_string()),
1862 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1863 ];
1864 let schema = make_test_schema();
1865 let ctx = make_minimal_ctx();
1866
1867 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1868
1869 assert!(tool_args.flags.contains("l"));
1870 assert!(tool_args.flags.contains("a"));
1871 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1872 assert_eq!(
1873 tool_args.named.get("query"),
1874 Some(&Value::String("file.txt".to_string()))
1875 );
1876 }
1877
1878 #[test]
1879 fn test_flag_at_end_no_value() {
1880 let args = vec![
1883 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1884 Arg::LongFlag("output".to_string()),
1885 ];
1886 let schema = make_test_schema();
1887 let ctx = make_minimal_ctx();
1888
1889 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1890
1891 assert!(tool_args.flags.contains("output"));
1893 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1894 assert_eq!(
1895 tool_args.named.get("query"),
1896 Some(&Value::String("file.txt".to_string()))
1897 );
1898 }
1899
1900 #[test]
1901 fn test_positional_skips_bool_params() {
1902 let schema = ToolSchema::new("test", "")
1906 .param(ParamSchema::required("query", "string", ""))
1907 .param(ParamSchema::optional(
1908 "verbose",
1909 "bool",
1910 Value::Bool(false),
1911 "",
1912 ))
1913 .param(ParamSchema::optional(
1914 "output",
1915 "string",
1916 Value::Null,
1917 "",
1918 ))
1919 .with_positional_mapping();
1920 let args = vec![
1921 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1922 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1923 ];
1924 let ctx = make_minimal_ctx();
1925
1926 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1927
1928 assert_eq!(
1929 tool_args.named.get("query"),
1930 Some(&Value::String("val1".to_string()))
1931 );
1932 assert_eq!(
1933 tool_args.named.get("output"),
1934 Some(&Value::String("val2".to_string()))
1935 );
1936 assert!(!tool_args.flags.contains("verbose"));
1937 assert!(tool_args.positional.is_empty());
1938 }
1939
1940 #[test]
1941 fn test_positionals_fill_available_slots() {
1942 let args = vec![
1945 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1946 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1947 Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1948 ];
1949 let schema = make_test_schema(); let ctx = make_minimal_ctx();
1951
1952 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1953
1954 assert_eq!(
1957 tool_args.named.get("query"),
1958 Some(&Value::String("val1".to_string()))
1959 );
1960 assert_eq!(
1961 tool_args.named.get("limit"),
1962 Some(&Value::String("val2".to_string()))
1963 );
1964 assert_eq!(
1965 tool_args.named.get("output"),
1966 Some(&Value::String("val3".to_string()))
1967 );
1968 assert!(tool_args.positional.is_empty());
1969 }
1970
1971 #[test]
1972 fn test_truly_excess_positionals() {
1973 let schema = ToolSchema::new("test", "")
1975 .param(ParamSchema::required("name", "string", ""))
1976 .with_positional_mapping();
1977 let args = vec![
1978 Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1979 Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1980 Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1981 ];
1982 let ctx = make_minimal_ctx();
1983
1984 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1985
1986 assert_eq!(
1987 tool_args.named.get("name"),
1988 Some(&Value::String("first".to_string()))
1989 );
1990 assert_eq!(
1991 tool_args.positional,
1992 vec![
1993 Value::String("second".to_string()),
1994 Value::String("third".to_string()),
1995 ]
1996 );
1997 }
1998
1999 #[test]
2000 fn test_double_dash_positional_not_mapped() {
2001 let args = vec![
2003 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
2004 Arg::DoubleDash,
2005 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
2006 ];
2007 let schema = make_test_schema();
2008 let ctx = make_minimal_ctx();
2009
2010 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2011
2012 assert_eq!(
2013 tool_args.named.get("query"),
2014 Some(&Value::String("val1".to_string()))
2015 );
2016 assert_eq!(
2018 tool_args.positional,
2019 vec![Value::String("val2".to_string())]
2020 );
2021 }
2022
2023 #[test]
2024 fn test_all_params_filled_by_flags() {
2025 let args = vec![
2027 Arg::LongFlag("query".to_string()),
2028 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2029 Arg::LongFlag("output".to_string()),
2030 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2031 Arg::LongFlag("verbose".to_string()),
2032 ];
2033 let schema = make_test_schema();
2034 let ctx = make_minimal_ctx();
2035
2036 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2037
2038 assert_eq!(
2039 tool_args.named.get("query"),
2040 Some(&Value::String("search".to_string()))
2041 );
2042 assert_eq!(
2043 tool_args.named.get("output"),
2044 Some(&Value::String("out.txt".to_string()))
2045 );
2046 assert!(tool_args.flags.contains("verbose"));
2047 assert!(tool_args.positional.is_empty());
2048 }
2049
2050 #[test]
2051 fn test_mixed_flags_and_positional_fill() {
2052 let args = vec![
2054 Arg::LongFlag("output".to_string()),
2055 Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
2056 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
2057 ];
2058 let schema = make_test_schema();
2059 let ctx = make_minimal_ctx();
2060
2061 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2062
2063 assert_eq!(
2064 tool_args.named.get("output"),
2065 Some(&Value::String("foo".to_string()))
2066 );
2067 assert_eq!(
2068 tool_args.named.get("query"),
2069 Some(&Value::String("val1".to_string()))
2070 );
2071 assert!(tool_args.positional.is_empty());
2072 }
2073
2074 #[test]
2075 fn test_alias_flag_prevents_mapping_overwrite() {
2076 let schema = ToolSchema::new("test", "")
2078 .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
2079 .param(ParamSchema::required("output", "string", ""))
2080 .with_positional_mapping();
2081 let args = vec![
2082 Arg::ShortFlag("q".to_string()),
2083 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2084 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2085 ];
2086 let ctx = make_minimal_ctx();
2087
2088 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2089
2090 assert_eq!(
2091 tool_args.named.get("query"),
2092 Some(&Value::String("search".to_string()))
2093 );
2094 assert_eq!(
2095 tool_args.named.get("output"),
2096 Some(&Value::String("out.txt".to_string()))
2097 );
2098 assert!(tool_args.positional.is_empty());
2099 }
2100
2101 #[test]
2102 fn test_builtin_schema_no_positional_mapping() {
2103 let schema = ToolSchema::new("echo", "")
2105 .param(ParamSchema::optional("args", "any", Value::Null, ""))
2106 .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
2107 let args = vec![
2109 Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
2110 Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
2111 ];
2112 let ctx = make_minimal_ctx();
2113
2114 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2115
2116 assert_eq!(
2118 tool_args.positional,
2119 vec![
2120 Value::String("hello".to_string()),
2121 Value::String("world".to_string()),
2122 ]
2123 );
2124 assert!(!tool_args.named.contains_key("args"));
2125 }
2126
2127 #[test]
2128 fn test_short_flag_with_alias_consumes_value() {
2129 let schema = ToolSchema::new("head", "Output first part of files")
2132 .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
2133 .with_aliases(["-n"]));
2134 let args = vec![
2135 Arg::ShortFlag("n".to_string()),
2136 Arg::Positional(Expr::Literal(Value::Int(5))),
2137 Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
2138 ];
2139 let ctx = make_minimal_ctx();
2140
2141 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2142
2143 assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
2144 assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
2145 assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
2146 }
2147
2148 #[tokio::test]
2151 async fn test_merge_stderr_redirect() {
2152 let result = ExecResult::from_output(0, "stdout content", "stderr content");
2154
2155 let redirects = vec![Redirect {
2156 kind: RedirectKind::MergeStderr,
2157 target: Expr::Literal(Value::Null),
2158 }];
2159
2160 let ctx = make_minimal_ctx();
2161 let result = apply_redirects(result, &redirects, &ctx).await;
2162
2163 assert_eq!(&*result.text_out(), "stdout contentstderr content");
2164 assert!(result.err.is_empty());
2165 }
2166
2167 #[tokio::test]
2168 async fn test_merge_stderr_with_empty_stderr() {
2169 let result = ExecResult::from_output(0, "stdout only", "");
2171
2172 let redirects = vec![Redirect {
2173 kind: RedirectKind::MergeStderr,
2174 target: Expr::Literal(Value::Null),
2175 }];
2176
2177 let ctx = make_minimal_ctx();
2178 let result = apply_redirects(result, &redirects, &ctx).await;
2179
2180 assert_eq!(&*result.text_out(), "stdout only");
2181 assert!(result.err.is_empty());
2182 }
2183
2184 #[tokio::test]
2185 async fn test_merge_stderr_order_matters() {
2186 let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
2191
2192 let redirects = vec![Redirect {
2194 kind: RedirectKind::MergeStderr,
2195 target: Expr::Literal(Value::Null),
2196 }];
2197
2198 let ctx = make_minimal_ctx();
2199 let result = apply_redirects(result, &redirects, &ctx).await;
2200
2201 assert_eq!(&*result.text_out(), "stdout\nstderr\n");
2202 assert!(result.err.is_empty());
2203 }
2204
2205 #[tokio::test]
2206 async fn test_redirect_with_command_execution() {
2207 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2208
2209 let cmd = Command {
2211 name: "echo".to_string(),
2212 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
2213 redirects: vec![Redirect {
2214 kind: RedirectKind::MergeStderr,
2215 target: Expr::Literal(Value::Null),
2216 }],
2217 };
2218
2219 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
2220 assert!(result.ok());
2221 assert!(result.text_out().contains("hello"));
2223 }
2224
2225 #[tokio::test]
2226 async fn test_merge_stderr_in_pipeline() {
2227 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2228
2229 let echo_cmd = Command {
2232 name: "echo".to_string(),
2233 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2234 redirects: vec![Redirect {
2235 kind: RedirectKind::MergeStderr,
2236 target: Expr::Literal(Value::Null),
2237 }],
2238 };
2239 let grep_cmd = Command {
2240 name: "grep".to_string(),
2241 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2242 redirects: vec![],
2243 };
2244
2245 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
2246 assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
2247 assert!(result.text_out().contains("output"));
2248 }
2249}