1use async_trait::async_trait;
2use serde_json::json;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6use super::{Tool, ToolCtx, ToolResult};
7use crate::config::Config;
8use crate::engine::{Engine, Identity, Task};
9use crate::event::{Block, Event, RiskLevel};
10use crate::memory::Memory;
11use crate::permissions::PermissionMode;
12use crate::router::Router;
13
14pub struct SubagentSpawn {
22 router: Arc<dyn Router>,
23 config: Config,
24 memory: Option<Arc<dyn Memory>>,
25}
26
27impl SubagentSpawn {
28 pub fn new(router: Arc<dyn Router>, config: Config) -> Self {
29 Self {
30 router,
31 config,
32 memory: None,
33 }
34 }
35
36 pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
37 self.memory = Some(memory);
38 self
39 }
40}
41
42#[async_trait]
43impl Tool for SubagentSpawn {
44 fn name(&self) -> &str {
45 "subagent_spawn"
46 }
47 fn description(&self) -> &str {
48 "Spawn a child agent to handle a subtask independently"
49 }
50 fn schema(&self) -> serde_json::Value {
51 json!({
52 "type": "object",
53 "properties": {
54 "task": { "type": "string", "description": "Subtask description" },
55 "role": { "type": "string", "description": "Role for the subagent (e.g. tester, researcher, reviewer)" },
56 "model": { "type": "string", "description": "Optional: provider:model or provider/model for the subagent" },
57 "permission_mode": { "type": "string", "description": "Optional: read-only, plan, supervised, trusted, autonomous, emergency-stop" },
58 "tools": { "type": "array", "items": { "type": "string" }, "description": "Optional explicit allowed tool patterns" },
59 "disallowed_tools": { "type": "array", "items": { "type": "string" }, "description": "Optional denied tool patterns for this subagent" }
60 },
61 "required": ["task"]
62 })
63 }
64 fn risk(&self) -> RiskLevel {
65 RiskLevel::Exec
66 }
67 async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
68 let task_desc = args["task"].as_str().unwrap_or("");
69 let role = args["role"].as_str().unwrap_or("helper");
70 let mut child_config = self.config.clone();
71 if let Some(model_ref) = args["model"].as_str() {
72 if let Some((provider, model)) = parse_model_ref(model_ref) {
73 child_config.forced_model = Some((provider.clone(), model.clone()));
74 for tier in ["trivial", "small", "medium", "hard", "vision"] {
75 child_config
76 .routing
77 .policy
78 .insert(tier.to_string(), provider.clone());
79 }
80 child_config
81 .providers
82 .entry(provider)
83 .or_insert_with(|| crate::config::ProviderConfig {
84 adapter: "openai-compatible".into(),
85 base_url: None,
86 models: vec![],
87 api_key_env: None,
88 })
89 .models = vec![model];
90 }
91 }
92 if let Some(mode) = args["permission_mode"]
93 .as_str()
94 .and_then(PermissionMode::parse)
95 {
96 child_config.defaults.autonomy = mode.autonomy_level();
97 child_config.permissions.mode = mode;
98 }
99 for tool in string_array(&args["tools"]) {
100 if !child_config.permissions.tools.allow.contains(&tool) {
101 child_config.permissions.tools.allow.push(tool);
102 }
103 }
104 for tool in string_array(&args["disallowed_tools"]) {
105 if !child_config.permissions.tools.deny.contains(&tool) {
106 child_config.permissions.tools.deny.push(tool);
107 }
108 }
109
110 let (tx, mut rx) = mpsc::unbounded_channel();
111
112 let task = Task {
113 description: task_desc.to_string(),
114 context: vec![],
115 };
116
117 let mut child = Engine::new(self.router.clone(), child_config).with_identity(Identity {
119 name: role.to_string(),
120 role: role.to_string(),
121 personality: format!("Focused {} subagent. Be concise and return evidence.", role),
122 });
123 if let Some(mem) = &self.memory {
124 child = child.with_memory(mem.clone());
125 }
126 let engine = Arc::new(child);
127
128 let handle = tokio::spawn(async move {
129 match engine.drive(task, tx).await {
130 Ok(outcome) => outcome,
131 Err(e) => crate::event::OutcomeSummary {
132 status: format!("error: {}", e),
133 diffs: vec![],
134 cost_usd: 0.0,
135 tokens: crate::event::TokenUsage {
136 input: 0,
137 output: 0,
138 },
139 cost_comparison: String::new(),
140 },
141 }
142 });
143
144 let mut output = String::new();
146 while let Some(event) = rx.recv().await {
147 match &event {
148 Event::ThinkingDelta { text, .. } => {
149 output.push_str(text);
150 }
151 Event::AgentStatus { note, .. } => {
152 output.push_str(&format!("\n[{}]", note));
153 }
154 Event::RunFinished { outcome, .. } => {
155 output.push_str(&format!(
156 "\n[Subagent done: {} | ${:.4}]",
157 outcome.status, outcome.cost_usd
158 ));
159 }
160 Event::Error { message, .. } => {
161 output.push_str(&format!("\n[Error: {}]", message));
162 }
163 _ => {}
164 }
165 }
166
167 let outcome = handle
168 .await
169 .unwrap_or_else(|e| crate::event::OutcomeSummary {
170 status: format!("subagent panicked: {}", e),
171 diffs: vec![],
172 cost_usd: 0.0,
173 tokens: crate::event::TokenUsage {
174 input: 0,
175 output: 0,
176 },
177 cost_comparison: String::new(),
178 });
179
180 Ok(ToolResult::ok(vec![Block::Text(format!(
181 "Subagent '{}' completed.\nStatus: {}\nOutput:\n{}",
182 role, outcome.status, output
183 ))]))
184 }
185}
186
187fn string_array(value: &serde_json::Value) -> Vec<String> {
188 value
189 .as_array()
190 .map(|items| {
191 items
192 .iter()
193 .filter_map(|item| item.as_str())
194 .map(str::trim)
195 .filter(|item| !item.is_empty())
196 .map(str::to_string)
197 .collect()
198 })
199 .unwrap_or_default()
200}
201
202fn parse_model_ref(model_ref: &str) -> Option<(String, String)> {
203 let model_ref = model_ref.trim();
204 if model_ref.is_empty() {
205 return None;
206 }
207 if let Some((provider, model)) = model_ref.split_once(':') {
208 let provider = provider.trim();
209 let model = model.trim();
210 if !provider.is_empty() && !model.is_empty() {
211 return Some((provider.to_string(), model.to_string()));
212 }
213 }
214 if let Some((provider, rest)) = model_ref.split_once('/') {
215 let provider = provider.trim();
216 if !provider.is_empty() {
217 return Some((provider.to_string(), model_ref.to_string()));
218 }
219 if !rest.trim().is_empty() {
220 return Some(("custom".into(), model_ref.to_string()));
221 }
222 }
223 Some(("custom".into(), model_ref.to_string()))
224}
225
226use std::io::{BufRead, BufReader, Write};
233use std::process::{Child, ChildStdin, ChildStdout};
234use std::sync::Mutex;
235
236const KERNEL_SENTINEL: &str = "__SPARROW_KERNEL_END__";
237
238const KERNEL_DRIVER: &str = r#"
239import sys, io, json, contextlib, traceback
240_g = {"__name__": "__sparrow__"}
241SENT = "__SPARROW_KERNEL_END__"
242for line in sys.stdin:
243 line = line.strip()
244 if not line:
245 continue
246 try:
247 req = json.loads(line)
248 except Exception:
249 print(json.dumps({"out": "", "err": "bad request"}), flush=True)
250 print(SENT, flush=True)
251 continue
252 code = req.get("code", "")
253 buf = io.StringIO()
254 err = ""
255 try:
256 with contextlib.redirect_stdout(buf):
257 exec(compile(code, "<sparrow>", "exec"), _g)
258 except Exception:
259 err = traceback.format_exc()
260 print(json.dumps({"out": buf.getvalue(), "err": err}), flush=True)
261 print(SENT, flush=True)
262"#;
263
264struct Kernel {
265 child: Child,
266 stdin: ChildStdin,
267 stdout: BufReader<ChildStdout>,
268}
269
270pub struct PythonRpc {
271 kernel: Mutex<Option<Kernel>>,
272 python_bin: String,
273}
274
275impl PythonRpc {
276 pub fn new() -> Self {
277 let python_bin = if which_python("python3") {
279 "python3".to_string()
280 } else {
281 "python".to_string()
282 };
283 Self {
284 kernel: Mutex::new(None),
285 python_bin,
286 }
287 }
288
289 fn ensure_kernel(&self, kernel: &mut Option<Kernel>) -> anyhow::Result<()> {
290 if kernel.is_some() {
291 return Ok(());
292 }
293 use std::process::{Command, Stdio};
294 let mut child = Command::new(&self.python_bin)
295 .arg("-u")
296 .arg("-c")
297 .arg(KERNEL_DRIVER)
298 .stdin(Stdio::piped())
299 .stdout(Stdio::piped())
300 .stderr(Stdio::null())
301 .spawn()?;
302 let stdin = child
303 .stdin
304 .take()
305 .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
306 let stdout = BufReader::new(
307 child
308 .stdout
309 .take()
310 .ok_or_else(|| anyhow::anyhow!("no stdout"))?,
311 );
312 *kernel = Some(Kernel {
313 child,
314 stdin,
315 stdout,
316 });
317 Ok(())
318 }
319}
320
321fn which_python(bin: &str) -> bool {
322 std::process::Command::new(bin)
323 .arg("--version")
324 .stdout(std::process::Stdio::null())
325 .stderr(std::process::Stdio::null())
326 .status()
327 .map(|s| s.success())
328 .unwrap_or(false)
329}
330
331#[async_trait]
332impl Tool for PythonRpc {
333 fn name(&self) -> &str {
334 "python_rpc"
335 }
336 fn description(&self) -> &str {
337 "Execute Python in a PERSISTENT kernel — variables, imports and state persist across calls."
338 }
339 fn schema(&self) -> serde_json::Value {
340 json!({
341 "type": "object",
342 "properties": {
343 "code": { "type": "string", "description": "Python code to execute in the persistent kernel" }
344 },
345 "required": ["code"]
346 })
347 }
348 fn risk(&self) -> RiskLevel {
349 RiskLevel::Exec
350 }
351 async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
352 let code = args["code"].as_str().unwrap_or("").to_string();
353
354 let mut guard = self.kernel.lock().unwrap();
357 if let Err(e) = self.ensure_kernel(&mut guard) {
358 return Ok(ToolResult::error(format!(
359 "Python kernel unavailable ({}). Is '{}' installed?",
360 e, self.python_bin
361 )));
362 }
363 let kernel = guard.as_mut().unwrap();
364
365 let req = serde_json::json!({ "code": code }).to_string();
367 if writeln!(kernel.stdin, "{}", req)
368 .and_then(|_| kernel.stdin.flush())
369 .is_err()
370 {
371 *guard = None; return Ok(ToolResult::error(
373 "Python kernel write failed (kernel reset)",
374 ));
375 }
376
377 let mut last_json = String::new();
379 loop {
380 let mut line = String::new();
381 match kernel.stdout.read_line(&mut line) {
382 Ok(0) => {
383 *guard = None;
384 return Ok(ToolResult::error("Python kernel closed unexpectedly"));
385 }
386 Ok(_) => {
387 let trimmed = line.trim_end();
388 if trimmed == KERNEL_SENTINEL {
389 break;
390 }
391 last_json = trimmed.to_string();
392 }
393 Err(e) => {
394 *guard = None;
395 return Ok(ToolResult::error(format!(
396 "Python kernel read error: {}",
397 e
398 )));
399 }
400 }
401 }
402
403 let parsed: serde_json::Value = serde_json::from_str(&last_json)
404 .unwrap_or_else(|_| serde_json::json!({"out": last_json, "err": ""}));
405 let out = parsed["out"].as_str().unwrap_or("");
406 let err = parsed["err"].as_str().unwrap_or("");
407 if !err.is_empty() {
408 Ok(ToolResult::ok(vec![Block::Text(format!("{}{}", out, err))]))
409 } else {
410 Ok(ToolResult::text(out.to_string()))
411 }
412 }
413}
414
415impl Drop for PythonRpc {
416 fn drop(&mut self) {
417 if let Ok(mut g) = self.kernel.lock() {
418 if let Some(mut k) = g.take() {
419 let _ = k.child.kill();
420 }
421 }
422 }
423}