1use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21 parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24async fn apply_redirects(
30 mut result: ExecResult,
31 redirects: &[Redirect],
32 ctx: &ExecContext,
33) -> ExecResult {
34 for redir in redirects {
39 match redir.kind {
40 RedirectKind::MergeStderr => {
41 result.materialize();
44 if !result.err.is_empty() {
45 let err = std::mem::take(&mut result.err);
46 result.push_out(&err);
47 }
48 }
49 RedirectKind::MergeStdout => {
50 result.materialize();
52 if !result.text_out().is_empty() {
53 let out = result.text_out().into_owned();
54 result.err.push_str(&out);
55 result.clear_out();
56 }
57 }
58 RedirectKind::StdoutOverwrite => {
59 let path = match eval_redirect_target(&redir.target, ctx).await {
60 Ok(p) => p,
61 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
62 };
63 if let Some(output) = result.take_output_for_stream() {
65 let mut buf = Vec::new();
66 if let Err(e) = output.write_canonical(&mut buf, None) {
67 return ExecResult::failure(1, format!("redirect: {e}"));
68 }
69 if let Err(e) = redirect_write(ctx, &path, &buf).await {
70 return ExecResult::failure(1, format!("redirect: {e}"));
71 }
72 } else {
73 if let Err(e) = redirect_write(ctx, &path, result.text_out().as_bytes()).await {
74 return ExecResult::failure(1, format!("redirect: {e}"));
75 }
76 }
77 result.clear_out();
78 result.set_output(None);
79 }
80 RedirectKind::StdoutAppend => {
81 let path = match eval_redirect_target(&redir.target, ctx).await {
82 Ok(p) => p,
83 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
84 };
85 if let Some(output) = result.take_output_for_stream() {
87 let mut buf = Vec::new();
88 if let Err(e) = output.write_canonical(&mut buf, None) {
89 return ExecResult::failure(1, format!("redirect: {e}"));
90 }
91 if let Err(e) = redirect_append(ctx, &path, &buf).await {
92 return ExecResult::failure(1, format!("redirect: {e}"));
93 }
94 } else {
95 if let Err(e) = redirect_append(ctx, &path, result.text_out().as_bytes()).await {
96 return ExecResult::failure(1, format!("redirect: {e}"));
97 }
98 }
99 result.clear_out();
100 result.set_output(None);
101 }
102 RedirectKind::Stderr => {
103 let path = match eval_redirect_target(&redir.target, ctx).await {
104 Ok(p) => p,
105 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
106 };
107 if let Err(e) = redirect_write(ctx, &path, result.err.as_bytes()).await {
108 return ExecResult::failure(1, format!("redirect: {e}"));
109 }
110 result.err.clear();
111 }
112 RedirectKind::Both => {
113 let path = match eval_redirect_target(&redir.target, ctx).await {
114 Ok(p) => p,
115 Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
116 };
117 let combined = format!("{}{}", result.text_out(), result.err);
118 if let Err(e) = redirect_write(ctx, &path, combined.as_bytes()).await {
119 return ExecResult::failure(1, format!("redirect: {e}"));
120 }
121 result.clear_out();
122 result.set_output(None);
123 result.err.clear();
124 }
125 RedirectKind::Stdin | RedirectKind::HereDoc | RedirectKind::HereString => {}
127 }
128 }
129 result.materialize();
134 result
135}
136
137async fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Result<String, String> {
144 if let Some(dispatcher) = &ctx.dispatcher {
145 dispatcher
146 .eval_expr(expr, ctx)
147 .await
148 .map(|v| value_to_string(&v))
149 .map_err(|e| e.to_string())
150 } else {
151 eval_simple_expr(expr, ctx)
152 .map(|v| value_to_string(&v))
153 .ok_or_else(|| "could not evaluate redirect target".to_string())
154 }
155}
156
157async fn redirect_write(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
164 use crate::backend::WriteMode;
165 let resolved = ctx.resolve_path(path);
166 ctx.backend.write(&resolved, data, WriteMode::Overwrite).await.map_err(|e| e.to_string())
167}
168
169async fn redirect_append(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
173 let resolved = ctx.resolve_path(path);
174 ctx.backend.append(&resolved, data).await.map_err(|e| e.to_string())
175}
176
177async fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) -> Result<(), String> {
185 use std::path::Path;
186 for redir in &cmd.redirects {
187 match &redir.kind {
188 RedirectKind::Stdin => {
189 let path = eval_redirect_target(&redir.target, ctx).await?;
190 let resolved = ctx.resolve_path(&path);
191 let data = ctx
192 .backend
193 .read(Path::new(&resolved), None)
194 .await
195 .map_err(|e| format!("redirect: {path}: {e}"))?;
196 let content = String::from_utf8(data)
197 .map_err(|_| format!("redirect: {path}: invalid UTF-8"))?;
198 ctx.set_stdin(content);
199 }
200 RedirectKind::HereDoc => {
201 match &redir.target {
202 Expr::Literal(Value::String(content)) => {
203 ctx.set_stdin(content.clone());
204 }
205 expr => {
208 let body = eval_redirect_target(expr, ctx).await?;
209 ctx.set_stdin(body);
210 }
211 }
212 }
213 RedirectKind::HereString => {
214 let mut s = eval_redirect_target(&redir.target, ctx).await?;
217 s.push('\n');
218 ctx.set_stdin(s);
219 }
220 _ => {}
221 }
222 }
223 Ok(())
224}
225
226#[derive(Clone)]
228pub struct PipelineRunner {
229 tools: Arc<ToolRegistry>,
230}
231
232impl PipelineRunner {
233 pub fn new(tools: Arc<ToolRegistry>) -> Self {
235 Self { tools }
236 }
237
238 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
248 pub async fn run(
249 &self,
250 commands: &[Command],
251 ctx: &mut ExecContext,
252 dispatcher: &dyn CommandDispatcher,
253 ) -> ExecResult {
254 if commands.is_empty() {
255 return ExecResult::success("");
256 }
257
258 if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
260 return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
261 }
262
263 self.run_sequential(commands, ctx, dispatcher).await
264 }
265
266 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
271 pub async fn run_sequential(
272 &self,
273 commands: &[Command],
274 ctx: &mut ExecContext,
275 dispatcher: &dyn CommandDispatcher,
276 ) -> ExecResult {
277 if commands.is_empty() {
278 return ExecResult::success("");
279 }
280
281 if commands.len() == 1 {
282 return self.run_single(&commands[0], ctx, None, dispatcher).await;
284 }
285
286 self.run_pipeline(commands, ctx, dispatcher).await
288 }
289
290 async fn run_scatter_gather(
292 &self,
293 commands: &[Command],
294 scatter_idx: usize,
295 gather_idx: usize,
296 ctx: &mut ExecContext,
297 dispatcher: &dyn CommandDispatcher,
298 ) -> ExecResult {
299 let pre_scatter = &commands[..scatter_idx];
301 let scatter_cmd = &commands[scatter_idx];
302 let parallel = &commands[scatter_idx + 1..gather_idx];
303 let gather_cmd = &commands[gather_idx];
304 let post_gather = &commands[gather_idx + 1..];
305
306 let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
309 let gather_schema = self.tools.get("gather").map(|t| t.schema());
310 let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
311 let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
312
313 let sequential_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
318
319 let runner = ScatterGatherRunner::new(self.tools.clone(), sequential_dispatcher);
320 runner
321 .run(
322 pre_scatter,
323 scatter_opts,
324 parallel,
325 gather_opts,
326 post_gather,
327 ctx,
328 )
329 .await
330 }
331
332 #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
337 async fn run_single(
338 &self,
339 cmd: &Command,
340 ctx: &mut ExecContext,
341 stdin: Option<String>,
342 dispatcher: &dyn CommandDispatcher,
343 ) -> ExecResult {
344 if let Err(e) = setup_stdin_redirects(cmd, ctx).await {
346 return ExecResult::failure(1, e);
347 }
348
349 if let Some(input) = stdin {
351 ctx.set_stdin(input);
352 }
353
354 ctx.pipeline_position = PipelinePosition::Only;
356
357 let result = match dispatcher.dispatch(cmd, ctx).await {
359 Ok(result) => result,
360 Err(e) => ExecResult::failure(1, e.to_string()),
361 };
362
363 apply_redirects(result, &cmd.redirects, ctx).await
365 }
366
367 #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(stage_count = commands.len()))]
377 async fn run_pipeline(
378 &self,
379 commands: &[Command],
380 ctx: &mut ExecContext,
381 dispatcher: &dyn CommandDispatcher,
382 ) -> ExecResult {
383 let stage_count = commands.len();
384 let last_idx = stage_count - 1;
385
386 let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
388 let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
389
390 for _ in 0..last_idx {
391 let (writer, reader) = pipe_stream_default();
392 pipe_writers.push(Some(writer));
393 pipe_readers.push(Some(reader));
394 }
395
396 let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
398 let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
399
400 for _ in 0..last_idx {
401 let (tx, rx) = tokio::sync::oneshot::channel();
402 data_senders.push(Some(tx));
403 data_receivers.push(Some(rx));
404 }
405
406 let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
407
408 for (i, cmd) in commands.iter().enumerate() {
409 let mut stage_ctx = ctx.child_for_pipeline();
410 let cmd = cmd.clone();
411
412 let task_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
417
418 let stdin_setup = setup_stdin_redirects(&cmd, &mut stage_ctx).await;
422
423 if i == 0 {
425 if stage_ctx.stdin.is_none() {
428 stage_ctx.stdin = ctx.stdin.take();
429 }
430 if stage_ctx.stdin_data.is_none() {
431 stage_ctx.stdin_data = ctx.stdin_data.take();
432 }
433 } else {
434 stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
436 }
438
439 if i < last_idx {
441 stage_ctx.pipe_stdout = pipe_writers[i].take();
442 }
443
444 stage_ctx.pipeline_position = match i {
446 0 => PipelinePosition::First,
447 n if n == last_idx => PipelinePosition::Last,
448 _ => PipelinePosition::Middle,
449 };
450
451 let data_sender = if i < last_idx { data_senders[i].take() } else { None };
452 let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
453
454 let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> =
457 tokio::spawn(crate::telemetry::bind_current_context(async move {
458 if let Err(e) = stdin_setup {
460 return (ExecResult::failure(1, e), stage_ctx);
461 }
462
463 if let Some(mut rx) = data_receiver {
470 if let Ok(data) = rx.try_recv() {
471 stage_ctx.stdin_data = data;
472 }
473 }
475
476 let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
478 Ok(result) => result,
479 Err(e) => ExecResult::failure(1, e.to_string()),
480 };
481
482 result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
484
485 if !result.err.is_empty() {
491 if let Some(ref stderr) = stage_ctx.stderr {
492 stderr.write_str(&result.err);
493 result.err.clear();
494 }
495 }
496
497 if let Some(tx) = data_sender {
503 let _ = tx.send(result.data.clone());
504 }
505
506 if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
509 let text = result.text_out();
510 if !text.is_empty() {
511 let _ = pipe_out.write_all(text.as_bytes()).await;
513 let _ = pipe_out.shutdown().await;
514 }
515 }
517
518 (result, stage_ctx)
519 }));
520
521 handles.push(handle);
522 }
523
524 let mut last_result = ExecResult::success("");
529 let mut panics: Vec<String> = Vec::new();
530 for (i, handle) in handles.into_iter().enumerate() {
531 match handle.await {
532 Ok((result, stage_ctx)) => {
533 if i == last_idx {
534 last_result = result;
535 ctx.scope = stage_ctx.scope;
537 ctx.cwd = stage_ctx.cwd;
538 ctx.prev_cwd = stage_ctx.prev_cwd;
539 ctx.aliases = stage_ctx.aliases;
540 }
541 }
542 Err(e) => {
543 panics.push(format!("stage {}: {}", i, e));
544 }
545 }
546 }
547
548 if !panics.is_empty() {
549 last_result = ExecResult::failure(
550 1,
551 format!("pipeline stage(s) panicked: {}", panics.join("; ")),
552 );
553 }
554
555 last_result
556 }
557}
558
559pub fn select_leaf<'a>(schema: &'a ToolSchema, args: &[Arg]) -> anyhow::Result<&'a ToolSchema> {
599 let root_lookup = schema_param_lookup(schema);
602 let is_root_value_flag = |name: &str| -> bool {
603 root_lookup.get(name).is_some_and(|(_, typ, _)| !is_bool_type(typ))
604 };
605
606 let mut node = schema;
607 let mut skip_next_positional = false;
608 for arg in args {
609 match arg {
610 Arg::DoubleDash => break,
612 Arg::LongFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
615 Arg::ShortFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
616 Arg::Positional(expr) => {
617 if skip_next_positional {
618 skip_next_positional = false;
619 continue; }
621 if node.subcommands.is_empty() {
622 break; }
624 match classify_subcommand_positional(expr) {
625 SubcommandWord::Word(word) => {
626 match node.subcommands.iter().find(|c| c.matches_command(word)) {
627 Some(child) => node = child, None => break, }
630 }
631 SubcommandWord::OtherLiteral => break,
635 SubcommandWord::Computed(kind) => anyhow::bail!(
636 "{}: a subcommand name is required here, but got {kind}. \
637 Subcommands must be literal words — spell it out \
638 (e.g. `{} <subcommand> …`) or use the `--flag=value` form.",
639 node.name,
640 schema.name
641 ),
642 }
643 }
644 _ => {}
646 }
647 }
648 Ok(node)
649}
650
651enum SubcommandWord<'a> {
653 Word(&'a str),
655 OtherLiteral,
657 Computed(&'static str),
659}
660
661fn classify_subcommand_positional(expr: &Expr) -> SubcommandWord<'_> {
662 match expr {
663 Expr::Literal(Value::String(s)) => SubcommandWord::Word(s),
664 Expr::Literal(_) => SubcommandWord::OtherLiteral,
665 Expr::CommandSubst(_) | Expr::Command(_) => SubcommandWord::Computed("a command substitution `$(…)`"),
666 Expr::VarRef(_)
667 | Expr::VarWithDefault { .. }
668 | Expr::VarLength(_)
669 | Expr::Positional(_)
670 | Expr::AllArgs
671 | Expr::ArgCount
672 | Expr::CurrentPid
673 | Expr::LastExitCode => SubcommandWord::Computed("a variable reference"),
674 Expr::Interpolated(_) | Expr::HereDocBody { .. } => SubcommandWord::Computed("an interpolated string"),
675 Expr::GlobPattern(_) => SubcommandWord::Computed("a glob pattern"),
676 Expr::Arithmetic(_) => SubcommandWord::Computed("an arithmetic expansion"),
677 _ => SubcommandWord::Computed("a value computed at runtime"),
678 }
679}
680
681pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str, usize)> {
682 let mut map = HashMap::new();
683 for p in schema.params.iter().filter(|p| !p.positional) {
684 map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
685 for alias in &p.aliases {
686 let stripped = alias.trim_start_matches('-');
687 map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
688 }
689 }
690 map
691}
692
693pub fn is_bool_type(param_type: &str) -> bool {
695 matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
696}
697
698pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
706 let mut tool_args = ToolArgs::new();
707 let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
708 let accepts_word_assign = schema
709 .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
710 .unwrap_or(false);
711
712 let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
714 let mut past_double_dash = false;
715
716 let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
718 for (i, arg) in args.iter().enumerate() {
719 if let Arg::Positional(expr) = arg {
720 positional_indices.push((i, expr));
721 }
722 }
723
724 let mut i = 0;
726 while i < args.len() {
727 let arg = &args[i];
728
729 match arg {
730 Arg::DoubleDash => {
731 past_double_dash = true;
732 }
733 Arg::Positional(expr) => {
734 if !consumed_positionals.contains(&i)
736 && let Some(value) = eval_simple_expr(expr, ctx)
737 {
738 tool_args.positional.push(value);
739 }
740 }
741 Arg::Named { key, value } => {
742 if let Some(val) = eval_simple_expr(value, ctx) {
743 tool_args.named.insert(key.clone(), val);
744 }
745 }
746 Arg::WordAssign { key, value } => {
747 if let Some(val) = eval_simple_expr(value, ctx) {
748 if accepts_word_assign {
749 tool_args.named.insert(key.clone(), val);
750 } else {
751 let val_str = crate::interpreter::value_to_string(&val);
752 tool_args.positional.push(Value::String(format!("{key}={val_str}")));
753 }
754 }
755 }
756 Arg::ShortFlag(name) => {
757 if past_double_dash {
758 tool_args.positional.push(Value::String(format!("-{name}")));
759 } else if name.len() == 1 {
760 let flag_name = name.as_str();
763 let lookup = param_lookup.get(flag_name);
764 let is_bool = lookup
765 .map(|(_, typ, _)| is_bool_type(typ))
766 .unwrap_or(true);
767
768 if is_bool {
769 tool_args.flags.insert(flag_name.to_string());
770 } else {
771 let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
773 let next_positional = positional_indices
774 .iter()
775 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
776
777 if let Some((pos_idx, expr)) = next_positional {
778 if let Some(value) = eval_simple_expr(expr, ctx) {
779 tool_args.named.insert(canonical.to_string(), value);
780 consumed_positionals.insert(*pos_idx);
781 } else {
782 tool_args.flags.insert(flag_name.to_string());
783 }
784 } else {
785 tool_args.flags.insert(flag_name.to_string());
786 }
787 }
788 } else if let Some(&(canonical, typ, _)) = param_lookup.get(name.as_str()) {
789 if is_bool_type(typ) {
791 tool_args.flags.insert(canonical.to_string());
792 } else {
793 let next_positional = positional_indices
794 .iter()
795 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
796 if let Some((pos_idx, expr)) = next_positional {
797 if let Some(value) = eval_simple_expr(expr, ctx) {
798 tool_args.named.insert(canonical.to_string(), value);
799 consumed_positionals.insert(*pos_idx);
800 } else {
801 tool_args.flags.insert(name.clone());
802 }
803 } else {
804 tool_args.flags.insert(name.clone());
805 }
806 }
807 } else {
808 for c in name.chars() {
810 tool_args.flags.insert(c.to_string());
811 }
812 }
813 }
814 Arg::LongFlag(name) => {
815 if past_double_dash {
816 tool_args.positional.push(Value::String(format!("--{name}")));
817 } else {
818 let lookup = param_lookup.get(name.as_str());
820 let is_bool = lookup
821 .map(|(_, typ, _)| is_bool_type(typ))
822 .unwrap_or(true); if is_bool {
825 tool_args.flags.insert(name.clone());
826 } else {
827 let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
833 let next_positional = positional_indices
834 .iter()
835 .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
836
837 if let Some((pos_idx, expr)) = next_positional {
838 if let Some(value) = eval_simple_expr(expr, ctx) {
839 tool_args.named.insert(canonical.to_string(), value);
840 consumed_positionals.insert(*pos_idx);
841 } else {
842 tool_args.flags.insert(name.clone());
843 }
844 } else {
845 tool_args.flags.insert(name.clone());
846 }
847 }
848 }
849 }
850 }
851 i += 1;
852 }
853
854 if let Some(schema) = schema.filter(|s| s.map_positionals) {
859 let pre_dash_count = if past_double_dash {
861 let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
863 positional_indices.iter()
865 .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
866 .count()
867 } else {
868 tool_args.positional.len()
869 };
870
871 let mut remaining = Vec::new();
872 let mut positional_iter = tool_args.positional.drain(..).enumerate();
873
874 for param in &schema.params {
875 if tool_args.named.contains_key(¶m.name) || tool_args.flags.contains(¶m.name) {
876 continue; }
878 if is_bool_type(¶m.param_type) {
879 continue; }
881 loop {
883 match positional_iter.next() {
884 Some((idx, val)) if idx < pre_dash_count => {
885 tool_args.named.insert(param.name.clone(), val);
886 break;
887 }
888 Some((_, val)) => {
889 remaining.push(val); }
891 None => break,
892 }
893 }
894 }
895
896 remaining.extend(positional_iter.map(|(_, v)| v));
898 tool_args.positional = remaining;
899 }
900
901 tool_args
902}
903
904pub(crate) fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
906 match expr {
907 Expr::Literal(value) => Some(eval_literal(value, ctx)),
908 Expr::VarRef(path) => ctx.scope.resolve_path(path),
909 Expr::Interpolated(parts) => {
910 let mut result = String::new();
911 for part in parts {
912 match part {
913 crate::ast::StringPart::Literal(s) => result.push_str(s),
914 crate::ast::StringPart::Var(path) => {
915 if let Some(value) = ctx.scope.resolve_path(path) {
916 result.push_str(&value_to_string(&value));
917 }
918 }
919 crate::ast::StringPart::VarWithDefault { name, default } => {
920 match ctx.scope.get(name) {
921 Some(value) => {
922 let s = value_to_string(value);
923 if s.is_empty() {
924 result.push_str(&eval_string_parts_sync(default, ctx));
925 } else {
926 result.push_str(&s);
927 }
928 }
929 None => result.push_str(&eval_string_parts_sync(default, ctx)),
930 }
931 }
932 crate::ast::StringPart::VarLength(name) => {
933 let len = match ctx.scope.get(name) {
934 Some(value) => value_to_string(value).len(),
935 None => 0,
936 };
937 result.push_str(&len.to_string());
938 }
939 crate::ast::StringPart::Positional(n) => {
940 if let Some(s) = ctx.scope.get_positional(*n) {
941 result.push_str(s);
942 }
943 }
944 crate::ast::StringPart::AllArgs => {
945 result.push_str(&ctx.scope.all_args().join(" "));
946 }
947 crate::ast::StringPart::ArgCount => {
948 result.push_str(&ctx.scope.arg_count().to_string());
949 }
950 crate::ast::StringPart::Arithmetic(expr) => {
951 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
953 result.push_str(&value.to_string());
954 }
955 }
956 crate::ast::StringPart::CommandSubst(_) => {
957 }
959 crate::ast::StringPart::LastExitCode => {
960 result.push_str(&ctx.scope.last_result().code.to_string());
961 }
962 crate::ast::StringPart::CurrentPid => {
963 result.push_str(&ctx.scope.pid().to_string());
964 }
965 }
966 }
967 Some(Value::String(result))
968 }
969 Expr::GlobPattern(s) => Some(Value::String(s.clone())),
970 Expr::HereDocBody { parts, strip_tabs } => {
971 let unwrapped: Vec<crate::ast::StringPart> =
975 parts.iter().map(|sp| sp.part.clone()).collect();
976 let raw = eval_string_parts_sync(&unwrapped, ctx);
977 let body = if *strip_tabs {
978 crate::interpreter::strip_leading_tabs(&raw)
979 } else {
980 raw
981 };
982 Some(Value::String(body))
983 }
984 _ => None, }
986}
987
988fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
990 value.clone()
991}
992
993fn value_to_string(value: &Value) -> String {
995 match value {
996 Value::Null => "".to_string(),
997 Value::Bool(b) => b.to_string(),
998 Value::Int(i) => i.to_string(),
999 Value::Float(f) => f.to_string(),
1000 Value::String(s) => s.clone(),
1001 Value::Json(json) => json.to_string(),
1002 Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
1003 }
1004}
1005
1006fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
1009 let mut result = String::new();
1010 for part in parts {
1011 match part {
1012 crate::ast::StringPart::Literal(s) => result.push_str(s),
1013 crate::ast::StringPart::Var(path) => {
1014 if let Some(value) = ctx.scope.resolve_path(path) {
1015 result.push_str(&value_to_string(&value));
1016 }
1017 }
1018 crate::ast::StringPart::VarWithDefault { name, default } => {
1019 match ctx.scope.get(name) {
1020 Some(value) => {
1021 let s = value_to_string(value);
1022 if s.is_empty() {
1023 result.push_str(&eval_string_parts_sync(default, ctx));
1024 } else {
1025 result.push_str(&s);
1026 }
1027 }
1028 None => result.push_str(&eval_string_parts_sync(default, ctx)),
1029 }
1030 }
1031 crate::ast::StringPart::VarLength(name) => {
1032 let len = match ctx.scope.get(name) {
1033 Some(value) => value_to_string(value).len(),
1034 None => 0,
1035 };
1036 result.push_str(&len.to_string());
1037 }
1038 crate::ast::StringPart::Positional(n) => {
1039 if let Some(s) = ctx.scope.get_positional(*n) {
1040 result.push_str(s);
1041 }
1042 }
1043 crate::ast::StringPart::AllArgs => {
1044 result.push_str(&ctx.scope.all_args().join(" "));
1045 }
1046 crate::ast::StringPart::ArgCount => {
1047 result.push_str(&ctx.scope.arg_count().to_string());
1048 }
1049 crate::ast::StringPart::Arithmetic(expr) => {
1050 if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
1051 result.push_str(&value.to_string());
1052 }
1053 }
1054 crate::ast::StringPart::CommandSubst(_) => {
1055 }
1057 crate::ast::StringPart::LastExitCode => {
1058 result.push_str(&ctx.scope.last_result().code.to_string());
1059 }
1060 crate::ast::StringPart::CurrentPid => {
1061 result.push_str(&ctx.scope.pid().to_string());
1062 }
1063 }
1064 }
1065 result
1066}
1067
1068fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
1073 let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
1074 let gather_idx = commands.iter().position(|c| c.name == "gather")?;
1075
1076 if gather_idx > scatter_idx {
1078 Some((scatter_idx, gather_idx))
1079 } else {
1080 None
1081 }
1082}
1083
1084#[cfg(test)]
1085mod select_leaf_tests {
1086 use super::*;
1087 use crate::tools::ParamSchema;
1088
1089 fn kj_schema() -> ToolSchema {
1094 ToolSchema::new("kj", "kaijutsu")
1095 .param(ParamSchema::new("confirm", "string"))
1096 .param(ParamSchema::new("verbose", "bool"))
1097 .subcommand(
1098 ToolSchema::new("context", "context ops")
1099 .with_command_aliases(["ctx"])
1100 .subcommand(ToolSchema::new("list", "list").with_command_aliases(["ls"]))
1101 .subcommand(
1102 ToolSchema::new("create", "create").param(
1103 ParamSchema::new("type", "string").with_aliases(["t"]),
1104 ),
1105 ),
1106 )
1107 }
1108
1109 fn word(s: &str) -> Arg {
1110 Arg::Positional(Expr::Literal(Value::String(s.to_string())))
1111 }
1112
1113 #[test]
1114 fn flat_tool_returns_root() {
1115 let schema = ToolSchema::new("cat", "concat")
1116 .param(ParamSchema::required("path", "string", "f").positional());
1117 let leaf = select_leaf(&schema, &[word("foo.txt")]).expect("flat ok");
1118 assert_eq!(leaf.name, "cat");
1119 }
1120
1121 #[test]
1122 fn single_hop() {
1123 let schema = kj_schema();
1124 let leaf = select_leaf(&schema, &[word("context")]).expect("ok");
1125 assert_eq!(leaf.name, "context");
1126 }
1127
1128 #[test]
1129 fn two_hops() {
1130 let schema = kj_schema();
1131 let leaf = select_leaf(&schema, &[word("context"), word("create")]).expect("ok");
1132 assert_eq!(leaf.name, "create");
1133 assert!(leaf.params.iter().any(|p| p.name == "type"), "leaf has --type");
1134 }
1135
1136 #[test]
1137 fn alias_hops_route() {
1138 let schema = kj_schema();
1139 let leaf = select_leaf(&schema, &[word("ctx"), word("ls")]).expect("ok");
1141 assert_eq!(leaf.name, "list");
1142 }
1143
1144 #[test]
1145 fn unknown_subcommand_stops_at_current_node() {
1146 let schema = kj_schema();
1147 let leaf = select_leaf(&schema, &[word("context"), word("nonesuch")]).expect("ok");
1150 assert_eq!(leaf.name, "context");
1151 }
1152
1153 #[test]
1154 fn root_bool_flag_before_path_does_not_disrupt_routing() {
1155 let schema = kj_schema();
1156 let args = vec![Arg::LongFlag("verbose".into()), word("context"), word("create")];
1159 let leaf = select_leaf(&schema, &args).expect("ok");
1160 assert_eq!(leaf.name, "create");
1161 }
1162
1163 #[test]
1164 fn root_value_flag_space_form_before_path_skips_its_value() {
1165 let schema = kj_schema();
1166 let args = vec![
1169 Arg::LongFlag("confirm".into()),
1170 word("nonce"),
1171 word("context"),
1172 word("create"),
1173 ];
1174 let leaf = select_leaf(&schema, &args).expect("ok");
1175 assert_eq!(leaf.name, "create");
1176 }
1177
1178 #[test]
1179 fn leaf_value_flag_after_path_routes_to_leaf() {
1180 let schema = kj_schema();
1181 let args = vec![
1184 word("context"),
1185 word("create"),
1186 Arg::LongFlag("type".into()),
1187 word("x"),
1188 ];
1189 let leaf = select_leaf(&schema, &args).expect("ok");
1190 assert_eq!(leaf.name, "create");
1191 assert!(leaf.params.iter().any(|p| p.name == "type"));
1192 }
1193
1194 #[test]
1195 fn double_dash_stops_routing() {
1196 let schema = kj_schema();
1197 let leaf = select_leaf(&schema, &[Arg::DoubleDash, word("context")]).expect("ok");
1199 assert_eq!(leaf.name, "kj");
1200 }
1201
1202 #[test]
1203 fn computed_subcommand_selector_errors() {
1204 let schema = kj_schema();
1205 let args = vec![Arg::Positional(Expr::CommandSubst(vec![
1208 crate::ast::Stmt::Command(crate::ast::Command {
1209 name: "echo".into(),
1210 args: vec![],
1211 redirects: vec![],
1212 }),
1213 ]))];
1214 let err = select_leaf(&schema, &args).expect_err("must error");
1215 let msg = err.to_string();
1216 assert!(msg.contains("subcommand name is required"), "got: {msg}");
1217 assert!(msg.contains("command substitution"), "names the cause: {msg}");
1218 }
1219
1220 #[test]
1221 fn variable_subcommand_selector_errors() {
1222 let schema = kj_schema();
1223 let args = vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("sub")))];
1224 let err = select_leaf(&schema, &args).expect_err("must error");
1225 assert!(err.to_string().contains("variable reference"), "got: {err}");
1226 }
1227
1228 #[test]
1229 fn computed_positional_after_leaf_is_fine() {
1230 let schema = kj_schema();
1231 let args = vec![
1234 word("context"),
1235 word("list"),
1236 Arg::Positional(Expr::CommandSubst(vec![crate::ast::Stmt::Command(
1237 crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
1238 )])),
1239 ];
1240 let leaf = select_leaf(&schema, &args).expect("ok");
1241 assert_eq!(leaf.name, "list");
1242 }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247 use super::*;
1248 use crate::dispatch::BackendDispatcher;
1249 use crate::tools::register_builtins;
1250 use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
1251 use std::path::Path;
1252
1253 async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
1254 let mut tools = ToolRegistry::new();
1255 register_builtins(&mut tools);
1256 let tools = Arc::new(tools);
1257 let runner = PipelineRunner::new(tools.clone());
1258 let dispatcher = BackendDispatcher::new(tools.clone());
1259
1260 let mut vfs = VfsRouter::new();
1261 let mem = MemoryFs::new();
1262 mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
1263 vfs.mount("/", mem);
1264 let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
1265
1266 (runner, ctx, dispatcher)
1267 }
1268
1269 fn make_cmd(name: &str, args: Vec<&str>) -> Command {
1270 Command {
1271 name: name.to_string(),
1272 args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
1273 redirects: vec![],
1274 }
1275 }
1276
1277 #[tokio::test]
1278 async fn test_single_command() {
1279 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1280 let cmd = make_cmd("echo", vec!["hello"]);
1281
1282 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1283 assert!(result.ok());
1284 assert_eq!(result.text_out().trim(), "hello");
1285 }
1286
1287 #[tokio::test]
1288 async fn test_pipeline_echo_grep() {
1289 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1290
1291 let echo_cmd = Command {
1293 name: "echo".to_string(),
1294 args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
1295 redirects: vec![],
1296 };
1297 let grep_cmd = Command {
1298 name: "grep".to_string(),
1299 args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
1300 redirects: vec![],
1301 };
1302
1303 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1304 assert!(result.ok());
1305 assert_eq!(result.text_out().trim(), "world");
1306 }
1307
1308 #[tokio::test]
1309 async fn test_pipeline_cat_grep() {
1310 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1311
1312 let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
1314 let grep_cmd = Command {
1315 name: "grep".to_string(),
1316 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1317 redirects: vec![],
1318 };
1319
1320 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1321 assert!(result.ok());
1322 assert!(result.text_out().contains("hello"));
1323 }
1324
1325 #[tokio::test]
1326 async fn test_command_not_found() {
1327 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1328 let cmd = make_cmd("nonexistent", vec![]);
1329
1330 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1331 assert!(!result.ok());
1332 assert_eq!(result.code, 127);
1333 assert!(result.err.contains("not found"));
1334 }
1335
1336 #[tokio::test]
1337 async fn test_pipeline_continues_on_failure() {
1338 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1341
1342 let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
1345 let grep_cmd = Command {
1346 name: "grep".to_string(),
1347 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1348 redirects: vec![],
1349 };
1350
1351 let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1352 assert!(!result.ok());
1354 }
1355
1356 #[tokio::test]
1357 async fn test_pipeline_last_command_exit_code() {
1358 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1360
1361 let echo_cmd = make_cmd("echo", vec!["hello"]);
1362 let cat_cmd = make_cmd("cat", vec![]);
1363
1364 let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1365 assert!(result.ok());
1366 assert!(result.text_out().contains("hello"));
1367 }
1368
1369 #[tokio::test]
1370 async fn test_empty_pipeline() {
1371 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1372 let result = runner.run(&[], &mut ctx, &dispatcher).await;
1373 assert!(result.ok());
1374 }
1375
1376 #[test]
1379 fn test_find_scatter_gather_both_present() {
1380 let commands = vec![
1381 make_cmd("echo", vec!["a"]),
1382 make_cmd("scatter", vec![]),
1383 make_cmd("process", vec![]),
1384 make_cmd("gather", vec![]),
1385 ];
1386 let result = find_scatter_gather(&commands);
1387 assert_eq!(result, Some((1, 3)));
1388 }
1389
1390 #[test]
1391 fn test_find_scatter_gather_no_scatter() {
1392 let commands = vec![
1393 make_cmd("echo", vec!["a"]),
1394 make_cmd("gather", vec![]),
1395 ];
1396 let result = find_scatter_gather(&commands);
1397 assert!(result.is_none());
1398 }
1399
1400 #[test]
1401 fn test_find_scatter_gather_no_gather() {
1402 let commands = vec![
1403 make_cmd("echo", vec!["a"]),
1404 make_cmd("scatter", vec![]),
1405 ];
1406 let result = find_scatter_gather(&commands);
1407 assert!(result.is_none());
1408 }
1409
1410 #[test]
1411 fn test_find_scatter_gather_wrong_order() {
1412 let commands = vec![
1413 make_cmd("gather", vec![]),
1414 make_cmd("scatter", vec![]),
1415 ];
1416 let result = find_scatter_gather(&commands);
1417 assert!(result.is_none());
1418 }
1419
1420 #[tokio::test]
1421 async fn test_scatter_gather_simple() {
1422 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1423
1424 let split_cmd = Command {
1426 name: "split".to_string(),
1427 args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1428 redirects: vec![],
1429 };
1430 let scatter_cmd = make_cmd("scatter", vec![]);
1431 let process_cmd = Command {
1432 name: "echo".to_string(),
1433 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1434 redirects: vec![],
1435 };
1436 let gather_cmd = make_cmd("gather", vec![]);
1437
1438 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1439 assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1440 assert!(result.text_out().contains("a"));
1442 assert!(result.text_out().contains("b"));
1443 assert!(result.text_out().contains("c"));
1444 }
1445
1446 #[tokio::test]
1447 async fn test_scatter_gather_empty_input() {
1448 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1449
1450 let echo_cmd = Command {
1452 name: "echo".to_string(),
1453 args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1454 redirects: vec![],
1455 };
1456 let scatter_cmd = make_cmd("scatter", vec![]);
1457 let process_cmd = Command {
1458 name: "echo".to_string(),
1459 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1460 redirects: vec![],
1461 };
1462 let gather_cmd = make_cmd("gather", vec![]);
1463
1464 let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1465 assert!(result.ok());
1466 assert!(result.text_out().trim().is_empty());
1467 }
1468
1469 #[tokio::test]
1470 async fn test_scatter_gather_with_structured_stdin() {
1471 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1472
1473 let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1475 ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1476
1477 let scatter_cmd = make_cmd("scatter", vec![]);
1478 let process_cmd = Command {
1479 name: "echo".to_string(),
1480 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1481 redirects: vec![],
1482 };
1483 let gather_cmd = make_cmd("gather", vec![]);
1484
1485 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1486 assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1487 assert!(result.text_out().contains("x"));
1488 assert!(result.text_out().contains("y"));
1489 assert!(result.text_out().contains("z"));
1490 }
1491
1492 #[tokio::test]
1493 async fn test_scatter_gather_json_input() {
1494 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1495
1496 let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1498 ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1499
1500 let scatter_cmd = make_cmd("scatter", vec![]);
1501 let process_cmd = Command {
1502 name: "echo".to_string(),
1503 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1504 redirects: vec![],
1505 };
1506 let gather_cmd = make_cmd("gather", vec![]);
1507
1508 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1509 assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1510 assert!(result.text_out().contains("one"));
1511 assert!(result.text_out().contains("two"));
1512 assert!(result.text_out().contains("three"));
1513 }
1514
1515 #[tokio::test]
1516 async fn test_scatter_gather_with_post_gather() {
1517 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1518
1519 let split_cmd = Command {
1521 name: "split".to_string(),
1522 args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1523 redirects: vec![],
1524 };
1525 let scatter_cmd = make_cmd("scatter", vec![]);
1526 let process_cmd = Command {
1527 name: "echo".to_string(),
1528 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1529 redirects: vec![],
1530 };
1531 let gather_cmd = make_cmd("gather", vec![]);
1532 let grep_cmd = Command {
1533 name: "grep".to_string(),
1534 args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1535 redirects: vec![],
1536 };
1537
1538 let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1539 assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1540 assert!(result.text_out().contains("a"));
1541 assert!(!result.text_out().contains("b"));
1542 }
1543
1544 #[tokio::test]
1545 async fn test_scatter_custom_var_name() {
1546 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1547
1548 let data = Value::Json(serde_json::json!(["test1", "test2"]));
1550 ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1551
1552 let scatter_cmd = Command {
1554 name: "scatter".to_string(),
1555 args: vec![Arg::Named {
1556 key: "as".to_string(),
1557 value: Expr::Literal(Value::String("URL".to_string())),
1558 }],
1559 redirects: vec![],
1560 };
1561 let process_cmd = Command {
1562 name: "echo".to_string(),
1563 args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1564 redirects: vec![],
1565 };
1566 let gather_cmd = make_cmd("gather", vec![]);
1567
1568 let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1569 assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1570 assert!(result.text_out().contains("test1"));
1571 assert!(result.text_out().contains("test2"));
1572 }
1573
1574 #[tokio::test]
1577 async fn test_pipeline_routes_through_backend() {
1578 use crate::backend::testing::MockBackend;
1579 use std::sync::atomic::Ordering;
1580
1581 let (backend, call_count) = MockBackend::new();
1583 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1584
1585 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1587
1588 let tools = std::sync::Arc::new(ToolRegistry::new());
1590 let runner = PipelineRunner::new(tools.clone());
1591 let dispatcher = BackendDispatcher::new(tools);
1592
1593 let cmd = make_cmd("test-tool", vec!["arg1"]);
1595 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1596
1597 assert!(result.ok(), "Mock backend should return success");
1598 assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1599 assert!(result.text_out().contains("mock executed"), "Output should be from mock backend");
1600 }
1601
1602 #[tokio::test]
1603 async fn test_multi_command_pipeline_routes_through_backend() {
1604 use crate::backend::testing::MockBackend;
1605 use std::sync::atomic::Ordering;
1606
1607 let (backend, call_count) = MockBackend::new();
1608 let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1609 let mut ctx = crate::tools::ExecContext::with_backend(backend);
1610
1611 let tools = std::sync::Arc::new(ToolRegistry::new());
1612 let runner = PipelineRunner::new(tools.clone());
1613 let dispatcher = BackendDispatcher::new(tools);
1614
1615 let cmd1 = make_cmd("tool1", vec![]);
1617 let cmd2 = make_cmd("tool2", vec![]);
1618 let cmd3 = make_cmd("tool3", vec![]);
1619
1620 let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1621
1622 assert!(result.ok());
1623 assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1624 }
1625
1626 use crate::tools::{ParamSchema, ToolSchema};
1629
1630 fn make_test_schema() -> ToolSchema {
1631 ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1632 .param(ParamSchema::required("query", "string", "Search query"))
1633 .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1634 .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1635 .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1636 .with_positional_mapping()
1637 }
1638
1639 fn make_minimal_ctx() -> ExecContext {
1640 let mut vfs = VfsRouter::new();
1641 vfs.mount("/", MemoryFs::new());
1642 ExecContext::new(Arc::new(vfs))
1643 }
1644
1645 #[test]
1646 fn test_schema_aware_string_arg() {
1647 let args = vec![
1649 Arg::LongFlag("query".to_string()),
1650 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1651 ];
1652 let schema = make_test_schema();
1653 let ctx = make_minimal_ctx();
1654
1655 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1656
1657 assert!(tool_args.flags.is_empty(), "No flags should be set");
1658 assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1659 assert_eq!(
1660 tool_args.named.get("query"),
1661 Some(&Value::String("test".to_string())),
1662 "--query should consume 'test' as its value"
1663 );
1664 }
1665
1666 #[test]
1667 fn test_schema_aware_bool_flag() {
1668 let args = vec![
1670 Arg::LongFlag("verbose".to_string()),
1671 ];
1672 let schema = make_test_schema();
1673 let ctx = make_minimal_ctx();
1674
1675 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1676
1677 assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1678 assert!(tool_args.named.is_empty(), "No named args");
1679 assert!(tool_args.positional.is_empty(), "No positionals");
1680 }
1681
1682 #[test]
1683 fn test_schema_aware_mixed() {
1684 let args = vec![
1687 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1688 Arg::LongFlag("output".to_string()),
1689 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1690 Arg::LongFlag("verbose".to_string()),
1691 ];
1692 let schema = make_test_schema();
1693 let ctx = make_minimal_ctx();
1694
1695 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1696
1697 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1698 assert_eq!(
1699 tool_args.named.get("query"),
1700 Some(&Value::String("file.txt".to_string()))
1701 );
1702 assert_eq!(
1703 tool_args.named.get("output"),
1704 Some(&Value::String("out.txt".to_string()))
1705 );
1706 assert!(tool_args.flags.contains("verbose"));
1707 }
1708
1709 #[test]
1710 fn test_schema_aware_multiple_string_args() {
1711 let args = vec![
1713 Arg::LongFlag("query".to_string()),
1714 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1715 Arg::LongFlag("output".to_string()),
1716 Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1717 Arg::LongFlag("verbose".to_string()),
1718 Arg::LongFlag("limit".to_string()),
1719 Arg::Positional(Expr::Literal(Value::Int(5))),
1720 ];
1721 let schema = make_test_schema();
1722 let ctx = make_minimal_ctx();
1723
1724 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1725
1726 assert!(tool_args.positional.is_empty(), "All positionals consumed");
1727 assert_eq!(
1728 tool_args.named.get("query"),
1729 Some(&Value::String("test".to_string()))
1730 );
1731 assert_eq!(
1732 tool_args.named.get("output"),
1733 Some(&Value::String("result.json".to_string()))
1734 );
1735 assert_eq!(
1736 tool_args.named.get("limit"),
1737 Some(&Value::Int(5))
1738 );
1739 assert!(tool_args.flags.contains("verbose"));
1740 }
1741
1742 #[test]
1743 fn test_schema_aware_double_dash() {
1744 let args = vec![
1747 Arg::LongFlag("output".to_string()),
1748 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1749 Arg::DoubleDash,
1750 Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1751 ];
1752 let schema = make_test_schema();
1753 let ctx = make_minimal_ctx();
1754
1755 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1756
1757 assert_eq!(
1758 tool_args.named.get("output"),
1759 Some(&Value::String("out.txt".to_string()))
1760 );
1761 assert_eq!(
1763 tool_args.positional,
1764 vec![Value::String("--this-is-data".to_string())]
1765 );
1766 }
1767
1768 #[test]
1769 fn test_no_schema_fallback() {
1770 let args = vec![
1772 Arg::LongFlag("query".to_string()),
1773 Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1774 ];
1775 let ctx = make_minimal_ctx();
1776
1777 let tool_args = build_tool_args(&args, &ctx, None);
1778
1779 assert!(tool_args.flags.contains("query"), "--query should be a flag");
1781 assert_eq!(
1782 tool_args.positional,
1783 vec![Value::String("test".to_string())],
1784 "'test' should be a positional"
1785 );
1786 }
1787
1788 #[test]
1789 fn test_unknown_flag_in_schema() {
1790 let args = vec![
1792 Arg::LongFlag("unknown".to_string()),
1793 Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1794 ];
1795 let schema = make_test_schema();
1796 let ctx = make_minimal_ctx();
1797
1798 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1799
1800 assert!(tool_args.flags.contains("unknown"));
1801 assert!(tool_args.positional.is_empty(), "value consumed as query param");
1802 assert_eq!(
1803 tool_args.named.get("query"),
1804 Some(&Value::String("value".to_string()))
1805 );
1806 }
1807
1808 #[test]
1809 fn test_named_args_unchanged() {
1810 let args = vec![
1812 Arg::Named {
1813 key: "query".to_string(),
1814 value: Expr::Literal(Value::String("test".to_string())),
1815 },
1816 Arg::LongFlag("verbose".to_string()),
1817 ];
1818 let schema = make_test_schema();
1819 let ctx = make_minimal_ctx();
1820
1821 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1822
1823 assert_eq!(
1824 tool_args.named.get("query"),
1825 Some(&Value::String("test".to_string()))
1826 );
1827 assert!(tool_args.flags.contains("verbose"));
1828 }
1829
1830 #[test]
1831 fn test_short_flags_unchanged() {
1832 let args = vec![
1834 Arg::ShortFlag("la".to_string()),
1835 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1836 ];
1837 let schema = make_test_schema();
1838 let ctx = make_minimal_ctx();
1839
1840 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1841
1842 assert!(tool_args.flags.contains("l"));
1843 assert!(tool_args.flags.contains("a"));
1844 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1845 assert_eq!(
1846 tool_args.named.get("query"),
1847 Some(&Value::String("file.txt".to_string()))
1848 );
1849 }
1850
1851 #[test]
1852 fn test_flag_at_end_no_value() {
1853 let args = vec![
1856 Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1857 Arg::LongFlag("output".to_string()),
1858 ];
1859 let schema = make_test_schema();
1860 let ctx = make_minimal_ctx();
1861
1862 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1863
1864 assert!(tool_args.flags.contains("output"));
1866 assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1867 assert_eq!(
1868 tool_args.named.get("query"),
1869 Some(&Value::String("file.txt".to_string()))
1870 );
1871 }
1872
1873 #[test]
1874 fn test_positional_skips_bool_params() {
1875 let schema = ToolSchema::new("test", "")
1879 .param(ParamSchema::required("query", "string", ""))
1880 .param(ParamSchema::optional(
1881 "verbose",
1882 "bool",
1883 Value::Bool(false),
1884 "",
1885 ))
1886 .param(ParamSchema::optional(
1887 "output",
1888 "string",
1889 Value::Null,
1890 "",
1891 ))
1892 .with_positional_mapping();
1893 let args = vec![
1894 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1895 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1896 ];
1897 let ctx = make_minimal_ctx();
1898
1899 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1900
1901 assert_eq!(
1902 tool_args.named.get("query"),
1903 Some(&Value::String("val1".to_string()))
1904 );
1905 assert_eq!(
1906 tool_args.named.get("output"),
1907 Some(&Value::String("val2".to_string()))
1908 );
1909 assert!(!tool_args.flags.contains("verbose"));
1910 assert!(tool_args.positional.is_empty());
1911 }
1912
1913 #[test]
1914 fn test_positionals_fill_available_slots() {
1915 let args = vec![
1918 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1919 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1920 Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1921 ];
1922 let schema = make_test_schema(); let ctx = make_minimal_ctx();
1924
1925 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1926
1927 assert_eq!(
1930 tool_args.named.get("query"),
1931 Some(&Value::String("val1".to_string()))
1932 );
1933 assert_eq!(
1934 tool_args.named.get("limit"),
1935 Some(&Value::String("val2".to_string()))
1936 );
1937 assert_eq!(
1938 tool_args.named.get("output"),
1939 Some(&Value::String("val3".to_string()))
1940 );
1941 assert!(tool_args.positional.is_empty());
1942 }
1943
1944 #[test]
1945 fn test_truly_excess_positionals() {
1946 let schema = ToolSchema::new("test", "")
1948 .param(ParamSchema::required("name", "string", ""))
1949 .with_positional_mapping();
1950 let args = vec![
1951 Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1952 Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1953 Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1954 ];
1955 let ctx = make_minimal_ctx();
1956
1957 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1958
1959 assert_eq!(
1960 tool_args.named.get("name"),
1961 Some(&Value::String("first".to_string()))
1962 );
1963 assert_eq!(
1964 tool_args.positional,
1965 vec![
1966 Value::String("second".to_string()),
1967 Value::String("third".to_string()),
1968 ]
1969 );
1970 }
1971
1972 #[test]
1973 fn test_double_dash_positional_not_mapped() {
1974 let args = vec![
1976 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1977 Arg::DoubleDash,
1978 Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1979 ];
1980 let schema = make_test_schema();
1981 let ctx = make_minimal_ctx();
1982
1983 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1984
1985 assert_eq!(
1986 tool_args.named.get("query"),
1987 Some(&Value::String("val1".to_string()))
1988 );
1989 assert_eq!(
1991 tool_args.positional,
1992 vec![Value::String("val2".to_string())]
1993 );
1994 }
1995
1996 #[test]
1997 fn test_all_params_filled_by_flags() {
1998 let args = vec![
2000 Arg::LongFlag("query".to_string()),
2001 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2002 Arg::LongFlag("output".to_string()),
2003 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2004 Arg::LongFlag("verbose".to_string()),
2005 ];
2006 let schema = make_test_schema();
2007 let ctx = make_minimal_ctx();
2008
2009 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2010
2011 assert_eq!(
2012 tool_args.named.get("query"),
2013 Some(&Value::String("search".to_string()))
2014 );
2015 assert_eq!(
2016 tool_args.named.get("output"),
2017 Some(&Value::String("out.txt".to_string()))
2018 );
2019 assert!(tool_args.flags.contains("verbose"));
2020 assert!(tool_args.positional.is_empty());
2021 }
2022
2023 #[test]
2024 fn test_mixed_flags_and_positional_fill() {
2025 let args = vec![
2027 Arg::LongFlag("output".to_string()),
2028 Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
2029 Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
2030 ];
2031 let schema = make_test_schema();
2032 let ctx = make_minimal_ctx();
2033
2034 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2035
2036 assert_eq!(
2037 tool_args.named.get("output"),
2038 Some(&Value::String("foo".to_string()))
2039 );
2040 assert_eq!(
2041 tool_args.named.get("query"),
2042 Some(&Value::String("val1".to_string()))
2043 );
2044 assert!(tool_args.positional.is_empty());
2045 }
2046
2047 #[test]
2048 fn test_alias_flag_prevents_mapping_overwrite() {
2049 let schema = ToolSchema::new("test", "")
2051 .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
2052 .param(ParamSchema::required("output", "string", ""))
2053 .with_positional_mapping();
2054 let args = vec![
2055 Arg::ShortFlag("q".to_string()),
2056 Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2057 Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2058 ];
2059 let ctx = make_minimal_ctx();
2060
2061 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2062
2063 assert_eq!(
2064 tool_args.named.get("query"),
2065 Some(&Value::String("search".to_string()))
2066 );
2067 assert_eq!(
2068 tool_args.named.get("output"),
2069 Some(&Value::String("out.txt".to_string()))
2070 );
2071 assert!(tool_args.positional.is_empty());
2072 }
2073
2074 #[test]
2075 fn test_builtin_schema_no_positional_mapping() {
2076 let schema = ToolSchema::new("echo", "")
2078 .param(ParamSchema::optional("args", "any", Value::Null, ""))
2079 .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
2080 let args = vec![
2082 Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
2083 Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
2084 ];
2085 let ctx = make_minimal_ctx();
2086
2087 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2088
2089 assert_eq!(
2091 tool_args.positional,
2092 vec![
2093 Value::String("hello".to_string()),
2094 Value::String("world".to_string()),
2095 ]
2096 );
2097 assert!(tool_args.named.get("args").is_none());
2098 }
2099
2100 #[test]
2101 fn test_short_flag_with_alias_consumes_value() {
2102 let schema = ToolSchema::new("head", "Output first part of files")
2105 .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
2106 .with_aliases(["-n"]));
2107 let args = vec![
2108 Arg::ShortFlag("n".to_string()),
2109 Arg::Positional(Expr::Literal(Value::Int(5))),
2110 Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
2111 ];
2112 let ctx = make_minimal_ctx();
2113
2114 let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2115
2116 assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
2117 assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
2118 assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
2119 }
2120
2121 #[tokio::test]
2124 async fn test_merge_stderr_redirect() {
2125 let result = ExecResult::from_output(0, "stdout content", "stderr content");
2127
2128 let redirects = vec![Redirect {
2129 kind: RedirectKind::MergeStderr,
2130 target: Expr::Literal(Value::Null),
2131 }];
2132
2133 let ctx = make_minimal_ctx();
2134 let result = apply_redirects(result, &redirects, &ctx).await;
2135
2136 assert_eq!(&*result.text_out(), "stdout contentstderr content");
2137 assert!(result.err.is_empty());
2138 }
2139
2140 #[tokio::test]
2141 async fn test_merge_stderr_with_empty_stderr() {
2142 let result = ExecResult::from_output(0, "stdout only", "");
2144
2145 let redirects = vec![Redirect {
2146 kind: RedirectKind::MergeStderr,
2147 target: Expr::Literal(Value::Null),
2148 }];
2149
2150 let ctx = make_minimal_ctx();
2151 let result = apply_redirects(result, &redirects, &ctx).await;
2152
2153 assert_eq!(&*result.text_out(), "stdout only");
2154 assert!(result.err.is_empty());
2155 }
2156
2157 #[tokio::test]
2158 async fn test_merge_stderr_order_matters() {
2159 let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
2164
2165 let redirects = vec![Redirect {
2167 kind: RedirectKind::MergeStderr,
2168 target: Expr::Literal(Value::Null),
2169 }];
2170
2171 let ctx = make_minimal_ctx();
2172 let result = apply_redirects(result, &redirects, &ctx).await;
2173
2174 assert_eq!(&*result.text_out(), "stdout\nstderr\n");
2175 assert!(result.err.is_empty());
2176 }
2177
2178 #[tokio::test]
2179 async fn test_redirect_with_command_execution() {
2180 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2181
2182 let cmd = Command {
2184 name: "echo".to_string(),
2185 args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
2186 redirects: vec![Redirect {
2187 kind: RedirectKind::MergeStderr,
2188 target: Expr::Literal(Value::Null),
2189 }],
2190 };
2191
2192 let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
2193 assert!(result.ok());
2194 assert!(result.text_out().contains("hello"));
2196 }
2197
2198 #[tokio::test]
2199 async fn test_merge_stderr_in_pipeline() {
2200 let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2201
2202 let echo_cmd = Command {
2205 name: "echo".to_string(),
2206 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2207 redirects: vec![Redirect {
2208 kind: RedirectKind::MergeStderr,
2209 target: Expr::Literal(Value::Null),
2210 }],
2211 };
2212 let grep_cmd = Command {
2213 name: "grep".to_string(),
2214 args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2215 redirects: vec![],
2216 };
2217
2218 let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
2219 assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
2220 assert!(result.text_out().contains("output"));
2221 }
2222}