1use async_trait::async_trait;
2use serde_json::json;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6use super::{Tool, ToolCtx, ToolResult};
7use crate::config::Config;
8use crate::engine::{Engine, Identity, Task};
9use crate::event::{Block, Event, RiskLevel};
10use crate::memory::Memory;
11use crate::permissions::PermissionMode;
12use crate::router::Router;
13
14pub struct SubagentSpawn {
22 router: Arc<dyn Router>,
23 config: Config,
24 memory: Option<Arc<dyn Memory>>,
25}
26
27impl SubagentSpawn {
28 pub fn new(router: Arc<dyn Router>, config: Config) -> Self {
29 Self {
30 router,
31 config,
32 memory: None,
33 }
34 }
35
36 pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
37 self.memory = Some(memory);
38 self
39 }
40}
41
42#[async_trait]
43impl Tool for SubagentSpawn {
44 fn name(&self) -> &str {
45 "subagent_spawn"
46 }
47 fn description(&self) -> &str {
48 "Spawn a child agent to handle a subtask independently"
49 }
50 fn schema(&self) -> serde_json::Value {
51 json!({
52 "type": "object",
53 "properties": {
54 "task": { "type": "string", "description": "Subtask description" },
55 "role": { "type": "string", "description": "Role for the subagent (e.g. tester, researcher, reviewer)" },
56 "model": { "type": "string", "description": "Optional: provider:model or provider/model for the subagent" },
57 "permission_mode": { "type": "string", "description": "Optional: read-only, plan, supervised, trusted, autonomous, emergency-stop" },
58 "tools": { "type": "array", "items": { "type": "string" }, "description": "Optional explicit allowed tool patterns" },
59 "disallowed_tools": { "type": "array", "items": { "type": "string" }, "description": "Optional denied tool patterns for this subagent" }
60 },
61 "required": ["task"]
62 })
63 }
64 fn risk(&self) -> RiskLevel {
65 RiskLevel::Exec
66 }
67 async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
68 let task_desc = args["task"].as_str().unwrap_or("");
69 let role = args["role"].as_str().unwrap_or("helper");
70 let mut child_config = self.config.clone();
71 if let Some(model_ref) = args["model"].as_str() {
72 if let Some((provider, model)) = parse_model_ref(model_ref) {
73 child_config.forced_model = Some((provider.clone(), model.clone()));
74 for tier in ["trivial", "small", "medium", "hard", "vision"] {
75 child_config
76 .routing
77 .policy
78 .insert(tier.to_string(), provider.clone());
79 }
80 child_config
81 .providers
82 .entry(provider)
83 .or_insert_with(|| crate::config::ProviderConfig {
84 adapter: "openai-compatible".into(),
85 base_url: None,
86 models: vec![],
87 api_key_env: None,
88 })
89 .models = vec![model];
90 }
91 }
92 if let Some(mode) = args["permission_mode"]
93 .as_str()
94 .and_then(PermissionMode::parse)
95 {
96 child_config.defaults.autonomy = mode.autonomy_level();
97 child_config.permissions.mode = mode;
98 }
99 for tool in string_array(&args["tools"]) {
100 if !child_config.permissions.tools.allow.contains(&tool) {
101 child_config.permissions.tools.allow.push(tool);
102 }
103 }
104 for tool in string_array(&args["disallowed_tools"]) {
105 if !child_config.permissions.tools.deny.contains(&tool) {
106 child_config.permissions.tools.deny.push(tool);
107 }
108 }
109
110 let (tx, mut rx) = mpsc::unbounded_channel();
111
112 let task = Task {
113 description: task_desc.to_string(),
114 context: vec![],
115 };
116
117 let mut child = Engine::new(self.router.clone(), child_config).with_identity(Identity {
119 name: role.to_string(),
120 role: role.to_string(),
121 personality: format!("Focused {} subagent. Be concise and return evidence.", role),
122 });
123 if let Some(mem) = &self.memory {
124 child = child.with_memory(mem.clone());
125 }
126 let engine = Arc::new(child);
127
128 let handle = tokio::spawn(async move {
129 match engine.drive(task, tx).await {
130 Ok(outcome) => outcome,
131 Err(e) => crate::event::OutcomeSummary {
132 status: format!("error: {}", e),
133 diffs: vec![],
134 cost_usd: 0.0,
135 tokens: crate::event::TokenUsage {
136 input: 0,
137 output: 0,
138 },
139 },
140 }
141 });
142
143 let mut output = String::new();
145 while let Some(event) = rx.recv().await {
146 match &event {
147 Event::ThinkingDelta { text, .. } => {
148 output.push_str(text);
149 }
150 Event::AgentStatus { note, .. } => {
151 output.push_str(&format!("\n[{}]", note));
152 }
153 Event::RunFinished { outcome, .. } => {
154 output.push_str(&format!(
155 "\n[Subagent done: {} | ${:.4}]",
156 outcome.status, outcome.cost_usd
157 ));
158 }
159 Event::Error { message, .. } => {
160 output.push_str(&format!("\n[Error: {}]", message));
161 }
162 _ => {}
163 }
164 }
165
166 let outcome = handle
167 .await
168 .unwrap_or_else(|e| crate::event::OutcomeSummary {
169 status: format!("subagent panicked: {}", e),
170 diffs: vec![],
171 cost_usd: 0.0,
172 tokens: crate::event::TokenUsage {
173 input: 0,
174 output: 0,
175 },
176 });
177
178 Ok(ToolResult::ok(vec![Block::Text(format!(
179 "Subagent '{}' completed.\nStatus: {}\nOutput:\n{}",
180 role, outcome.status, output
181 ))]))
182 }
183}
184
185fn string_array(value: &serde_json::Value) -> Vec<String> {
186 value
187 .as_array()
188 .map(|items| {
189 items
190 .iter()
191 .filter_map(|item| item.as_str())
192 .map(str::trim)
193 .filter(|item| !item.is_empty())
194 .map(str::to_string)
195 .collect()
196 })
197 .unwrap_or_default()
198}
199
200fn parse_model_ref(model_ref: &str) -> Option<(String, String)> {
201 let model_ref = model_ref.trim();
202 if model_ref.is_empty() {
203 return None;
204 }
205 if let Some((provider, model)) = model_ref.split_once(':') {
206 let provider = provider.trim();
207 let model = model.trim();
208 if !provider.is_empty() && !model.is_empty() {
209 return Some((provider.to_string(), model.to_string()));
210 }
211 }
212 if let Some((provider, rest)) = model_ref.split_once('/') {
213 let provider = provider.trim();
214 if !provider.is_empty() {
215 return Some((provider.to_string(), model_ref.to_string()));
216 }
217 if !rest.trim().is_empty() {
218 return Some(("custom".into(), model_ref.to_string()));
219 }
220 }
221 Some(("custom".into(), model_ref.to_string()))
222}
223
224use std::io::{BufRead, BufReader, Write};
231use std::process::{Child, ChildStdin, ChildStdout};
232use std::sync::Mutex;
233
234const KERNEL_SENTINEL: &str = "__SPARROW_KERNEL_END__";
235
236const KERNEL_DRIVER: &str = r#"
237import sys, io, json, contextlib, traceback
238_g = {"__name__": "__sparrow__"}
239SENT = "__SPARROW_KERNEL_END__"
240for line in sys.stdin:
241 line = line.strip()
242 if not line:
243 continue
244 try:
245 req = json.loads(line)
246 except Exception:
247 print(json.dumps({"out": "", "err": "bad request"}), flush=True)
248 print(SENT, flush=True)
249 continue
250 code = req.get("code", "")
251 buf = io.StringIO()
252 err = ""
253 try:
254 with contextlib.redirect_stdout(buf):
255 exec(compile(code, "<sparrow>", "exec"), _g)
256 except Exception:
257 err = traceback.format_exc()
258 print(json.dumps({"out": buf.getvalue(), "err": err}), flush=True)
259 print(SENT, flush=True)
260"#;
261
262struct Kernel {
263 child: Child,
264 stdin: ChildStdin,
265 stdout: BufReader<ChildStdout>,
266}
267
268pub struct PythonRpc {
269 kernel: Mutex<Option<Kernel>>,
270 python_bin: String,
271}
272
273impl PythonRpc {
274 pub fn new() -> Self {
275 let python_bin = if which_python("python3") {
277 "python3".to_string()
278 } else {
279 "python".to_string()
280 };
281 Self {
282 kernel: Mutex::new(None),
283 python_bin,
284 }
285 }
286
287 fn ensure_kernel(&self, kernel: &mut Option<Kernel>) -> anyhow::Result<()> {
288 if kernel.is_some() {
289 return Ok(());
290 }
291 use std::process::{Command, Stdio};
292 let mut child = Command::new(&self.python_bin)
293 .arg("-u")
294 .arg("-c")
295 .arg(KERNEL_DRIVER)
296 .stdin(Stdio::piped())
297 .stdout(Stdio::piped())
298 .stderr(Stdio::null())
299 .spawn()?;
300 let stdin = child
301 .stdin
302 .take()
303 .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
304 let stdout = BufReader::new(
305 child
306 .stdout
307 .take()
308 .ok_or_else(|| anyhow::anyhow!("no stdout"))?,
309 );
310 *kernel = Some(Kernel {
311 child,
312 stdin,
313 stdout,
314 });
315 Ok(())
316 }
317}
318
319fn which_python(bin: &str) -> bool {
320 std::process::Command::new(bin)
321 .arg("--version")
322 .stdout(std::process::Stdio::null())
323 .stderr(std::process::Stdio::null())
324 .status()
325 .map(|s| s.success())
326 .unwrap_or(false)
327}
328
329#[async_trait]
330impl Tool for PythonRpc {
331 fn name(&self) -> &str {
332 "python_rpc"
333 }
334 fn description(&self) -> &str {
335 "Execute Python in a PERSISTENT kernel — variables, imports and state persist across calls."
336 }
337 fn schema(&self) -> serde_json::Value {
338 json!({
339 "type": "object",
340 "properties": {
341 "code": { "type": "string", "description": "Python code to execute in the persistent kernel" }
342 },
343 "required": ["code"]
344 })
345 }
346 fn risk(&self) -> RiskLevel {
347 RiskLevel::Exec
348 }
349 async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
350 let code = args["code"].as_str().unwrap_or("").to_string();
351
352 let mut guard = self.kernel.lock().unwrap();
355 if let Err(e) = self.ensure_kernel(&mut guard) {
356 return Ok(ToolResult::error(format!(
357 "Python kernel unavailable ({}). Is '{}' installed?",
358 e, self.python_bin
359 )));
360 }
361 let kernel = guard.as_mut().unwrap();
362
363 let req = serde_json::json!({ "code": code }).to_string();
365 if writeln!(kernel.stdin, "{}", req)
366 .and_then(|_| kernel.stdin.flush())
367 .is_err()
368 {
369 *guard = None; return Ok(ToolResult::error(
371 "Python kernel write failed (kernel reset)",
372 ));
373 }
374
375 let mut last_json = String::new();
377 loop {
378 let mut line = String::new();
379 match kernel.stdout.read_line(&mut line) {
380 Ok(0) => {
381 *guard = None;
382 return Ok(ToolResult::error("Python kernel closed unexpectedly"));
383 }
384 Ok(_) => {
385 let trimmed = line.trim_end();
386 if trimmed == KERNEL_SENTINEL {
387 break;
388 }
389 last_json = trimmed.to_string();
390 }
391 Err(e) => {
392 *guard = None;
393 return Ok(ToolResult::error(format!(
394 "Python kernel read error: {}",
395 e
396 )));
397 }
398 }
399 }
400
401 let parsed: serde_json::Value = serde_json::from_str(&last_json)
402 .unwrap_or_else(|_| serde_json::json!({"out": last_json, "err": ""}));
403 let out = parsed["out"].as_str().unwrap_or("");
404 let err = parsed["err"].as_str().unwrap_or("");
405 if !err.is_empty() {
406 Ok(ToolResult::ok(vec![Block::Text(format!("{}{}", out, err))]))
407 } else {
408 Ok(ToolResult::text(out.to_string()))
409 }
410 }
411}
412
413impl Drop for PythonRpc {
414 fn drop(&mut self) {
415 if let Ok(mut g) = self.kernel.lock() {
416 if let Some(mut k) = g.take() {
417 let _ = k.child.kill();
418 }
419 }
420 }
421}