Skip to main content

bob_runtime/
subagent.rs

1//! # Subagent Manager
2//!
3//! Spawns independent agent tasks in the background, each with its own
4//! session state and tool registry but sharing the parent's LLM port.
5//!
6//! ## Design
7//!
8//! - Each subagent runs in its own `tokio::spawn` task
9//! - Results are delivered via `tokio::sync::oneshot`
10//! - Recursive spawning is prevented by always denying `subagent/spawn`
11//! - Cancellation is supported via `CancellationToken`
12
13use std::{
14    collections::HashMap,
15    sync::{Arc, Mutex},
16};
17
18use bob_core::{
19    error::AgentError,
20    ports::{EventSink, LlmPort, SessionStore, SubagentPort, ToolPort},
21    types::{
22        AgentEvent, AgentRequest, AgentRunResult, CancelToken, RequestContext, SessionId,
23        SubagentResult, TokenUsage, ToolCall, ToolDescriptor, ToolResult, TurnPolicy,
24    },
25};
26use serde_json::json;
27use tokio::sync::oneshot;
28use tracing::{debug, info};
29
30/// Handle to a running subagent.
31struct SubagentHandle {
32    cancel_token: CancelToken,
33}
34
35/// Internal state for the subagent manager.
36#[derive(Default)]
37struct ManagerState {
38    active: HashMap<SessionId, SubagentHandle>,
39    results: HashMap<SessionId, oneshot::Receiver<SubagentResult>>,
40    parent_index: HashMap<SessionId, Vec<SessionId>>,
41}
42
43/// Default subagent manager implementation.
44pub struct DefaultSubagentManager {
45    llm: Arc<dyn LlmPort>,
46    store: Arc<dyn SessionStore>,
47    events: Arc<dyn EventSink>,
48    tools: Arc<dyn ToolPort>,
49    default_model: String,
50    policy: TurnPolicy,
51    state: Mutex<ManagerState>,
52}
53
54impl std::fmt::Debug for DefaultSubagentManager {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("DefaultSubagentManager")
57            .field("default_model", &self.default_model)
58            .field("policy", &self.policy)
59            .finish_non_exhaustive()
60    }
61}
62
63impl DefaultSubagentManager {
64    /// Create a new subagent manager.
65    #[must_use]
66    pub fn new(
67        llm: Arc<dyn LlmPort>,
68        store: Arc<dyn SessionStore>,
69        events: Arc<dyn EventSink>,
70        tools: Arc<dyn ToolPort>,
71        default_model: String,
72        policy: TurnPolicy,
73    ) -> Self {
74        Self {
75            llm,
76            store,
77            events,
78            tools,
79            default_model,
80            policy,
81            state: Mutex::new(ManagerState::default()),
82        }
83    }
84}
85
86#[async_trait::async_trait]
87impl SubagentPort for DefaultSubagentManager {
88    async fn spawn(
89        &self,
90        task: String,
91        parent_session_id: SessionId,
92        model: Option<String>,
93        max_steps: Option<u32>,
94        extra_deny_tools: Vec<String>,
95    ) -> Result<SessionId, AgentError> {
96        let subagent_id = format!("subagent:{}", uuid_v7_simple());
97        let cancel_token = CancelToken::new();
98        let (tx, rx) = oneshot::channel();
99
100        {
101            let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
102            state
103                .active
104                .insert(subagent_id.clone(), SubagentHandle { cancel_token: cancel_token.clone() });
105            state.results.insert(subagent_id.clone(), rx);
106            state
107                .parent_index
108                .entry(parent_session_id.clone())
109                .or_default()
110                .push(subagent_id.clone());
111        }
112
113        self.events.emit(AgentEvent::SubagentSpawned {
114            parent_session_id: parent_session_id.clone(),
115            subagent_id: subagent_id.clone(),
116            task: task.clone(),
117        });
118
119        let llm = self.llm.clone();
120        let store = self.store.clone();
121        let events = self.events.clone();
122        let tools = self.tools.clone();
123        let model_str = model.unwrap_or_else(|| self.default_model.clone());
124        let mut policy = self.policy.clone();
125        if let Some(steps) = max_steps {
126            policy.max_steps = steps;
127        }
128        let sid = subagent_id.clone();
129
130        // Build deny list: always deny subagent spawning + any extras.
131        let mut deny_tools = vec!["subagent/spawn".to_string()];
132        deny_tools.extend(extra_deny_tools);
133        let filtered_tools: Arc<dyn ToolPort> =
134            Arc::new(DenyListToolPort { inner: tools, deny: deny_tools });
135
136        tokio::spawn(async move {
137            debug!(subagent_id = %sid, "subagent task started");
138
139            let req = AgentRequest {
140                input: task,
141                session_id: sid.clone(),
142                model: Some(model_str),
143                context: RequestContext::default(),
144                cancel_token: Some(cancel_token),
145                output_schema: None,
146                max_output_retries: 0,
147            };
148
149            let result = crate::scheduler::run_turn(
150                llm.as_ref(),
151                filtered_tools.as_ref(),
152                store.as_ref(),
153                events.as_ref(),
154                req,
155                &policy,
156                "subagent-default",
157            )
158            .await;
159
160            let subagent_result = match result {
161                Ok(AgentRunResult::Finished(resp)) => SubagentResult {
162                    subagent_id: sid.clone(),
163                    parent_session_id: parent_session_id.clone(),
164                    content: resp.content,
165                    usage: resp.usage,
166                    is_error: false,
167                },
168                Err(e) => SubagentResult {
169                    subagent_id: sid.clone(),
170                    parent_session_id: parent_session_id.clone(),
171                    content: format!("subagent error: {e}"),
172                    usage: TokenUsage::default(),
173                    is_error: true,
174                },
175            };
176
177            let is_error = subagent_result.is_error;
178            let _ = tx.send(subagent_result);
179
180            events.emit(AgentEvent::SubagentCompleted { subagent_id: sid.clone(), is_error });
181
182            // Remove from active list.
183            {
184                // We don't have a reference to state here since we're in a spawned task.
185                // The active entry will be cleaned up when list_active checks it.
186            }
187
188            info!(subagent_id = %sid, is_error, "subagent task completed");
189        });
190
191        Ok(subagent_id)
192    }
193
194    async fn await_result(&self, subagent_id: &SessionId) -> Result<SubagentResult, AgentError> {
195        let rx = {
196            let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
197            state.results.remove(subagent_id)
198        };
199
200        match rx {
201            Some(receiver) => match receiver.await {
202                Ok(result) => Ok(result),
203                Err(_) => {
204                    Err(AgentError::Internal("subagent task dropped without sending result".into()))
205                }
206            },
207            None => Err(AgentError::Config(format!(
208                "subagent '{subagent_id}' not found (already awaited or never spawned)"
209            ))),
210        }
211    }
212
213    async fn list_active(
214        &self,
215        parent_session_id: &SessionId,
216    ) -> Result<Vec<SessionId>, AgentError> {
217        let state = self.state.lock().unwrap_or_else(|p| p.into_inner());
218        let mut result = Vec::new();
219        if let Some(ids) = state.parent_index.get(parent_session_id) {
220            for id in ids {
221                if state.active.contains_key(id) {
222                    result.push(id.clone());
223                }
224            }
225        }
226        Ok(result)
227    }
228
229    async fn cancel(&self, subagent_id: &SessionId) -> Result<(), AgentError> {
230        let handle = {
231            let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
232            state.active.remove(subagent_id)
233        };
234
235        match handle {
236            Some(h) => {
237                h.cancel_token.cancel();
238                debug!(subagent_id = %subagent_id, "subagent cancelled");
239                Ok(())
240            }
241            None => Err(AgentError::Config(format!("subagent '{subagent_id}' not found"))),
242        }
243    }
244}
245
246/// Tool port wrapper that denies specific tools.
247struct DenyListToolPort {
248    inner: Arc<dyn ToolPort>,
249    deny: Vec<String>,
250}
251
252#[async_trait::async_trait]
253impl ToolPort for DenyListToolPort {
254    async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, bob_core::error::ToolError> {
255        let tools = self.inner.list_tools().await?;
256        Ok(tools.into_iter().filter(|t| !self.deny.contains(&t.id)).collect())
257    }
258
259    async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, bob_core::error::ToolError> {
260        if self.deny.contains(&call.name) {
261            return Ok(ToolResult {
262                name: call.name,
263                output: json!({ "error": "tool denied in subagent context" }),
264                is_error: true,
265            });
266        }
267        self.inner.call_tool(call).await
268    }
269}
270
271/// Subagent tool port — exposes `subagent/spawn` as a callable tool.
272pub struct SubagentToolPort {
273    manager: Arc<dyn SubagentPort>,
274    default_parent_session: SessionId,
275}
276
277impl std::fmt::Debug for SubagentToolPort {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        f.debug_struct("SubagentToolPort")
280            .field("default_parent_session", &self.default_parent_session)
281            .finish_non_exhaustive()
282    }
283}
284
285impl SubagentToolPort {
286    /// Create a new subagent tool port.
287    #[must_use]
288    pub fn new(manager: Arc<dyn SubagentPort>, default_parent_session: SessionId) -> Self {
289        Self { manager, default_parent_session }
290    }
291}
292
293#[async_trait::async_trait]
294impl ToolPort for SubagentToolPort {
295    async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, bob_core::error::ToolError> {
296        Ok(vec![
297            ToolDescriptor::new(
298                "subagent/spawn",
299                "Spawn a background subagent to handle a task independently. \
300             The subagent runs with its own tool registry and reasoning loop. \
301             Results are returned when the subagent completes.",
302            )
303            .with_input_schema(json!({
304                "type": "object",
305                "properties": {
306                    "task": {
307                        "type": "string",
308                        "description": "Task description for the subagent"
309                    },
310                    "model": {
311                        "type": "string",
312                        "description": "Optional model override"
313                    },
314                    "max_steps": {
315                        "type": "integer",
316                        "description": "Optional max steps override"
317                    }
318                },
319                "required": ["task"]
320            })),
321        ])
322    }
323
324    async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, bob_core::error::ToolError> {
325        if call.name != "subagent/spawn" {
326            return Err(bob_core::error::ToolError::NotFound { name: call.name });
327        }
328
329        let task = call
330            .arguments
331            .get("task")
332            .and_then(serde_json::Value::as_str)
333            .ok_or_else(|| {
334                bob_core::error::ToolError::Execution("missing 'task' argument".to_string())
335            })?
336            .to_string();
337
338        let model =
339            call.arguments.get("model").and_then(serde_json::Value::as_str).map(String::from);
340
341        let max_steps =
342            call.arguments.get("max_steps").and_then(serde_json::Value::as_u64).map(|v| v as u32);
343
344        let subagent_id = self
345            .manager
346            .spawn(task.clone(), self.default_parent_session.clone(), model, max_steps, vec![])
347            .await
348            .map_err(|e| bob_core::error::ToolError::Execution(format!("spawn failed: {e}")))?;
349
350        let result = self
351            .manager
352            .await_result(&subagent_id)
353            .await
354            .map_err(|e| bob_core::error::ToolError::Execution(format!("await failed: {e}")))?;
355
356        Ok(ToolResult {
357            name: call.name,
358            output: json!({
359                "subagent_id": result.subagent_id,
360                "content": result.content,
361                "is_error": result.is_error,
362                "usage": {
363                    "prompt_tokens": result.usage.prompt_tokens,
364                    "completion_tokens": result.usage.completion_tokens,
365                }
366            }),
367            is_error: result.is_error,
368        })
369    }
370}
371
372/// Generate a simple unique ID.
373fn uuid_v7_simple() -> String {
374    use std::time::{SystemTime, UNIX_EPOCH};
375    let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
376    let rand_part: u16 = rand_u16();
377    format!("{:016x}{:04x}", now.as_nanos() & 0xFFFF_FFFF_FFFF_FFFF, rand_part)
378}
379
380fn rand_u16() -> u16 {
381    use std::{
382        collections::hash_map::DefaultHasher,
383        hash::{Hash, Hasher},
384    };
385    let mut hasher = DefaultHasher::new();
386    std::thread::current().id().hash(&mut hasher);
387    std::time::Instant::now().hash(&mut hasher);
388    hasher.finish() as u16
389}
390
391// ── Tests ────────────────────────────────────────────────────────────
392
393#[cfg(test)]
394mod tests {
395    use std::sync::Arc;
396
397    use bob_core::{
398        error::{LlmError, StoreError, ToolError},
399        types::{FinishReason, LlmRequest, LlmResponse, LlmStream, SessionState, TokenUsage},
400    };
401
402    use super::*;
403
404    struct StubLlm;
405
406    #[async_trait::async_trait]
407    impl LlmPort for StubLlm {
408        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
409            Ok(LlmResponse {
410                content: r#"{"type": "final", "content": "subagent done"}"#.into(),
411                usage: TokenUsage { prompt_tokens: 5, completion_tokens: 3 },
412                finish_reason: FinishReason::Stop,
413                tool_calls: Vec::new(),
414            })
415        }
416
417        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
418            Err(LlmError::Provider("not implemented".into()))
419        }
420    }
421
422    struct StubTools;
423
424    #[async_trait::async_trait]
425    impl ToolPort for StubTools {
426        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
427            Ok(Vec::new())
428        }
429
430        async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
431            Ok(ToolResult { name: call.name, output: json!(null), is_error: false })
432        }
433    }
434
435    struct StubStore;
436
437    #[async_trait::async_trait]
438    impl SessionStore for StubStore {
439        async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
440            Ok(None)
441        }
442
443        async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
444            Ok(())
445        }
446    }
447
448    struct StubSink;
449
450    impl EventSink for StubSink {
451        fn emit(&self, _event: AgentEvent) {}
452    }
453
454    fn create_manager() -> DefaultSubagentManager {
455        DefaultSubagentManager::new(
456            Arc::new(StubLlm),
457            Arc::new(StubStore),
458            Arc::new(StubSink),
459            Arc::new(StubTools),
460            "test-model".to_string(),
461            TurnPolicy::default(),
462        )
463    }
464
465    #[tokio::test]
466    async fn deny_list_blocks_denied_tools() {
467        let inner: Arc<dyn ToolPort> = Arc::new(StubTools);
468        let deny = DenyListToolPort { inner, deny: vec!["subagent/spawn".into()] };
469
470        let tools = deny.list_tools().await.unwrap_or_default();
471        assert!(tools.is_empty(), "denied tool should not appear in list");
472    }
473
474    #[tokio::test]
475    async fn deny_list_returns_error_for_denied_call() {
476        let inner: Arc<dyn ToolPort> = Arc::new(StubTools);
477        let deny = DenyListToolPort { inner, deny: vec!["dangerous_tool".into()] };
478
479        let result = deny
480            .call_tool(ToolCall::new("dangerous_tool", json!({})))
481            .await
482            .unwrap_or_else(|_| unreachable!());
483        assert!(result.is_error, "denied tool call should return error result");
484    }
485
486    #[tokio::test]
487    async fn deny_list_passes_allowed_tools() {
488        let inner: Arc<dyn ToolPort> = Arc::new(StubTools);
489        let deny = DenyListToolPort { inner, deny: vec!["subagent/spawn".into()] };
490
491        let result = deny
492            .call_tool(ToolCall::new("local/file_read", json!({"path": "x"})))
493            .await
494            .unwrap_or_else(|_| unreachable!());
495        assert!(!result.is_error, "allowed tool should pass through");
496    }
497
498    #[tokio::test]
499    async fn subagent_tool_port_lists_spawn_tool() {
500        let manager = Arc::new(create_manager());
501        let port = SubagentToolPort::new(manager, "parent-session".into());
502
503        let tools = port.list_tools().await.unwrap_or_default();
504        assert_eq!(tools.len(), 1);
505        assert_eq!(tools[0].id, "subagent/spawn");
506    }
507
508    #[tokio::test]
509    async fn spawn_creates_subagent_and_delivers_result() {
510        let manager = create_manager();
511
512        let id = manager
513            .spawn("test task".into(), "parent-1".into(), None, None, vec![])
514            .await
515            .unwrap_or_else(|_| unreachable!());
516
517        assert!(id.starts_with("subagent:"));
518
519        let result = manager.await_result(&id).await.unwrap_or_else(|_| unreachable!());
520        assert_eq!(result.subagent_id, id);
521        assert!(!result.is_error);
522        assert!(!result.content.is_empty());
523    }
524
525    #[tokio::test]
526    async fn cancel_removes_subagent() {
527        let manager = create_manager();
528
529        let id = manager
530            .spawn("test task".into(), "parent-1".into(), None, None, vec![])
531            .await
532            .unwrap_or_else(|_| unreachable!());
533
534        let cancel_result = manager.cancel(&id).await;
535        assert!(cancel_result.is_ok(), "cancel should succeed");
536
537        let cancel_again = manager.cancel(&id).await;
538        assert!(cancel_again.is_err(), "double cancel should fail");
539    }
540
541    #[tokio::test]
542    async fn await_result_unknown_id_returns_error() {
543        let manager = create_manager();
544        let result = manager.await_result(&"nonexistent".into()).await;
545        assert!(result.is_err());
546    }
547
548    #[tokio::test]
549    async fn list_active_returns_running_subagents() {
550        let manager = create_manager();
551
552        let active =
553            manager.list_active(&"parent-1".into()).await.unwrap_or_else(|_| unreachable!());
554        assert!(active.is_empty());
555
556        let _id = manager
557            .spawn("test task".into(), "parent-1".into(), None, None, vec![])
558            .await
559            .unwrap_or_else(|_| unreachable!());
560
561        let active =
562            manager.list_active(&"parent-1".into()).await.unwrap_or_else(|_| unreachable!());
563        assert_eq!(active.len(), 1, "should have one active subagent");
564    }
565}