use std::sync::Arc;
use std::collections::HashMap;
use crate::arithmetic;
use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
use crate::dispatch::{CommandDispatcher, PipelinePosition};
use crate::interpreter::ExecResult;
use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
use tokio::io::AsyncWriteExt;
use super::pipe_stream::pipe_stream_default;
use super::scatter::{
parse_gather_options, parse_scatter_options, ScatterGatherRunner,
};
async fn apply_redirects(
mut result: ExecResult,
redirects: &[Redirect],
ctx: &ExecContext,
) -> ExecResult {
for redir in redirects {
match redir.kind {
RedirectKind::MergeStderr => {
result.materialize();
if !result.err.is_empty() {
let err = std::mem::take(&mut result.err);
result.push_out(&err);
}
}
RedirectKind::MergeStdout => {
result.materialize();
if !result.text_out().is_empty() {
let out = result.text_out().into_owned();
result.err.push_str(&out);
result.clear_out();
}
}
RedirectKind::StdoutOverwrite => {
let path = match eval_redirect_target(&redir.target, ctx).await {
Ok(p) => p,
Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
};
if let Some(output) = result.take_output_for_stream() {
let mut buf = Vec::new();
if let Err(e) = output.write_canonical(&mut buf, None) {
return ExecResult::failure(1, format!("redirect: {e}"));
}
if let Err(e) = redirect_write(ctx, &path, &buf).await {
return ExecResult::failure(1, format!("redirect: {e}"));
}
} else {
if let Err(e) = redirect_write(ctx, &path, result.text_out().as_bytes()).await {
return ExecResult::failure(1, format!("redirect: {e}"));
}
}
result.clear_out();
result.set_output(None);
}
RedirectKind::StdoutAppend => {
let path = match eval_redirect_target(&redir.target, ctx).await {
Ok(p) => p,
Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
};
if let Some(output) = result.take_output_for_stream() {
let mut buf = Vec::new();
if let Err(e) = output.write_canonical(&mut buf, None) {
return ExecResult::failure(1, format!("redirect: {e}"));
}
if let Err(e) = redirect_append(ctx, &path, &buf).await {
return ExecResult::failure(1, format!("redirect: {e}"));
}
} else {
if let Err(e) = redirect_append(ctx, &path, result.text_out().as_bytes()).await {
return ExecResult::failure(1, format!("redirect: {e}"));
}
}
result.clear_out();
result.set_output(None);
}
RedirectKind::Stderr => {
let path = match eval_redirect_target(&redir.target, ctx).await {
Ok(p) => p,
Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
};
if let Err(e) = redirect_write(ctx, &path, result.err.as_bytes()).await {
return ExecResult::failure(1, format!("redirect: {e}"));
}
result.err.clear();
}
RedirectKind::Both => {
let path = match eval_redirect_target(&redir.target, ctx).await {
Ok(p) => p,
Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
};
let combined = format!("{}{}", result.text_out(), result.err);
if let Err(e) = redirect_write(ctx, &path, combined.as_bytes()).await {
return ExecResult::failure(1, format!("redirect: {e}"));
}
result.clear_out();
result.set_output(None);
result.err.clear();
}
RedirectKind::Stdin | RedirectKind::HereDoc | RedirectKind::HereString => {}
}
}
result.materialize();
result
}
async fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Result<String, String> {
if let Some(dispatcher) = &ctx.dispatcher {
dispatcher
.eval_expr(expr, ctx)
.await
.map(|v| value_to_string(&v))
.map_err(|e| e.to_string())
} else {
eval_simple_expr(expr, ctx)
.map(|v| value_to_string(&v))
.ok_or_else(|| "could not evaluate redirect target".to_string())
}
}
async fn redirect_write(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
use crate::backend::WriteMode;
let resolved = ctx.resolve_path(path);
ctx.backend.write(&resolved, data, WriteMode::Overwrite).await.map_err(|e| e.to_string())
}
async fn redirect_append(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
let resolved = ctx.resolve_path(path);
ctx.backend.append(&resolved, data).await.map_err(|e| e.to_string())
}
async fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) -> Result<(), String> {
use std::path::Path;
for redir in &cmd.redirects {
match &redir.kind {
RedirectKind::Stdin => {
let path = eval_redirect_target(&redir.target, ctx).await?;
let resolved = ctx.resolve_path(&path);
let data = ctx
.backend
.read(Path::new(&resolved), None)
.await
.map_err(|e| format!("redirect: {path}: {e}"))?;
let content = String::from_utf8(data)
.map_err(|_| format!("redirect: {path}: invalid UTF-8"))?;
ctx.set_stdin(content);
}
RedirectKind::HereDoc => {
match &redir.target {
Expr::Literal(Value::String(content)) => {
ctx.set_stdin(content.clone());
}
expr => {
let body = eval_redirect_target(expr, ctx).await?;
ctx.set_stdin(body);
}
}
}
RedirectKind::HereString => {
let mut s = eval_redirect_target(&redir.target, ctx).await?;
s.push('\n');
ctx.set_stdin(s);
}
_ => {}
}
}
Ok(())
}
#[derive(Clone)]
pub struct PipelineRunner {
tools: Arc<ToolRegistry>,
}
impl PipelineRunner {
pub fn new(tools: Arc<ToolRegistry>) -> Self {
Self { tools }
}
#[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
pub async fn run(
&self,
commands: &[Command],
ctx: &mut ExecContext,
dispatcher: &dyn CommandDispatcher,
) -> ExecResult {
if commands.is_empty() {
return ExecResult::success("");
}
if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
}
self.run_sequential(commands, ctx, dispatcher).await
}
#[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
pub async fn run_sequential(
&self,
commands: &[Command],
ctx: &mut ExecContext,
dispatcher: &dyn CommandDispatcher,
) -> ExecResult {
if commands.is_empty() {
return ExecResult::success("");
}
if commands.len() == 1 {
return self.run_single(&commands[0], ctx, None, dispatcher).await;
}
self.run_pipeline(commands, ctx, dispatcher).await
}
async fn run_scatter_gather(
&self,
commands: &[Command],
scatter_idx: usize,
gather_idx: usize,
ctx: &mut ExecContext,
dispatcher: &dyn CommandDispatcher,
) -> ExecResult {
let pre_scatter = &commands[..scatter_idx];
let scatter_cmd = &commands[scatter_idx];
let parallel = &commands[scatter_idx + 1..gather_idx];
let gather_cmd = &commands[gather_idx];
let post_gather = &commands[gather_idx + 1..];
let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
let gather_schema = self.tools.get("gather").map(|t| t.schema());
let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
let sequential_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
let runner = ScatterGatherRunner::new(self.tools.clone(), sequential_dispatcher);
runner
.run(
pre_scatter,
scatter_opts,
parallel,
gather_opts,
post_gather,
ctx,
)
.await
}
#[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
async fn run_single(
&self,
cmd: &Command,
ctx: &mut ExecContext,
stdin: Option<String>,
dispatcher: &dyn CommandDispatcher,
) -> ExecResult {
if let Err(e) = setup_stdin_redirects(cmd, ctx).await {
return ExecResult::failure(1, e);
}
if let Some(input) = stdin {
ctx.set_stdin(input);
}
ctx.pipeline_position = PipelinePosition::Only;
let result = match dispatcher.dispatch(cmd, ctx).await {
Ok(result) => result,
Err(e) => ExecResult::failure(1, e.to_string()),
};
apply_redirects(result, &cmd.redirects, ctx).await
}
#[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(stage_count = commands.len()))]
async fn run_pipeline(
&self,
commands: &[Command],
ctx: &mut ExecContext,
dispatcher: &dyn CommandDispatcher,
) -> ExecResult {
let stage_count = commands.len();
let last_idx = stage_count - 1;
let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
for _ in 0..last_idx {
let (writer, reader) = pipe_stream_default();
pipe_writers.push(Some(writer));
pipe_readers.push(Some(reader));
}
let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
for _ in 0..last_idx {
let (tx, rx) = tokio::sync::oneshot::channel();
data_senders.push(Some(tx));
data_receivers.push(Some(rx));
}
let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
for (i, cmd) in commands.iter().enumerate() {
let mut stage_ctx = ctx.child_for_pipeline();
let cmd = cmd.clone();
let task_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
let stdin_setup = setup_stdin_redirects(&cmd, &mut stage_ctx).await;
if i == 0 {
if stage_ctx.stdin.is_none() {
stage_ctx.stdin = ctx.stdin.take();
}
if stage_ctx.stdin_data.is_none() {
stage_ctx.stdin_data = ctx.stdin_data.take();
}
} else {
stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
}
if i < last_idx {
stage_ctx.pipe_stdout = pipe_writers[i].take();
}
stage_ctx.pipeline_position = match i {
0 => PipelinePosition::First,
n if n == last_idx => PipelinePosition::Last,
_ => PipelinePosition::Middle,
};
let data_sender = if i < last_idx { data_senders[i].take() } else { None };
let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> =
tokio::spawn(crate::telemetry::bind_current_context(async move {
if let Err(e) = stdin_setup {
return (ExecResult::failure(1, e), stage_ctx);
}
if let Some(mut rx) = data_receiver {
if let Ok(data) = rx.try_recv() {
stage_ctx.stdin_data = data;
}
}
let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
Ok(result) => result,
Err(e) => ExecResult::failure(1, e.to_string()),
};
result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
if !result.err.is_empty() {
if let Some(ref stderr) = stage_ctx.stderr {
stderr.write_str(&result.err);
result.err.clear();
}
}
if let Some(tx) = data_sender {
let _ = tx.send(result.data.clone());
}
if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
let text = result.text_out();
if !text.is_empty() {
let _ = pipe_out.write_all(text.as_bytes()).await;
let _ = pipe_out.shutdown().await;
}
}
(result, stage_ctx)
}));
handles.push(handle);
}
let mut last_result = ExecResult::success("");
let mut panics: Vec<String> = Vec::new();
for (i, handle) in handles.into_iter().enumerate() {
match handle.await {
Ok((result, stage_ctx)) => {
if i == last_idx {
last_result = result;
ctx.scope = stage_ctx.scope;
ctx.cwd = stage_ctx.cwd;
ctx.prev_cwd = stage_ctx.prev_cwd;
ctx.aliases = stage_ctx.aliases;
}
}
Err(e) => {
panics.push(format!("stage {}: {}", i, e));
}
}
}
if !panics.is_empty() {
last_result = ExecResult::failure(
1,
format!("pipeline stage(s) panicked: {}", panics.join("; ")),
);
}
last_result
}
}
pub fn select_leaf<'a>(schema: &'a ToolSchema, args: &[Arg]) -> anyhow::Result<&'a ToolSchema> {
let root_lookup = schema_param_lookup(schema);
let is_root_value_flag = |name: &str| -> bool {
root_lookup.get(name).is_some_and(|(_, typ, _)| !is_bool_type(typ))
};
let mut node = schema;
let mut skip_next_positional = false;
for arg in args {
match arg {
Arg::DoubleDash => break,
Arg::LongFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
Arg::ShortFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
Arg::Positional(expr) => {
if skip_next_positional {
skip_next_positional = false;
continue; }
if node.subcommands.is_empty() {
break; }
match classify_subcommand_positional(expr) {
SubcommandWord::Word(word) => {
match node.subcommands.iter().find(|c| c.matches_command(word)) {
Some(child) => node = child, None => break, }
}
SubcommandWord::OtherLiteral => break,
SubcommandWord::Computed(kind) => anyhow::bail!(
"{}: a subcommand name is required here, but got {kind}. \
Subcommands must be literal words — spell it out \
(e.g. `{} <subcommand> …`) or use the `--flag=value` form.",
node.name,
schema.name
),
}
}
_ => {}
}
}
Ok(node)
}
enum SubcommandWord<'a> {
Word(&'a str),
OtherLiteral,
Computed(&'static str),
}
fn classify_subcommand_positional(expr: &Expr) -> SubcommandWord<'_> {
match expr {
Expr::Literal(Value::String(s)) => SubcommandWord::Word(s),
Expr::Literal(_) => SubcommandWord::OtherLiteral,
Expr::CommandSubst(_) | Expr::Command(_) => SubcommandWord::Computed("a command substitution `$(…)`"),
Expr::VarRef(_)
| Expr::VarWithDefault { .. }
| Expr::VarLength(_)
| Expr::Positional(_)
| Expr::AllArgs
| Expr::ArgCount
| Expr::CurrentPid
| Expr::LastExitCode => SubcommandWord::Computed("a variable reference"),
Expr::Interpolated(_) | Expr::HereDocBody { .. } => SubcommandWord::Computed("an interpolated string"),
Expr::GlobPattern(_) => SubcommandWord::Computed("a glob pattern"),
Expr::Arithmetic(_) => SubcommandWord::Computed("an arithmetic expansion"),
_ => SubcommandWord::Computed("a value computed at runtime"),
}
}
pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str, usize)> {
let mut map = HashMap::new();
for p in schema.params.iter().filter(|p| !p.positional) {
map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
for alias in &p.aliases {
let stripped = alias.trim_start_matches('-');
map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
}
}
map
}
pub fn is_bool_type(param_type: &str) -> bool {
matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
}
pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
let mut tool_args = ToolArgs::new();
let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
let accepts_word_assign = schema
.map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
.unwrap_or(false);
let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
let mut past_double_dash = false;
let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
for (i, arg) in args.iter().enumerate() {
if let Arg::Positional(expr) = arg {
positional_indices.push((i, expr));
}
}
let mut i = 0;
while i < args.len() {
let arg = &args[i];
match arg {
Arg::DoubleDash => {
past_double_dash = true;
}
Arg::Positional(expr) => {
if !consumed_positionals.contains(&i)
&& let Some(value) = eval_simple_expr(expr, ctx)
{
tool_args.positional.push(value);
}
}
Arg::Named { key, value } => {
if let Some(val) = eval_simple_expr(value, ctx) {
tool_args.named.insert(key.clone(), val);
}
}
Arg::WordAssign { key, value } => {
if let Some(val) = eval_simple_expr(value, ctx) {
if accepts_word_assign {
tool_args.named.insert(key.clone(), val);
} else {
let val_str = crate::interpreter::value_to_string(&val);
tool_args.positional.push(Value::String(format!("{key}={val_str}")));
}
}
}
Arg::ShortFlag(name) => {
if past_double_dash {
tool_args.positional.push(Value::String(format!("-{name}")));
} else if name.len() == 1 {
let flag_name = name.as_str();
let lookup = param_lookup.get(flag_name);
let is_bool = lookup
.map(|(_, typ, _)| is_bool_type(typ))
.unwrap_or(true);
if is_bool {
tool_args.flags.insert(flag_name.to_string());
} else {
let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
let next_positional = positional_indices
.iter()
.find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
if let Some((pos_idx, expr)) = next_positional {
if let Some(value) = eval_simple_expr(expr, ctx) {
tool_args.named.insert(canonical.to_string(), value);
consumed_positionals.insert(*pos_idx);
} else {
tool_args.flags.insert(flag_name.to_string());
}
} else {
tool_args.flags.insert(flag_name.to_string());
}
}
} else if let Some(&(canonical, typ, _)) = param_lookup.get(name.as_str()) {
if is_bool_type(typ) {
tool_args.flags.insert(canonical.to_string());
} else {
let next_positional = positional_indices
.iter()
.find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
if let Some((pos_idx, expr)) = next_positional {
if let Some(value) = eval_simple_expr(expr, ctx) {
tool_args.named.insert(canonical.to_string(), value);
consumed_positionals.insert(*pos_idx);
} else {
tool_args.flags.insert(name.clone());
}
} else {
tool_args.flags.insert(name.clone());
}
}
} else {
for c in name.chars() {
tool_args.flags.insert(c.to_string());
}
}
}
Arg::LongFlag(name) => {
if past_double_dash {
tool_args.positional.push(Value::String(format!("--{name}")));
} else {
let lookup = param_lookup.get(name.as_str());
let is_bool = lookup
.map(|(_, typ, _)| is_bool_type(typ))
.unwrap_or(true);
if is_bool {
tool_args.flags.insert(name.clone());
} else {
let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
let next_positional = positional_indices
.iter()
.find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
if let Some((pos_idx, expr)) = next_positional {
if let Some(value) = eval_simple_expr(expr, ctx) {
tool_args.named.insert(canonical.to_string(), value);
consumed_positionals.insert(*pos_idx);
} else {
tool_args.flags.insert(name.clone());
}
} else {
tool_args.flags.insert(name.clone());
}
}
}
}
}
i += 1;
}
if let Some(schema) = schema.filter(|s| s.map_positionals) {
let pre_dash_count = if past_double_dash {
let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
positional_indices.iter()
.filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
.count()
} else {
tool_args.positional.len()
};
let mut remaining = Vec::new();
let mut positional_iter = tool_args.positional.drain(..).enumerate();
for param in &schema.params {
if tool_args.named.contains_key(¶m.name) || tool_args.flags.contains(¶m.name) {
continue; }
if is_bool_type(¶m.param_type) {
continue; }
loop {
match positional_iter.next() {
Some((idx, val)) if idx < pre_dash_count => {
tool_args.named.insert(param.name.clone(), val);
break;
}
Some((_, val)) => {
remaining.push(val); }
None => break,
}
}
}
remaining.extend(positional_iter.map(|(_, v)| v));
tool_args.positional = remaining;
}
tool_args
}
pub(crate) fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
match expr {
Expr::Literal(value) => Some(eval_literal(value, ctx)),
Expr::VarRef(path) => ctx.scope.resolve_path(path),
Expr::Interpolated(parts) => {
let mut result = String::new();
for part in parts {
match part {
crate::ast::StringPart::Literal(s) => result.push_str(s),
crate::ast::StringPart::Var(path) => {
if let Some(value) = ctx.scope.resolve_path(path) {
result.push_str(&value_to_string(&value));
}
}
crate::ast::StringPart::VarWithDefault { name, default } => {
match ctx.scope.get(name) {
Some(value) => {
let s = value_to_string(value);
if s.is_empty() {
result.push_str(&eval_string_parts_sync(default, ctx));
} else {
result.push_str(&s);
}
}
None => result.push_str(&eval_string_parts_sync(default, ctx)),
}
}
crate::ast::StringPart::VarLength(name) => {
let len = match ctx.scope.get(name) {
Some(value) => value_to_string(value).len(),
None => 0,
};
result.push_str(&len.to_string());
}
crate::ast::StringPart::Positional(n) => {
if let Some(s) = ctx.scope.get_positional(*n) {
result.push_str(s);
}
}
crate::ast::StringPart::AllArgs => {
result.push_str(&ctx.scope.all_args().join(" "));
}
crate::ast::StringPart::ArgCount => {
result.push_str(&ctx.scope.arg_count().to_string());
}
crate::ast::StringPart::Arithmetic(expr) => {
if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
result.push_str(&value.to_string());
}
}
crate::ast::StringPart::CommandSubst(_) => {
}
crate::ast::StringPart::LastExitCode => {
result.push_str(&ctx.scope.last_result().code.to_string());
}
crate::ast::StringPart::CurrentPid => {
result.push_str(&ctx.scope.pid().to_string());
}
}
}
Some(Value::String(result))
}
Expr::GlobPattern(s) => Some(Value::String(s.clone())),
Expr::HereDocBody { parts, strip_tabs } => {
let unwrapped: Vec<crate::ast::StringPart> =
parts.iter().map(|sp| sp.part.clone()).collect();
let raw = eval_string_parts_sync(&unwrapped, ctx);
let body = if *strip_tabs {
crate::interpreter::strip_leading_tabs(&raw)
} else {
raw
};
Some(Value::String(body))
}
_ => None, }
}
fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
value.clone()
}
fn value_to_string(value: &Value) -> String {
match value {
Value::Null => "".to_string(),
Value::Bool(b) => b.to_string(),
Value::Int(i) => i.to_string(),
Value::Float(f) => f.to_string(),
Value::String(s) => s.clone(),
Value::Json(json) => json.to_string(),
Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
}
}
fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
let mut result = String::new();
for part in parts {
match part {
crate::ast::StringPart::Literal(s) => result.push_str(s),
crate::ast::StringPart::Var(path) => {
if let Some(value) = ctx.scope.resolve_path(path) {
result.push_str(&value_to_string(&value));
}
}
crate::ast::StringPart::VarWithDefault { name, default } => {
match ctx.scope.get(name) {
Some(value) => {
let s = value_to_string(value);
if s.is_empty() {
result.push_str(&eval_string_parts_sync(default, ctx));
} else {
result.push_str(&s);
}
}
None => result.push_str(&eval_string_parts_sync(default, ctx)),
}
}
crate::ast::StringPart::VarLength(name) => {
let len = match ctx.scope.get(name) {
Some(value) => value_to_string(value).len(),
None => 0,
};
result.push_str(&len.to_string());
}
crate::ast::StringPart::Positional(n) => {
if let Some(s) = ctx.scope.get_positional(*n) {
result.push_str(s);
}
}
crate::ast::StringPart::AllArgs => {
result.push_str(&ctx.scope.all_args().join(" "));
}
crate::ast::StringPart::ArgCount => {
result.push_str(&ctx.scope.arg_count().to_string());
}
crate::ast::StringPart::Arithmetic(expr) => {
if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
result.push_str(&value.to_string());
}
}
crate::ast::StringPart::CommandSubst(_) => {
}
crate::ast::StringPart::LastExitCode => {
result.push_str(&ctx.scope.last_result().code.to_string());
}
crate::ast::StringPart::CurrentPid => {
result.push_str(&ctx.scope.pid().to_string());
}
}
}
result
}
fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
let gather_idx = commands.iter().position(|c| c.name == "gather")?;
if gather_idx > scatter_idx {
Some((scatter_idx, gather_idx))
} else {
None
}
}
#[cfg(test)]
mod select_leaf_tests {
use super::*;
use crate::tools::ParamSchema;
fn kj_schema() -> ToolSchema {
ToolSchema::new("kj", "kaijutsu")
.param(ParamSchema::new("confirm", "string"))
.param(ParamSchema::new("verbose", "bool"))
.subcommand(
ToolSchema::new("context", "context ops")
.with_command_aliases(["ctx"])
.subcommand(ToolSchema::new("list", "list").with_command_aliases(["ls"]))
.subcommand(
ToolSchema::new("create", "create").param(
ParamSchema::new("type", "string").with_aliases(["t"]),
),
),
)
}
fn word(s: &str) -> Arg {
Arg::Positional(Expr::Literal(Value::String(s.to_string())))
}
#[test]
fn flat_tool_returns_root() {
let schema = ToolSchema::new("cat", "concat")
.param(ParamSchema::required("path", "string", "f").positional());
let leaf = select_leaf(&schema, &[word("foo.txt")]).expect("flat ok");
assert_eq!(leaf.name, "cat");
}
#[test]
fn single_hop() {
let schema = kj_schema();
let leaf = select_leaf(&schema, &[word("context")]).expect("ok");
assert_eq!(leaf.name, "context");
}
#[test]
fn two_hops() {
let schema = kj_schema();
let leaf = select_leaf(&schema, &[word("context"), word("create")]).expect("ok");
assert_eq!(leaf.name, "create");
assert!(leaf.params.iter().any(|p| p.name == "type"), "leaf has --type");
}
#[test]
fn alias_hops_route() {
let schema = kj_schema();
let leaf = select_leaf(&schema, &[word("ctx"), word("ls")]).expect("ok");
assert_eq!(leaf.name, "list");
}
#[test]
fn unknown_subcommand_stops_at_current_node() {
let schema = kj_schema();
let leaf = select_leaf(&schema, &[word("context"), word("nonesuch")]).expect("ok");
assert_eq!(leaf.name, "context");
}
#[test]
fn root_bool_flag_before_path_does_not_disrupt_routing() {
let schema = kj_schema();
let args = vec![Arg::LongFlag("verbose".into()), word("context"), word("create")];
let leaf = select_leaf(&schema, &args).expect("ok");
assert_eq!(leaf.name, "create");
}
#[test]
fn root_value_flag_space_form_before_path_skips_its_value() {
let schema = kj_schema();
let args = vec![
Arg::LongFlag("confirm".into()),
word("nonce"),
word("context"),
word("create"),
];
let leaf = select_leaf(&schema, &args).expect("ok");
assert_eq!(leaf.name, "create");
}
#[test]
fn leaf_value_flag_after_path_routes_to_leaf() {
let schema = kj_schema();
let args = vec![
word("context"),
word("create"),
Arg::LongFlag("type".into()),
word("x"),
];
let leaf = select_leaf(&schema, &args).expect("ok");
assert_eq!(leaf.name, "create");
assert!(leaf.params.iter().any(|p| p.name == "type"));
}
#[test]
fn double_dash_stops_routing() {
let schema = kj_schema();
let leaf = select_leaf(&schema, &[Arg::DoubleDash, word("context")]).expect("ok");
assert_eq!(leaf.name, "kj");
}
#[test]
fn computed_subcommand_selector_errors() {
let schema = kj_schema();
let args = vec![Arg::Positional(Expr::CommandSubst(vec![
crate::ast::Stmt::Command(crate::ast::Command {
name: "echo".into(),
args: vec![],
redirects: vec![],
}),
]))];
let err = select_leaf(&schema, &args).expect_err("must error");
let msg = err.to_string();
assert!(msg.contains("subcommand name is required"), "got: {msg}");
assert!(msg.contains("command substitution"), "names the cause: {msg}");
}
#[test]
fn variable_subcommand_selector_errors() {
let schema = kj_schema();
let args = vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("sub")))];
let err = select_leaf(&schema, &args).expect_err("must error");
assert!(err.to_string().contains("variable reference"), "got: {err}");
}
#[test]
fn computed_positional_after_leaf_is_fine() {
let schema = kj_schema();
let args = vec![
word("context"),
word("list"),
Arg::Positional(Expr::CommandSubst(vec![crate::ast::Stmt::Command(
crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
)])),
];
let leaf = select_leaf(&schema, &args).expect("ok");
assert_eq!(leaf.name, "list");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dispatch::BackendDispatcher;
use crate::tools::register_builtins;
use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
use std::path::Path;
async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
let mut tools = ToolRegistry::new();
register_builtins(&mut tools);
let tools = Arc::new(tools);
let runner = PipelineRunner::new(tools.clone());
let dispatcher = BackendDispatcher::new(tools.clone());
let mut vfs = VfsRouter::new();
let mem = MemoryFs::new();
mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
vfs.mount("/", mem);
let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
(runner, ctx, dispatcher)
}
fn make_cmd(name: &str, args: Vec<&str>) -> Command {
Command {
name: name.to_string(),
args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
redirects: vec![],
}
}
#[tokio::test]
async fn test_single_command() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let cmd = make_cmd("echo", vec!["hello"]);
let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
assert!(result.ok());
assert_eq!(result.text_out().trim(), "hello");
}
#[tokio::test]
async fn test_pipeline_echo_grep() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let echo_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
redirects: vec![],
};
let grep_cmd = Command {
name: "grep".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
redirects: vec![],
};
let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok());
assert_eq!(result.text_out().trim(), "world");
}
#[tokio::test]
async fn test_pipeline_cat_grep() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
let grep_cmd = Command {
name: "grep".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
redirects: vec![],
};
let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok());
assert!(result.text_out().contains("hello"));
}
#[tokio::test]
async fn test_command_not_found() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let cmd = make_cmd("nonexistent", vec![]);
let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
assert!(!result.ok());
assert_eq!(result.code, 127);
assert!(result.err.contains("not found"));
}
#[tokio::test]
async fn test_pipeline_continues_on_failure() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
let grep_cmd = Command {
name: "grep".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
redirects: vec![],
};
let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
assert!(!result.ok());
}
#[tokio::test]
async fn test_pipeline_last_command_exit_code() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let echo_cmd = make_cmd("echo", vec!["hello"]);
let cat_cmd = make_cmd("cat", vec![]);
let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok());
assert!(result.text_out().contains("hello"));
}
#[tokio::test]
async fn test_empty_pipeline() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let result = runner.run(&[], &mut ctx, &dispatcher).await;
assert!(result.ok());
}
#[test]
fn test_find_scatter_gather_both_present() {
let commands = vec![
make_cmd("echo", vec!["a"]),
make_cmd("scatter", vec![]),
make_cmd("process", vec![]),
make_cmd("gather", vec![]),
];
let result = find_scatter_gather(&commands);
assert_eq!(result, Some((1, 3)));
}
#[test]
fn test_find_scatter_gather_no_scatter() {
let commands = vec![
make_cmd("echo", vec!["a"]),
make_cmd("gather", vec![]),
];
let result = find_scatter_gather(&commands);
assert!(result.is_none());
}
#[test]
fn test_find_scatter_gather_no_gather() {
let commands = vec![
make_cmd("echo", vec!["a"]),
make_cmd("scatter", vec![]),
];
let result = find_scatter_gather(&commands);
assert!(result.is_none());
}
#[test]
fn test_find_scatter_gather_wrong_order() {
let commands = vec![
make_cmd("gather", vec![]),
make_cmd("scatter", vec![]),
];
let result = find_scatter_gather(&commands);
assert!(result.is_none());
}
#[tokio::test]
async fn test_scatter_gather_simple() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let split_cmd = Command {
name: "split".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
redirects: vec![],
};
let scatter_cmd = make_cmd("scatter", vec![]);
let process_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
redirects: vec![],
};
let gather_cmd = make_cmd("gather", vec![]);
let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
assert!(result.text_out().contains("a"));
assert!(result.text_out().contains("b"));
assert!(result.text_out().contains("c"));
}
#[tokio::test]
async fn test_scatter_gather_empty_input() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let echo_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
redirects: vec![],
};
let scatter_cmd = make_cmd("scatter", vec![]);
let process_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
redirects: vec![],
};
let gather_cmd = make_cmd("gather", vec![]);
let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok());
assert!(result.text_out().trim().is_empty());
}
#[tokio::test]
async fn test_scatter_gather_with_structured_stdin() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let data = Value::Json(serde_json::json!(["x", "y", "z"]));
ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
let scatter_cmd = make_cmd("scatter", vec![]);
let process_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
redirects: vec![],
};
let gather_cmd = make_cmd("gather", vec![]);
let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
assert!(result.text_out().contains("x"));
assert!(result.text_out().contains("y"));
assert!(result.text_out().contains("z"));
}
#[tokio::test]
async fn test_scatter_gather_json_input() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let data = Value::Json(serde_json::json!(["one", "two", "three"]));
ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
let scatter_cmd = make_cmd("scatter", vec![]);
let process_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
redirects: vec![],
};
let gather_cmd = make_cmd("gather", vec![]);
let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
assert!(result.text_out().contains("one"));
assert!(result.text_out().contains("two"));
assert!(result.text_out().contains("three"));
}
#[tokio::test]
async fn test_scatter_gather_with_post_gather() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let split_cmd = Command {
name: "split".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
redirects: vec![],
};
let scatter_cmd = make_cmd("scatter", vec![]);
let process_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
redirects: vec![],
};
let gather_cmd = make_cmd("gather", vec![]);
let grep_cmd = Command {
name: "grep".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
redirects: vec![],
};
let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
assert!(result.text_out().contains("a"));
assert!(!result.text_out().contains("b"));
}
#[tokio::test]
async fn test_scatter_custom_var_name() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let data = Value::Json(serde_json::json!(["test1", "test2"]));
ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
let scatter_cmd = Command {
name: "scatter".to_string(),
args: vec![Arg::Named {
key: "as".to_string(),
value: Expr::Literal(Value::String("URL".to_string())),
}],
redirects: vec![],
};
let process_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
redirects: vec![],
};
let gather_cmd = make_cmd("gather", vec![]);
let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
assert!(result.text_out().contains("test1"));
assert!(result.text_out().contains("test2"));
}
#[tokio::test]
async fn test_pipeline_routes_through_backend() {
use crate::backend::testing::MockBackend;
use std::sync::atomic::Ordering;
let (backend, call_count) = MockBackend::new();
let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
let mut ctx = crate::tools::ExecContext::with_backend(backend);
let tools = std::sync::Arc::new(ToolRegistry::new());
let runner = PipelineRunner::new(tools.clone());
let dispatcher = BackendDispatcher::new(tools);
let cmd = make_cmd("test-tool", vec!["arg1"]);
let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
assert!(result.ok(), "Mock backend should return success");
assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
assert!(result.text_out().contains("mock executed"), "Output should be from mock backend");
}
#[tokio::test]
async fn test_multi_command_pipeline_routes_through_backend() {
use crate::backend::testing::MockBackend;
use std::sync::atomic::Ordering;
let (backend, call_count) = MockBackend::new();
let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
let mut ctx = crate::tools::ExecContext::with_backend(backend);
let tools = std::sync::Arc::new(ToolRegistry::new());
let runner = PipelineRunner::new(tools.clone());
let dispatcher = BackendDispatcher::new(tools);
let cmd1 = make_cmd("tool1", vec![]);
let cmd2 = make_cmd("tool2", vec![]);
let cmd3 = make_cmd("tool3", vec![]);
let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
assert!(result.ok());
assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
}
use crate::tools::{ParamSchema, ToolSchema};
fn make_test_schema() -> ToolSchema {
ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
.param(ParamSchema::required("query", "string", "Search query"))
.param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
.param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
.param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
.with_positional_mapping()
}
fn make_minimal_ctx() -> ExecContext {
let mut vfs = VfsRouter::new();
vfs.mount("/", MemoryFs::new());
ExecContext::new(Arc::new(vfs))
}
#[test]
fn test_schema_aware_string_arg() {
let args = vec![
Arg::LongFlag("query".to_string()),
Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.flags.is_empty(), "No flags should be set");
assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("test".to_string())),
"--query should consume 'test' as its value"
);
}
#[test]
fn test_schema_aware_bool_flag() {
let args = vec![
Arg::LongFlag("verbose".to_string()),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
assert!(tool_args.named.is_empty(), "No named args");
assert!(tool_args.positional.is_empty(), "No positionals");
}
#[test]
fn test_schema_aware_mixed() {
let args = vec![
Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
Arg::LongFlag("output".to_string()),
Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
Arg::LongFlag("verbose".to_string()),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("file.txt".to_string()))
);
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("out.txt".to_string()))
);
assert!(tool_args.flags.contains("verbose"));
}
#[test]
fn test_schema_aware_multiple_string_args() {
let args = vec![
Arg::LongFlag("query".to_string()),
Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
Arg::LongFlag("output".to_string()),
Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
Arg::LongFlag("verbose".to_string()),
Arg::LongFlag("limit".to_string()),
Arg::Positional(Expr::Literal(Value::Int(5))),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.positional.is_empty(), "All positionals consumed");
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("test".to_string()))
);
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("result.json".to_string()))
);
assert_eq!(
tool_args.named.get("limit"),
Some(&Value::Int(5))
);
assert!(tool_args.flags.contains("verbose"));
}
#[test]
fn test_schema_aware_double_dash() {
let args = vec![
Arg::LongFlag("output".to_string()),
Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
Arg::DoubleDash,
Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("out.txt".to_string()))
);
assert_eq!(
tool_args.positional,
vec![Value::String("--this-is-data".to_string())]
);
}
#[test]
fn test_no_schema_fallback() {
let args = vec![
Arg::LongFlag("query".to_string()),
Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
];
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, None);
assert!(tool_args.flags.contains("query"), "--query should be a flag");
assert_eq!(
tool_args.positional,
vec![Value::String("test".to_string())],
"'test' should be a positional"
);
}
#[test]
fn test_unknown_flag_in_schema() {
let args = vec![
Arg::LongFlag("unknown".to_string()),
Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.flags.contains("unknown"));
assert!(tool_args.positional.is_empty(), "value consumed as query param");
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("value".to_string()))
);
}
#[test]
fn test_named_args_unchanged() {
let args = vec![
Arg::Named {
key: "query".to_string(),
value: Expr::Literal(Value::String("test".to_string())),
},
Arg::LongFlag("verbose".to_string()),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("test".to_string()))
);
assert!(tool_args.flags.contains("verbose"));
}
#[test]
fn test_short_flags_unchanged() {
let args = vec![
Arg::ShortFlag("la".to_string()),
Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.flags.contains("l"));
assert!(tool_args.flags.contains("a"));
assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("file.txt".to_string()))
);
}
#[test]
fn test_flag_at_end_no_value() {
let args = vec![
Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
Arg::LongFlag("output".to_string()),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.flags.contains("output"));
assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("file.txt".to_string()))
);
}
#[test]
fn test_positional_skips_bool_params() {
let schema = ToolSchema::new("test", "")
.param(ParamSchema::required("query", "string", ""))
.param(ParamSchema::optional(
"verbose",
"bool",
Value::Bool(false),
"",
))
.param(ParamSchema::optional(
"output",
"string",
Value::Null,
"",
))
.with_positional_mapping();
let args = vec![
Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
];
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("val1".to_string()))
);
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("val2".to_string()))
);
assert!(!tool_args.flags.contains("verbose"));
assert!(tool_args.positional.is_empty());
}
#[test]
fn test_positionals_fill_available_slots() {
let args = vec![
Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
];
let schema = make_test_schema(); let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("val1".to_string()))
);
assert_eq!(
tool_args.named.get("limit"),
Some(&Value::String("val2".to_string()))
);
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("val3".to_string()))
);
assert!(tool_args.positional.is_empty());
}
#[test]
fn test_truly_excess_positionals() {
let schema = ToolSchema::new("test", "")
.param(ParamSchema::required("name", "string", ""))
.with_positional_mapping();
let args = vec![
Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
];
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("name"),
Some(&Value::String("first".to_string()))
);
assert_eq!(
tool_args.positional,
vec![
Value::String("second".to_string()),
Value::String("third".to_string()),
]
);
}
#[test]
fn test_double_dash_positional_not_mapped() {
let args = vec![
Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
Arg::DoubleDash,
Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("val1".to_string()))
);
assert_eq!(
tool_args.positional,
vec![Value::String("val2".to_string())]
);
}
#[test]
fn test_all_params_filled_by_flags() {
let args = vec![
Arg::LongFlag("query".to_string()),
Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
Arg::LongFlag("output".to_string()),
Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
Arg::LongFlag("verbose".to_string()),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("search".to_string()))
);
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("out.txt".to_string()))
);
assert!(tool_args.flags.contains("verbose"));
assert!(tool_args.positional.is_empty());
}
#[test]
fn test_mixed_flags_and_positional_fill() {
let args = vec![
Arg::LongFlag("output".to_string()),
Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
];
let schema = make_test_schema();
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("foo".to_string()))
);
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("val1".to_string()))
);
assert!(tool_args.positional.is_empty());
}
#[test]
fn test_alias_flag_prevents_mapping_overwrite() {
let schema = ToolSchema::new("test", "")
.param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
.param(ParamSchema::required("output", "string", ""))
.with_positional_mapping();
let args = vec![
Arg::ShortFlag("q".to_string()),
Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
];
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.named.get("query"),
Some(&Value::String("search".to_string()))
);
assert_eq!(
tool_args.named.get("output"),
Some(&Value::String("out.txt".to_string()))
);
assert!(tool_args.positional.is_empty());
}
#[test]
fn test_builtin_schema_no_positional_mapping() {
let schema = ToolSchema::new("echo", "")
.param(ParamSchema::optional("args", "any", Value::Null, ""))
.param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
let args = vec![
Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
];
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert_eq!(
tool_args.positional,
vec![
Value::String("hello".to_string()),
Value::String("world".to_string()),
]
);
assert!(tool_args.named.get("args").is_none());
}
#[test]
fn test_short_flag_with_alias_consumes_value() {
let schema = ToolSchema::new("head", "Output first part of files")
.param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
.with_aliases(["-n"]));
let args = vec![
Arg::ShortFlag("n".to_string()),
Arg::Positional(Expr::Literal(Value::Int(5))),
Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
];
let ctx = make_minimal_ctx();
let tool_args = build_tool_args(&args, &ctx, Some(&schema));
assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
}
#[tokio::test]
async fn test_merge_stderr_redirect() {
let result = ExecResult::from_output(0, "stdout content", "stderr content");
let redirects = vec![Redirect {
kind: RedirectKind::MergeStderr,
target: Expr::Literal(Value::Null),
}];
let ctx = make_minimal_ctx();
let result = apply_redirects(result, &redirects, &ctx).await;
assert_eq!(&*result.text_out(), "stdout contentstderr content");
assert!(result.err.is_empty());
}
#[tokio::test]
async fn test_merge_stderr_with_empty_stderr() {
let result = ExecResult::from_output(0, "stdout only", "");
let redirects = vec![Redirect {
kind: RedirectKind::MergeStderr,
target: Expr::Literal(Value::Null),
}];
let ctx = make_minimal_ctx();
let result = apply_redirects(result, &redirects, &ctx).await;
assert_eq!(&*result.text_out(), "stdout only");
assert!(result.err.is_empty());
}
#[tokio::test]
async fn test_merge_stderr_order_matters() {
let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
let redirects = vec![Redirect {
kind: RedirectKind::MergeStderr,
target: Expr::Literal(Value::Null),
}];
let ctx = make_minimal_ctx();
let result = apply_redirects(result, &redirects, &ctx).await;
assert_eq!(&*result.text_out(), "stdout\nstderr\n");
assert!(result.err.is_empty());
}
#[tokio::test]
async fn test_redirect_with_command_execution() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
redirects: vec![Redirect {
kind: RedirectKind::MergeStderr,
target: Expr::Literal(Value::Null),
}],
};
let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
assert!(result.ok());
assert!(result.text_out().contains("hello"));
}
#[tokio::test]
async fn test_merge_stderr_in_pipeline() {
let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
let echo_cmd = Command {
name: "echo".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
redirects: vec![Redirect {
kind: RedirectKind::MergeStderr,
target: Expr::Literal(Value::Null),
}],
};
let grep_cmd = Command {
name: "grep".to_string(),
args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
redirects: vec![],
};
let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
assert!(result.text_out().contains("output"));
}
}