1use bamboo_domain::Session;
4use chrono::Utc;
5use serde_json::json;
6
7use super::helpers::{
8 compute_status_guidance, format_child_assignment, map_child_entry, metadata_text,
9 normalize_non_empty_optional, normalize_required_text, render_forked_parent_context,
10 replace_or_append_last_user_message, truncate_after_index, truncate_after_last_user,
11};
12use super::DELEGATION_NOTE;
13use super::{
14 ChildSessionEntry, ChildSessionError, ChildSessionPort, CreateChildInput, CreateChildResult,
15 QueuedInjectedMessage,
16};
17
18pub async fn create_child_action(
19 port: &dyn ChildSessionPort,
20 input: CreateChildInput,
21) -> Result<CreateChildResult, ChildSessionError> {
22 use crate::runner::refresh_prompt_snapshot;
23 use bamboo_agent_core::Message;
24
25 let mut child = Session::new_child_of(
30 input.child_id.clone(),
31 &input.parent_session,
32 input
33 .model_ref_override
34 .as_ref()
35 .map(|model_ref| model_ref.model.clone())
36 .or_else(|| input.model_override.clone())
37 .unwrap_or_else(|| input.parent_session.model.clone()),
38 input.title.clone(),
39 );
40
41 if let Some(model_ref) = input.model_ref_override.clone() {
42 child.model_ref = Some(model_ref.clone());
43 child
44 .metadata
45 .insert("provider_name".to_string(), model_ref.provider);
46 } else if let Some(parent_model_ref) = input.parent_session.model_ref.clone() {
47 child.model_ref = Some(parent_model_ref.clone());
48 child.set_provider_name(parent_model_ref.provider);
49 } else if let Some(parent_provider) = input.parent_session.provider_name() {
50 child.set_provider_name(parent_provider);
51 }
52
53 if let Some(effort) = input.reasoning_effort {
57 child.reasoning_effort = Some(effort);
58 }
59
60 if input
65 .parent_session
66 .agent_runtime_state
67 .as_ref()
68 .is_some_and(|state| state.bypass_permissions)
69 {
70 child
71 .agent_runtime_state
72 .get_or_insert_with(bamboo_domain::AgentRuntimeState::default)
73 .bypass_permissions = true;
74 }
75
76 child.workspace = Some(input.workspace.clone());
77 bamboo_agent_core::workspace_state::set_workspace(
78 &child.id,
79 std::path::PathBuf::from(&input.workspace),
80 );
81
82 child
83 .metadata
84 .insert("spawned_by".to_string(), "SubAgent".to_string());
85 child.set_subagent_type(input.subagent_type.clone());
86 child
87 .metadata
88 .insert("responsibility".to_string(), input.responsibility.clone());
89 child.metadata.insert(
90 "assignment_prompt".to_string(),
91 input.assignment_prompt.clone(),
92 );
93 if input.lifecycle.as_deref() == Some("resident") {
98 child
99 .metadata
100 .insert("lifecycle".to_string(), "resident".to_string());
101 if let Some(name) = input.resident_name.clone().filter(|n| !n.trim().is_empty()) {
102 child.metadata.insert("resident_name".to_string(), name);
103 }
104 child.metadata.insert(
105 "resident_context".to_string(),
106 input
107 .resident_context
108 .clone()
109 .filter(|c| matches!(c.as_str(), "reset" | "accumulate"))
110 .unwrap_or_else(|| "reset".to_string()),
111 );
112 }
113 child.set_last_run_status("pending");
114 child.clear_last_run_error();
115
116 for (key, value) in input.runtime_metadata {
118 child.metadata.insert(key, value);
119 }
120
121 let base_prompt = {
128 let global = crate::prompt_defaults::read_global_default_system_prompt_template();
129 if global.trim().is_empty() {
130 crate::context::DEFAULT_BASE_PROMPT.to_string()
131 } else {
132 global
133 }
134 };
135 let system_prompt = format!("{base_prompt}\n\n{DELEGATION_NOTE}");
136
137 child
138 .metadata
139 .insert("base_system_prompt".to_string(), system_prompt.clone());
140
141 child.add_message(Message::system(&system_prompt));
142
143 if let Some(ref parent_budget) = input.parent_session.token_budget {
147 let mut child_budget = parent_budget.clone();
148 child_budget.compression_trigger_percent = 70;
149 child_budget.compression_target_percent = 35;
150 child.token_budget = Some(child_budget);
151 }
152
153 refresh_prompt_snapshot(&mut child);
154 let assignment = format_child_assignment(
155 &input.title,
156 &input.responsibility,
157 &input.subagent_type,
158 &input.assignment_prompt,
159 );
160 let assignment = match input
164 .context_fork
165 .and_then(|n| render_forked_parent_context(&input.parent_session, n))
166 {
167 Some(forked) => format!("{forked}\n\n{assignment}"),
168 None => assignment,
169 };
170 child.add_message(Message::user(assignment));
171
172 if let Some(parent_task_list) = input.parent_session.task_list.clone() {
173 child.set_task_list(parent_task_list);
174 }
175
176 if let Some(ref disabled) = input.disabled_tools {
180 if !disabled.is_empty() {
181 child.metadata.insert(
182 "disabled_tools".to_string(),
183 serde_json::to_string(disabled).unwrap_or_default(),
184 );
185 }
186 }
187
188 let model = child.model.clone();
189 port.save_child_session(&mut child).await?;
190 if input.auto_run {
191 port.enqueue_child_run(&input.parent_session, &child)
192 .await?;
193 }
194
195 Ok(CreateChildResult {
196 child_session_id: child.id,
197 model,
198 })
199}
200
201pub async fn list_children_action(
202 port: &dyn ChildSessionPort,
203 parent_id: &str,
204) -> serde_json::Value {
205 let children = port.list_children(parent_id).await;
206 json!({
207 "parent_session_id": parent_id,
208 "children": children.iter().map(map_child_entry).collect::<Vec<_>>(),
209 "count": children.len(),
210 })
211}
212
213#[derive(Debug, Clone, PartialEq, serde::Serialize)]
216pub struct SessionTreeNode {
217 pub session_id: String,
218 pub title: String,
219 #[serde(skip_serializing_if = "Option::is_none")]
220 pub last_run_status: Option<String>,
221 pub depth: u32,
222 pub children: Vec<SessionTreeNode>,
223}
224
225pub fn assemble_session_tree(
230 root_id: &str,
231 root_title: &str,
232 adjacency: &std::collections::HashMap<String, Vec<ChildSessionEntry>>,
233 max_depth: u32,
234) -> SessionTreeNode {
235 fn build(
236 id: &str,
237 title: &str,
238 status: Option<String>,
239 depth: u32,
240 max_depth: u32,
241 adjacency: &std::collections::HashMap<String, Vec<ChildSessionEntry>>,
242 visited: &mut std::collections::HashSet<String>,
243 ) -> SessionTreeNode {
244 let first_visit = visited.insert(id.to_string());
245 let mut children = Vec::new();
246 if first_visit && depth < max_depth {
247 if let Some(kids) = adjacency.get(id) {
248 for kid in kids {
249 children.push(build(
250 &kid.child_session_id,
251 &kid.title,
252 kid.last_run_status.clone(),
253 depth + 1,
254 max_depth,
255 adjacency,
256 visited,
257 ));
258 }
259 }
260 }
261 SessionTreeNode {
262 session_id: id.to_string(),
263 title: title.to_string(),
264 last_run_status: status,
265 depth,
266 children,
267 }
268 }
269 let mut visited = std::collections::HashSet::new();
270 build(
271 root_id,
272 root_title,
273 None,
274 0,
275 max_depth,
276 adjacency,
277 &mut visited,
278 )
279}
280
281pub async fn build_session_tree_action(
287 port: &dyn ChildSessionPort,
288 root_id: &str,
289 max_depth: u32,
290) -> SessionTreeNode {
291 use std::collections::{HashMap, HashSet, VecDeque};
292 const NODE_CAP: usize = 5000;
293
294 let root_title = port
295 .load_root_session(root_id)
296 .await
297 .map(|s| s.title)
298 .unwrap_or_default();
299
300 let mut adjacency: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
301 let mut visited: HashSet<String> = HashSet::new();
302 let mut queue: VecDeque<(String, u32)> = VecDeque::new();
303 queue.push_back((root_id.to_string(), 0));
304
305 while let Some((id, depth)) = queue.pop_front() {
306 if depth >= max_depth || adjacency.len() >= NODE_CAP || !visited.insert(id.clone()) {
307 continue;
308 }
309 let kids = port.list_children(&id).await;
310 for kid in &kids {
311 queue.push_back((kid.child_session_id.clone(), depth + 1));
312 }
313 adjacency.insert(id, kids);
314 }
315
316 assemble_session_tree(root_id, &root_title, &adjacency, max_depth)
317}
318
319pub async fn get_child_action(
320 port: &dyn ChildSessionPort,
321 parent_id: &str,
322 child_session_id: String,
323) -> Result<serde_json::Value, ChildSessionError> {
324 let child = port
325 .load_child_for_parent(parent_id, &child_session_id)
326 .await?;
327
328 let status = metadata_text(&child, "last_run_status");
329 let runner_info = port.get_child_runner_info(&child.id).await;
330
331 Ok(json!({
332 "child_session_id": child.id,
333 "title": child.title,
334 "model": child.model,
335 "pinned": child.pinned,
336 "message_count": child.messages.len(),
337 "is_running": port.is_child_running(&child.id).await,
338 "last_run_status": status,
339 "last_run_error": metadata_text(&child, "last_run_error"),
340 "responsibility": metadata_text(&child, "responsibility"),
341 "subagent_type": metadata_text(&child, "subagent_type"),
342 "prompt": metadata_text(&child, "assignment_prompt"),
343 "latest_user_message": child
344 .messages
345 .iter()
346 .rposition(|message| matches!(message.role, bamboo_agent_core::Role::User))
347 .and_then(|idx| child.messages.get(idx))
348 .map(|message| message.content.clone()),
349 "runtime_kind": metadata_text(&child, "runtime.kind"),
350 "external_protocol": metadata_text(&child, "external.protocol"),
351 "external_agent_id": metadata_text(&child, "external.agent_id"),
352 "a2a_context_id": metadata_text(&child, "a2a.context_id"),
353 "a2a_latest_task_id": metadata_text(&child, "a2a.latest_task_id"),
354 "a2a_last_state": metadata_text(&child, "a2a.last_state"),
355 "runner_started_at": runner_info.as_ref().and_then(|r| r.started_at.map(|t| t.to_rfc3339())),
356 "runner_completed_at": runner_info.as_ref().and_then(|r| r.completed_at.map(|t| t.to_rfc3339())),
357 "last_tool_name": runner_info.as_ref().and_then(|r| r.last_tool_name.clone()),
358 "last_tool_phase": runner_info.as_ref().and_then(|r| r.last_tool_phase.clone()),
359 "last_event_at": runner_info.as_ref().and_then(|r| r.last_event_at.map(|t| t.to_rfc3339())),
360 "round_count": runner_info.as_ref().map(|r| r.round_count).unwrap_or(0),
361 "has_pending_injected_messages": child.has_pending_injected_messages(),
362 "guidance": compute_status_guidance(status.as_deref(), runner_info.as_ref(), child.has_pending_injected_messages()),
363 }))
364}
365
366#[allow(clippy::too_many_arguments)]
367pub async fn update_child_action(
368 port: &dyn ChildSessionPort,
369 parent_id: &str,
370 child_session_id: String,
371 title: Option<String>,
372 responsibility: Option<String>,
373 prompt: Option<String>,
374 subagent_type: Option<String>,
375 reset_after_update: Option<bool>,
376 reasoning_effort: Option<bamboo_domain::ReasoningEffort>,
377) -> Result<serde_json::Value, ChildSessionError> {
378 let mut child = port
379 .load_child_for_parent(parent_id, &child_session_id)
380 .await?;
381
382 let title = normalize_non_empty_optional(title, "title")?;
383 let responsibility = normalize_non_empty_optional(responsibility, "responsibility")?;
384 let prompt = normalize_non_empty_optional(prompt, "prompt")?;
385 let subagent_type = normalize_non_empty_optional(subagent_type, "subagent_type")?;
386
387 let should_refresh_assignment =
388 responsibility.is_some() || prompt.is_some() || subagent_type.is_some();
389
390 if title.is_none() && !should_refresh_assignment && reasoning_effort.is_none() {
391 return Err(ChildSessionError::InvalidArguments(
392 "update requires at least one field: title/responsibility/prompt/subagent_type/reasoning_effort"
393 .to_string(),
394 ));
395 }
396
397 if let Some(effort) = reasoning_effort {
398 child.reasoning_effort = Some(effort);
399 }
400
401 if let Some(title) = title {
402 child.title = title;
403 }
404
405 let mut messages_removed = 0usize;
406
407 if should_refresh_assignment {
408 let effective_responsibility = normalize_required_text(
409 responsibility.or_else(|| metadata_text(&child, "responsibility")),
410 "responsibility",
411 )?;
412 let effective_subagent_type = normalize_required_text(
413 subagent_type.or_else(|| metadata_text(&child, "subagent_type")),
414 "subagent_type",
415 )?;
416 let effective_prompt = normalize_required_text(
417 prompt.or_else(|| metadata_text(&child, "assignment_prompt")),
418 "prompt",
419 )?;
420
421 child.metadata.insert(
422 "responsibility".to_string(),
423 effective_responsibility.clone(),
424 );
425 child
426 .metadata
427 .insert("subagent_type".to_string(), effective_subagent_type.clone());
428 child
429 .metadata
430 .insert("assignment_prompt".to_string(), effective_prompt.clone());
431 child.set_last_run_status("pending");
432 child.clear_last_run_error();
433
434 let assignment = format_child_assignment(
435 &child.title,
436 &effective_responsibility,
437 &effective_subagent_type,
438 &effective_prompt,
439 );
440 let user_index = replace_or_append_last_user_message(&mut child, assignment);
441
442 if reset_after_update.unwrap_or(true) {
443 messages_removed = truncate_after_index(&mut child, user_index);
444 }
445 }
446
447 child.updated_at = Utc::now();
448 port.save_child_session(&mut child).await?;
449
450 Ok(json!({
451 "child_session_id": child.id,
452 "title": child.title,
453 "messages_removed": messages_removed,
454 "last_run_status": metadata_text(&child, "last_run_status"),
455 "note": "Child session updated in place. Use action=run to execute the same child session.",
456 }))
457}
458
459pub async fn run_child_action(
460 port: &dyn ChildSessionPort,
461 parent: &Session,
462 child_session_id: String,
463 reset_to_last_user: Option<bool>,
464) -> Result<serde_json::Value, ChildSessionError> {
465 let mut child = port
466 .load_child_for_parent(&parent.id, &child_session_id)
467 .await?;
468
469 if port.is_child_running(&child.id).await {
470 return Ok(json!({
471 "child_session_id": child.id,
472 "status": "already_running",
473 "note": "Child session is already running.",
474 }));
475 }
476
477 let mut messages_removed = 0usize;
478 if reset_to_last_user.unwrap_or(true) {
479 messages_removed = truncate_after_last_user(&mut child)?;
480 }
481
482 child.set_last_run_status("pending");
483 child.clear_last_run_error();
484 child.updated_at = Utc::now();
485 port.save_child_session(&mut child).await?;
486
487 port.enqueue_child_run(parent, &child).await?;
488
489 Ok(json!({
490 "child_session_id": child.id,
491 "status": "queued",
492 "messages_removed": messages_removed,
493 "note": "Queued existing child session for retry in place.",
494 }))
495}
496
497pub async fn send_message_to_child_action(
498 port: &dyn ChildSessionPort,
499 parent: &Session,
500 child_session_id: String,
501 message: String,
502 auto_run: Option<bool>,
503 interrupt_running: Option<bool>,
504) -> Result<serde_json::Value, ChildSessionError> {
505 let mut child = port
506 .load_child_for_parent(&parent.id, &child_session_id)
507 .await?;
508
509 let is_running = port.is_child_running(&child.id).await;
510 let should_interrupt = interrupt_running.unwrap_or(false);
511
512 if is_running && should_interrupt {
513 port.cancel_child_run_and_wait(&child.id).await?;
514 child = port
515 .load_child_for_parent(&parent.id, &child_session_id)
516 .await?;
517 }
518
519 let message = normalize_required_text(Some(message), "message")?;
520
521 if is_running && !should_interrupt {
522 if crate::external_agents::live::deliver_message(&child.id, &message) {
528 child.add_message(bamboo_agent_core::Message::user(message.clone()));
529 port.save_child_session(&mut child).await?;
530 return Ok(json!({
531 "child_session_id": child.id,
532 "status": "message_delivered_live",
533 "auto_run": false,
534 "message": message,
535 "message_count": child.messages.len(),
536 "note": "Message delivered to the running actor in-band; it will be admitted at the next round boundary without canceling progress.",
537 }));
538 }
539
540 let mut pending = child.pending_injected_messages().unwrap_or_default();
545 let queued = QueuedInjectedMessage {
546 content: message.clone(),
547 created_at: Some(chrono::Utc::now()),
548 };
549 pending.push(serde_json::to_value(&queued).unwrap_or(serde_json::Value::Null));
550 child.set_pending_injected_messages(pending);
551 port.save_child_session(&mut child).await?;
552
553 if !port.is_child_running(&child.id).await {
558 port.enqueue_child_run(parent, &child).await?;
559 return Ok(json!({
560 "child_session_id": child.id,
561 "status": "queued",
562 "auto_run": true,
563 "message": message,
564 "message_count": child.messages.len(),
565 "note": "Child finished while the message was being queued; a new run was scheduled to process it.",
566 }));
567 }
568
569 return Ok(json!({
570 "child_session_id": child.id,
571 "status": "message_queued",
572 "auto_run": false,
573 "message": message,
574 "message_count": child.messages.len(),
575 "note": "Message queued for the child session. It will be picked up at the next turn boundary without canceling current progress.",
576 }));
577 }
578
579 child.add_message(bamboo_agent_core::Message::user(message.clone()));
580 child.set_last_run_status("pending");
581 child.clear_last_run_error();
582 port.save_child_session(&mut child).await?;
583
584 let should_auto_run = auto_run.unwrap_or(true);
585 if should_auto_run {
586 port.enqueue_child_run(parent, &child).await?;
587 }
588
589 Ok(json!({
590 "child_session_id": child.id,
591 "status": if should_auto_run { "queued" } else { "pending" },
592 "auto_run": should_auto_run,
593 "message": message,
594 "message_count": child.messages.len(),
595 "note": if should_auto_run {
596 "Follow-up message appended and child session queued."
597 } else {
598 "Follow-up message appended. Use action=run to execute the child session."
599 },
600 }))
601}
602
603pub async fn cancel_child_action(
604 port: &dyn ChildSessionPort,
605 parent_id: &str,
606 child_session_id: String,
607) -> Result<serde_json::Value, ChildSessionError> {
608 let _ = port
610 .load_child_for_parent(parent_id, &child_session_id)
611 .await?;
612 port.cancel_child_run_and_wait(&child_session_id).await?;
613
614 let mut child = port
619 .load_child_for_parent(parent_id, &child_session_id)
620 .await?;
621 let latest_status = child.last_run_status().unwrap_or_default();
622 if matches!(latest_status.as_str(), "completed" | "error") {
623 return Ok(json!({
624 "child_session_id": child_session_id,
625 "status": latest_status,
626 "note": "Child reached a natural terminal state while the cancel was in flight; its real outcome was kept.",
627 }));
628 }
629 child.set_last_run_status("cancelled");
630 child.set_last_run_error("Cancelled by parent");
631 port.save_child_session(&mut child).await?;
632 Ok(json!({
633 "child_session_id": child_session_id,
634 "status": "cancelled",
635 }))
636}
637
638pub async fn delete_child_action(
639 port: &dyn ChildSessionPort,
640 parent_id: &str,
641 child_session_id: String,
642) -> Result<serde_json::Value, ChildSessionError> {
643 let child = port
645 .load_child_for_parent(parent_id, &child_session_id)
646 .await?;
647 let result = port.delete_child_session(parent_id, &child.id).await?;
648
649 if !result.deleted {
650 return Err(ChildSessionError::Execution(format!(
651 "child session was not deleted: {}",
652 child.id
653 )));
654 }
655
656 Ok(json!({
657 "child_session_id": child.id,
658 "deleted": true,
659 "cancelled_running_child": result.cancelled_running_child,
660 }))
661}
662
663#[cfg(test)]
664mod tree_tests {
665 use super::super::ChildSessionEntry;
666 use super::assemble_session_tree;
667 use std::collections::HashMap;
668
669 fn entry(id: &str, title: &str) -> ChildSessionEntry {
670 ChildSessionEntry {
671 child_session_id: id.to_string(),
672 title: title.to_string(),
673 pinned: false,
674 message_count: 0,
675 updated_at: String::new(),
676 last_run_status: Some("completed".to_string()),
677 last_run_error: None,
678 }
679 }
680
681 #[test]
682 fn assembles_multi_level_tree() {
683 let mut adj: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
684 adj.insert(
685 "root".into(),
686 vec![entry("c1", "child 1"), entry("c2", "child 2")],
687 );
688 adj.insert("c1".into(), vec![entry("g1", "grandchild")]);
689
690 let tree = assemble_session_tree("root", "Root", &adj, 8);
691 assert_eq!(tree.session_id, "root");
692 assert_eq!(tree.depth, 0);
693 assert_eq!(tree.children.len(), 2);
694 let c1 = tree.children.iter().find(|n| n.session_id == "c1").unwrap();
695 assert_eq!(c1.depth, 1);
696 assert_eq!(c1.children.len(), 1);
697 assert_eq!(c1.children[0].session_id, "g1");
698 assert_eq!(c1.children[0].depth, 2);
699 let c2 = tree.children.iter().find(|n| n.session_id == "c2").unwrap();
700 assert!(c2.children.is_empty());
701 }
702
703 #[test]
704 fn depth_cap_stops_descent() {
705 let mut adj: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
706 adj.insert("root".into(), vec![entry("c1", "c1")]);
707 adj.insert("c1".into(), vec![entry("g1", "g1")]);
708 let tree = assemble_session_tree("root", "Root", &adj, 1);
709 assert_eq!(tree.children.len(), 1);
710 assert!(
711 tree.children[0].children.is_empty(),
712 "depth cap stops expansion at depth 1"
713 );
714 }
715
716 #[test]
717 fn cycle_is_broken_by_first_visit_guard() {
718 let mut adj: HashMap<String, Vec<ChildSessionEntry>> = HashMap::new();
719 adj.insert("a".into(), vec![entry("b", "b")]);
720 adj.insert("b".into(), vec![entry("a", "a")]); let tree = assemble_session_tree("a", "A", &adj, 100);
722 assert_eq!(tree.children.len(), 1);
723 let b = &tree.children[0];
724 assert_eq!(b.session_id, "b");
725 assert_eq!(b.children.len(), 1);
726 let a2 = &b.children[0];
727 assert_eq!(a2.session_id, "a");
728 assert!(a2.children.is_empty(), "cycle must terminate as a leaf");
729 }
730}