car-engine 0.13.0

Core runtime engine for Common Agent Runtime
Documentation
//! Subprocess tool executor — runs tools as external processes via stdin/stdout JSON-RPC.
//!
//! Each tool maps to a command (binary or script). The runtime sends a JSON-RPC request
//! on stdin and reads the JSON-RPC response from stdout. This enables language-agnostic
//! tool authoring: any program that reads JSON from stdin and writes JSON to stdout works.

use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;

/// JSON-RPC request sent to subprocess stdin.
#[derive(Debug, Serialize)]
struct JsonRpcRequest {
    jsonrpc: &'static str,
    method: String,
    params: Value,
    id: u64,
}

/// JSON-RPC response read from subprocess stdout.
#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
    #[allow(dead_code)]
    jsonrpc: Option<String>,
    result: Option<Value>,
    error: Option<JsonRpcError>,
    #[allow(dead_code)]
    id: Option<u64>,
}

#[derive(Debug, Deserialize)]
struct JsonRpcError {
    #[allow(dead_code)]
    code: Option<i64>,
    message: String,
}

/// Registration for a subprocess tool — maps a tool name to a command.
#[derive(Debug, Clone)]
pub struct SubprocessTool {
    /// The command to execute (e.g., "python3", "/usr/local/bin/my-tool").
    pub command: String,
    /// Arguments passed before the JSON-RPC input (e.g., ["tool.py"]).
    pub args: Vec<String>,
    /// Optional working directory.
    pub cwd: Option<String>,
    /// Environment variables to set.
    pub env: HashMap<String, String>,
    /// Timeout for the subprocess (default 30s).
    pub timeout: Duration,
}

impl SubprocessTool {
    pub fn new(command: &str) -> Self {
        Self {
            command: command.to_string(),
            args: Vec::new(),
            cwd: None,
            env: HashMap::new(),
            timeout: Duration::from_secs(30),
        }
    }

    pub fn with_args(mut self, args: Vec<String>) -> Self {
        self.args = args;
        self
    }

    pub fn with_cwd(mut self, cwd: &str) -> Self {
        self.cwd = Some(cwd.to_string());
        self
    }

    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.timeout = timeout;
        self
    }

    pub fn with_env(mut self, key: &str, value: &str) -> Self {
        self.env.insert(key.to_string(), value.to_string());
        self
    }
}

/// Tool executor that runs tools as subprocesses via stdin/stdout JSON-RPC.
pub struct SubprocessToolExecutor {
    tools: HashMap<String, SubprocessTool>,
    /// Optional fallback executor for tools not registered as subprocesses.
    fallback: Option<std::sync::Arc<dyn super::ToolExecutor>>,
    next_id: std::sync::atomic::AtomicU64,
}

impl SubprocessToolExecutor {
    pub fn new() -> Self {
        Self {
            tools: HashMap::new(),
            fallback: None,
            next_id: std::sync::atomic::AtomicU64::new(1),
        }
    }

    /// Register a subprocess tool.
    pub fn register(&mut self, name: &str, tool: SubprocessTool) {
        self.tools.insert(name.to_string(), tool);
    }

    /// Set a fallback executor for tools not registered as subprocesses.
    pub fn with_fallback(mut self, fallback: std::sync::Arc<dyn super::ToolExecutor>) -> Self {
        self.fallback = Some(fallback);
        self
    }

    async fn execute_subprocess(
        &self,
        tool_name: &str,
        tool: &SubprocessTool,
        params: &Value,
    ) -> Result<Value, String> {
        let request = JsonRpcRequest {
            jsonrpc: "2.0",
            method: tool_name.to_string(),
            params: params.clone(),
            id: self
                .next_id
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
        };

        let request_json = serde_json::to_string(&request)
            .map_err(|e| format!("failed to serialize request: {}", e))?;

        let mut cmd = Command::new(&tool.command);
        cmd.args(&tool.args)
            .stdin(std::process::Stdio::piped())
            .stdout(std::process::Stdio::piped())
            .stderr(std::process::Stdio::piped());

        if let Some(ref cwd) = tool.cwd {
            cmd.current_dir(cwd);
        }
        for (k, v) in &tool.env {
            cmd.env(k, v);
        }

        let mut child = cmd
            .spawn()
            .map_err(|e| format!("failed to spawn subprocess '{}': {}", tool.command, e))?;

        // Write request to stdin
        if let Some(mut stdin) = child.stdin.take() {
            stdin
                .write_all(request_json.as_bytes())
                .await
                .map_err(|e| format!("failed to write to subprocess stdin: {}", e))?;
            stdin
                .write_all(b"\n")
                .await
                .map_err(|e| format!("failed to write newline to stdin: {}", e))?;
            // Drop stdin to signal EOF
        }

        // Read response with timeout; kill child on timeout to prevent zombies
        let output = match tokio::time::timeout(tool.timeout, child.wait_with_output()).await {
            Ok(Ok(output)) => {
                let stdout = String::from_utf8_lossy(&output.stdout).to_string();
                if !output.status.success() && stdout.trim().is_empty() {
                    let stderr = String::from_utf8_lossy(&output.stderr);
                    return Err(format!(
                        "subprocess exited with status {}: {}",
                        output.status,
                        stderr.trim()
                    ));
                }
                stdout
            }
            Ok(Err(e)) => {
                return Err(format!("failed to read subprocess output: {}", e));
            }
            Err(_) => {
                // Timeout — process is already dropped which sends SIGKILL on Unix
                return Err(format!(
                    "subprocess '{}' timed out after {:?}",
                    tool.command, tool.timeout
                ));
            }
        };

        // Parse JSON-RPC response
        let response: JsonRpcResponse = serde_json::from_str(&output).map_err(|e| {
            format!(
                "invalid JSON-RPC response from '{}': {} (raw: {})",
                tool.command,
                e,
                output.trim()
            )
        })?;

        if let Some(error) = response.error {
            return Err(format!("subprocess tool error: {}", error.message));
        }

        response
            .result
            .ok_or_else(|| "subprocess returned no result".to_string())
    }
}

impl Default for SubprocessToolExecutor {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait::async_trait]
impl super::ToolExecutor for SubprocessToolExecutor {
    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
        self.execute_with_action(tool, params, "").await
    }

    async fn execute_with_action(
        &self,
        tool: &str,
        params: &Value,
        action_id: &str,
    ) -> Result<Value, String> {
        if let Some(subprocess_tool) = self.tools.get(tool) {
            self.execute_subprocess(tool, subprocess_tool, params).await
        } else if let Some(ref fallback) = self.fallback {
            fallback.execute_with_action(tool, params, action_id).await
        } else {
            Err(format!(
                "unknown subprocess tool: '{}' (no fallback configured)",
                tool
            ))
        }
    }
}