1use bamboo_domain::Session;
4use chrono::Utc;
5use serde_json::json;
6
7use super::helpers::{
8 compute_status_guidance, format_child_assignment, map_child_entry, metadata_text,
9 normalize_non_empty_optional, normalize_required_text, render_forked_parent_context,
10 replace_or_append_last_user_message, truncate_after_index, truncate_after_last_user,
11};
12use super::DELEGATION_NOTE;
13use super::{
14 ChildSessionEntry, ChildSessionError, ChildSessionPort, CreateChildInput, CreateChildResult,
15 QueuedInjectedMessage,
16};
17
18pub async fn create_child_action(
19 port: &dyn ChildSessionPort,
20 input: CreateChildInput,
21) -> Result<CreateChildResult, ChildSessionError> {
22 use crate::runner::refresh_prompt_snapshot;
23 use bamboo_agent_core::Message;
24
25 let mut child = Session::new_child_of(
30 input.child_id.clone(),
31 &input.parent_session,
32 input
33 .model_ref_override
34 .as_ref()
35 .map(|model_ref| model_ref.model.clone())
36 .or_else(|| input.model_override.clone())
37 .unwrap_or_else(|| input.parent_session.model.clone()),
38 input.title.clone(),
39 );
40
41 if let Some(model_ref) = input.model_ref_override.clone() {
42 child.model_ref = Some(model_ref.clone());
43 child
44 .metadata
45 .insert("provider_name".to_string(), model_ref.provider);
46 } else if let Some(parent_model_ref) = input.parent_session.model_ref.clone() {
47 child.model_ref = Some(parent_model_ref.clone());
48 child.set_provider_name(parent_model_ref.provider);
49 } else if let Some(parent_provider) = input.parent_session.provider_name() {
50 child.set_provider_name(parent_provider);
51 }
52
53 if let Some(effort) = input.reasoning_effort {
57 child.reasoning_effort = Some(effort);
58 }
59
60 if input
65 .parent_session
66 .agent_runtime_state
67 .as_ref()
68 .is_some_and(|state| state.bypass_permissions)
69 {
70 child
71 .agent_runtime_state
72 .get_or_insert_with(bamboo_domain::AgentRuntimeState::default)
73 .bypass_permissions = true;
74 }
75
76 if input
81 .parent_session
82 .agent_runtime_state
83 .as_ref()
84 .is_some_and(|state| state.no_human_approver)
85 {
86 child
87 .agent_runtime_state
88 .get_or_insert_with(bamboo_domain::AgentRuntimeState::default)
89 .no_human_approver = true;
90 }
91
92 child.workspace = Some(input.workspace.clone());
93 bamboo_agent_core::workspace_state::set_workspace(
94 &child.id,
95 std::path::PathBuf::from(&input.workspace),
96 );
97
98 child
99 .metadata
100 .insert("spawned_by".to_string(), "SubAgent".to_string());
101 child.set_subagent_type(input.subagent_type.clone());
102 child
103 .metadata
104 .insert("responsibility".to_string(), input.responsibility.clone());
105 child.metadata.insert(
106 "assignment_prompt".to_string(),
107 input.assignment_prompt.clone(),
108 );
109 if input.lifecycle.as_deref() == Some("resident") {
114 child
115 .metadata
116 .insert("lifecycle".to_string(), "resident".to_string());
117 if let Some(name) = input.resident_name.clone().filter(|n| !n.trim().is_empty()) {
118 child.metadata.insert("resident_name".to_string(), name);
119 }
120 child.metadata.insert(
121 "resident_context".to_string(),
122 input
123 .resident_context
124 .clone()
125 .filter(|c| matches!(c.as_str(), "reset" | "accumulate"))
126 .unwrap_or_else(|| "reset".to_string()),
127 );
128 }
129 child.set_last_run_status("pending");
130 child.clear_last_run_error();
131
132 for (key, value) in input.runtime_metadata {
134 child.metadata.insert(key, value);
135 }
136
137 let base_prompt = {
144 let global = crate::prompt_defaults::read_global_default_system_prompt_template();
145 if global.trim().is_empty() {
146 crate::context::DEFAULT_BASE_PROMPT.to_string()
147 } else {
148 global
149 }
150 };
151 let system_prompt = format!("{base_prompt}\n\n{DELEGATION_NOTE}");
152
153 child
154 .metadata
155 .insert("base_system_prompt".to_string(), system_prompt.clone());
156
157 child.add_message(Message::system(&system_prompt));
158
159 if let Some(ref parent_budget) = input.parent_session.token_budget {
163 let mut child_budget = parent_budget.clone();
164 child_budget.compression_trigger_percent = 70;
165 child_budget.compression_target_percent = 35;
166 child.token_budget = Some(child_budget);
167 }
168
169 refresh_prompt_snapshot(&mut child);
170 let assignment = format_child_assignment(
171 &input.title,
172 &input.responsibility,
173 &input.subagent_type,
174 &input.assignment_prompt,
175 );
176 let assignment = match input
180 .context_fork
181 .and_then(|n| render_forked_parent_context(&input.parent_session, n))
182 {
183 Some(forked) => format!("{forked}\n\n{assignment}"),
184 None => assignment,
185 };
186 child.add_message(Message::user(assignment));
187
188 if let Some(parent_task_list) = input.parent_session.task_list.clone() {
189 child.set_task_list(parent_task_list);
190 }
191
192 if let Some(ref disabled) = input.disabled_tools {
196 if !disabled.is_empty() {
197 child.metadata.insert(
198 "disabled_tools".to_string(),
199 serde_json::to_string(disabled).unwrap_or_default(),
200 );
201 }
202 }
203
204 let model = child.model.clone();
205 port.save_child_session(&mut child).await?;
206 if input.auto_run {
207 port.enqueue_child_run(&input.parent_session, &child)
208 .await?;
209 }
210
211 Ok(CreateChildResult {
212 child_session_id: child.id,
213 model,
214 })
215}
216
217pub async fn list_children_action(
218 port: &dyn ChildSessionPort,
219 parent_id: &str,
220) -> serde_json::Value {
221 let children = port.list_children(parent_id).await;
222 json!({
223 "parent_session_id": parent_id,
224 "children": children.iter().map(map_child_entry).collect::<Vec<_>>(),
225 "count": children.len(),
226 })
227}
228
229#[derive(Debug, Clone, PartialEq, serde::Serialize)]
232pub struct SessionTreeNode {
233 pub session_id: String,
234 pub title: String,
235 #[serde(skip_serializing_if = "Option::is_none")]
236 pub last_run_status: Option<String>,
237 pub depth: u32,
238 pub children: Vec<SessionTreeNode>,
239}
240
241pub fn assemble_session_tree(
246 root_id: &str,
247 root_title: &str,
248 adjacency: &std::collections::HashMap<String, Vec<ChildSessionEntry>>,
249 max_depth: u32,
250) -> SessionTreeNode {
251 fn build(
252 id: &str,
253 title: &str,
254 status: Option<String>,
255 depth: u32,
256 max_depth: u32,
257 adjacency: &std::collections::HashMap<String, Vec<ChildSessionEntry>>,
258 visited: &mut std::collections::HashSet<String>,
259 ) -> SessionTreeNode {
260 let first_visit = visited.insert(id.to_string());
261 let mut children = Vec::new();
262 if first_visit && depth < max_depth {
263 if let Some(kids) = adjacency.get(id) {
264 for kid in kids {
265 children.push(build(
266 &kid.child_session_id,
267 &kid.title,
268 kid.last_run_status.clone(),
269 depth + 1,
270 max_depth,
271 adjacency,
272 visited,
273 ));
274 }
275 }
276 }
277 SessionTreeNode {
278 session_id: id.to_string(),
279 title: title.to_string(),
280 last_run_status: status,
281 depth,
282 children,
283 }
284 }
285 let mut visited = std::collections::HashSet::new();
286 build(
287 root_id,
288 root_title,
289 None,
290 0,
291 max_depth,
292 adjacency,
293 &mut visited,
294 )
295}
296
297pub async fn build_session_tree_action(
303 port: &dyn ChildSessionPort,
304 root_id: &str,
305 max_depth: u32,
306) -> SessionTreeNode {
307 use std::collections::{HashMap, HashSet, VecDeque};
308 const NODE_CAP: usize = 5000;
309
310 let root_title = port
311 .load_root_session(root_id)
312 .await
313 .map(|s| s.title)
314 .unwrap_or_default();
315
316 let mut adjacency: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
317 let mut visited: HashSet<String> = HashSet::new();
318 let mut queue: VecDeque<(String, u32)> = VecDeque::new();
319 queue.push_back((root_id.to_string(), 0));
320
321 while let Some((id, depth)) = queue.pop_front() {
322 if depth >= max_depth || adjacency.len() >= NODE_CAP || !visited.insert(id.clone()) {
323 continue;
324 }
325 let kids = port.list_children(&id).await;
326 for kid in &kids {
327 queue.push_back((kid.child_session_id.clone(), depth + 1));
328 }
329 adjacency.insert(id, kids);
330 }
331
332 assemble_session_tree(root_id, &root_title, &adjacency, max_depth)
333}
334
335pub async fn get_child_action(
336 port: &dyn ChildSessionPort,
337 parent_id: &str,
338 child_session_id: String,
339) -> Result<serde_json::Value, ChildSessionError> {
340 let child = port
341 .load_child_for_parent(parent_id, &child_session_id)
342 .await?;
343
344 let status = metadata_text(&child, "last_run_status");
345 let runner_info = port.get_child_runner_info(&child.id).await;
346
347 Ok(json!({
348 "child_session_id": child.id,
349 "title": child.title,
350 "model": child.model,
351 "pinned": child.pinned,
352 "message_count": child.messages.len(),
353 "is_running": port.is_child_running(&child.id).await,
354 "last_run_status": status,
355 "last_run_error": metadata_text(&child, "last_run_error"),
356 "responsibility": metadata_text(&child, "responsibility"),
357 "subagent_type": metadata_text(&child, "subagent_type"),
358 "prompt": metadata_text(&child, "assignment_prompt"),
359 "latest_user_message": child
360 .messages
361 .iter()
362 .rposition(|message| matches!(message.role, bamboo_agent_core::Role::User))
363 .and_then(|idx| child.messages.get(idx))
364 .map(|message| message.content.clone()),
365 "runtime_kind": metadata_text(&child, "runtime.kind"),
366 "external_protocol": metadata_text(&child, "external.protocol"),
367 "external_agent_id": metadata_text(&child, "external.agent_id"),
368 "a2a_context_id": metadata_text(&child, "a2a.context_id"),
369 "a2a_latest_task_id": metadata_text(&child, "a2a.latest_task_id"),
370 "a2a_last_state": metadata_text(&child, "a2a.last_state"),
371 "runner_started_at": runner_info.as_ref().and_then(|r| r.started_at.map(|t| t.to_rfc3339())),
372 "runner_completed_at": runner_info.as_ref().and_then(|r| r.completed_at.map(|t| t.to_rfc3339())),
373 "last_tool_name": runner_info.as_ref().and_then(|r| r.last_tool_name.clone()),
374 "last_tool_phase": runner_info.as_ref().and_then(|r| r.last_tool_phase.clone()),
375 "last_event_at": runner_info.as_ref().and_then(|r| r.last_event_at.map(|t| t.to_rfc3339())),
376 "round_count": runner_info.as_ref().map(|r| r.round_count).unwrap_or(0),
377 "has_pending_injected_messages": child.has_pending_injected_messages(),
378 "guidance": compute_status_guidance(status.as_deref(), runner_info.as_ref(), child.has_pending_injected_messages()),
379 }))
380}
381
382#[allow(clippy::too_many_arguments)]
383pub async fn update_child_action(
384 port: &dyn ChildSessionPort,
385 parent_id: &str,
386 child_session_id: String,
387 title: Option<String>,
388 responsibility: Option<String>,
389 prompt: Option<String>,
390 subagent_type: Option<String>,
391 reset_after_update: Option<bool>,
392 reasoning_effort: Option<bamboo_domain::ReasoningEffort>,
393) -> Result<serde_json::Value, ChildSessionError> {
394 let mut child = port
395 .load_child_for_parent(parent_id, &child_session_id)
396 .await?;
397
398 let title = normalize_non_empty_optional(title, "title")?;
399 let responsibility = normalize_non_empty_optional(responsibility, "responsibility")?;
400 let prompt = normalize_non_empty_optional(prompt, "prompt")?;
401 let subagent_type = normalize_non_empty_optional(subagent_type, "subagent_type")?;
402
403 let should_refresh_assignment =
404 responsibility.is_some() || prompt.is_some() || subagent_type.is_some();
405
406 if title.is_none() && !should_refresh_assignment && reasoning_effort.is_none() {
407 return Err(ChildSessionError::InvalidArguments(
408 "update requires at least one field: title/responsibility/prompt/subagent_type/reasoning_effort"
409 .to_string(),
410 ));
411 }
412
413 if let Some(effort) = reasoning_effort {
414 child.reasoning_effort = Some(effort);
415 }
416
417 if let Some(title) = title {
418 child.title = title;
419 }
420
421 let mut messages_removed = 0usize;
422
423 if should_refresh_assignment {
424 let effective_responsibility = normalize_required_text(
425 responsibility.or_else(|| metadata_text(&child, "responsibility")),
426 "responsibility",
427 )?;
428 let effective_subagent_type = normalize_required_text(
429 subagent_type.or_else(|| metadata_text(&child, "subagent_type")),
430 "subagent_type",
431 )?;
432 let effective_prompt = normalize_required_text(
433 prompt.or_else(|| metadata_text(&child, "assignment_prompt")),
434 "prompt",
435 )?;
436
437 child.metadata.insert(
438 "responsibility".to_string(),
439 effective_responsibility.clone(),
440 );
441 child
442 .metadata
443 .insert("subagent_type".to_string(), effective_subagent_type.clone());
444 child
445 .metadata
446 .insert("assignment_prompt".to_string(), effective_prompt.clone());
447 child.set_last_run_status("pending");
448 child.clear_last_run_error();
449
450 let assignment = format_child_assignment(
451 &child.title,
452 &effective_responsibility,
453 &effective_subagent_type,
454 &effective_prompt,
455 );
456 let user_index = replace_or_append_last_user_message(&mut child, assignment);
457
458 if reset_after_update.unwrap_or(true) {
459 messages_removed = truncate_after_index(&mut child, user_index);
460 }
461 }
462
463 child.updated_at = Utc::now();
464 port.save_child_session(&mut child).await?;
465
466 Ok(json!({
467 "child_session_id": child.id,
468 "title": child.title,
469 "messages_removed": messages_removed,
470 "last_run_status": metadata_text(&child, "last_run_status"),
471 "note": "Child session updated in place. Use action=run to execute the same child session.",
472 }))
473}
474
475pub async fn run_child_action(
476 port: &dyn ChildSessionPort,
477 parent: &Session,
478 child_session_id: String,
479 reset_to_last_user: Option<bool>,
480) -> Result<serde_json::Value, ChildSessionError> {
481 let mut child = port
482 .load_child_for_parent(&parent.id, &child_session_id)
483 .await?;
484
485 if port.is_child_running(&child.id).await {
486 return Ok(json!({
487 "child_session_id": child.id,
488 "status": "already_running",
489 "note": "Child session is already running.",
490 }));
491 }
492
493 let mut messages_removed = 0usize;
494 if reset_to_last_user.unwrap_or(true) {
495 messages_removed = truncate_after_last_user(&mut child)?;
496 }
497
498 child.set_last_run_status("pending");
499 child.clear_last_run_error();
500 child.updated_at = Utc::now();
501 port.save_child_session(&mut child).await?;
502
503 port.enqueue_child_run(parent, &child).await?;
504
505 Ok(json!({
506 "child_session_id": child.id,
507 "status": "queued",
508 "messages_removed": messages_removed,
509 "note": "Queued existing child session for retry in place.",
510 }))
511}
512
513pub async fn send_message_to_child_action(
514 port: &dyn ChildSessionPort,
515 parent: &Session,
516 child_session_id: String,
517 message: String,
518 auto_run: Option<bool>,
519 interrupt_running: Option<bool>,
520) -> Result<serde_json::Value, ChildSessionError> {
521 let mut child = port
522 .load_child_for_parent(&parent.id, &child_session_id)
523 .await?;
524
525 let is_running = port.is_child_running(&child.id).await;
526 let should_interrupt = interrupt_running.unwrap_or(false);
527
528 if is_running && should_interrupt {
529 port.cancel_child_run_and_wait(&child.id).await?;
530 child = port
531 .load_child_for_parent(&parent.id, &child_session_id)
532 .await?;
533 }
534
535 let message = normalize_required_text(Some(message), "message")?;
536
537 if is_running && !should_interrupt {
538 if crate::external_agents::live::deliver_message(&child.id, &message) {
544 child.add_message(bamboo_agent_core::Message::user(message.clone()));
545 port.save_child_session(&mut child).await?;
546 return Ok(json!({
547 "child_session_id": child.id,
548 "status": "message_delivered_live",
549 "auto_run": false,
550 "message": message,
551 "message_count": child.messages.len(),
552 "note": "Message delivered to the running actor in-band; it will be admitted at the next round boundary without canceling progress.",
553 }));
554 }
555
556 let mut pending = child.pending_injected_messages().unwrap_or_default();
561 let queued = QueuedInjectedMessage {
562 content: message.clone(),
563 created_at: Some(chrono::Utc::now()),
564 };
565 pending.push(serde_json::to_value(&queued).unwrap_or(serde_json::Value::Null));
566 child.set_pending_injected_messages(pending);
567 port.save_child_session(&mut child).await?;
568
569 if !port.is_child_running(&child.id).await {
574 port.enqueue_child_run(parent, &child).await?;
575 return Ok(json!({
576 "child_session_id": child.id,
577 "status": "queued",
578 "auto_run": true,
579 "message": message,
580 "message_count": child.messages.len(),
581 "note": "Child finished while the message was being queued; a new run was scheduled to process it.",
582 }));
583 }
584
585 return Ok(json!({
586 "child_session_id": child.id,
587 "status": "message_queued",
588 "auto_run": false,
589 "message": message,
590 "message_count": child.messages.len(),
591 "note": "Message queued for the child session. It will be picked up at the next turn boundary without canceling current progress.",
592 }));
593 }
594
595 child.add_message(bamboo_agent_core::Message::user(message.clone()));
596 child.set_last_run_status("pending");
597 child.clear_last_run_error();
598 port.save_child_session(&mut child).await?;
599
600 let should_auto_run = auto_run.unwrap_or(true);
601 if should_auto_run {
602 port.enqueue_child_run(parent, &child).await?;
603 }
604
605 Ok(json!({
606 "child_session_id": child.id,
607 "status": if should_auto_run { "queued" } else { "pending" },
608 "auto_run": should_auto_run,
609 "message": message,
610 "message_count": child.messages.len(),
611 "note": if should_auto_run {
612 "Follow-up message appended and child session queued."
613 } else {
614 "Follow-up message appended. Use action=run to execute the child session."
615 },
616 }))
617}
618
619pub async fn cancel_child_action(
620 port: &dyn ChildSessionPort,
621 parent_id: &str,
622 child_session_id: String,
623) -> Result<serde_json::Value, ChildSessionError> {
624 let _ = port
626 .load_child_for_parent(parent_id, &child_session_id)
627 .await?;
628 port.cancel_child_run_and_wait(&child_session_id).await?;
629
630 let mut child = port
635 .load_child_for_parent(parent_id, &child_session_id)
636 .await?;
637 let latest_status = child.last_run_status().unwrap_or_default();
638 if matches!(latest_status.as_str(), "completed" | "error") {
639 return Ok(json!({
640 "child_session_id": child_session_id,
641 "status": latest_status,
642 "note": "Child reached a natural terminal state while the cancel was in flight; its real outcome was kept.",
643 }));
644 }
645 child.set_last_run_status("cancelled");
646 child.set_last_run_error("Cancelled by parent");
647 port.save_child_session(&mut child).await?;
648 Ok(json!({
649 "child_session_id": child_session_id,
650 "status": "cancelled",
651 }))
652}
653
654pub async fn delete_child_action(
655 port: &dyn ChildSessionPort,
656 parent_id: &str,
657 child_session_id: String,
658) -> Result<serde_json::Value, ChildSessionError> {
659 let child = port
661 .load_child_for_parent(parent_id, &child_session_id)
662 .await?;
663 let result = port.delete_child_session(parent_id, &child.id).await?;
664
665 if !result.deleted {
666 return Err(ChildSessionError::Execution(format!(
667 "child session was not deleted: {}",
668 child.id
669 )));
670 }
671
672 Ok(json!({
673 "child_session_id": child.id,
674 "deleted": true,
675 "cancelled_running_child": result.cancelled_running_child,
676 }))
677}
678
679#[cfg(test)]
680mod tree_tests {
681 use super::super::ChildSessionEntry;
682 use super::assemble_session_tree;
683 use std::collections::HashMap;
684
685 fn entry(id: &str, title: &str) -> ChildSessionEntry {
686 ChildSessionEntry {
687 child_session_id: id.to_string(),
688 title: title.to_string(),
689 pinned: false,
690 message_count: 0,
691 updated_at: String::new(),
692 last_run_status: Some("completed".to_string()),
693 last_run_error: None,
694 }
695 }
696
697 #[test]
698 fn assembles_multi_level_tree() {
699 let mut adj: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
700 adj.insert(
701 "root".into(),
702 vec![entry("c1", "child 1"), entry("c2", "child 2")],
703 );
704 adj.insert("c1".into(), vec![entry("g1", "grandchild")]);
705
706 let tree = assemble_session_tree("root", "Root", &adj, 8);
707 assert_eq!(tree.session_id, "root");
708 assert_eq!(tree.depth, 0);
709 assert_eq!(tree.children.len(), 2);
710 let c1 = tree.children.iter().find(|n| n.session_id == "c1").unwrap();
711 assert_eq!(c1.depth, 1);
712 assert_eq!(c1.children.len(), 1);
713 assert_eq!(c1.children[0].session_id, "g1");
714 assert_eq!(c1.children[0].depth, 2);
715 let c2 = tree.children.iter().find(|n| n.session_id == "c2").unwrap();
716 assert!(c2.children.is_empty());
717 }
718
719 #[test]
720 fn depth_cap_stops_descent() {
721 let mut adj: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
722 adj.insert("root".into(), vec![entry("c1", "c1")]);
723 adj.insert("c1".into(), vec![entry("g1", "g1")]);
724 let tree = assemble_session_tree("root", "Root", &adj, 1);
725 assert_eq!(tree.children.len(), 1);
726 assert!(
727 tree.children[0].children.is_empty(),
728 "depth cap stops expansion at depth 1"
729 );
730 }
731
732 #[test]
733 fn cycle_is_broken_by_first_visit_guard() {
734 let mut adj: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
735 adj.insert("a".into(), vec![entry("b", "b")]);
736 adj.insert("b".into(), vec![entry("a", "a")]); let tree = assemble_session_tree("a", "A", &adj, 100);
738 assert_eq!(tree.children.len(), 1);
739 let b = &tree.children[0];
740 assert_eq!(b.session_id, "b");
741 assert_eq!(b.children.len(), 1);
742 let a2 = &b.children[0];
743 assert_eq!(a2.session_id, "a");
744 assert!(a2.children.is_empty(), "cycle must terminate as a leaf");
745 }
746}