use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
#[derive(Debug, Serialize)]
struct JsonRpcRequest {
jsonrpc: &'static str,
method: String,
params: Value,
id: u64,
}
#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
#[allow(dead_code)]
jsonrpc: Option<String>,
result: Option<Value>,
error: Option<JsonRpcError>,
#[allow(dead_code)]
id: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct JsonRpcError {
#[allow(dead_code)]
code: Option<i64>,
message: String,
}
#[derive(Debug, Clone)]
pub struct SubprocessTool {
pub command: String,
pub args: Vec<String>,
pub cwd: Option<String>,
pub env: HashMap<String, String>,
pub timeout: Duration,
}
impl SubprocessTool {
pub fn new(command: &str) -> Self {
Self {
command: command.to_string(),
args: Vec::new(),
cwd: None,
env: HashMap::new(),
timeout: Duration::from_secs(30),
}
}
pub fn with_args(mut self, args: Vec<String>) -> Self {
self.args = args;
self
}
pub fn with_cwd(mut self, cwd: &str) -> Self {
self.cwd = Some(cwd.to_string());
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_env(mut self, key: &str, value: &str) -> Self {
self.env.insert(key.to_string(), value.to_string());
self
}
}
pub struct SubprocessToolExecutor {
tools: HashMap<String, SubprocessTool>,
fallback: Option<std::sync::Arc<dyn super::ToolExecutor>>,
next_id: std::sync::atomic::AtomicU64,
}
impl SubprocessToolExecutor {
pub fn new() -> Self {
Self {
tools: HashMap::new(),
fallback: None,
next_id: std::sync::atomic::AtomicU64::new(1),
}
}
pub fn register(&mut self, name: &str, tool: SubprocessTool) {
self.tools.insert(name.to_string(), tool);
}
pub fn with_fallback(mut self, fallback: std::sync::Arc<dyn super::ToolExecutor>) -> Self {
self.fallback = Some(fallback);
self
}
async fn execute_subprocess(
&self,
tool_name: &str,
tool: &SubprocessTool,
params: &Value,
) -> Result<Value, String> {
let request = JsonRpcRequest {
jsonrpc: "2.0",
method: tool_name.to_string(),
params: params.clone(),
id: self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
};
let request_json = serde_json::to_string(&request)
.map_err(|e| format!("failed to serialize request: {}", e))?;
let mut cmd = Command::new(&tool.command);
cmd.args(&tool.args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(ref cwd) = tool.cwd {
cmd.current_dir(cwd);
}
for (k, v) in &tool.env {
cmd.env(k, v);
}
let mut child = cmd
.spawn()
.map_err(|e| format!("failed to spawn subprocess '{}': {}", tool.command, e))?;
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(request_json.as_bytes())
.await
.map_err(|e| format!("failed to write to subprocess stdin: {}", e))?;
stdin
.write_all(b"\n")
.await
.map_err(|e| format!("failed to write newline to stdin: {}", e))?;
}
let output = match tokio::time::timeout(tool.timeout, child.wait_with_output()).await {
Ok(Ok(output)) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
if !output.status.success() && stdout.trim().is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"subprocess exited with status {}: {}",
output.status,
stderr.trim()
));
}
stdout
}
Ok(Err(e)) => {
return Err(format!("failed to read subprocess output: {}", e));
}
Err(_) => {
return Err(format!(
"subprocess '{}' timed out after {:?}",
tool.command, tool.timeout
));
}
};
let response: JsonRpcResponse = serde_json::from_str(&output).map_err(|e| {
format!(
"invalid JSON-RPC response from '{}': {} (raw: {})",
tool.command,
e,
output.trim()
)
})?;
if let Some(error) = response.error {
return Err(format!("subprocess tool error: {}", error.message));
}
response
.result
.ok_or_else(|| "subprocess returned no result".to_string())
}
}
impl Default for SubprocessToolExecutor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl super::ToolExecutor for SubprocessToolExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
if let Some(subprocess_tool) = self.tools.get(tool) {
self.execute_subprocess(tool, subprocess_tool, params).await
} else if let Some(ref fallback) = self.fallback {
fallback.execute(tool, params).await
} else {
Err(format!(
"unknown subprocess tool: '{}' (no fallback configured)",
tool
))
}
}
}