1use async_trait::async_trait;
2use serde_json::json;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6use super::{Tool, ToolCtx, ToolResult};
7use crate::config::Config;
8use crate::engine::{Engine, Identity, Task};
9use crate::event::{Block, Event, RiskLevel};
10use crate::memory::Memory;
11use crate::permissions::PermissionMode;
12use crate::router::Router;
13
14pub struct SubagentSpawn {
22 router: Arc<dyn Router>,
23 config: Config,
24 memory: Option<Arc<dyn Memory>>,
25}
26
27impl SubagentSpawn {
28 pub fn new(router: Arc<dyn Router>, config: Config) -> Self {
29 Self {
30 router,
31 config,
32 memory: None,
33 }
34 }
35
36 pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
37 self.memory = Some(memory);
38 self
39 }
40}
41
42#[async_trait]
43impl Tool for SubagentSpawn {
44 fn name(&self) -> &str {
45 "subagent_spawn"
46 }
47 fn description(&self) -> &str {
48 "Spawn a child agent to handle a subtask independently"
49 }
50 fn schema(&self) -> serde_json::Value {
51 json!({
52 "type": "object",
53 "properties": {
54 "task": { "type": "string", "description": "Subtask description" },
55 "role": { "type": "string", "description": "Role for the subagent (e.g. tester, researcher, reviewer)" },
56 "model": { "type": "string", "description": "Optional: provider:model or provider/model for the subagent" },
57 "permission_mode": { "type": "string", "description": "Optional: read-only, plan, supervised, trusted, autonomous, emergency-stop" },
58 "tools": { "type": "array", "items": { "type": "string" }, "description": "Optional explicit allowed tool patterns" },
59 "disallowed_tools": { "type": "array", "items": { "type": "string" }, "description": "Optional denied tool patterns for this subagent" }
60 },
61 "required": ["task"]
62 })
63 }
64 fn risk(&self) -> RiskLevel {
65 RiskLevel::Exec
66 }
67 async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
68 let task_desc = args["task"].as_str().unwrap_or("");
69 let role = args["role"].as_str().unwrap_or("helper");
70 let mut child_config = self.config.clone();
71 if let Some(model_ref) = args["model"].as_str() {
72 if let Some((provider, model)) = parse_model_ref(model_ref) {
73 child_config.forced_model = Some((provider.clone(), model.clone()));
74 for tier in ["trivial", "small", "medium", "hard", "vision"] {
75 child_config
76 .routing
77 .policy
78 .insert(tier.to_string(), provider.clone());
79 }
80 child_config
81 .providers
82 .entry(provider)
83 .or_insert_with(|| crate::config::ProviderConfig {
84 adapter: "openai-compatible".into(),
85 base_url: None,
86 models: vec![],
87 api_key_env: None,
88 })
89 .models = vec![model];
90 }
91 }
92 if let Some(mode) = args["permission_mode"]
93 .as_str()
94 .and_then(PermissionMode::parse)
95 {
96 child_config.defaults.autonomy = mode.autonomy_level();
97 child_config.permissions.mode = mode;
98 }
99 for tool in string_array(&args["tools"]) {
100 if !child_config.permissions.tools.allow.contains(&tool) {
101 child_config.permissions.tools.allow.push(tool);
102 }
103 }
104 for tool in string_array(&args["disallowed_tools"]) {
105 if !child_config.permissions.tools.deny.contains(&tool) {
106 child_config.permissions.tools.deny.push(tool);
107 }
108 }
109
110 let (tx, mut rx) = mpsc::unbounded_channel();
111
112 let task = Task {
113 description: task_desc.to_string(),
114 context: vec![],
115 };
116
117 let mut child = Engine::new(self.router.clone(), child_config).with_identity(Identity {
119 name: role.to_string(),
120 role: role.to_string(),
121 personality: format!("Focused {} subagent. Be concise and return evidence.", role),
122 });
123 if let Some(mem) = &self.memory {
124 child = child.with_memory(mem.clone());
125 }
126 let engine = Arc::new(child);
127
128 let handle = tokio::spawn(async move {
129 match engine.drive(task, tx).await {
130 Ok(outcome) => outcome,
131 Err(e) => crate::event::OutcomeSummary {
132 status: format!("error: {}", e),
133 diffs: vec![],
134 cost_usd: 0.0,
135 tokens: crate::event::TokenUsage {
136 input: 0,
137 output: 0,
138 },
139 cost_comparison: String::new(),
140 duration_ms: None,
141 },
142 }
143 });
144
145 let mut output = String::new();
147 while let Some(event) = rx.recv().await {
148 match &event {
149 Event::ThinkingDelta { text, .. } => {
150 output.push_str(text);
151 }
152 Event::AgentStatus { note, .. } => {
153 output.push_str(&format!("\n[{}]", note));
154 }
155 Event::RunFinished { outcome, .. } => {
156 output.push_str(&format!(
157 "\n[Subagent done: {} | ${:.4}]",
158 outcome.status, outcome.cost_usd
159 ));
160 }
161 Event::Error { message, .. } => {
162 output.push_str(&format!("\n[Error: {}]", message));
163 }
164 _ => {}
165 }
166 }
167
168 let outcome = handle
169 .await
170 .unwrap_or_else(|e| crate::event::OutcomeSummary {
171 status: format!("subagent panicked: {}", e),
172 diffs: vec![],
173 cost_usd: 0.0,
174 tokens: crate::event::TokenUsage {
175 input: 0,
176 output: 0,
177 },
178 cost_comparison: String::new(),
179 duration_ms: None,
180 });
181
182 Ok(ToolResult::ok(vec![Block::Text(format!(
183 "Subagent '{}' completed.\nStatus: {}\nOutput:\n{}",
184 role, outcome.status, output
185 ))]))
186 }
187}
188
189fn string_array(value: &serde_json::Value) -> Vec<String> {
190 value
191 .as_array()
192 .map(|items| {
193 items
194 .iter()
195 .filter_map(|item| item.as_str())
196 .map(str::trim)
197 .filter(|item| !item.is_empty())
198 .map(str::to_string)
199 .collect()
200 })
201 .unwrap_or_default()
202}
203
204fn parse_model_ref(model_ref: &str) -> Option<(String, String)> {
205 let model_ref = model_ref.trim();
206 if model_ref.is_empty() {
207 return None;
208 }
209 if let Some((provider, model)) = model_ref.split_once(':') {
210 let provider = provider.trim();
211 let model = model.trim();
212 if !provider.is_empty() && !model.is_empty() {
213 return Some((provider.to_string(), model.to_string()));
214 }
215 }
216 if let Some((provider, rest)) = model_ref.split_once('/') {
217 let provider = provider.trim();
218 if !provider.is_empty() {
219 return Some((provider.to_string(), model_ref.to_string()));
220 }
221 if !rest.trim().is_empty() {
222 return Some(("custom".into(), model_ref.to_string()));
223 }
224 }
225 Some(("custom".into(), model_ref.to_string()))
226}
227
228use std::io::{BufRead, BufReader, Write};
235use std::process::{Child, ChildStdin, ChildStdout};
236use std::sync::Mutex;
237
238const KERNEL_SENTINEL: &str = "__SPARROW_KERNEL_END__";
239
240const KERNEL_DRIVER: &str = r#"
241import sys, io, json, contextlib, traceback
242_g = {"__name__": "__sparrow__"}
243SENT = "__SPARROW_KERNEL_END__"
244for line in sys.stdin:
245 line = line.strip()
246 if not line:
247 continue
248 try:
249 req = json.loads(line)
250 except Exception:
251 print(json.dumps({"out": "", "err": "bad request"}), flush=True)
252 print(SENT, flush=True)
253 continue
254 code = req.get("code", "")
255 buf = io.StringIO()
256 err = ""
257 try:
258 with contextlib.redirect_stdout(buf):
259 exec(compile(code, "<sparrow>", "exec"), _g)
260 except Exception:
261 err = traceback.format_exc()
262 print(json.dumps({"out": buf.getvalue(), "err": err}), flush=True)
263 print(SENT, flush=True)
264"#;
265
266struct Kernel {
267 child: Child,
268 stdin: ChildStdin,
269 stdout: BufReader<ChildStdout>,
270}
271
272pub struct PythonRpc {
273 kernel: Mutex<Option<Kernel>>,
274 python_bin: String,
275}
276
277impl PythonRpc {
278 pub fn new() -> Self {
279 let python_bin = if which_python("python3") {
281 "python3".to_string()
282 } else {
283 "python".to_string()
284 };
285 Self {
286 kernel: Mutex::new(None),
287 python_bin,
288 }
289 }
290
291 fn ensure_kernel(&self, kernel: &mut Option<Kernel>) -> anyhow::Result<()> {
292 if kernel.is_some() {
293 return Ok(());
294 }
295 use std::process::{Command, Stdio};
296 let mut child = Command::new(&self.python_bin)
297 .arg("-u")
298 .arg("-c")
299 .arg(KERNEL_DRIVER)
300 .stdin(Stdio::piped())
301 .stdout(Stdio::piped())
302 .stderr(Stdio::null())
303 .spawn()?;
304 let stdin = child
305 .stdin
306 .take()
307 .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
308 let stdout = BufReader::new(
309 child
310 .stdout
311 .take()
312 .ok_or_else(|| anyhow::anyhow!("no stdout"))?,
313 );
314 *kernel = Some(Kernel {
315 child,
316 stdin,
317 stdout,
318 });
319 Ok(())
320 }
321}
322
323fn which_python(bin: &str) -> bool {
324 std::process::Command::new(bin)
325 .arg("--version")
326 .stdout(std::process::Stdio::null())
327 .stderr(std::process::Stdio::null())
328 .status()
329 .map(|s| s.success())
330 .unwrap_or(false)
331}
332
333#[async_trait]
334impl Tool for PythonRpc {
335 fn name(&self) -> &str {
336 "python_rpc"
337 }
338 fn description(&self) -> &str {
339 "Execute Python in a PERSISTENT kernel — variables, imports and state persist across calls."
340 }
341 fn schema(&self) -> serde_json::Value {
342 json!({
343 "type": "object",
344 "properties": {
345 "code": { "type": "string", "description": "Python code to execute in the persistent kernel" }
346 },
347 "required": ["code"]
348 })
349 }
350 fn risk(&self) -> RiskLevel {
351 RiskLevel::Exec
352 }
353 async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
354 let code = args["code"].as_str().unwrap_or("").to_string();
355
356 let mut guard = self.kernel.lock().unwrap();
359 if let Err(e) = self.ensure_kernel(&mut guard) {
360 return Ok(ToolResult::error(format!(
361 "Python kernel unavailable ({}). Is '{}' installed?",
362 e, self.python_bin
363 )));
364 }
365 let kernel = guard.as_mut().unwrap();
366
367 let req = serde_json::json!({ "code": code }).to_string();
369 if writeln!(kernel.stdin, "{}", req)
370 .and_then(|_| kernel.stdin.flush())
371 .is_err()
372 {
373 *guard = None; return Ok(ToolResult::error(
375 "Python kernel write failed (kernel reset)",
376 ));
377 }
378
379 let mut last_json = String::new();
381 loop {
382 let mut line = String::new();
383 match kernel.stdout.read_line(&mut line) {
384 Ok(0) => {
385 *guard = None;
386 return Ok(ToolResult::error("Python kernel closed unexpectedly"));
387 }
388 Ok(_) => {
389 let trimmed = line.trim_end();
390 if trimmed == KERNEL_SENTINEL {
391 break;
392 }
393 last_json = trimmed.to_string();
394 }
395 Err(e) => {
396 *guard = None;
397 return Ok(ToolResult::error(format!(
398 "Python kernel read error: {}",
399 e
400 )));
401 }
402 }
403 }
404
405 let parsed: serde_json::Value = serde_json::from_str(&last_json)
406 .unwrap_or_else(|_| serde_json::json!({"out": last_json, "err": ""}));
407 let out = parsed["out"].as_str().unwrap_or("");
408 let err = parsed["err"].as_str().unwrap_or("");
409 if !err.is_empty() {
410 Ok(ToolResult::ok(vec![Block::Text(format!("{}{}", out, err))]))
411 } else {
412 Ok(ToolResult::text(out.to_string()))
413 }
414 }
415}
416
417impl Drop for PythonRpc {
418 fn drop(&mut self) {
419 if let Ok(mut g) = self.kernel.lock() {
420 if let Some(mut k) = g.take() {
421 let _ = k.child.kill();
422 }
423 }
424 }
425}