Skip to main content

sparrow/tools/
subagent.rs

1use async_trait::async_trait;
2use serde_json::json;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6use super::{Tool, ToolCtx, ToolResult};
7use crate::config::Config;
8use crate::engine::{Engine, Identity, Task};
9use crate::event::{Block, Event, RiskLevel};
10use crate::memory::Memory;
11use crate::permissions::PermissionMode;
12use crate::router::Router;
13
14// ─── Subagent spawn ─────────────────────────────────────────────────────────────
15
16/// Delegates a subtask to a child AgentRun with its own conversation and sandbox.
17/// §15: "Each subagent gets its own conversation, terminal, and a Python RPC channel."
18///
19/// Holds the router + config (not a parent Engine) so it can build a fresh child
20/// engine per call — this avoids a self-referential Arc<Engine> at registration.
21pub struct SubagentSpawn {
22    router: Arc<dyn Router>,
23    config: Config,
24    memory: Option<Arc<dyn Memory>>,
25}
26
27impl SubagentSpawn {
28    pub fn new(router: Arc<dyn Router>, config: Config) -> Self {
29        Self {
30            router,
31            config,
32            memory: None,
33        }
34    }
35
36    pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
37        self.memory = Some(memory);
38        self
39    }
40}
41
42#[async_trait]
43impl Tool for SubagentSpawn {
44    fn name(&self) -> &str {
45        "subagent_spawn"
46    }
47    fn description(&self) -> &str {
48        "Spawn a child agent to handle a subtask independently"
49    }
50    fn schema(&self) -> serde_json::Value {
51        json!({
52            "type": "object",
53            "properties": {
54                "task": { "type": "string", "description": "Subtask description" },
55                "role": { "type": "string", "description": "Role for the subagent (e.g. tester, researcher, reviewer)" },
56                "model": { "type": "string", "description": "Optional: provider:model or provider/model for the subagent" },
57                "permission_mode": { "type": "string", "description": "Optional: read-only, plan, supervised, trusted, autonomous, emergency-stop" },
58                "tools": { "type": "array", "items": { "type": "string" }, "description": "Optional explicit allowed tool patterns" },
59                "disallowed_tools": { "type": "array", "items": { "type": "string" }, "description": "Optional denied tool patterns for this subagent" }
60            },
61            "required": ["task"]
62        })
63    }
64    fn risk(&self) -> RiskLevel {
65        RiskLevel::Exec
66    }
67    async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
68        let task_desc = args["task"].as_str().unwrap_or("");
69        let role = args["role"].as_str().unwrap_or("helper");
70        let mut child_config = self.config.clone();
71        if let Some(model_ref) = args["model"].as_str() {
72            if let Some((provider, model)) = parse_model_ref(model_ref) {
73                child_config.forced_model = Some((provider.clone(), model.clone()));
74                for tier in ["trivial", "small", "medium", "hard", "vision"] {
75                    child_config
76                        .routing
77                        .policy
78                        .insert(tier.to_string(), provider.clone());
79                }
80                child_config
81                    .providers
82                    .entry(provider)
83                    .or_insert_with(|| crate::config::ProviderConfig {
84                        adapter: "openai-compatible".into(),
85                        base_url: None,
86                        models: vec![],
87                        api_key_env: None,
88                    })
89                    .models = vec![model];
90            }
91        }
92        if let Some(mode) = args["permission_mode"]
93            .as_str()
94            .and_then(PermissionMode::parse)
95        {
96            child_config.defaults.autonomy = mode.autonomy_level();
97            child_config.permissions.mode = mode;
98        }
99        for tool in string_array(&args["tools"]) {
100            if !child_config.permissions.tools.allow.contains(&tool) {
101                child_config.permissions.tools.allow.push(tool);
102            }
103        }
104        for tool in string_array(&args["disallowed_tools"]) {
105            if !child_config.permissions.tools.deny.contains(&tool) {
106                child_config.permissions.tools.deny.push(tool);
107            }
108        }
109
110        let (tx, mut rx) = mpsc::unbounded_channel();
111
112        let task = Task {
113            description: task_desc.to_string(),
114            context: vec![],
115        };
116
117        // Build a fresh child engine for this subagent.
118        let mut child = Engine::new(self.router.clone(), child_config).with_identity(Identity {
119            name: role.to_string(),
120            role: role.to_string(),
121            personality: format!("Focused {} subagent. Be concise and return evidence.", role),
122        });
123        if let Some(mem) = &self.memory {
124            child = child.with_memory(mem.clone());
125        }
126        let engine = Arc::new(child);
127
128        let handle = tokio::spawn(async move {
129            match engine.drive(task, tx).await {
130                Ok(outcome) => outcome,
131                Err(e) => crate::event::OutcomeSummary {
132                    status: format!("error: {}", e),
133                    diffs: vec![],
134                    cost_usd: 0.0,
135                    tokens: crate::event::TokenUsage {
136                        input: 0,
137                        output: 0,
138                    },
139                    cost_comparison: String::new(),
140                },
141            }
142        });
143
144        // Collect subagent output
145        let mut output = String::new();
146        while let Some(event) = rx.recv().await {
147            match &event {
148                Event::ThinkingDelta { text, .. } => {
149                    output.push_str(text);
150                }
151                Event::AgentStatus { note, .. } => {
152                    output.push_str(&format!("\n[{}]", note));
153                }
154                Event::RunFinished { outcome, .. } => {
155                    output.push_str(&format!(
156                        "\n[Subagent done: {} | ${:.4}]",
157                        outcome.status, outcome.cost_usd
158                    ));
159                }
160                Event::Error { message, .. } => {
161                    output.push_str(&format!("\n[Error: {}]", message));
162                }
163                _ => {}
164            }
165        }
166
167        let outcome = handle
168            .await
169            .unwrap_or_else(|e| crate::event::OutcomeSummary {
170                status: format!("subagent panicked: {}", e),
171                diffs: vec![],
172                cost_usd: 0.0,
173                tokens: crate::event::TokenUsage {
174                    input: 0,
175                    output: 0,
176                },
177                cost_comparison: String::new(),
178            });
179
180        Ok(ToolResult::ok(vec![Block::Text(format!(
181            "Subagent '{}' completed.\nStatus: {}\nOutput:\n{}",
182            role, outcome.status, output
183        ))]))
184    }
185}
186
187fn string_array(value: &serde_json::Value) -> Vec<String> {
188    value
189        .as_array()
190        .map(|items| {
191            items
192                .iter()
193                .filter_map(|item| item.as_str())
194                .map(str::trim)
195                .filter(|item| !item.is_empty())
196                .map(str::to_string)
197                .collect()
198        })
199        .unwrap_or_default()
200}
201
202fn parse_model_ref(model_ref: &str) -> Option<(String, String)> {
203    let model_ref = model_ref.trim();
204    if model_ref.is_empty() {
205        return None;
206    }
207    if let Some((provider, model)) = model_ref.split_once(':') {
208        let provider = provider.trim();
209        let model = model.trim();
210        if !provider.is_empty() && !model.is_empty() {
211            return Some((provider.to_string(), model.to_string()));
212        }
213    }
214    if let Some((provider, rest)) = model_ref.split_once('/') {
215        let provider = provider.trim();
216        if !provider.is_empty() {
217            return Some((provider.to_string(), model_ref.to_string()));
218        }
219        if !rest.trim().is_empty() {
220            return Some(("custom".into(), model_ref.to_string()));
221        }
222    }
223    Some(("custom".into(), model_ref.to_string()))
224}
225
226// ─── Persistent Python kernel ─────────────────────────────────────────────────
227// A long-lived `python3` process that keeps a single globals dict across calls,
228// so variables/imports/state persist between tool invocations (§15). A small
229// driver loop reads one JSON request per line, execs it capturing stdout, and
230// emits a JSON result terminated by a unique sentinel.
231
232use std::io::{BufRead, BufReader, Write};
233use std::process::{Child, ChildStdin, ChildStdout};
234use std::sync::Mutex;
235
236const KERNEL_SENTINEL: &str = "__SPARROW_KERNEL_END__";
237
238const KERNEL_DRIVER: &str = r#"
239import sys, io, json, contextlib, traceback
240_g = {"__name__": "__sparrow__"}
241SENT = "__SPARROW_KERNEL_END__"
242for line in sys.stdin:
243    line = line.strip()
244    if not line:
245        continue
246    try:
247        req = json.loads(line)
248    except Exception:
249        print(json.dumps({"out": "", "err": "bad request"}), flush=True)
250        print(SENT, flush=True)
251        continue
252    code = req.get("code", "")
253    buf = io.StringIO()
254    err = ""
255    try:
256        with contextlib.redirect_stdout(buf):
257            exec(compile(code, "<sparrow>", "exec"), _g)
258    except Exception:
259        err = traceback.format_exc()
260    print(json.dumps({"out": buf.getvalue(), "err": err}), flush=True)
261    print(SENT, flush=True)
262"#;
263
264struct Kernel {
265    child: Child,
266    stdin: ChildStdin,
267    stdout: BufReader<ChildStdout>,
268}
269
270pub struct PythonRpc {
271    kernel: Mutex<Option<Kernel>>,
272    python_bin: String,
273}
274
275impl PythonRpc {
276    pub fn new() -> Self {
277        // Prefer python3, fall back to python (Windows often only has `python`).
278        let python_bin = if which_python("python3") {
279            "python3".to_string()
280        } else {
281            "python".to_string()
282        };
283        Self {
284            kernel: Mutex::new(None),
285            python_bin,
286        }
287    }
288
289    fn ensure_kernel(&self, kernel: &mut Option<Kernel>) -> anyhow::Result<()> {
290        if kernel.is_some() {
291            return Ok(());
292        }
293        use std::process::{Command, Stdio};
294        let mut child = Command::new(&self.python_bin)
295            .arg("-u")
296            .arg("-c")
297            .arg(KERNEL_DRIVER)
298            .stdin(Stdio::piped())
299            .stdout(Stdio::piped())
300            .stderr(Stdio::null())
301            .spawn()?;
302        let stdin = child
303            .stdin
304            .take()
305            .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
306        let stdout = BufReader::new(
307            child
308                .stdout
309                .take()
310                .ok_or_else(|| anyhow::anyhow!("no stdout"))?,
311        );
312        *kernel = Some(Kernel {
313            child,
314            stdin,
315            stdout,
316        });
317        Ok(())
318    }
319}
320
321fn which_python(bin: &str) -> bool {
322    std::process::Command::new(bin)
323        .arg("--version")
324        .stdout(std::process::Stdio::null())
325        .stderr(std::process::Stdio::null())
326        .status()
327        .map(|s| s.success())
328        .unwrap_or(false)
329}
330
331#[async_trait]
332impl Tool for PythonRpc {
333    fn name(&self) -> &str {
334        "python_rpc"
335    }
336    fn description(&self) -> &str {
337        "Execute Python in a PERSISTENT kernel — variables, imports and state persist across calls."
338    }
339    fn schema(&self) -> serde_json::Value {
340        json!({
341            "type": "object",
342            "properties": {
343                "code": { "type": "string", "description": "Python code to execute in the persistent kernel" }
344            },
345            "required": ["code"]
346        })
347    }
348    fn risk(&self) -> RiskLevel {
349        RiskLevel::Exec
350    }
351    async fn call(&self, args: serde_json::Value, _ctx: &ToolCtx) -> anyhow::Result<ToolResult> {
352        let code = args["code"].as_str().unwrap_or("").to_string();
353
354        // Kernel IO is blocking; run on a blocking thread without holding the
355        // lock across an await. We take the kernel out, use it, put it back.
356        let mut guard = self.kernel.lock().unwrap();
357        if let Err(e) = self.ensure_kernel(&mut guard) {
358            return Ok(ToolResult::error(format!(
359                "Python kernel unavailable ({}). Is '{}' installed?",
360                e, self.python_bin
361            )));
362        }
363        let kernel = guard.as_mut().unwrap();
364
365        // Send request as a single JSON line.
366        let req = serde_json::json!({ "code": code }).to_string();
367        if writeln!(kernel.stdin, "{}", req)
368            .and_then(|_| kernel.stdin.flush())
369            .is_err()
370        {
371            *guard = None; // kernel died; drop it so it respawns next time
372            return Ok(ToolResult::error(
373                "Python kernel write failed (kernel reset)",
374            ));
375        }
376
377        // Read lines until the sentinel; the line before it is the JSON result.
378        let mut last_json = String::new();
379        loop {
380            let mut line = String::new();
381            match kernel.stdout.read_line(&mut line) {
382                Ok(0) => {
383                    *guard = None;
384                    return Ok(ToolResult::error("Python kernel closed unexpectedly"));
385                }
386                Ok(_) => {
387                    let trimmed = line.trim_end();
388                    if trimmed == KERNEL_SENTINEL {
389                        break;
390                    }
391                    last_json = trimmed.to_string();
392                }
393                Err(e) => {
394                    *guard = None;
395                    return Ok(ToolResult::error(format!(
396                        "Python kernel read error: {}",
397                        e
398                    )));
399                }
400            }
401        }
402
403        let parsed: serde_json::Value = serde_json::from_str(&last_json)
404            .unwrap_or_else(|_| serde_json::json!({"out": last_json, "err": ""}));
405        let out = parsed["out"].as_str().unwrap_or("");
406        let err = parsed["err"].as_str().unwrap_or("");
407        if !err.is_empty() {
408            Ok(ToolResult::ok(vec![Block::Text(format!("{}{}", out, err))]))
409        } else {
410            Ok(ToolResult::text(out.to_string()))
411        }
412    }
413}
414
415impl Drop for PythonRpc {
416    fn drop(&mut self) {
417        if let Ok(mut g) = self.kernel.lock() {
418            if let Some(mut k) = g.take() {
419                let _ = k.child.kill();
420            }
421        }
422    }
423}