1use std::{
14 collections::HashMap,
15 sync::{Arc, Mutex},
16};
17
18use bob_core::{
19 error::AgentError,
20 ports::{EventSink, LlmPort, SessionStore, SubagentPort, ToolPort},
21 types::{
22 AgentEvent, AgentRequest, AgentRunResult, CancelToken, RequestContext, SessionId,
23 SubagentResult, TokenUsage, ToolCall, ToolDescriptor, ToolResult, TurnPolicy,
24 },
25};
26use serde_json::json;
27use tokio::sync::oneshot;
28use tracing::{debug, info};
29
30struct SubagentHandle {
32 cancel_token: CancelToken,
33}
34
35#[derive(Default)]
37struct ManagerState {
38 active: HashMap<SessionId, SubagentHandle>,
39 results: HashMap<SessionId, oneshot::Receiver<SubagentResult>>,
40 parent_index: HashMap<SessionId, Vec<SessionId>>,
41}
42
43pub struct DefaultSubagentManager {
45 llm: Arc<dyn LlmPort>,
46 store: Arc<dyn SessionStore>,
47 events: Arc<dyn EventSink>,
48 tools: Arc<dyn ToolPort>,
49 default_model: String,
50 policy: TurnPolicy,
51 state: Mutex<ManagerState>,
52}
53
54impl std::fmt::Debug for DefaultSubagentManager {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("DefaultSubagentManager")
57 .field("default_model", &self.default_model)
58 .field("policy", &self.policy)
59 .finish_non_exhaustive()
60 }
61}
62
63impl DefaultSubagentManager {
64 #[must_use]
66 pub fn new(
67 llm: Arc<dyn LlmPort>,
68 store: Arc<dyn SessionStore>,
69 events: Arc<dyn EventSink>,
70 tools: Arc<dyn ToolPort>,
71 default_model: String,
72 policy: TurnPolicy,
73 ) -> Self {
74 Self {
75 llm,
76 store,
77 events,
78 tools,
79 default_model,
80 policy,
81 state: Mutex::new(ManagerState::default()),
82 }
83 }
84}
85
86#[async_trait::async_trait]
87impl SubagentPort for DefaultSubagentManager {
88 async fn spawn(
89 &self,
90 task: String,
91 parent_session_id: SessionId,
92 model: Option<String>,
93 max_steps: Option<u32>,
94 extra_deny_tools: Vec<String>,
95 ) -> Result<SessionId, AgentError> {
96 let subagent_id = format!("subagent:{}", uuid_v7_simple());
97 let cancel_token = CancelToken::new();
98 let (tx, rx) = oneshot::channel();
99
100 {
101 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
102 state
103 .active
104 .insert(subagent_id.clone(), SubagentHandle { cancel_token: cancel_token.clone() });
105 state.results.insert(subagent_id.clone(), rx);
106 state
107 .parent_index
108 .entry(parent_session_id.clone())
109 .or_default()
110 .push(subagent_id.clone());
111 }
112
113 self.events.emit(AgentEvent::SubagentSpawned {
114 parent_session_id: parent_session_id.clone(),
115 subagent_id: subagent_id.clone(),
116 task: task.clone(),
117 });
118
119 let llm = self.llm.clone();
120 let store = self.store.clone();
121 let events = self.events.clone();
122 let tools = self.tools.clone();
123 let model_str = model.unwrap_or_else(|| self.default_model.clone());
124 let mut policy = self.policy.clone();
125 if let Some(steps) = max_steps {
126 policy.max_steps = steps;
127 }
128 let sid = subagent_id.clone();
129
130 let mut deny_tools = vec!["subagent/spawn".to_string()];
132 deny_tools.extend(extra_deny_tools);
133 let filtered_tools: Arc<dyn ToolPort> =
134 Arc::new(DenyListToolPort { inner: tools, deny: deny_tools });
135
136 tokio::spawn(async move {
137 debug!(subagent_id = %sid, "subagent task started");
138
139 let req = AgentRequest {
140 input: task,
141 session_id: sid.clone(),
142 model: Some(model_str),
143 context: RequestContext::default(),
144 cancel_token: Some(cancel_token),
145 output_schema: None,
146 max_output_retries: 0,
147 };
148
149 let result = crate::scheduler::run_turn(
150 llm.as_ref(),
151 filtered_tools.as_ref(),
152 store.as_ref(),
153 events.as_ref(),
154 req,
155 &policy,
156 "subagent-default",
157 )
158 .await;
159
160 let subagent_result = match result {
161 Ok(AgentRunResult::Finished(resp)) => SubagentResult {
162 subagent_id: sid.clone(),
163 parent_session_id: parent_session_id.clone(),
164 content: resp.content,
165 usage: resp.usage,
166 is_error: false,
167 },
168 Err(e) => SubagentResult {
169 subagent_id: sid.clone(),
170 parent_session_id: parent_session_id.clone(),
171 content: format!("subagent error: {e}"),
172 usage: TokenUsage::default(),
173 is_error: true,
174 },
175 };
176
177 let is_error = subagent_result.is_error;
178 let _ = tx.send(subagent_result);
179
180 events.emit(AgentEvent::SubagentCompleted { subagent_id: sid.clone(), is_error });
181
182 {
184 }
187
188 info!(subagent_id = %sid, is_error, "subagent task completed");
189 });
190
191 Ok(subagent_id)
192 }
193
194 async fn await_result(&self, subagent_id: &SessionId) -> Result<SubagentResult, AgentError> {
195 let rx = {
196 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
197 state.results.remove(subagent_id)
198 };
199
200 match rx {
201 Some(receiver) => match receiver.await {
202 Ok(result) => Ok(result),
203 Err(_) => {
204 Err(AgentError::Internal("subagent task dropped without sending result".into()))
205 }
206 },
207 None => Err(AgentError::Config(format!(
208 "subagent '{subagent_id}' not found (already awaited or never spawned)"
209 ))),
210 }
211 }
212
213 async fn list_active(
214 &self,
215 parent_session_id: &SessionId,
216 ) -> Result<Vec<SessionId>, AgentError> {
217 let state = self.state.lock().unwrap_or_else(|p| p.into_inner());
218 let mut result = Vec::new();
219 if let Some(ids) = state.parent_index.get(parent_session_id) {
220 for id in ids {
221 if state.active.contains_key(id) {
222 result.push(id.clone());
223 }
224 }
225 }
226 Ok(result)
227 }
228
229 async fn cancel(&self, subagent_id: &SessionId) -> Result<(), AgentError> {
230 let handle = {
231 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
232 state.active.remove(subagent_id)
233 };
234
235 match handle {
236 Some(h) => {
237 h.cancel_token.cancel();
238 debug!(subagent_id = %subagent_id, "subagent cancelled");
239 Ok(())
240 }
241 None => Err(AgentError::Config(format!("subagent '{subagent_id}' not found"))),
242 }
243 }
244}
245
246struct DenyListToolPort {
248 inner: Arc<dyn ToolPort>,
249 deny: Vec<String>,
250}
251
252#[async_trait::async_trait]
253impl ToolPort for DenyListToolPort {
254 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, bob_core::error::ToolError> {
255 let tools = self.inner.list_tools().await?;
256 Ok(tools.into_iter().filter(|t| !self.deny.contains(&t.id)).collect())
257 }
258
259 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, bob_core::error::ToolError> {
260 if self.deny.contains(&call.name) {
261 return Ok(ToolResult {
262 name: call.name,
263 output: json!({ "error": "tool denied in subagent context" }),
264 is_error: true,
265 });
266 }
267 self.inner.call_tool(call).await
268 }
269}
270
271pub struct SubagentToolPort {
273 manager: Arc<dyn SubagentPort>,
274 default_parent_session: SessionId,
275}
276
277impl std::fmt::Debug for SubagentToolPort {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 f.debug_struct("SubagentToolPort")
280 .field("default_parent_session", &self.default_parent_session)
281 .finish_non_exhaustive()
282 }
283}
284
285impl SubagentToolPort {
286 #[must_use]
288 pub fn new(manager: Arc<dyn SubagentPort>, default_parent_session: SessionId) -> Self {
289 Self { manager, default_parent_session }
290 }
291}
292
293#[async_trait::async_trait]
294impl ToolPort for SubagentToolPort {
295 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, bob_core::error::ToolError> {
296 Ok(vec![
297 ToolDescriptor::new(
298 "subagent/spawn",
299 "Spawn a background subagent to handle a task independently. \
300 The subagent runs with its own tool registry and reasoning loop. \
301 Results are returned when the subagent completes.",
302 )
303 .with_input_schema(json!({
304 "type": "object",
305 "properties": {
306 "task": {
307 "type": "string",
308 "description": "Task description for the subagent"
309 },
310 "model": {
311 "type": "string",
312 "description": "Optional model override"
313 },
314 "max_steps": {
315 "type": "integer",
316 "description": "Optional max steps override"
317 }
318 },
319 "required": ["task"]
320 })),
321 ])
322 }
323
324 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, bob_core::error::ToolError> {
325 if call.name != "subagent/spawn" {
326 return Err(bob_core::error::ToolError::NotFound { name: call.name });
327 }
328
329 let task = call
330 .arguments
331 .get("task")
332 .and_then(serde_json::Value::as_str)
333 .ok_or_else(|| {
334 bob_core::error::ToolError::Execution("missing 'task' argument".to_string())
335 })?
336 .to_string();
337
338 let model =
339 call.arguments.get("model").and_then(serde_json::Value::as_str).map(String::from);
340
341 let max_steps =
342 call.arguments.get("max_steps").and_then(serde_json::Value::as_u64).map(|v| v as u32);
343
344 let subagent_id = self
345 .manager
346 .spawn(task.clone(), self.default_parent_session.clone(), model, max_steps, vec![])
347 .await
348 .map_err(|e| bob_core::error::ToolError::Execution(format!("spawn failed: {e}")))?;
349
350 let result = self
351 .manager
352 .await_result(&subagent_id)
353 .await
354 .map_err(|e| bob_core::error::ToolError::Execution(format!("await failed: {e}")))?;
355
356 Ok(ToolResult {
357 name: call.name,
358 output: json!({
359 "subagent_id": result.subagent_id,
360 "content": result.content,
361 "is_error": result.is_error,
362 "usage": {
363 "prompt_tokens": result.usage.prompt_tokens,
364 "completion_tokens": result.usage.completion_tokens,
365 }
366 }),
367 is_error: result.is_error,
368 })
369 }
370}
371
372fn uuid_v7_simple() -> String {
374 use std::time::{SystemTime, UNIX_EPOCH};
375 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
376 let rand_part: u16 = rand_u16();
377 format!("{:016x}{:04x}", now.as_nanos() & 0xFFFF_FFFF_FFFF_FFFF, rand_part)
378}
379
380fn rand_u16() -> u16 {
381 use std::{
382 collections::hash_map::DefaultHasher,
383 hash::{Hash, Hasher},
384 };
385 let mut hasher = DefaultHasher::new();
386 std::thread::current().id().hash(&mut hasher);
387 std::time::Instant::now().hash(&mut hasher);
388 hasher.finish() as u16
389}
390
391#[cfg(test)]
394mod tests {
395 use std::sync::Arc;
396
397 use bob_core::{
398 error::{LlmError, StoreError, ToolError},
399 types::{FinishReason, LlmRequest, LlmResponse, LlmStream, SessionState, TokenUsage},
400 };
401
402 use super::*;
403
404 struct StubLlm;
405
406 #[async_trait::async_trait]
407 impl LlmPort for StubLlm {
408 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
409 Ok(LlmResponse {
410 content: r#"{"type": "final", "content": "subagent done"}"#.into(),
411 usage: TokenUsage { prompt_tokens: 5, completion_tokens: 3 },
412 finish_reason: FinishReason::Stop,
413 tool_calls: Vec::new(),
414 })
415 }
416
417 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
418 Err(LlmError::Provider("not implemented".into()))
419 }
420 }
421
422 struct StubTools;
423
424 #[async_trait::async_trait]
425 impl ToolPort for StubTools {
426 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
427 Ok(Vec::new())
428 }
429
430 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
431 Ok(ToolResult { name: call.name, output: json!(null), is_error: false })
432 }
433 }
434
435 struct StubStore;
436
437 #[async_trait::async_trait]
438 impl SessionStore for StubStore {
439 async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
440 Ok(None)
441 }
442
443 async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
444 Ok(())
445 }
446 }
447
448 struct StubSink;
449
450 impl EventSink for StubSink {
451 fn emit(&self, _event: AgentEvent) {}
452 }
453
454 fn create_manager() -> DefaultSubagentManager {
455 DefaultSubagentManager::new(
456 Arc::new(StubLlm),
457 Arc::new(StubStore),
458 Arc::new(StubSink),
459 Arc::new(StubTools),
460 "test-model".to_string(),
461 TurnPolicy::default(),
462 )
463 }
464
465 #[tokio::test]
466 async fn deny_list_blocks_denied_tools() {
467 let inner: Arc<dyn ToolPort> = Arc::new(StubTools);
468 let deny = DenyListToolPort { inner, deny: vec!["subagent/spawn".into()] };
469
470 let tools = deny.list_tools().await.unwrap_or_default();
471 assert!(tools.is_empty(), "denied tool should not appear in list");
472 }
473
474 #[tokio::test]
475 async fn deny_list_returns_error_for_denied_call() {
476 let inner: Arc<dyn ToolPort> = Arc::new(StubTools);
477 let deny = DenyListToolPort { inner, deny: vec!["dangerous_tool".into()] };
478
479 let result = deny
480 .call_tool(ToolCall::new("dangerous_tool", json!({})))
481 .await
482 .unwrap_or_else(|_| unreachable!());
483 assert!(result.is_error, "denied tool call should return error result");
484 }
485
486 #[tokio::test]
487 async fn deny_list_passes_allowed_tools() {
488 let inner: Arc<dyn ToolPort> = Arc::new(StubTools);
489 let deny = DenyListToolPort { inner, deny: vec!["subagent/spawn".into()] };
490
491 let result = deny
492 .call_tool(ToolCall::new("local/file_read", json!({"path": "x"})))
493 .await
494 .unwrap_or_else(|_| unreachable!());
495 assert!(!result.is_error, "allowed tool should pass through");
496 }
497
498 #[tokio::test]
499 async fn subagent_tool_port_lists_spawn_tool() {
500 let manager = Arc::new(create_manager());
501 let port = SubagentToolPort::new(manager, "parent-session".into());
502
503 let tools = port.list_tools().await.unwrap_or_default();
504 assert_eq!(tools.len(), 1);
505 assert_eq!(tools[0].id, "subagent/spawn");
506 }
507
508 #[tokio::test]
509 async fn spawn_creates_subagent_and_delivers_result() {
510 let manager = create_manager();
511
512 let id = manager
513 .spawn("test task".into(), "parent-1".into(), None, None, vec![])
514 .await
515 .unwrap_or_else(|_| unreachable!());
516
517 assert!(id.starts_with("subagent:"));
518
519 let result = manager.await_result(&id).await.unwrap_or_else(|_| unreachable!());
520 assert_eq!(result.subagent_id, id);
521 assert!(!result.is_error);
522 assert!(!result.content.is_empty());
523 }
524
525 #[tokio::test]
526 async fn cancel_removes_subagent() {
527 let manager = create_manager();
528
529 let id = manager
530 .spawn("test task".into(), "parent-1".into(), None, None, vec![])
531 .await
532 .unwrap_or_else(|_| unreachable!());
533
534 let cancel_result = manager.cancel(&id).await;
535 assert!(cancel_result.is_ok(), "cancel should succeed");
536
537 let cancel_again = manager.cancel(&id).await;
538 assert!(cancel_again.is_err(), "double cancel should fail");
539 }
540
541 #[tokio::test]
542 async fn await_result_unknown_id_returns_error() {
543 let manager = create_manager();
544 let result = manager.await_result(&"nonexistent".into()).await;
545 assert!(result.is_err());
546 }
547
548 #[tokio::test]
549 async fn list_active_returns_running_subagents() {
550 let manager = create_manager();
551
552 let active =
553 manager.list_active(&"parent-1".into()).await.unwrap_or_else(|_| unreachable!());
554 assert!(active.is_empty());
555
556 let _id = manager
557 .spawn("test task".into(), "parent-1".into(), None, None, vec![])
558 .await
559 .unwrap_or_else(|_| unreachable!());
560
561 let active =
562 manager.list_active(&"parent-1".into()).await.unwrap_or_else(|_| unreachable!());
563 assert_eq!(active.len(), 1, "should have one active subagent");
564 }
565}