use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use crate::ast::{Command, Expr, Value};
use crate::interpreter::ExecResult;
use crate::tools::ExecContext;
#[cfg(test)]
use crate::ast::Arg;
#[cfg(test)]
use crate::backend::BackendError;
#[cfg(test)]
use crate::interpreter::apply_output_format;
#[cfg(test)]
use crate::scheduler::build_tool_args;
#[cfg(test)]
use crate::tools::{GlobalFlags, ToolRegistry};
#[cfg(all(test, feature = "subprocess"))]
use crate::tools::resolve_in_path;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PipelinePosition {
#[default]
Only,
First,
Middle,
Last,
}
#[async_trait]
pub trait CommandDispatcher: Send + Sync {
async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
async fn eval_expr(&self, expr: &Expr, ctx: &ExecContext) -> Result<Value>;
async fn fork(&self) -> Arc<dyn CommandDispatcher>;
async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
self.fork().await
}
}
#[cfg(test)]
pub(crate) struct BackendDispatcher {
tools: Arc<ToolRegistry>,
}
#[cfg(test)]
impl BackendDispatcher {
pub(crate) fn new(tools: Arc<ToolRegistry>) -> Self {
Self { tools }
}
#[cfg(not(feature = "subprocess"))]
async fn try_external(
&self,
_name: &str,
_args: &[Arg],
_ctx: &mut ExecContext,
) -> Option<ExecResult> {
None
}
#[cfg(feature = "subprocess")]
async fn try_external(
&self,
name: &str,
args: &[Arg],
ctx: &mut ExecContext,
) -> Option<ExecResult> {
if !ctx.allow_external_commands {
return None;
}
let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
Some(p) => p,
None => return None,
};
let executable = if name.contains('/') {
let resolved = if std::path::Path::new(name).is_absolute() {
std::path::PathBuf::from(name)
} else {
real_cwd.join(name)
};
if resolved.exists() {
resolved.to_string_lossy().into_owned()
} else {
return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
}
} else {
let path_var = ctx.scope.get("PATH")
.map(crate::interpreter::value_to_string)
.unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
resolve_in_path(name, &path_var)?
};
let argv: Vec<String> = args.iter().filter_map(|arg| {
match arg {
Arg::Positional(expr) => match expr {
Expr::Literal(Value::String(s)) => Some(s.clone()),
Expr::Literal(Value::Int(i)) => Some(i.to_string()),
Expr::Literal(Value::Float(f)) => Some(f.to_string()),
Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
_ => None,
},
Arg::ShortFlag(f) => Some(format!("-{f}")),
Arg::LongFlag(f) => Some(format!("--{f}")),
Arg::Named { key, value } => match value {
Expr::Literal(Value::String(s)) => Some(format!("--{key}={s}")),
_ => Some(format!("--{key}=")),
},
Arg::WordAssign { key, value } => match value {
Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
_ => Some(format!("{key}=")),
},
Arg::DoubleDash => Some("--".to_string()),
}
}).collect();
let has_pipe_stdin = ctx.pipe_stdin.is_some();
let has_buffered_stdin = ctx.stdin.is_some();
use tokio::process::Command;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut cmd = Command::new(&executable);
cmd.args(&argv);
cmd.current_dir(&real_cwd);
cmd.kill_on_drop(true);
cmd.env_clear();
for (var_name, value) in ctx.scope.exported_vars() {
cmd.env(var_name, crate::interpreter::value_to_string(&value));
}
cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
std::process::Stdio::piped()
} else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
std::process::Stdio::inherit()
} else {
std::process::Stdio::null()
});
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
};
let kill_target = crate::pidfd::KillTarget::from_child(&child);
let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
child.stdin.take().map(|mut child_stdin| {
tokio::spawn(async move {
let mut buf = [0u8; 8192];
loop {
match pipe_in.read(&mut buf).await {
Ok(0) => break, Ok(n) => {
if child_stdin.write_all(&buf[..n]).await.is_err() {
break; }
}
Err(_) => break,
}
}
})
})
} else if let Some(data) = ctx.stdin.take() {
if let Some(mut child_stdin) = child.stdin.take() {
let _ = child_stdin.write_all(data.as_bytes()).await;
}
None
} else {
None
};
if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
let Some(mut child_stdout) = child.stdout.take() else {
return Some(ExecResult::failure(1, "internal: stdout not available"));
};
let Some(mut child_stderr_reader) = child.stderr.take() else {
return Some(ExecResult::failure(1, "internal: stderr not available"));
};
let stderr_stream_handle = ctx.stderr.clone();
let stderr_task = tokio::spawn(async move {
let mut buf = Vec::new();
let mut chunk = [0u8; 8192];
loop {
match child_stderr_reader.read(&mut chunk).await {
Ok(0) => break,
Ok(n) => {
if let Some(ref stream) = stderr_stream_handle {
stream.write(&chunk[..n]);
} else {
buf.extend_from_slice(&chunk[..n]);
}
}
Err(_) => break,
}
}
if stderr_stream_handle.is_some() {
String::new()
} else {
String::from_utf8_lossy(&buf).into_owned()
}
});
let mut buf = [0u8; 8192];
loop {
match child_stdout.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if pipe_out.write_all(&buf[..n]).await.is_err() {
break; }
}
Err(_) => break,
}
}
let _ = pipe_out.shutdown().await;
drop(pipe_out);
let cancel = ctx.cancel.clone();
let status = crate::kernel::wait_or_kill(
&mut child,
kill_target.as_ref(),
&cancel,
std::time::Duration::from_secs(2),
).await;
if let Some(task) = stdin_task { task.abort(); }
stderr_task.abort();
let stderr = stderr_task.await.unwrap_or_default();
let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
Some(ExecResult::from_output(code, String::new(), stderr))
} else {
let Some(child_stdout) = child.stdout.take() else {
return Some(ExecResult::failure(1, "internal: stdout not available"));
};
let Some(child_stderr) = child.stderr.take() else {
return Some(ExecResult::failure(1, "internal: stderr not available"));
};
let (stdout, stderr, did_spill) = crate::output_limit::spill_aware_collect(
child_stdout,
child_stderr,
ctx.stderr.clone(),
&ctx.output_limit,
).await;
let cancel = ctx.cancel.clone();
let status = crate::kernel::wait_or_kill(
&mut child,
kill_target.as_ref(),
&cancel,
std::time::Duration::from_secs(2),
).await;
if let Some(task) = stdin_task { task.abort(); }
let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
let mut result = ExecResult::from_output(code, stdout, stderr);
result.did_spill = did_spill;
Some(result)
}
}
}
#[cfg(test)]
#[async_trait]
impl CommandDispatcher for BackendDispatcher {
async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
match cmd.name.as_str() {
"true" => return Ok(ExecResult::success("")),
"false" => return Ok(ExecResult::failure(1, "")),
_ => {}
}
let schema = self.tools.get(&cmd.name).map(|t| t.schema());
let tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
GlobalFlags::apply_from_args(&tool_args, ctx);
let backend = ctx.backend.clone();
let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
Ok(tool_result) => {
let mut exec = ExecResult::from_output(
tool_result.code as i64,
tool_result.stdout,
tool_result.stderr,
);
exec.set_output(tool_result.output);
exec.content_type = tool_result.content_type;
exec.baggage = tool_result.baggage;
if let Some(json_data) = tool_result.data {
exec.data = Some(Value::Json(json_data));
}
exec
}
Err(BackendError::ToolNotFound(_)) => {
match self.try_external(&cmd.name, &cmd.args, ctx).await {
Some(result) => result,
None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
}
}
Err(e) => ExecResult::failure(127, e.to_string()),
};
let result = match ctx.output_format {
Some(format) => apply_output_format(result, format),
None => result,
};
Ok(result)
}
async fn eval_expr(&self, expr: &Expr, ctx: &ExecContext) -> Result<Value> {
crate::scheduler::pipeline::eval_simple_expr(expr, ctx)
.ok_or_else(|| anyhow::anyhow!("cannot evaluate expression in test dispatcher"))
}
async fn fork(&self) -> Arc<dyn CommandDispatcher> {
Arc::new(Self { tools: Arc::clone(&self.tools) })
}
}