Skip to main content

sparrow/tools/
subagent.rs

1use async_trait::async_trait;
2use serde_json::json;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6use super::{Tool, ToolCtx, ToolResult};
7use crate::config::Config;
8use crate::engine::{Engine, Identity, Task};
9use crate::event::{Block, Event, RiskLevel};
10use crate::memory::Memory;
11use crate::permissions::PermissionMode;
12use crate::router::Router;
13
14// ─── Subagent spawn ─────────────────────────────────────────────────────────────
15
16/// Delegates a subtask to a child AgentRun with its own conversation and sandbox.
17/// §15: "Each subagent gets its own conversation, terminal, and a Python RPC channel."
18///
19/// Holds the router + config (not a parent Engine) so it can build a fresh child
20/// engine per call — this avoids a self-referential Arc<Engine> at registration.
21pub struct SubagentSpawn {
22    router: Arc<dyn Router>,
23    config: Config,
24    memory: Option<Arc<dyn Memory>>,
25}
26
27impl SubagentSpawn {
28    pub fn new(router: Arc<dyn Router>, config: Config) -> Self {
29        Self {
30            router,
31            config,
32            memory: None,
33        }
34    }
35
36    pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
37        self.memory = Some(memory);
38        self
39    }
40}
41
42#[async_trait]
43impl Tool for SubagentSpawn {
44    fn name(&self) -> &str {
45        "subagent_spawn"
46    }
47    fn description(&self) -> &str {
48        "Spawn a child agent to handle a subtask independently"
49    }
50    fn schema(&self) -> serde_json::Value {
51        json!({
52            "type": "object",
53            "properties": {
54                "task": { "type": "string", "description": "Subtask description" },
55                "role": { "type": "string", "description": "Role for the subagent (e.g. tester, researcher, reviewer)" },
56                "model": { "type": "string", "description": "Optional: provider:model or provider/model for the subagent" },
57                "permission_mode": { "type": "string", "description": "Optional: read-only, plan, supervised, trusted, autonomous, emergency-stop" },
58                "tools": { "type": "array", "items": { "type": "string" }, "description": "Optional explicit allowed tool patterns" },
59                "disallowed_tools": { "type": "array", "items": { "type": "string" }, "description": "Optional denied tool patterns for this subagent" }
60            },
61            "required": ["task"]
62        })
63    }
64    fn risk(&self) -> RiskLevel {
65        RiskLevel::Exec
66    }
67    async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
68        let task_desc = args["task"].as_str().unwrap_or("");
69        let role = args["role"].as_str().unwrap_or("helper");
70        let mut child_config = self.config.clone();
71        if let Some(model_ref) = args["model"].as_str() {
72            if let Some((provider, model)) = parse_model_ref(model_ref) {
73                child_config.forced_model = Some((provider.clone(), model.clone()));
74                for tier in ["trivial", "small", "medium", "hard", "vision"] {
75                    child_config
76                        .routing
77                        .policy
78                        .insert(tier.to_string(), provider.clone());
79                }
80                child_config
81                    .providers
82                    .entry(provider)
83                    .or_insert_with(|| crate::config::ProviderConfig {
84                        adapter: "openai-compatible".into(),
85                        base_url: None,
86                        models: vec![],
87                        api_key_env: None,
88                    })
89                    .models = vec![model];
90            }
91        }
92        if let Some(mode) = args["permission_mode"]
93            .as_str()
94            .and_then(PermissionMode::parse)
95        {
96            child_config.defaults.autonomy = mode.autonomy_level();
97            child_config.permissions.mode = mode;
98        }
99        for tool in string_array(&args["tools"]) {
100            if !child_config.permissions.tools.allow.contains(&tool) {
101                child_config.permissions.tools.allow.push(tool);
102            }
103        }
104        for tool in string_array(&args["disallowed_tools"]) {
105            if !child_config.permissions.tools.deny.contains(&tool) {
106                child_config.permissions.tools.deny.push(tool);
107            }
108        }
109
110        let (tx, mut rx) = mpsc::unbounded_channel();
111
112        let task = Task {
113            description: task_desc.to_string(),
114            context: vec![],
115        };
116
117        // Build a fresh child engine for this subagent.
118        let mut child = Engine::new(self.router.clone(), child_config).with_identity(Identity {
119            name: role.to_string(),
120            role: role.to_string(),
121            personality: format!("Focused {} subagent. Be concise and return evidence.", role),
122        });
123        if let Some(mem) = &self.memory {
124            child = child.with_memory(mem.clone());
125        }
126        let engine = Arc::new(child);
127
128        let handle = tokio::spawn(async move {
129            match engine.drive(task, tx).await {
130                Ok(outcome) => outcome,
131                Err(e) => crate::event::OutcomeSummary {
132                    status: format!("error: {}", e),
133                    diffs: vec![],
134                    cost_usd: 0.0,
135                    tokens: crate::event::TokenUsage {
136                        input: 0,
137                        output: 0,
138                    },
139                    cost_comparison: String::new(),
140                    duration_ms: None,
141                },
142            }
143        });
144
145        // Collect subagent output
146        let mut output = String::new();
147        while let Some(event) = rx.recv().await {
148            match &event {
149                Event::ThinkingDelta { text, .. } => {
150                    output.push_str(text);
151                }
152                Event::AgentStatus { note, .. } => {
153                    output.push_str(&format!("\n[{}]", note));
154                }
155                Event::RunFinished { outcome, .. } => {
156                    output.push_str(&format!(
157                        "\n[Subagent done: {} | ${:.4}]",
158                        outcome.status, outcome.cost_usd
159                    ));
160                }
161                Event::Error { message, .. } => {
162                    output.push_str(&format!("\n[Error: {}]", message));
163                }
164                _ => {}
165            }
166        }
167
168        let outcome = handle
169            .await
170            .unwrap_or_else(|e| crate::event::OutcomeSummary {
171                status: format!("subagent panicked: {}", e),
172                diffs: vec![],
173                cost_usd: 0.0,
174                tokens: crate::event::TokenUsage {
175                    input: 0,
176                    output: 0,
177                },
178                cost_comparison: String::new(),
179                duration_ms: None,
180            });
181
182        Ok(ToolResult::ok(vec![Block::Text(format!(
183            "Subagent '{}' completed.\nStatus: {}\nOutput:\n{}",
184            role, outcome.status, output
185        ))]))
186    }
187}
188
189fn string_array(value: &serde_json::Value) -> Vec<String> {
190    value
191        .as_array()
192        .map(|items| {
193            items
194                .iter()
195                .filter_map(|item| item.as_str())
196                .map(str::trim)
197                .filter(|item| !item.is_empty())
198                .map(str::to_string)
199                .collect()
200        })
201        .unwrap_or_default()
202}
203
204fn parse_model_ref(model_ref: &str) -> Option<(String, String)> {
205    let model_ref = model_ref.trim();
206    if model_ref.is_empty() {
207        return None;
208    }
209    if let Some((provider, model)) = model_ref.split_once(':') {
210        let provider = provider.trim();
211        let model = model.trim();
212        if !provider.is_empty() && !model.is_empty() {
213            return Some((provider.to_string(), model.to_string()));
214        }
215    }
216    if let Some((provider, rest)) = model_ref.split_once('/') {
217        let provider = provider.trim();
218        if !provider.is_empty() {
219            return Some((provider.to_string(), model_ref.to_string()));
220        }
221        if !rest.trim().is_empty() {
222            return Some(("custom".into(), model_ref.to_string()));
223        }
224    }
225    Some(("custom".into(), model_ref.to_string()))
226}
227
228// ─── Persistent Python kernel ─────────────────────────────────────────────────
229// A long-lived `python3` process that keeps a single globals dict across calls,
230// so variables/imports/state persist between tool invocations (§15). A small
231// driver loop reads one JSON request per line, execs it capturing stdout, and
232// emits a JSON result terminated by a unique sentinel.
233
234use std::io::{BufRead, BufReader, Write};
235use std::process::{Child, ChildStdin, ChildStdout};
236use std::sync::Mutex;
237
238const KERNEL_SENTINEL: &str = "__SPARROW_KERNEL_END__";
239
240const KERNEL_DRIVER: &str = r#"
241import sys, io, json, contextlib, traceback
242_g = {"__name__": "__sparrow__"}
243SENT = "__SPARROW_KERNEL_END__"
244for line in sys.stdin:
245    line = line.strip()
246    if not line:
247        continue
248    try:
249        req = json.loads(line)
250    except Exception:
251        print(json.dumps({"out": "", "err": "bad request"}), flush=True)
252        print(SENT, flush=True)
253        continue
254    code = req.get("code", "")
255    buf = io.StringIO()
256    err = ""
257    try:
258        with contextlib.redirect_stdout(buf):
259            exec(compile(code, "<sparrow>", "exec"), _g)
260    except Exception:
261        err = traceback.format_exc()
262    print(json.dumps({"out": buf.getvalue(), "err": err}), flush=True)
263    print(SENT, flush=True)
264"#;
265
266struct Kernel {
267    child: Child,
268    stdin: ChildStdin,
269    stdout: BufReader<ChildStdout>,
270}
271
272pub struct PythonRpc {
273    kernel: Mutex<Option<Kernel>>,
274    python_bin: String,
275}
276
277impl PythonRpc {
278    pub fn new() -> Self {
279        // Prefer python3, fall back to python (Windows often only has `python`).
280        let python_bin = if which_python("python3") {
281            "python3".to_string()
282        } else {
283            "python".to_string()
284        };
285        Self {
286            kernel: Mutex::new(None),
287            python_bin,
288        }
289    }
290
291    fn ensure_kernel(&self, kernel: &mut Option<Kernel>) -> anyhow::Result<()> {
292        if kernel.is_some() {
293            return Ok(());
294        }
295        use std::process::{Command, Stdio};
296        let mut child = Command::new(&self.python_bin)
297            .arg("-u")
298            .arg("-c")
299            .arg(KERNEL_DRIVER)
300            .stdin(Stdio::piped())
301            .stdout(Stdio::piped())
302            .stderr(Stdio::null())
303            .spawn()?;
304        let stdin = child
305            .stdin
306            .take()
307            .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
308        let stdout = BufReader::new(
309            child
310                .stdout
311                .take()
312                .ok_or_else(|| anyhow::anyhow!("no stdout"))?,
313        );
314        *kernel = Some(Kernel {
315            child,
316            stdin,
317            stdout,
318        });
319        Ok(())
320    }
321}
322
323fn which_python(bin: &str) -> bool {
324    std::process::Command::new(bin)
325        .arg("--version")
326        .stdout(std::process::Stdio::null())
327        .stderr(std::process::Stdio::null())
328        .status()
329        .map(|s| s.success())
330        .unwrap_or(false)
331}
332
333#[async_trait]
334impl Tool for PythonRpc {
335    fn name(&self) -> &str {
336        "python_rpc"
337    }
338    fn description(&self) -> &str {
339        "Execute Python in a PERSISTENT kernel — variables, imports and state persist across calls."
340    }
341    fn schema(&self) -> serde_json::Value {
342        json!({
343            "type": "object",
344            "properties": {
345                "code": { "type": "string", "description": "Python code to execute in the persistent kernel" }
346            },
347            "required": ["code"]
348        })
349    }
350    fn risk(&self) -> RiskLevel {
351        RiskLevel::Exec
352    }
353    async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
354        let code = args["code"].as_str().unwrap_or("").to_string();
355
356        // Kernel IO is blocking; run on a blocking thread without holding the
357        // lock across an await. We take the kernel out, use it, put it back.
358        let mut guard = self.kernel.lock().unwrap();
359        if let Err(e) = self.ensure_kernel(&mut guard) {
360            return Ok(ToolResult::error(format!(
361                "Python kernel unavailable ({}). Is '{}' installed?",
362                e, self.python_bin
363            )));
364        }
365        let kernel = guard.as_mut().unwrap();
366
367        // Send request as a single JSON line.
368        let req = serde_json::json!({ "code": code }).to_string();
369        if writeln!(kernel.stdin, "{}", req)
370            .and_then(|_| kernel.stdin.flush())
371            .is_err()
372        {
373            *guard = None; // kernel died; drop it so it respawns next time
374            return Ok(ToolResult::error(
375                "Python kernel write failed (kernel reset)",
376            ));
377        }
378
379        // Read lines until the sentinel; the line before it is the JSON result.
380        let mut last_json = String::new();
381        loop {
382            let mut line = String::new();
383            match kernel.stdout.read_line(&mut line) {
384                Ok(0) => {
385                    *guard = None;
386                    return Ok(ToolResult::error("Python kernel closed unexpectedly"));
387                }
388                Ok(_) => {
389                    let trimmed = line.trim_end();
390                    if trimmed == KERNEL_SENTINEL {
391                        break;
392                    }
393                    last_json = trimmed.to_string();
394                }
395                Err(e) => {
396                    *guard = None;
397                    return Ok(ToolResult::error(format!(
398                        "Python kernel read error: {}",
399                        e
400                    )));
401                }
402            }
403        }
404
405        let parsed: serde_json::Value = serde_json::from_str(&last_json)
406            .unwrap_or_else(|_| serde_json::json!({"out": last_json, "err": ""}));
407        let out = parsed["out"].as_str().unwrap_or("");
408        let err = parsed["err"].as_str().unwrap_or("");
409        if !err.is_empty() {
410            Ok(ToolResult::ok(vec![Block::Text(format!("{}{}", out, err))]))
411        } else {
412            Ok(ToolResult::text(out.to_string()))
413        }
414    }
415}
416
417impl Drop for PythonRpc {
418    fn drop(&mut self) {
419        if let Ok(mut g) = self.kernel.lock() {
420            if let Some(mut k) = g.take() {
421                let _ = k.child.kill();
422            }
423        }
424    }
425}