1use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::io::AsyncWriteExt;
12use tokio::process::Command;
13
14#[derive(Debug, Serialize)]
16struct JsonRpcRequest {
17 jsonrpc: &'static str,
18 method: String,
19 params: Value,
20 id: u64,
21}
22
23#[derive(Debug, Deserialize)]
25struct JsonRpcResponse {
26 #[allow(dead_code)]
27 jsonrpc: Option<String>,
28 result: Option<Value>,
29 error: Option<JsonRpcError>,
30 #[allow(dead_code)]
31 id: Option<u64>,
32}
33
34#[derive(Debug, Deserialize)]
35struct JsonRpcError {
36 #[allow(dead_code)]
37 code: Option<i64>,
38 message: String,
39}
40
41#[derive(Debug, Clone)]
43pub struct SubprocessTool {
44 pub command: String,
46 pub args: Vec<String>,
48 pub cwd: Option<String>,
50 pub env: HashMap<String, String>,
52 pub timeout: Duration,
54}
55
56impl SubprocessTool {
57 pub fn new(command: &str) -> Self {
58 Self {
59 command: command.to_string(),
60 args: Vec::new(),
61 cwd: None,
62 env: HashMap::new(),
63 timeout: Duration::from_secs(30),
64 }
65 }
66
67 pub fn with_args(mut self, args: Vec<String>) -> Self {
68 self.args = args;
69 self
70 }
71
72 pub fn with_cwd(mut self, cwd: &str) -> Self {
73 self.cwd = Some(cwd.to_string());
74 self
75 }
76
77 pub fn with_timeout(mut self, timeout: Duration) -> Self {
78 self.timeout = timeout;
79 self
80 }
81
82 pub fn with_env(mut self, key: &str, value: &str) -> Self {
83 self.env.insert(key.to_string(), value.to_string());
84 self
85 }
86}
87
88pub struct SubprocessToolExecutor {
90 tools: HashMap<String, SubprocessTool>,
91 fallback: Option<std::sync::Arc<dyn super::ToolExecutor>>,
93 next_id: std::sync::atomic::AtomicU64,
94}
95
96impl SubprocessToolExecutor {
97 pub fn new() -> Self {
98 Self {
99 tools: HashMap::new(),
100 fallback: None,
101 next_id: std::sync::atomic::AtomicU64::new(1),
102 }
103 }
104
105 pub fn register(&mut self, name: &str, tool: SubprocessTool) {
107 self.tools.insert(name.to_string(), tool);
108 }
109
110 pub fn with_fallback(mut self, fallback: std::sync::Arc<dyn super::ToolExecutor>) -> Self {
112 self.fallback = Some(fallback);
113 self
114 }
115
116 async fn execute_subprocess(
117 &self,
118 tool_name: &str,
119 tool: &SubprocessTool,
120 params: &Value,
121 ) -> Result<Value, String> {
122 let request = JsonRpcRequest {
123 jsonrpc: "2.0",
124 method: tool_name.to_string(),
125 params: params.clone(),
126 id: self
127 .next_id
128 .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
129 };
130
131 let request_json = serde_json::to_string(&request)
132 .map_err(|e| format!("failed to serialize request: {}", e))?;
133
134 let mut cmd = Command::new(&tool.command);
135 cmd.args(&tool.args)
136 .stdin(std::process::Stdio::piped())
137 .stdout(std::process::Stdio::piped())
138 .stderr(std::process::Stdio::piped());
139
140 if let Some(ref cwd) = tool.cwd {
141 cmd.current_dir(cwd);
142 }
143 for (k, v) in &tool.env {
144 cmd.env(k, v);
145 }
146
147 let mut child = cmd
148 .spawn()
149 .map_err(|e| format!("failed to spawn subprocess '{}': {}", tool.command, e))?;
150
151 if let Some(mut stdin) = child.stdin.take() {
153 stdin
154 .write_all(request_json.as_bytes())
155 .await
156 .map_err(|e| format!("failed to write to subprocess stdin: {}", e))?;
157 stdin
158 .write_all(b"\n")
159 .await
160 .map_err(|e| format!("failed to write newline to stdin: {}", e))?;
161 }
163
164 let output = match tokio::time::timeout(tool.timeout, child.wait_with_output()).await {
166 Ok(Ok(output)) => {
167 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
168 if !output.status.success() && stdout.trim().is_empty() {
169 let stderr = String::from_utf8_lossy(&output.stderr);
170 return Err(format!(
171 "subprocess exited with status {}: {}",
172 output.status,
173 stderr.trim()
174 ));
175 }
176 stdout
177 }
178 Ok(Err(e)) => {
179 return Err(format!("failed to read subprocess output: {}", e));
180 }
181 Err(_) => {
182 return Err(format!(
184 "subprocess '{}' timed out after {:?}",
185 tool.command, tool.timeout
186 ));
187 }
188 };
189
190 let response: JsonRpcResponse = serde_json::from_str(&output).map_err(|e| {
192 format!(
193 "invalid JSON-RPC response from '{}': {} (raw: {})",
194 tool.command,
195 e,
196 output.trim()
197 )
198 })?;
199
200 if let Some(error) = response.error {
201 return Err(format!("subprocess tool error: {}", error.message));
202 }
203
204 response
205 .result
206 .ok_or_else(|| "subprocess returned no result".to_string())
207 }
208}
209
210impl Default for SubprocessToolExecutor {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216#[async_trait::async_trait]
217impl super::ToolExecutor for SubprocessToolExecutor {
218 async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
219 if let Some(subprocess_tool) = self.tools.get(tool) {
220 self.execute_subprocess(tool, subprocess_tool, params).await
221 } else if let Some(ref fallback) = self.fallback {
222 fallback.execute(tool, params).await
223 } else {
224 Err(format!(
225 "unknown subprocess tool: '{}' (no fallback configured)",
226 tool
227 ))
228 }
229 }
230}