1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
//! Subprocess tool executor — runs tools as external processes via stdin/stdout JSON-RPC.
//!
//! Each tool maps to a command (binary or script). The runtime sends a JSON-RPC request
//! on stdin and reads the JSON-RPC response from stdout. This enables language-agnostic
//! tool authoring: any program that reads JSON from stdin and writes JSON to stdout works.
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
/// JSON-RPC request sent to subprocess stdin.
#[derive(Debug, Serialize)]
struct JsonRpcRequest {
jsonrpc: &'static str,
method: String,
params: Value,
id: u64,
}
/// JSON-RPC response read from subprocess stdout.
#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
#[allow(dead_code)]
jsonrpc: Option<String>,
result: Option<Value>,
error: Option<JsonRpcError>,
#[allow(dead_code)]
id: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct JsonRpcError {
#[allow(dead_code)]
code: Option<i64>,
message: String,
}
/// Registration for a subprocess tool — maps a tool name to a command.
#[derive(Debug, Clone)]
pub struct SubprocessTool {
/// The command to execute (e.g., "python3", "/usr/local/bin/my-tool").
pub command: String,
/// Arguments passed before the JSON-RPC input (e.g., ["tool.py"]).
pub args: Vec<String>,
/// Optional working directory.
pub cwd: Option<String>,
/// Environment variables to set.
pub env: HashMap<String, String>,
/// Timeout for the subprocess (default 30s).
pub timeout: Duration,
}
impl SubprocessTool {
pub fn new(command: &str) -> Self {
Self {
command: command.to_string(),
args: Vec::new(),
cwd: None,
env: HashMap::new(),
timeout: Duration::from_secs(30),
}
}
pub fn with_args(mut self, args: Vec<String>) -> Self {
self.args = args;
self
}
pub fn with_cwd(mut self, cwd: &str) -> Self {
self.cwd = Some(cwd.to_string());
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_env(mut self, key: &str, value: &str) -> Self {
self.env.insert(key.to_string(), value.to_string());
self
}
}
/// Tool executor that runs tools as subprocesses via stdin/stdout JSON-RPC.
pub struct SubprocessToolExecutor {
tools: HashMap<String, SubprocessTool>,
/// Optional fallback executor for tools not registered as subprocesses.
fallback: Option<std::sync::Arc<dyn super::ToolExecutor>>,
next_id: std::sync::atomic::AtomicU64,
}
impl SubprocessToolExecutor {
pub fn new() -> Self {
Self {
tools: HashMap::new(),
fallback: None,
next_id: std::sync::atomic::AtomicU64::new(1),
}
}
/// Register a subprocess tool.
pub fn register(&mut self, name: &str, tool: SubprocessTool) {
self.tools.insert(name.to_string(), tool);
}
/// Set a fallback executor for tools not registered as subprocesses.
pub fn with_fallback(mut self, fallback: std::sync::Arc<dyn super::ToolExecutor>) -> Self {
self.fallback = Some(fallback);
self
}
async fn execute_subprocess(
&self,
tool_name: &str,
tool: &SubprocessTool,
params: &Value,
timeout_ms: Option<u64>,
) -> Result<Value, String> {
// Per-action budget wins over the tool's internal `timeout` when it is
// larger (Parslee-ai/car#266 item 3): a subprocess tool declaring
// `timeoutMs = 180000` but keeping the 30s default `SubprocessTool.timeout`
// must NOT be killed at 30s — the action budget is the authority. We
// take `max(tool.timeout, budget)` so neither bound shadows the other:
// a tool author can still set a *longer* internal timeout than the
// action declares, and an action can lift a short tool default.
let effective_timeout = match timeout_ms {
Some(ms) => tool.timeout.max(Duration::from_millis(ms)),
None => tool.timeout,
};
let request = JsonRpcRequest {
jsonrpc: "2.0",
method: tool_name.to_string(),
params: params.clone(),
id: self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
};
let request_json = serde_json::to_string(&request)
.map_err(|e| format!("failed to serialize request: {}", e))?;
let mut cmd = Command::new(&tool.command);
cmd.args(&tool.args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(ref cwd) = tool.cwd {
cmd.current_dir(cwd);
}
for (k, v) in &tool.env {
cmd.env(k, v);
}
let mut child = cmd
.spawn()
.map_err(|e| format!("failed to spawn subprocess '{}': {}", tool.command, e))?;
// Write request to stdin
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(request_json.as_bytes())
.await
.map_err(|e| format!("failed to write to subprocess stdin: {}", e))?;
stdin
.write_all(b"\n")
.await
.map_err(|e| format!("failed to write newline to stdin: {}", e))?;
// Drop stdin to signal EOF
}
// Read response with timeout; kill child on timeout to prevent zombies
let output = match tokio::time::timeout(effective_timeout, child.wait_with_output()).await {
Ok(Ok(output)) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
if !output.status.success() && stdout.trim().is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"subprocess exited with status {}: {}",
output.status,
stderr.trim()
));
}
stdout
}
Ok(Err(e)) => {
return Err(format!("failed to read subprocess output: {}", e));
}
Err(_) => {
// Timeout — process is already dropped which sends SIGKILL on Unix
return Err(format!(
"subprocess '{}' timed out after {:?}",
tool.command, effective_timeout
));
}
};
// Parse JSON-RPC response
let response: JsonRpcResponse = serde_json::from_str(&output).map_err(|e| {
format!(
"invalid JSON-RPC response from '{}': {} (raw: {})",
tool.command,
e,
output.trim()
)
})?;
if let Some(error) = response.error {
return Err(format!("subprocess tool error: {}", error.message));
}
response
.result
.ok_or_else(|| "subprocess returned no result".to_string())
}
}
impl Default for SubprocessToolExecutor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl super::ToolExecutor for SubprocessToolExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
self.execute_with_action(tool, params, "", None).await
}
async fn execute_with_action(
&self,
tool: &str,
params: &Value,
action_id: &str,
timeout_ms: Option<u64>,
) -> Result<Value, String> {
if let Some(subprocess_tool) = self.tools.get(tool) {
// Thread the per-action budget into the local path so it is not
// shadowed by the tool's internal `timeout` (#266 item 3). The
// fallback path below already received it.
self.execute_subprocess(tool, subprocess_tool, params, timeout_ms)
.await
} else if let Some(ref fallback) = self.fallback {
fallback
.execute_with_action(tool, params, action_id, timeout_ms)
.await
} else {
Err(format!(
"unknown subprocess tool: '{}' (no fallback configured)",
tool
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ToolExecutor;
/// #266 item 3: a subprocess tool with a short internal `timeout` (30s
/// default) but a longer per-action budget must run to the *budget*, not be
/// shadowed at the tool default. We use a tool whose internal timeout is
/// 100ms and an action budget of 5s against a 1s sleep: with the bug the
/// call dies at 100ms; with the fix `max(100ms, 5s)` lets the 1s sleep
/// complete.
#[cfg(unix)]
#[tokio::test]
async fn action_budget_lifts_short_subprocess_timeout() {
let mut exec = SubprocessToolExecutor::new();
// A tool that ignores stdin, sleeps 1s, then emits a valid JSON-RPC
// response. `id` is echoed loosely; the executor only reads `result`.
let tool = SubprocessTool::new("sh")
.with_args(vec![
"-c".to_string(),
"sleep 1; printf '{\"jsonrpc\":\"2.0\",\"result\":{\"ok\":true},\"id\":1}'"
.to_string(),
])
.with_timeout(Duration::from_millis(100));
exec.register("slow", tool);
// Without a budget the 100ms internal timeout reaps the 1s sleep.
let reaped = exec.execute("slow", &Value::Null).await;
assert!(
reaped.is_err() && reaped.as_ref().unwrap_err().contains("timed out"),
"expected the short internal timeout to reap: {reaped:?}"
);
// With a 5s action budget the call completes — the budget is the
// authority, not the 100ms tool default.
let ok = exec
.execute_with_action("slow", &Value::Null, "a0", Some(5_000))
.await;
assert!(ok.is_ok(), "action budget must lift the short timeout: {ok:?}");
assert_eq!(ok.unwrap(), serde_json::json!({ "ok": true }));
}
}