Skip to main content

sparrow/tools/
subagent.rs

1use async_trait::async_trait;
2use serde_json::json;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6use super::{Tool, ToolCtx, ToolResult};
7use crate::config::Config;
8use crate::engine::{Engine, Identity, Task};
9use crate::event::{Block, Event, RiskLevel};
10use crate::memory::Memory;
11use crate::permissions::PermissionMode;
12use crate::router::Router;
13
14// ─── Subagent spawn ─────────────────────────────────────────────────────────────
15
16/// Delegates a subtask to a child AgentRun with its own conversation and sandbox.
17/// §15: "Each subagent gets its own conversation, terminal, and a Python RPC channel."
18///
19/// Holds the router + config (not a parent Engine) so it can build a fresh child
20/// engine per call — this avoids a self-referential Arc<Engine> at registration.
21pub struct SubagentSpawn {
22    router: Arc<dyn Router>,
23    config: Config,
24    memory: Option<Arc<dyn Memory>>,
25}
26
27impl SubagentSpawn {
28    pub fn new(router: Arc<dyn Router>, config: Config) -> Self {
29        Self {
30            router,
31            config,
32            memory: None,
33        }
34    }
35
36    pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
37        self.memory = Some(memory);
38        self
39    }
40}
41
42#[async_trait]
43impl Tool for SubagentSpawn {
44    fn name(&self) -> &str {
45        "subagent_spawn"
46    }
47    fn description(&self) -> &str {
48        "Spawn a child agent to handle a subtask independently"
49    }
50    fn schema(&self) -> serde_json::Value {
51        json!({
52            "type": "object",
53            "properties": {
54                "task": { "type": "string", "description": "Subtask description" },
55                "role": { "type": "string", "description": "Role for the subagent (e.g. tester, researcher, reviewer)" },
56                "model": { "type": "string", "description": "Optional: provider:model or provider/model for the subagent" },
57                "permission_mode": { "type": "string", "description": "Optional: read-only, plan, supervised, trusted, autonomous, emergency-stop" },
58                "tools": { "type": "array", "items": { "type": "string" }, "description": "Optional explicit allowed tool patterns" },
59                "disallowed_tools": { "type": "array", "items": { "type": "string" }, "description": "Optional denied tool patterns for this subagent" }
60            },
61            "required": ["task"]
62        })
63    }
64    fn risk(&self) -> RiskLevel {
65        RiskLevel::Exec
66    }
67    async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
68        let task_desc = args["task"].as_str().unwrap_or("");
69        let role = args["role"].as_str().unwrap_or("helper");
70        let mut child_config = self.config.clone();
71        if let Some(model_ref) = args["model"].as_str() {
72            if let Some((provider, model)) = parse_model_ref(model_ref) {
73                child_config.forced_model = Some((provider.clone(), model.clone()));
74                for tier in ["trivial", "small", "medium", "hard", "vision"] {
75                    child_config
76                        .routing
77                        .policy
78                        .insert(tier.to_string(), provider.clone());
79                }
80                child_config
81                    .providers
82                    .entry(provider)
83                    .or_insert_with(|| crate::config::ProviderConfig {
84                        adapter: "openai-compatible".into(),
85                        base_url: None,
86                        models: vec![],
87                        api_key_env: None,
88                    })
89                    .models = vec![model];
90            }
91        }
92        if let Some(mode) = args["permission_mode"]
93            .as_str()
94            .and_then(PermissionMode::parse)
95        {
96            child_config.defaults.autonomy = mode.autonomy_level();
97            child_config.permissions.mode = mode;
98        }
99        for tool in string_array(&args["tools"]) {
100            if !child_config.permissions.tools.allow.contains(&tool) {
101                child_config.permissions.tools.allow.push(tool);
102            }
103        }
104        for tool in string_array(&args["disallowed_tools"]) {
105            if !child_config.permissions.tools.deny.contains(&tool) {
106                child_config.permissions.tools.deny.push(tool);
107            }
108        }
109
110        let (tx, mut rx) = mpsc::unbounded_channel();
111
112        let task = Task {
113            description: task_desc.to_string(),
114            context: vec![],
115        };
116
117        // Build a fresh child engine for this subagent.
118        let mut child = Engine::new(self.router.clone(), child_config).with_identity(Identity {
119            name: role.to_string(),
120            role: role.to_string(),
121            personality: format!("Focused {} subagent. Be concise and return evidence.", role),
122        });
123        if let Some(mem) = &self.memory {
124            child = child.with_memory(mem.clone());
125        }
126        let engine = Arc::new(child);
127
128        let handle = tokio::spawn(async move {
129            match engine.drive(task, tx).await {
130                Ok(outcome) => outcome,
131                Err(e) => crate::event::OutcomeSummary {
132                    status: format!("error: {}", e),
133                    diffs: vec![],
134                    cost_usd: 0.0,
135                    tokens: crate::event::TokenUsage {
136                        input: 0,
137                        output: 0,
138                    },
139                },
140            }
141        });
142
143        // Collect subagent output
144        let mut output = String::new();
145        while let Some(event) = rx.recv().await {
146            match &event {
147                Event::ThinkingDelta { text, .. } => {
148                    output.push_str(text);
149                }
150                Event::AgentStatus { note, .. } => {
151                    output.push_str(&format!("\n[{}]", note));
152                }
153                Event::RunFinished { outcome, .. } => {
154                    output.push_str(&format!(
155                        "\n[Subagent done: {} | ${:.4}]",
156                        outcome.status, outcome.cost_usd
157                    ));
158                }
159                Event::Error { message, .. } => {
160                    output.push_str(&format!("\n[Error: {}]", message));
161                }
162                _ => {}
163            }
164        }
165
166        let outcome = handle
167            .await
168            .unwrap_or_else(|e| crate::event::OutcomeSummary {
169                status: format!("subagent panicked: {}", e),
170                diffs: vec![],
171                cost_usd: 0.0,
172                tokens: crate::event::TokenUsage {
173                    input: 0,
174                    output: 0,
175                },
176            });
177
178        Ok(ToolResult::ok(vec![Block::Text(format!(
179            "Subagent '{}' completed.\nStatus: {}\nOutput:\n{}",
180            role, outcome.status, output
181        ))]))
182    }
183}
184
185fn string_array(value: &serde_json::Value) -> Vec<String> {
186    value
187        .as_array()
188        .map(|items| {
189            items
190                .iter()
191                .filter_map(|item| item.as_str())
192                .map(str::trim)
193                .filter(|item| !item.is_empty())
194                .map(str::to_string)
195                .collect()
196        })
197        .unwrap_or_default()
198}
199
200fn parse_model_ref(model_ref: &str) -> Option<(String, String)> {
201    let model_ref = model_ref.trim();
202    if model_ref.is_empty() {
203        return None;
204    }
205    if let Some((provider, model)) = model_ref.split_once(':') {
206        let provider = provider.trim();
207        let model = model.trim();
208        if !provider.is_empty() && !model.is_empty() {
209            return Some((provider.to_string(), model.to_string()));
210        }
211    }
212    if let Some((provider, rest)) = model_ref.split_once('/') {
213        let provider = provider.trim();
214        if !provider.is_empty() {
215            return Some((provider.to_string(), model_ref.to_string()));
216        }
217        if !rest.trim().is_empty() {
218            return Some(("custom".into(), model_ref.to_string()));
219        }
220    }
221    Some(("custom".into(), model_ref.to_string()))
222}
223
224// ─── Persistent Python kernel ─────────────────────────────────────────────────
225// A long-lived `python3` process that keeps a single globals dict across calls,
226// so variables/imports/state persist between tool invocations (§15). A small
227// driver loop reads one JSON request per line, execs it capturing stdout, and
228// emits a JSON result terminated by a unique sentinel.
229
230use std::io::{BufRead, BufReader, Write};
231use std::process::{Child, ChildStdin, ChildStdout};
232use std::sync::Mutex;
233
234const KERNEL_SENTINEL: &str = "__SPARROW_KERNEL_END__";
235
236const KERNEL_DRIVER: &str = r#"
237import sys, io, json, contextlib, traceback
238_g = {"__name__": "__sparrow__"}
239SENT = "__SPARROW_KERNEL_END__"
240for line in sys.stdin:
241    line = line.strip()
242    if not line:
243        continue
244    try:
245        req = json.loads(line)
246    except Exception:
247        print(json.dumps({"out": "", "err": "bad request"}), flush=True)
248        print(SENT, flush=True)
249        continue
250    code = req.get("code", "")
251    buf = io.StringIO()
252    err = ""
253    try:
254        with contextlib.redirect_stdout(buf):
255            exec(compile(code, "<sparrow>", "exec"), _g)
256    except Exception:
257        err = traceback.format_exc()
258    print(json.dumps({"out": buf.getvalue(), "err": err}), flush=True)
259    print(SENT, flush=True)
260"#;
261
262struct Kernel {
263    child: Child,
264    stdin: ChildStdin,
265    stdout: BufReader<ChildStdout>,
266}
267
268pub struct PythonRpc {
269    kernel: Mutex<Option<Kernel>>,
270    python_bin: String,
271}
272
273impl PythonRpc {
274    pub fn new() -> Self {
275        // Prefer python3, fall back to python (Windows often only has `python`).
276        let python_bin = if which_python("python3") {
277            "python3".to_string()
278        } else {
279            "python".to_string()
280        };
281        Self {
282            kernel: Mutex::new(None),
283            python_bin,
284        }
285    }
286
287    fn ensure_kernel(&self, kernel: &mut Option<Kernel>) -> anyhow::Result<()> {
288        if kernel.is_some() {
289            return Ok(());
290        }
291        use std::process::{Command, Stdio};
292        let mut child = Command::new(&self.python_bin)
293            .arg("-u")
294            .arg("-c")
295            .arg(KERNEL_DRIVER)
296            .stdin(Stdio::piped())
297            .stdout(Stdio::piped())
298            .stderr(Stdio::null())
299            .spawn()?;
300        let stdin = child
301            .stdin
302            .take()
303            .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
304        let stdout = BufReader::new(
305            child
306                .stdout
307                .take()
308                .ok_or_else(|| anyhow::anyhow!("no stdout"))?,
309        );
310        *kernel = Some(Kernel {
311            child,
312            stdin,
313            stdout,
314        });
315        Ok(())
316    }
317}
318
319fn which_python(bin: &str) -> bool {
320    std::process::Command::new(bin)
321        .arg("--version")
322        .stdout(std::process::Stdio::null())
323        .stderr(std::process::Stdio::null())
324        .status()
325        .map(|s| s.success())
326        .unwrap_or(false)
327}
328
329#[async_trait]
330impl Tool for PythonRpc {
331    fn name(&self) -> &str {
332        "python_rpc"
333    }
334    fn description(&self) -> &str {
335        "Execute Python in a PERSISTENT kernel — variables, imports and state persist across calls."
336    }
337    fn schema(&self) -> serde_json::Value {
338        json!({
339            "type": "object",
340            "properties": {
341                "code": { "type": "string", "description": "Python code to execute in the persistent kernel" }
342            },
343            "required": ["code"]
344        })
345    }
346    fn risk(&self) -> RiskLevel {
347        RiskLevel::Exec
348    }
349    async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
350        let code = args["code"].as_str().unwrap_or("").to_string();
351
352        // Kernel IO is blocking; run on a blocking thread without holding the
353        // lock across an await. We take the kernel out, use it, put it back.
354        let mut guard = self.kernel.lock().unwrap();
355        if let Err(e) = self.ensure_kernel(&mut guard) {
356            return Ok(ToolResult::error(format!(
357                "Python kernel unavailable ({}). Is '{}' installed?",
358                e, self.python_bin
359            )));
360        }
361        let kernel = guard.as_mut().unwrap();
362
363        // Send request as a single JSON line.
364        let req = serde_json::json!({ "code": code }).to_string();
365        if writeln!(kernel.stdin, "{}", req)
366            .and_then(|_| kernel.stdin.flush())
367            .is_err()
368        {
369            *guard = None; // kernel died; drop it so it respawns next time
370            return Ok(ToolResult::error(
371                "Python kernel write failed (kernel reset)",
372            ));
373        }
374
375        // Read lines until the sentinel; the line before it is the JSON result.
376        let mut last_json = String::new();
377        loop {
378            let mut line = String::new();
379            match kernel.stdout.read_line(&mut line) {
380                Ok(0) => {
381                    *guard = None;
382                    return Ok(ToolResult::error("Python kernel closed unexpectedly"));
383                }
384                Ok(_) => {
385                    let trimmed = line.trim_end();
386                    if trimmed == KERNEL_SENTINEL {
387                        break;
388                    }
389                    last_json = trimmed.to_string();
390                }
391                Err(e) => {
392                    *guard = None;
393                    return Ok(ToolResult::error(format!(
394                        "Python kernel read error: {}",
395                        e
396                    )));
397                }
398            }
399        }
400
401        let parsed: serde_json::Value = serde_json::from_str(&last_json)
402            .unwrap_or_else(|_| serde_json::json!({"out": last_json, "err": ""}));
403        let out = parsed["out"].as_str().unwrap_or("");
404        let err = parsed["err"].as_str().unwrap_or("");
405        if !err.is_empty() {
406            Ok(ToolResult::ok(vec![Block::Text(format!("{}{}", out, err))]))
407        } else {
408            Ok(ToolResult::text(out.to_string()))
409        }
410    }
411}
412
413impl Drop for PythonRpc {
414    fn drop(&mut self) {
415        if let Ok(mut g) = self.kernel.lock() {
416            if let Some(mut k) = g.take() {
417                let _ = k.child.kill();
418            }
419        }
420    }
421}