Skip to main content

car_engine/
subprocess.rs

1//! Subprocess tool executor — runs tools as external processes via stdin/stdout JSON-RPC.
2//!
3//! Each tool maps to a command (binary or script). The runtime sends a JSON-RPC request
4//! on stdin and reads the JSON-RPC response from stdout. This enables language-agnostic
5//! tool authoring: any program that reads JSON from stdin and writes JSON to stdout works.
6
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::io::AsyncWriteExt;
12use tokio::process::Command;
13
14/// JSON-RPC request sent to subprocess stdin.
15#[derive(Debug, Serialize)]
16struct JsonRpcRequest {
17    jsonrpc: &'static str,
18    method: String,
19    params: Value,
20    id: u64,
21}
22
23/// JSON-RPC response read from subprocess stdout.
24#[derive(Debug, Deserialize)]
25struct JsonRpcResponse {
26    #[allow(dead_code)]
27    jsonrpc: Option<String>,
28    result: Option<Value>,
29    error: Option<JsonRpcError>,
30    #[allow(dead_code)]
31    id: Option<u64>,
32}
33
34#[derive(Debug, Deserialize)]
35struct JsonRpcError {
36    #[allow(dead_code)]
37    code: Option<i64>,
38    message: String,
39}
40
41/// Registration for a subprocess tool — maps a tool name to a command.
42#[derive(Debug, Clone)]
43pub struct SubprocessTool {
44    /// The command to execute (e.g., "python3", "/usr/local/bin/my-tool").
45    pub command: String,
46    /// Arguments passed before the JSON-RPC input (e.g., ["tool.py"]).
47    pub args: Vec<String>,
48    /// Optional working directory.
49    pub cwd: Option<String>,
50    /// Environment variables to set.
51    pub env: HashMap<String, String>,
52    /// Timeout for the subprocess (default 30s).
53    pub timeout: Duration,
54}
55
56impl SubprocessTool {
57    pub fn new(command: &str) -> Self {
58        Self {
59            command: command.to_string(),
60            args: Vec::new(),
61            cwd: None,
62            env: HashMap::new(),
63            timeout: Duration::from_secs(30),
64        }
65    }
66
67    pub fn with_args(mut self, args: Vec<String>) -> Self {
68        self.args = args;
69        self
70    }
71
72    pub fn with_cwd(mut self, cwd: &str) -> Self {
73        self.cwd = Some(cwd.to_string());
74        self
75    }
76
77    pub fn with_timeout(mut self, timeout: Duration) -> Self {
78        self.timeout = timeout;
79        self
80    }
81
82    pub fn with_env(mut self, key: &str, value: &str) -> Self {
83        self.env.insert(key.to_string(), value.to_string());
84        self
85    }
86}
87
88/// Tool executor that runs tools as subprocesses via stdin/stdout JSON-RPC.
89pub struct SubprocessToolExecutor {
90    tools: HashMap<String, SubprocessTool>,
91    /// Optional fallback executor for tools not registered as subprocesses.
92    fallback: Option<std::sync::Arc<dyn super::ToolExecutor>>,
93    next_id: std::sync::atomic::AtomicU64,
94}
95
96impl SubprocessToolExecutor {
97    pub fn new() -> Self {
98        Self {
99            tools: HashMap::new(),
100            fallback: None,
101            next_id: std::sync::atomic::AtomicU64::new(1),
102        }
103    }
104
105    /// Register a subprocess tool.
106    pub fn register(&mut self, name: &str, tool: SubprocessTool) {
107        self.tools.insert(name.to_string(), tool);
108    }
109
110    /// Set a fallback executor for tools not registered as subprocesses.
111    pub fn with_fallback(mut self, fallback: std::sync::Arc<dyn super::ToolExecutor>) -> Self {
112        self.fallback = Some(fallback);
113        self
114    }
115
116    async fn execute_subprocess(
117        &self,
118        tool_name: &str,
119        tool: &SubprocessTool,
120        params: &Value,
121    ) -> Result<Value, String> {
122        let request = JsonRpcRequest {
123            jsonrpc: "2.0",
124            method: tool_name.to_string(),
125            params: params.clone(),
126            id: self
127                .next_id
128                .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
129        };
130
131        let request_json = serde_json::to_string(&request)
132            .map_err(|e| format!("failed to serialize request: {}", e))?;
133
134        let mut cmd = Command::new(&tool.command);
135        cmd.args(&tool.args)
136            .stdin(std::process::Stdio::piped())
137            .stdout(std::process::Stdio::piped())
138            .stderr(std::process::Stdio::piped());
139
140        if let Some(ref cwd) = tool.cwd {
141            cmd.current_dir(cwd);
142        }
143        for (k, v) in &tool.env {
144            cmd.env(k, v);
145        }
146
147        let mut child = cmd
148            .spawn()
149            .map_err(|e| format!("failed to spawn subprocess '{}': {}", tool.command, e))?;
150
151        // Write request to stdin
152        if let Some(mut stdin) = child.stdin.take() {
153            stdin
154                .write_all(request_json.as_bytes())
155                .await
156                .map_err(|e| format!("failed to write to subprocess stdin: {}", e))?;
157            stdin
158                .write_all(b"\n")
159                .await
160                .map_err(|e| format!("failed to write newline to stdin: {}", e))?;
161            // Drop stdin to signal EOF
162        }
163
164        // Read response with timeout; kill child on timeout to prevent zombies
165        let output = match tokio::time::timeout(tool.timeout, child.wait_with_output()).await {
166            Ok(Ok(output)) => {
167                let stdout = String::from_utf8_lossy(&output.stdout).to_string();
168                if !output.status.success() && stdout.trim().is_empty() {
169                    let stderr = String::from_utf8_lossy(&output.stderr);
170                    return Err(format!(
171                        "subprocess exited with status {}: {}",
172                        output.status,
173                        stderr.trim()
174                    ));
175                }
176                stdout
177            }
178            Ok(Err(e)) => {
179                return Err(format!("failed to read subprocess output: {}", e));
180            }
181            Err(_) => {
182                // Timeout — process is already dropped which sends SIGKILL on Unix
183                return Err(format!(
184                    "subprocess '{}' timed out after {:?}",
185                    tool.command, tool.timeout
186                ));
187            }
188        };
189
190        // Parse JSON-RPC response
191        let response: JsonRpcResponse = serde_json::from_str(&output).map_err(|e| {
192            format!(
193                "invalid JSON-RPC response from '{}': {} (raw: {})",
194                tool.command,
195                e,
196                output.trim()
197            )
198        })?;
199
200        if let Some(error) = response.error {
201            return Err(format!("subprocess tool error: {}", error.message));
202        }
203
204        response
205            .result
206            .ok_or_else(|| "subprocess returned no result".to_string())
207    }
208}
209
210impl Default for SubprocessToolExecutor {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216#[async_trait::async_trait]
217impl super::ToolExecutor for SubprocessToolExecutor {
218    async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
219        if let Some(subprocess_tool) = self.tools.get(tool) {
220            self.execute_subprocess(tool, subprocess_tool, params).await
221        } else if let Some(ref fallback) = self.fallback {
222            fallback.execute(tool, params).await
223        } else {
224            Err(format!(
225                "unknown subprocess tool: '{}' (no fallback configured)",
226                tool
227            ))
228        }
229    }
230}