1use std::collections::HashMap;
8use std::sync::{Arc, OnceLock, RwLock as StdRwLock};
9use std::time::Duration;
10
11use crate::execution::{
12 create_event_forwarder, spawn_session_execution, try_reserve_runner, AgentRunner,
13 ChildCompletion, ChildCompletionHandler, RunnerReservation, SessionExecutionArgs,
14};
15use crate::runtime::config::{BashResumeHook, GuardianSpawner, BASH_COMPLETION_RESUME_KIND};
16use crate::runtime::guardian_state::{
17 parse_guardian_verdict, read_guardian_config, read_guardian_state, write_guardian_state,
18 GuardianVerdict,
19};
20use crate::Agent;
21use async_trait::async_trait;
22use bamboo_agent_core::storage::Storage;
23use bamboo_agent_core::tools::ToolExecutor;
24use bamboo_agent_core::{AgentEvent, Message, Role, Session};
25use bamboo_domain::session::runtime_state::{
26 AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState,
27};
28use bamboo_llm::{Config, ProviderModelRouter, ProviderRegistry};
29use bamboo_storage::LockedSessionStore;
30use chrono::Utc;
31use tokio::sync::{broadcast, RwLock};
32
33use crate::model_areas::resolve_global_area_models;
34use crate::model_config_helper::{
35 resolve_fast_model, resolve_gold_config, GOLD_CONFIG_METADATA_KEY,
36};
37use crate::session_app::provider_model::session_effective_model_ref;
38use crate::session_app::resume::{
39 resume_session_execution, ResumeExecutionPort, ResumeSpawnRequest,
40};
41use crate::session_app::types::{ResumeConfigSnapshot, ResumeOutcome};
42
43const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
44const RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: &str = "hidden_from_ui";
45const RUNTIME_RESUME_MESSAGE_KIND_KEY: &str = "runtime_kind";
46
47fn read_runtime_state(session: &Session) -> AgentRuntimeState {
48 session
49 .agent_runtime_state
50 .clone()
51 .or_else(|| {
52 session
53 .metadata
54 .get(AGENT_RUNTIME_STATE_METADATA_KEY)
55 .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
56 })
57 .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-child-wait", session.id)))
58}
59
60fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
61 session.agent_runtime_state = Some(runtime_state.clone());
62 if let Ok(serialized) = serde_json::to_string(runtime_state) {
63 session
64 .metadata
65 .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
66 }
67}
68
69fn is_error_like(status: &str) -> bool {
70 matches!(status, "error" | "timeout" | "cancelled")
71}
72
73fn is_terminal_child_status(status: &str) -> bool {
75 matches!(
76 status,
77 "completed" | "error" | "timeout" | "cancelled" | "skipped"
78 )
79}
80
81async fn derive_completed_child_ids(
86 storage: &Arc<dyn Storage>,
87 parent_session_id: &str,
88 just_completed_child_id: &str,
89) -> Vec<String> {
90 let mut completed: Vec<String> = storage
91 .list_child_run_statuses(parent_session_id)
92 .await
93 .unwrap_or_default()
94 .into_iter()
95 .filter(|(_, status)| status.as_deref().is_some_and(is_terminal_child_status))
96 .map(|(id, _)| id)
97 .collect();
98 if !completed.iter().any(|id| id == just_completed_child_id) {
99 completed.push(just_completed_child_id.to_string());
100 }
101 completed.sort();
102 completed.dedup();
103 completed
104}
105
106fn read_config_snapshot(config: &Arc<RwLock<Config>>, cached_config: &StdRwLock<Config>) -> Config {
107 if let Ok(config_guard) = config.try_read() {
108 let snapshot = config_guard.clone();
109
110 if let Ok(mut cached_guard) = cached_config.try_write() {
111 *cached_guard = snapshot.clone();
112 }
113
114 snapshot
115 } else {
116 cached_config
117 .try_read()
118 .map(|guard| guard.clone())
119 .unwrap_or_default()
120 }
121}
122
123fn parent_locks() -> &'static std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
140 static LOCKS: OnceLock<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
141 OnceLock::new();
142 LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
143}
144
145fn wait_policy_satisfied(
146 policy: ChildWaitPolicy,
147 wait_child_ids: &[String],
148 completed_child_ids: &[String],
149 latest_status: &str,
150) -> bool {
151 if wait_child_ids.is_empty() {
152 return false;
153 }
154
155 match policy {
156 ChildWaitPolicy::All => wait_child_ids
157 .iter()
158 .all(|id| completed_child_ids.iter().any(|completed| completed == id)),
159 ChildWaitPolicy::Any => completed_child_ids
160 .iter()
161 .any(|id| wait_child_ids.iter().any(|wait_id| wait_id == id)),
162 ChildWaitPolicy::FirstError => {
163 is_error_like(latest_status)
164 || wait_child_ids
165 .iter()
166 .all(|id| completed_child_ids.iter().any(|completed| completed == id))
167 }
168 }
169}
170
171fn child_final_assistant_text(child: &Session) -> Option<String> {
175 child
176 .messages
177 .iter()
178 .rev()
179 .find(|message| matches!(message.role, Role::Assistant))
180 .map(|message| message.content.clone())
181 .filter(|content| !content.trim().is_empty())
182}
183
184fn runtime_resume_message(
185 completion: &ChildCompletion,
186 remaining_children: usize,
187 child_final_response: Option<&str>,
188) -> Message {
189 let mut body = format!(
190 "Runtime notification: child session `{}` finished with status `{}`. Remaining child sessions: {}.",
191 completion.child_session_id, completion.status, remaining_children
192 );
193
194 let final_response = child_final_response.map(str::to_string);
200 if let Some(response) = final_response.as_deref() {
201 body.push_str("\n\nChild final response:\n");
202 body.push_str(response);
203 } else if let Some(error) = completion.error.as_deref() {
204 if !error.is_empty() {
205 body.push_str("\n\nChild error:\n");
206 body.push_str(error);
207 }
208 }
209
210 body.push_str(
211 "\n\nResume the parent task using this child result and continue from the previous plan. \
212 If you need the full child transcript, call SubAgent.get(child_session_id).",
213 );
214
215 let mut message = Message::user(body);
216 message.metadata = Some(serde_json::json!({
217 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
218 RUNTIME_RESUME_MESSAGE_KIND_KEY: "child_completion_resume",
219 "child_session_id": completion.child_session_id,
220 "child_status": completion.status,
221 "child_error": completion.error,
222 "completed_at": completion.completed_at,
223 "child_final_response_included": final_response.is_some(),
224 }));
225 message.never_compress = false;
229 message
230}
231
232fn guardian_resume_message(completion: &ChildCompletion, verdict: &GuardianVerdict) -> Message {
237 let mut body = if verdict.approve {
238 String::from(
239 "Guardian review APPROVED: an independent reviewer verified the work and found no blocking issues. You may finalize the task.",
240 )
241 } else {
242 String::from(
243 "Guardian review REJECTED: an independent reviewer found issues. Address every finding below before completing — do NOT declare the task complete until they are resolved.",
244 )
245 };
246 if let Some(summary) = verdict.summary.as_deref().filter(|s| !s.trim().is_empty()) {
247 body.push_str("\n\nReviewer summary: ");
248 body.push_str(summary);
249 }
250 if !verdict.findings.is_empty() {
251 body.push_str("\n\nFindings:");
252 for (idx, finding) in verdict.findings.iter().enumerate() {
253 body.push_str(&format!("\n{}. {}", idx + 1, finding));
254 }
255 }
256 body.push_str(
257 "\n\nIf you need the full guardian transcript, call SubAgent.get(child_session_id).",
258 );
259
260 let mut message = Message::user(body);
261 message.metadata = Some(serde_json::json!({
262 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
263 RUNTIME_RESUME_MESSAGE_KIND_KEY: "guardian_review_resume",
264 "child_session_id": completion.child_session_id,
265 "child_status": completion.status,
266 "guardian_approved": verdict.approve,
267 "completed_at": completion.completed_at,
268 }));
269 message.never_compress = false;
270 message
271}
272
273#[derive(Clone)]
274pub struct ChildCompletionCoordinator {
275 storage: Arc<dyn Storage>,
276 persistence: Arc<bamboo_storage::LockedSessionStore>,
277 sessions: crate::SessionCache,
278 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
279 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
280 agent: Arc<Agent>,
281 config: Arc<RwLock<Config>>,
282 provider_registry: Arc<ProviderRegistry>,
283 provider_router: Arc<ProviderModelRouter>,
284 app_data_dir: std::path::PathBuf,
285 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
286 root_tools: Arc<RwLock<Option<Arc<dyn ToolExecutor>>>>,
287 guardian_spawner: Arc<RwLock<Option<Arc<dyn GuardianSpawner>>>>,
291}
292
293impl ChildCompletionCoordinator {
294 #[allow(clippy::too_many_arguments)]
295 pub fn new(
296 storage: Arc<dyn Storage>,
297 persistence: Arc<LockedSessionStore>,
298 sessions: crate::SessionCache,
299 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
300 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
301 agent: Arc<Agent>,
302 config: Arc<RwLock<Config>>,
303 provider_registry: Arc<ProviderRegistry>,
304 provider_router: Arc<ProviderModelRouter>,
305 app_data_dir: std::path::PathBuf,
306 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
307 ) -> Self {
308 Self {
309 storage,
310 persistence,
311 sessions,
312 agent_runners,
313 session_event_senders,
314 agent,
315 config,
316 provider_registry,
317 provider_router,
318 app_data_dir,
319 account_feed_inbox,
320 root_tools: Arc::new(RwLock::new(None)),
321 guardian_spawner: Arc::new(RwLock::new(None)),
322 }
323 }
324
325 pub async fn set_root_tools(&self, tools: Arc<dyn ToolExecutor>) {
326 *self.root_tools.write().await = Some(tools);
327 }
328
329 pub async fn set_guardian_spawner(&self, spawner: Arc<dyn GuardianSpawner>) {
332 *self.guardian_spawner.write().await = Some(spawner);
333 }
334
335 fn build_resume_config(
336 &self,
337 session: &Session,
338 config_snapshot: &Config,
339 ) -> ResumeConfigSnapshot {
340 crate::session_app::resolution::resolve_resume_config_snapshot(
341 config_snapshot,
342 &self.provider_registry,
343 session,
344 None,
345 )
346 }
347
348 async fn resume_parent(&self, parent_session_id: String) -> ResumeOutcome {
356 for attempt in 0..=5u8 {
357 if attempt > 0 {
358 tokio::time::sleep(Duration::from_millis(250 * attempt as u64)).await;
359 }
360
361 let Some(session) = self.load_session(&parent_session_id).await else {
362 tracing::warn!(%parent_session_id, "cannot resume parent after child completion: session not found");
363 return ResumeOutcome::NotFound;
364 };
365 let config_snapshot = self.config.read().await.clone();
366 let resume_config = self.build_resume_config(&session, &config_snapshot);
367 let outcome = resume_session_execution(self, &parent_session_id, resume_config).await;
368 tracing::info!(
369 %parent_session_id,
370 attempt,
371 outcome = outcome.as_str(),
372 "child completion requested parent resume"
373 );
374
375 if !matches!(outcome, ResumeOutcome::AlreadyRunning { .. }) {
376 return outcome;
377 }
378 }
379 ResumeOutcome::AlreadyRunning {
381 run_id: String::new(),
382 }
383 }
384
385 async fn save_and_cache(&self, session: &mut Session) {
386 if let Err(error) = self.persistence.merge_save_runtime(session).await {
387 tracing::warn!(session_id = %session.id, %error, "failed to persist session");
388 }
389 self.sessions.insert(
390 session.id.clone(),
391 Arc::new(parking_lot::RwLock::new(session.clone())),
392 );
393 }
394}
395
396#[async_trait]
397impl ChildCompletionHandler for ChildCompletionCoordinator {
398 async fn on_child_completed(&self, completion: ChildCompletion) {
399 let per_parent = {
404 let mut map = parent_locks().lock().expect("parent lock map poisoned");
405 map.entry(completion.parent_session_id.clone())
406 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
407 .clone()
408 };
409 let _per_parent_guard = per_parent.lock().await;
410
411 let Some(mut parent) = self.load_session(&completion.parent_session_id).await else {
412 tracing::warn!(
413 parent_session_id = %completion.parent_session_id,
414 child_session_id = %completion.child_session_id,
415 "child completion received for missing parent"
416 );
417 return;
418 };
419
420 let mut runtime_state = read_runtime_state(&parent);
426
427 let completed_child_ids = derive_completed_child_ids(
430 &self.storage,
431 &completion.parent_session_id,
432 &completion.child_session_id,
433 )
434 .await;
435
436 let mut should_resume = false;
437 let mut remaining_children = 0usize;
438 if let Some(wait) = runtime_state.waiting_for_children.clone() {
439 remaining_children = wait
440 .child_session_ids
441 .iter()
442 .filter(|id| !completed_child_ids.iter().any(|completed| completed == *id))
443 .count();
444 should_resume = wait_policy_satisfied(
445 wait.wait_for,
446 &wait.child_session_ids,
447 &completed_child_ids,
448 &completion.status,
449 );
450 if should_resume {
451 runtime_state.waiting_for_children = None;
452 runtime_state.status = AgentStatusState::Idle;
453 runtime_state.suspension = None;
454 }
455 }
456
457 if should_resume {
458 parent.metadata.remove("runtime.suspend_reason");
459 let loaded_child = match self
464 .storage
465 .load_session(&completion.child_session_id)
466 .await
467 {
468 Ok(child) => child,
469 Err(error) => {
470 tracing::warn!(
471 child_session_id = %completion.child_session_id,
472 %error,
473 "failed to load child session for runtime resume message"
474 );
475 None
476 }
477 };
478
479 let reviewed_round = runtime_state.round.current_round;
485 let guardian_resume = loaded_child.as_ref().and_then(|child| {
486 if child.subagent_type().as_deref() != Some("guardian") {
487 return None;
488 }
489 let mut guardian_state = read_guardian_state(&parent)?;
490 if guardian_state.guardian_child_id.as_deref()
491 != Some(completion.child_session_id.as_str())
492 {
493 tracing::warn!(
496 parent_session_id = %completion.parent_session_id,
497 child_session_id = %completion.child_session_id,
498 expected = ?guardian_state.guardian_child_id,
499 "guardian completion does not match recorded guardian_child_id; using generic resume"
500 );
501 return None;
502 }
503 let verdict = child_final_assistant_text(child)
511 .and_then(|text| match parse_guardian_verdict(&text) {
512 Ok(verdict) => Some(verdict),
513 Err(error) => {
514 tracing::warn!(
515 child_session_id = %completion.child_session_id,
516 %error,
517 "guardian verdict unparseable; recording a synthetic reject"
518 );
519 None
520 }
521 })
522 .unwrap_or_else(|| {
523 GuardianVerdict::rejected(vec![
524 "The guardian reviewer did not return a usable verdict (it errored or \
525 emitted unparseable output); the work has NOT been independently \
526 verified."
527 .to_string(),
528 ])
529 });
530 let approved = verdict.approve;
531 let message = guardian_resume_message(&completion, &verdict);
532 guardian_state.record_verdict(verdict, reviewed_round);
533 write_guardian_state(&mut parent, guardian_state);
534 tracing::info!(
535 parent_session_id = %completion.parent_session_id,
536 child_session_id = %completion.child_session_id,
537 approved,
538 "guardian verdict recorded; resuming parent"
539 );
540 Some(message)
541 });
542
543 let resume_message = guardian_resume.unwrap_or_else(|| {
544 runtime_resume_message(
545 &completion,
546 remaining_children,
547 loaded_child
548 .as_ref()
549 .and_then(child_final_assistant_text)
550 .as_deref(),
551 )
552 });
553 parent.add_message(resume_message);
554 } else if runtime_state.waiting_for_children.is_some() {
555 runtime_state.status = AgentStatusState::Suspended;
556 runtime_state.suspension = Some(SuspensionState {
557 reason: "waiting_for_children".to_string(),
558 suspended_at: Utc::now(),
559 resumable: true,
560 hook_point: Some("ChildCompletion".to_string()),
561 });
562 }
563
564 parent.updated_at = Utc::now();
565 write_runtime_state(&mut parent, &runtime_state);
566 self.save_and_cache(&mut parent).await;
567
568 let resume_parent_id = parent.id.clone();
573 drop(_per_parent_guard);
574
575 if should_resume {
576 self.resume_parent(resume_parent_id).await;
577 }
578 }
579}
580
581#[async_trait]
582impl ResumeExecutionPort for ChildCompletionCoordinator {
583 async fn load_session(&self, session_id: &str) -> Option<Session> {
584 match self.storage.load_session(session_id).await {
585 Ok(Some(session)) => Some(session),
586 Ok(None) => self
587 .sessions
588 .get(session_id)
589 .map(|e| e.value().clone())
590 .map(|arc| arc.read().clone()),
591 Err(error) => {
592 tracing::warn!(%session_id, %error, "failed to load session from storage");
593 self.sessions
594 .get(session_id)
595 .map(|e| e.value().clone())
596 .map(|arc| arc.read().clone())
597 }
598 }
599 }
600
601 async fn save_and_cache_session(&self, session: &mut Session) {
602 self.save_and_cache(session).await;
603 }
604
605 async fn try_reserve_runner(
606 &self,
607 session_id: &str,
608 event_sender: &broadcast::Sender<AgentEvent>,
609 ) -> Option<RunnerReservation> {
610 try_reserve_runner(&self.agent_runners, session_id, event_sender).await
611 }
612
613 async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String> {
614 let runners = self.agent_runners.read().await;
615 runners.get(session_id).map(|r| r.run_id.clone())
616 }
617
618 async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent> {
619 crate::execution::session_events::get_or_create_event_sender(
620 &self.session_event_senders,
621 session_id,
622 )
623 .await
624 }
625
626 async fn spawn_resume_execution(&self, request: ResumeSpawnRequest) {
627 let ResumeSpawnRequest {
628 session_id,
629 session,
630 cancel_token,
631 run_id: _,
632 event_sender,
633 config,
634 } = request;
635
636 let Some(root_tools) = self.root_tools.read().await.clone() else {
637 tracing::error!(%session_id, "cannot resume parent after child completion: root tool surface is not initialized");
638 return;
639 };
640
641 let model = session.model.clone();
642 let resolved_provider_name = session_effective_model_ref(&session)
643 .map(|model_ref| model_ref.provider)
644 .unwrap_or(config.provider_name);
645 let provider_override = session_effective_model_ref(&session)
646 .and_then(|model_ref| match self.provider_router.route(&model_ref) {
647 Ok(provider) => Some(provider),
648 Err(error) => {
649 tracing::warn!(
650 session_id = %session_id,
651 provider = %model_ref.provider,
652 model = %model_ref.model,
653 error = %error,
654 "failed to resolve provider override for child-completion parent resume; falling back to runtime provider"
655 );
656 None
657 }
658 });
659 let config_snapshot = self.config.read().await.clone();
660 let resolved_fast_provider = resolve_fast_model(
661 &config_snapshot,
662 &resolved_provider_name,
663 &self.provider_registry,
664 )
665 .map(|model| model.provider);
666 let reasoning_effort = session.reasoning_effort;
667 let reasoning_effort_source = session
668 .metadata
669 .get("reasoning_effort_source")
670 .cloned()
671 .unwrap_or_default();
672 let gold_config = resolve_gold_config(
673 &config_snapshot,
674 session
675 .metadata
676 .get(GOLD_CONFIG_METADATA_KEY)
677 .map(String::as_str),
678 )
679 .or(config.gold_config.clone());
680
681 let (mpsc_tx, _forwarder) = create_event_forwarder(
682 session_id.clone(),
683 event_sender,
684 self.agent_runners.clone(),
685 self.account_feed_inbox.clone(),
686 );
687
688 let config_handle = self.config.clone();
689 let cached_config = Arc::new(StdRwLock::new(config_snapshot.clone()));
690 let provider_registry = self.provider_registry.clone();
691 let provider_name_for_aux = resolved_provider_name.clone();
692 let auxiliary_model_resolver = std::sync::Arc::new(move || {
693 let config_snapshot = read_config_snapshot(&config_handle, cached_config.as_ref());
694 let areas = resolve_global_area_models(
696 &config_snapshot,
697 &provider_name_for_aux,
698 &provider_registry,
699 );
700 crate::AuxiliaryModelConfig {
701 fast_model_name: areas.fast.as_ref().map(|m| m.model_name.clone()),
702 fast_model_provider: areas.fast.map(|m| m.provider),
703 background_model_name: areas.background.as_ref().map(|m| m.model_name.clone()),
704 planning_model_name: None,
705 search_model_name: None,
706 summarization_model_name: areas
707 .summarization
708 .as_ref()
709 .map(|m| m.model_name.clone()),
710 background_model_provider: areas.background.map(|m| m.provider),
711 summarization_model_provider: areas.summarization.map(|m| m.provider),
712 }
713 });
714 let model_roster = crate::ModelRoster {
715 model: Some(model),
716 provider_name: Some(resolved_provider_name),
717 provider_type: config.provider_type.clone(),
718 fast: crate::RoleModel::from_parts(config.fast_model, resolved_fast_provider),
719 background: crate::RoleModel::from_parts(
720 config.background_model,
721 config.background_model_provider,
722 ),
723 summarization: crate::RoleModel::from_parts(
724 config.summarization_model,
725 config.summarization_model_provider,
726 ),
727 };
728
729 let guardian_config = read_guardian_config(&session);
734 let guardian_spawner = self.guardian_spawner.read().await.clone();
735
736 spawn_session_execution(SessionExecutionArgs {
737 agent: self.agent.clone(),
738 session_id,
739 session,
740 tools_override: Some(root_tools),
741 provider_override,
742 model_roster,
743 reasoning_effort,
744 reasoning_effort_source,
745 auxiliary_model_resolver: Some(auxiliary_model_resolver),
746 disabled_filter_resolver: None,
749 disabled_tools: Some(config.disabled_tools),
750 disabled_skill_ids: Some(config.disabled_skill_ids),
751 selected_skill_ids: None,
752 selected_skill_mode: None,
753 cancel_token,
754 mpsc_tx,
755 image_fallback: config.image_fallback,
756 gold_config,
757 guardian_config,
758 guardian_spawner,
759 bash_resume_hook: {
760 let hook: Arc<dyn BashResumeHook> = Arc::new(self.clone());
761 Some(hook)
762 },
763 app_data_dir: Some(self.app_data_dir.clone()),
764 runners: self.agent_runners.clone(),
765 sessions_cache: self.sessions.clone(),
766 on_complete: None,
767 });
768 }
769}
770
771fn bash_completion_resume_message(bash_ids: &[String], timed_out: bool) -> Message {
781 let body = if timed_out {
782 format!(
783 "Runtime notification: the background-Bash wait ceiling was reached while one or more \
784 shell(s) ({}) may still be running. The session is being resumed so it is not \
785 stranded; verify their actual status with BashOutput before assuming completion.",
786 bash_ids.join(", ")
787 )
788 } else {
789 format!(
790 "Runtime notification: all background Bash shell(s) ({}) have completed. \
791 Review their output with BashOutput and resume the task from where you left off.",
792 bash_ids.join(", ")
793 )
794 };
795 let mut message = Message::user(body);
796 message.metadata = Some(serde_json::json!({
797 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
798 RUNTIME_RESUME_MESSAGE_KIND_KEY: BASH_COMPLETION_RESUME_KIND,
799 }));
800 message.never_compress = false;
801 message
802}
803
804fn bash_resume_should_retry(outcome: &ResumeOutcome, persisted_waiting_for_bash: bool) -> bool {
817 match outcome {
818 ResumeOutcome::Started { .. } | ResumeOutcome::NotFound => false,
819 ResumeOutcome::Completed | ResumeOutcome::AlreadyRunning { .. } => {
820 persisted_waiting_for_bash
821 }
822 }
823}
824
825impl ChildCompletionCoordinator {
827 async fn bash_self_resume(&self, session_id: String, bash_ids: Vec<String>) {
845 let poll_interval = Duration::from_millis(200);
846 let max_poll = Duration::from_secs(6 * 3600 + 600);
850 let deadline = tokio::time::Instant::now() + max_poll;
851
852 let mut timed_out = false;
853 loop {
854 let still_running =
855 bamboo_tools::tools::bash_runtime::running_shells_for_session(&session_id);
856 if still_running.is_empty() {
857 break;
858 }
859 if tokio::time::Instant::now() >= deadline {
860 timed_out = true;
861 tracing::warn!(
862 session_id = %session_id,
863 "bash self-resume poll exceeded the wait ceiling; forcing resume"
864 );
865 break;
866 }
867 tokio::time::sleep(poll_interval).await;
868 }
869
870 const MAX_RESUME_ATTEMPTS: u8 = 5;
874 for attempt in 0..MAX_RESUME_ATTEMPTS {
875 if attempt > 0 {
876 tokio::time::sleep(poll_interval).await;
877 }
878
879 let Some(mut session) = self.load_session(&session_id).await else {
880 tracing::warn!(%session_id, "bash self-resume: session not found; nothing to resume");
881 return;
882 };
883
884 let mut runtime_state = read_runtime_state(&session);
885 if runtime_state.waiting_for_bash.is_none() {
886 tracing::info!(
891 %session_id, attempt,
892 "bash self-resume: persisted bash wait already cleared; nothing to resume"
893 );
894 return;
895 }
896
897 runtime_state.waiting_for_bash = None;
898 runtime_state.status = AgentStatusState::Idle;
899 runtime_state.suspension = None;
900 write_runtime_state(&mut session, &runtime_state);
901 session.metadata.remove("runtime.suspend_reason");
902 session.add_message(bash_completion_resume_message(&bash_ids, timed_out));
903 session.updated_at = Utc::now();
904 self.save_and_cache(&mut session).await;
905 tracing::info!(
906 session_id = %session_id,
907 shell_count = bash_ids.len(),
908 timed_out, attempt,
909 "bash self-resume: cleared bash wait and appended resume message"
910 );
911
912 let outcome = self.resume_parent(session_id.clone()).await;
913 match outcome {
914 ResumeOutcome::Started { .. } => {
915 tracing::info!(%session_id, attempt, "bash self-resume: resume fired");
916 return;
917 }
918 ResumeOutcome::NotFound => {
919 tracing::warn!(%session_id, "bash self-resume: session vanished during resume");
920 return;
921 }
922 _ => {
923 let clobbered = match self.load_session(&session_id).await {
929 Some(reloaded) => read_runtime_state(&reloaded).waiting_for_bash.is_some(),
930 None => {
931 tracing::warn!(
932 %session_id,
933 "bash self-resume: session vanished after resume"
934 );
935 return;
936 }
937 };
938 if bash_resume_should_retry(&outcome, clobbered) {
939 tracing::warn!(
940 %session_id, attempt,
941 outcome = outcome.as_str(),
942 "bash self-resume: persisted wait still set after resume (finalize-clobber); retrying"
943 );
944 continue;
945 }
946 tracing::info!(
947 %session_id, attempt,
948 outcome = outcome.as_str(),
949 "bash self-resume: wait cleared and resume handled; stopping"
950 );
951 return;
952 }
953 }
954 }
955
956 tracing::warn!(
957 %session_id,
958 attempts = MAX_RESUME_ATTEMPTS,
959 "bash self-resume: exhausted clobber-retry budget without confirming resume; giving up"
960 );
961 }
962}
963
964impl BashResumeHook for ChildCompletionCoordinator {
965 fn arrange_bash_self_resume(&self, session_id: String, bash_ids: Vec<String>) {
966 let coordinator = Arc::new(self.clone());
967 tokio::spawn(async move {
968 coordinator.bash_self_resume(session_id, bash_ids).await;
969 });
970 }
971}
972
973#[cfg(test)]
974mod tests {
975 use super::*;
976 use bamboo_agent_core::Message;
977
978 fn make_completion(status: &str) -> ChildCompletion {
979 ChildCompletion {
980 parent_session_id: "parent-1".to_string(),
981 child_session_id: "child-1".to_string(),
982 status: status.to_string(),
983 error: None,
984 completed_at: Utc::now(),
985 }
986 }
987
988 struct StubChildIndex {
991 children: Vec<(String, Option<String>)>,
992 }
993
994 #[async_trait]
995 impl Storage for StubChildIndex {
996 async fn save_session(&self, _session: &Session) -> std::io::Result<()> {
997 Ok(())
998 }
999 async fn load_session(&self, _id: &str) -> std::io::Result<Option<Session>> {
1000 Ok(None)
1001 }
1002 async fn delete_session(&self, _id: &str) -> std::io::Result<bool> {
1003 Ok(false)
1004 }
1005 async fn list_child_run_statuses(
1006 &self,
1007 _parent_session_id: &str,
1008 ) -> std::io::Result<Vec<(String, Option<String>)>> {
1009 Ok(self.children.clone())
1010 }
1011 }
1012
1013 #[tokio::test]
1014 async fn derive_completed_only_includes_terminal_children() {
1015 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
1016 children: vec![
1017 ("a".into(), Some("completed".into())),
1018 ("b".into(), Some("running".into())),
1019 ("c".into(), Some("error".into())),
1020 ("d".into(), None),
1021 ],
1022 });
1023 let completed = derive_completed_child_ids(&storage, "parent-1", "b").await;
1024 assert_eq!(
1026 completed,
1027 vec!["a".to_string(), "b".to_string(), "c".to_string()]
1028 );
1029 }
1030
1031 #[tokio::test]
1032 async fn derive_completed_folds_in_just_completed_when_index_lags() {
1033 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
1035 children: vec![("only".into(), Some("running".into()))],
1036 });
1037 let completed = derive_completed_child_ids(&storage, "parent-1", "only").await;
1038 assert_eq!(completed, vec!["only".to_string()]);
1039 }
1040
1041 #[test]
1042 fn wait_policy_all_uses_derived_completed_set() {
1043 let waited = vec!["a".to_string(), "b".to_string()];
1044 assert!(!wait_policy_satisfied(
1045 ChildWaitPolicy::All,
1046 &waited,
1047 &["a".to_string()],
1048 "completed"
1049 ));
1050 assert!(wait_policy_satisfied(
1051 ChildWaitPolicy::All,
1052 &waited,
1053 &["a".to_string(), "b".to_string()],
1054 "completed"
1055 ));
1056 }
1057
1058 #[test]
1059 fn child_final_assistant_text_returns_last_assistant() {
1060 let mut session = Session::new("child-1", "gpt-4");
1061 session.messages.push(Message::user("hi"));
1062 session
1063 .messages
1064 .push(Message::assistant("first answer", None));
1065 session.messages.push(Message::user("again"));
1066 session
1067 .messages
1068 .push(Message::assistant("final answer", None));
1069
1070 assert_eq!(
1071 child_final_assistant_text(&session).as_deref(),
1072 Some("final answer")
1073 );
1074 }
1075
1076 #[test]
1077 fn child_final_assistant_text_returns_none_when_blank() {
1078 let mut session = Session::new("child-1", "gpt-4");
1079 session.messages.push(Message::assistant(" ", None));
1080 assert!(child_final_assistant_text(&session).is_none());
1081 }
1082
1083 #[test]
1084 fn child_final_assistant_text_returns_none_when_no_assistant() {
1085 let mut session = Session::new("child-1", "gpt-4");
1086 session.messages.push(Message::user("hi"));
1087 assert!(child_final_assistant_text(&session).is_none());
1088 }
1089
1090 #[test]
1091 fn runtime_resume_message_folds_full_response_without_truncation() {
1092 let completion = make_completion("completed");
1095 let long: String = "a".repeat(10_000);
1096 let message = runtime_resume_message(&completion, 0, Some(&long));
1097 assert!(message.content.contains(&long));
1098 assert!(!message.content.contains("truncated"));
1099 }
1100
1101 #[test]
1102 fn runtime_resume_message_includes_child_response_when_provided() {
1103 let completion = make_completion("completed");
1104 let message = runtime_resume_message(&completion, 0, Some("the answer is 42"));
1105
1106 assert!(matches!(message.role, Role::User));
1107 assert!(!message.never_compress);
1110 assert!(message.content.contains("Child final response:"));
1111 assert!(message.content.contains("the answer is 42"));
1112
1113 let metadata = message.metadata.expect("metadata present");
1114 assert_eq!(
1115 metadata.get("hidden_from_ui").and_then(|v| v.as_bool()),
1116 Some(true)
1117 );
1118 assert_eq!(
1119 metadata.get("runtime_kind").and_then(|v| v.as_str()),
1120 Some("child_completion_resume")
1121 );
1122 assert_eq!(
1123 metadata
1124 .get("child_final_response_included")
1125 .and_then(|v| v.as_bool()),
1126 Some(true)
1127 );
1128 }
1129
1130 #[test]
1131 fn runtime_resume_message_falls_back_to_error_when_no_response() {
1132 let mut completion = make_completion("error");
1133 completion.error = Some("boom".to_string());
1134
1135 let message = runtime_resume_message(&completion, 1, None);
1136 assert!(message.content.contains("Child error:"));
1137 assert!(message.content.contains("boom"));
1138 let metadata = message.metadata.expect("metadata present");
1139 assert_eq!(
1140 metadata
1141 .get("child_final_response_included")
1142 .and_then(|v| v.as_bool()),
1143 Some(false)
1144 );
1145 }
1146
1147 #[test]
1148 fn runtime_resume_message_minimal_when_no_response_and_no_error() {
1149 let completion = make_completion("completed");
1150 let message = runtime_resume_message(&completion, 2, None);
1151 assert!(!message.content.contains("Child final response:"));
1152 assert!(!message.content.contains("Child error:"));
1153 assert!(message.content.contains("Resume the parent task"));
1154 }
1155
1156 #[test]
1157 fn read_config_snapshot_refreshes_cached_snapshot_from_live_config() {
1158 let runtime = tokio::runtime::Runtime::new().expect("runtime");
1159
1160 runtime.block_on(async {
1161 let config = Arc::new(RwLock::new(Config::default()));
1162 config.write().await.provider = "copilot".to_string();
1163 let cached_config = StdRwLock::new(Config::default());
1164
1165 let snapshot = read_config_snapshot(&config, &cached_config);
1166
1167 assert_eq!(snapshot.provider, "copilot");
1168 assert_eq!(
1169 cached_config.read().expect("cached snapshot lock").provider,
1170 "copilot"
1171 );
1172 });
1173 }
1174
1175 #[test]
1176 fn read_config_snapshot_uses_cached_snapshot_when_live_lock_is_busy() {
1177 let runtime = tokio::runtime::Runtime::new().expect("runtime");
1178
1179 runtime.block_on(async {
1180 let cached_snapshot = Config {
1181 provider: "cached-provider".to_string(),
1182 ..Default::default()
1183 };
1184
1185 let config = Arc::new(RwLock::new(Config::default()));
1186 let cached_config = StdRwLock::new(cached_snapshot);
1187 let _write_guard = config.write().await;
1188
1189 let snapshot = read_config_snapshot(&config, &cached_config);
1190
1191 assert_eq!(snapshot.provider, "cached-provider");
1192 });
1193 }
1194
1195 #[test]
1198 fn bash_completion_resume_message_normal_announces_completion() {
1199 let ids = vec!["bg-1".to_string(), "bg-2".to_string()];
1200 let message = bash_completion_resume_message(&ids, false);
1201 assert!(
1203 message.content.contains("have completed"),
1204 "normal resume message must announce completion: {}",
1205 message.content
1206 );
1207 let metadata = message.metadata.expect("metadata present");
1209 assert_eq!(
1210 metadata
1211 .get(RUNTIME_RESUME_MESSAGE_HIDDEN_KEY)
1212 .and_then(|v| v.as_bool()),
1213 Some(true),
1214 "resume message must be hidden from the UI"
1215 );
1216 assert_eq!(
1217 metadata
1218 .get(RUNTIME_RESUME_MESSAGE_KIND_KEY)
1219 .and_then(|v| v.as_str()),
1220 Some(BASH_COMPLETION_RESUME_KIND),
1221 "resume message must carry the bash-completion kind discriminant"
1222 );
1223 }
1224
1225 #[test]
1226 fn bash_completion_resume_message_deadline_does_not_claim_completion() {
1227 let ids = vec!["bg-long".to_string()];
1231 let message = bash_completion_resume_message(&ids, true);
1232 assert!(
1233 !message.content.contains("have completed"),
1234 "deadline resume message must NOT claim the shells completed: {}",
1235 message.content
1236 );
1237 assert!(
1238 message.content.contains("may still be running"),
1239 "deadline resume message must warn shells may still be running: {}",
1240 message.content
1241 );
1242 assert!(
1243 message.content.contains("BashOutput"),
1244 "deadline resume message must direct verification via BashOutput: {}",
1245 message.content
1246 );
1247 let metadata = message.metadata.expect("metadata present");
1249 assert_eq!(
1250 metadata
1251 .get(RUNTIME_RESUME_MESSAGE_KIND_KEY)
1252 .and_then(|v| v.as_str()),
1253 Some(BASH_COMPLETION_RESUME_KIND)
1254 );
1255 }
1256
1257 #[test]
1258 fn bash_resume_should_retry_matrix() {
1259 assert!(!bash_resume_should_retry(
1265 &ResumeOutcome::Started { run_id: "r".into() },
1266 true
1267 ));
1268 assert!(!bash_resume_should_retry(
1269 &ResumeOutcome::Started { run_id: "r".into() },
1270 false
1271 ));
1272
1273 assert!(!bash_resume_should_retry(&ResumeOutcome::NotFound, true));
1275 assert!(!bash_resume_should_retry(&ResumeOutcome::NotFound, false));
1276
1277 assert!(bash_resume_should_retry(&ResumeOutcome::Completed, true));
1279 assert!(!bash_resume_should_retry(&ResumeOutcome::Completed, false));
1282
1283 assert!(bash_resume_should_retry(
1286 &ResumeOutcome::AlreadyRunning { run_id: "r".into() },
1287 true
1288 ));
1289 assert!(!bash_resume_should_retry(
1291 &ResumeOutcome::AlreadyRunning { run_id: "r".into() },
1292 false
1293 ));
1294 }
1295}