1use std::collections::HashMap;
8use std::sync::{Arc, OnceLock, RwLock as StdRwLock};
9use std::time::Duration;
10
11use crate::execution::{
12 create_event_forwarder, spawn_session_execution, try_reserve_runner, AgentRunner,
13 ChildCompletion, ChildCompletionHandler, RunnerReservation, SessionExecutionArgs,
14};
15use crate::runtime::config::{BashResumeHook, GuardianSpawner, BASH_COMPLETION_RESUME_KIND};
16use crate::runtime::guardian_state::{
17 parse_guardian_verdict, read_guardian_config, read_guardian_state, write_guardian_state,
18 GuardianVerdict,
19};
20use crate::Agent;
21use async_trait::async_trait;
22use bamboo_agent_core::storage::Storage;
23use bamboo_agent_core::tools::ToolExecutor;
24use bamboo_agent_core::{AgentEvent, Message, Role, Session};
25use bamboo_domain::session::runtime_state::{
26 AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState,
27};
28use bamboo_llm::{Config, ProviderModelRouter, ProviderRegistry};
29use bamboo_storage::LockedSessionStore;
30use chrono::Utc;
31use tokio::sync::{broadcast, RwLock};
32
33use crate::model_areas::resolve_global_area_models;
34use crate::model_config_helper::{
35 resolve_fast_model, resolve_gold_config, GOLD_CONFIG_METADATA_KEY,
36};
37use crate::session_app::provider_model::session_effective_model_ref;
38use crate::session_app::resume::{
39 resume_session_execution, ResumeExecutionPort, ResumeSpawnRequest,
40};
41use crate::session_app::types::{ResumeConfigSnapshot, ResumeOutcome};
42
43const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
44const RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: &str = "hidden_from_ui";
45const RUNTIME_RESUME_MESSAGE_KIND_KEY: &str = "runtime_kind";
46
47fn read_runtime_state(session: &Session) -> AgentRuntimeState {
48 session
49 .agent_runtime_state
50 .clone()
51 .or_else(|| {
52 session
53 .metadata
54 .get(AGENT_RUNTIME_STATE_METADATA_KEY)
55 .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
56 })
57 .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-child-wait", session.id)))
58}
59
60fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
61 session.agent_runtime_state = Some(runtime_state.clone());
62 if let Ok(serialized) = serde_json::to_string(runtime_state) {
63 session
64 .metadata
65 .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
66 }
67}
68
69fn is_error_like(status: &str) -> bool {
70 matches!(status, "error" | "timeout" | "cancelled")
71}
72
73fn is_terminal_child_status(status: &str) -> bool {
75 matches!(
76 status,
77 "completed" | "error" | "timeout" | "cancelled" | "skipped"
78 )
79}
80
81async fn derive_completed_child_ids(
86 storage: &Arc<dyn Storage>,
87 parent_session_id: &str,
88 just_completed_child_id: &str,
89) -> Vec<String> {
90 let mut completed: Vec<String> = storage
91 .list_child_run_statuses(parent_session_id)
92 .await
93 .unwrap_or_default()
94 .into_iter()
95 .filter(|(_, status)| status.as_deref().is_some_and(is_terminal_child_status))
96 .map(|(id, _)| id)
97 .collect();
98 if !completed.iter().any(|id| id == just_completed_child_id) {
99 completed.push(just_completed_child_id.to_string());
100 }
101 completed.sort();
102 completed.dedup();
103 completed
104}
105
106fn read_config_snapshot(config: &Arc<RwLock<Config>>, cached_config: &StdRwLock<Config>) -> Config {
107 if let Ok(config_guard) = config.try_read() {
108 let snapshot = config_guard.clone();
109
110 if let Ok(mut cached_guard) = cached_config.try_write() {
111 *cached_guard = snapshot.clone();
112 }
113
114 snapshot
115 } else {
116 cached_config
117 .try_read()
118 .map(|guard| guard.clone())
119 .unwrap_or_default()
120 }
121}
122
123fn parent_locks() -> &'static std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
140 static LOCKS: OnceLock<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
141 OnceLock::new();
142 LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
143}
144
145fn wait_policy_satisfied(
146 policy: ChildWaitPolicy,
147 wait_child_ids: &[String],
148 completed_child_ids: &[String],
149 latest_status: &str,
150) -> bool {
151 if wait_child_ids.is_empty() {
152 return false;
153 }
154
155 match policy {
156 ChildWaitPolicy::All => wait_child_ids
157 .iter()
158 .all(|id| completed_child_ids.iter().any(|completed| completed == id)),
159 ChildWaitPolicy::Any => completed_child_ids
160 .iter()
161 .any(|id| wait_child_ids.iter().any(|wait_id| wait_id == id)),
162 ChildWaitPolicy::FirstError => {
163 is_error_like(latest_status)
164 || wait_child_ids
165 .iter()
166 .all(|id| completed_child_ids.iter().any(|completed| completed == id))
167 }
168 }
169}
170
171fn child_final_assistant_text(child: &Session) -> Option<String> {
175 child
176 .messages
177 .iter()
178 .rev()
179 .find(|message| matches!(message.role, Role::Assistant))
180 .map(|message| message.content.clone())
181 .filter(|content| !content.trim().is_empty())
182}
183
184fn runtime_resume_message(
185 completion: &ChildCompletion,
186 remaining_children: usize,
187 child_final_response: Option<&str>,
188) -> Message {
189 let mut body = format!(
190 "Runtime notification: child session `{}` finished with status `{}`. Remaining child sessions: {}.",
191 completion.child_session_id, completion.status, remaining_children
192 );
193
194 let final_response = child_final_response.map(str::to_string);
200 if let Some(response) = final_response.as_deref() {
201 body.push_str("\n\nChild final response:\n");
202 body.push_str(response);
203 } else if let Some(error) = completion.error.as_deref() {
204 if !error.is_empty() {
205 body.push_str("\n\nChild error:\n");
206 body.push_str(error);
207 }
208 }
209
210 body.push_str(
211 "\n\nResume the parent task using this child result and continue from the previous plan. \
212 If you need the full child transcript, call SubAgent.get(child_session_id).",
213 );
214
215 let mut message = Message::user(body);
216 message.metadata = Some(serde_json::json!({
217 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
218 RUNTIME_RESUME_MESSAGE_KIND_KEY: "child_completion_resume",
219 "child_session_id": completion.child_session_id,
220 "child_status": completion.status,
221 "child_error": completion.error,
222 "completed_at": completion.completed_at,
223 "child_final_response_included": final_response.is_some(),
224 }));
225 message.never_compress = false;
229 message
230}
231
232fn guardian_resume_message(completion: &ChildCompletion, verdict: &GuardianVerdict) -> Message {
237 let mut body = if verdict.approve {
238 String::from(
239 "Guardian review APPROVED: an independent reviewer verified the work and found no blocking issues. You may finalize the task.",
240 )
241 } else {
242 String::from(
243 "Guardian review REJECTED: an independent reviewer found issues. Address every finding below before completing — do NOT declare the task complete until they are resolved.",
244 )
245 };
246 if let Some(summary) = verdict.summary.as_deref().filter(|s| !s.trim().is_empty()) {
247 body.push_str("\n\nReviewer summary: ");
248 body.push_str(summary);
249 }
250 if !verdict.findings.is_empty() {
251 body.push_str("\n\nFindings:");
252 for (idx, finding) in verdict.findings.iter().enumerate() {
253 body.push_str(&format!("\n{}. {}", idx + 1, finding));
254 }
255 }
256 body.push_str(
257 "\n\nIf you need the full guardian transcript, call SubAgent.get(child_session_id).",
258 );
259
260 let mut message = Message::user(body);
261 message.metadata = Some(serde_json::json!({
262 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
263 RUNTIME_RESUME_MESSAGE_KIND_KEY: "guardian_review_resume",
264 "child_session_id": completion.child_session_id,
265 "child_status": completion.status,
266 "guardian_approved": verdict.approve,
267 "completed_at": completion.completed_at,
268 }));
269 message.never_compress = false;
270 message
271}
272
273#[derive(Clone)]
274pub struct ChildCompletionCoordinator {
275 storage: Arc<dyn Storage>,
276 persistence: Arc<bamboo_storage::LockedSessionStore>,
277 sessions: crate::SessionCache,
278 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
279 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
280 agent: Arc<Agent>,
281 config: Arc<RwLock<Config>>,
282 provider_registry: Arc<ProviderRegistry>,
283 provider_router: Arc<ProviderModelRouter>,
284 app_data_dir: std::path::PathBuf,
285 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
286 root_tools: Arc<RwLock<Option<Arc<dyn ToolExecutor>>>>,
287 guardian_spawner: Arc<RwLock<Option<Arc<dyn GuardianSpawner>>>>,
291}
292
293impl ChildCompletionCoordinator {
294 #[allow(clippy::too_many_arguments)]
295 pub fn new(
296 storage: Arc<dyn Storage>,
297 persistence: Arc<LockedSessionStore>,
298 sessions: crate::SessionCache,
299 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
300 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
301 agent: Arc<Agent>,
302 config: Arc<RwLock<Config>>,
303 provider_registry: Arc<ProviderRegistry>,
304 provider_router: Arc<ProviderModelRouter>,
305 app_data_dir: std::path::PathBuf,
306 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
307 ) -> Self {
308 Self {
309 storage,
310 persistence,
311 sessions,
312 agent_runners,
313 session_event_senders,
314 agent,
315 config,
316 provider_registry,
317 provider_router,
318 app_data_dir,
319 account_feed_inbox,
320 root_tools: Arc::new(RwLock::new(None)),
321 guardian_spawner: Arc::new(RwLock::new(None)),
322 }
323 }
324
325 pub async fn set_root_tools(&self, tools: Arc<dyn ToolExecutor>) {
326 *self.root_tools.write().await = Some(tools);
327 }
328
329 pub async fn set_guardian_spawner(&self, spawner: Arc<dyn GuardianSpawner>) {
332 *self.guardian_spawner.write().await = Some(spawner);
333 }
334
335 fn build_resume_config(
336 &self,
337 session: &Session,
338 config_snapshot: &Config,
339 ) -> ResumeConfigSnapshot {
340 crate::session_app::resolution::resolve_resume_config_snapshot(
341 config_snapshot,
342 &self.provider_registry,
343 session,
344 None,
345 )
346 }
347
348 async fn resume_parent(&self, parent_session_id: String) -> ResumeOutcome {
356 for attempt in 0..=5u8 {
357 if attempt > 0 {
358 tokio::time::sleep(Duration::from_millis(250 * attempt as u64)).await;
359 }
360
361 let Some(session) = self.load_session(&parent_session_id).await else {
362 tracing::warn!(%parent_session_id, "cannot resume parent after child completion: session not found");
363 return ResumeOutcome::NotFound;
364 };
365 let config_snapshot = self.config.read().await.clone();
366 let resume_config = self.build_resume_config(&session, &config_snapshot);
367 let outcome = resume_session_execution(self, &parent_session_id, resume_config).await;
368 tracing::info!(
369 %parent_session_id,
370 attempt,
371 outcome = outcome.as_str(),
372 "child completion requested parent resume"
373 );
374
375 if !matches!(outcome, ResumeOutcome::AlreadyRunning { .. }) {
376 return outcome;
377 }
378 }
379 ResumeOutcome::AlreadyRunning {
381 run_id: String::new(),
382 }
383 }
384
385 async fn save_and_cache(&self, session: &mut Session) {
386 if let Err(error) = self.persistence.merge_save_runtime(session).await {
387 tracing::warn!(session_id = %session.id, %error, "failed to persist session");
388 }
389 self.sessions.insert(
390 session.id.clone(),
391 Arc::new(parking_lot::RwLock::new(session.clone())),
392 );
393 }
394}
395
396#[async_trait]
397impl ChildCompletionHandler for ChildCompletionCoordinator {
398 async fn on_child_completed(&self, completion: ChildCompletion) {
399 let per_parent = {
404 let mut map = parent_locks().lock().expect("parent lock map poisoned");
405 map.entry(completion.parent_session_id.clone())
406 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
407 .clone()
408 };
409 let _per_parent_guard = per_parent.lock().await;
410
411 let Some(mut parent) = self.load_session(&completion.parent_session_id).await else {
412 tracing::warn!(
413 parent_session_id = %completion.parent_session_id,
414 child_session_id = %completion.child_session_id,
415 "child completion received for missing parent"
416 );
417 return;
418 };
419
420 let mut runtime_state = read_runtime_state(&parent);
426
427 let completed_child_ids = derive_completed_child_ids(
430 &self.storage,
431 &completion.parent_session_id,
432 &completion.child_session_id,
433 )
434 .await;
435
436 let mut should_resume = false;
437 let mut remaining_children = 0usize;
438 if let Some(wait) = runtime_state.waiting_for_children.clone() {
439 remaining_children = wait
440 .child_session_ids
441 .iter()
442 .filter(|id| !completed_child_ids.iter().any(|completed| completed == *id))
443 .count();
444 should_resume = wait_policy_satisfied(
445 wait.wait_for,
446 &wait.child_session_ids,
447 &completed_child_ids,
448 &completion.status,
449 );
450 if should_resume {
451 runtime_state.waiting_for_children = None;
452 runtime_state.status = AgentStatusState::Idle;
453 runtime_state.suspension = None;
454 }
455 }
456
457 if should_resume {
458 parent.metadata.remove("runtime.suspend_reason");
459 let loaded_child = match self
464 .storage
465 .load_session(&completion.child_session_id)
466 .await
467 {
468 Ok(child) => child,
469 Err(error) => {
470 tracing::warn!(
471 child_session_id = %completion.child_session_id,
472 %error,
473 "failed to load child session for runtime resume message"
474 );
475 None
476 }
477 };
478
479 let reviewed_round = runtime_state.round.current_round;
485 let guardian_resume = loaded_child.as_ref().and_then(|child| {
486 if child.subagent_type().as_deref() != Some("guardian") {
487 return None;
488 }
489 let mut guardian_state = read_guardian_state(&parent)?;
490 if guardian_state.guardian_child_id.as_deref()
491 != Some(completion.child_session_id.as_str())
492 {
493 tracing::warn!(
496 parent_session_id = %completion.parent_session_id,
497 child_session_id = %completion.child_session_id,
498 expected = ?guardian_state.guardian_child_id,
499 "guardian completion does not match recorded guardian_child_id; using generic resume"
500 );
501 return None;
502 }
503 let verdict = child_final_assistant_text(child)
511 .and_then(|text| match parse_guardian_verdict(&text) {
512 Ok(verdict) => Some(verdict),
513 Err(error) => {
514 tracing::warn!(
515 child_session_id = %completion.child_session_id,
516 %error,
517 "guardian verdict unparseable; recording a synthetic reject"
518 );
519 None
520 }
521 })
522 .unwrap_or_else(|| {
523 GuardianVerdict::rejected(vec![
524 "The guardian reviewer did not return a usable verdict (it errored or \
525 emitted unparseable output); the work has NOT been independently \
526 verified."
527 .to_string(),
528 ])
529 });
530 let approved = verdict.approve;
531 let message = guardian_resume_message(&completion, &verdict);
532 guardian_state.record_verdict(verdict, reviewed_round);
533 write_guardian_state(&mut parent, guardian_state);
534 tracing::info!(
535 parent_session_id = %completion.parent_session_id,
536 child_session_id = %completion.child_session_id,
537 approved,
538 "guardian verdict recorded; resuming parent"
539 );
540 Some(message)
541 });
542
543 let resume_message = guardian_resume.unwrap_or_else(|| {
544 runtime_resume_message(
545 &completion,
546 remaining_children,
547 loaded_child
548 .as_ref()
549 .and_then(child_final_assistant_text)
550 .as_deref(),
551 )
552 });
553 parent.add_message(resume_message);
554 } else if runtime_state.waiting_for_children.is_some() {
555 runtime_state.status = AgentStatusState::Suspended;
556 runtime_state.suspension = Some(SuspensionState {
557 reason: "waiting_for_children".to_string(),
558 suspended_at: Utc::now(),
559 resumable: true,
560 hook_point: Some("ChildCompletion".to_string()),
561 });
562 }
563
564 parent.updated_at = Utc::now();
565 write_runtime_state(&mut parent, &runtime_state);
566 self.save_and_cache(&mut parent).await;
567
568 let resume_parent_id = parent.id.clone();
573 drop(_per_parent_guard);
574
575 if should_resume {
576 self.resume_parent(resume_parent_id).await;
577 }
578 }
579}
580
581#[async_trait]
582impl ResumeExecutionPort for ChildCompletionCoordinator {
583 async fn load_session(&self, session_id: &str) -> Option<Session> {
584 match self.storage.load_session(session_id).await {
585 Ok(Some(session)) => Some(session),
586 Ok(None) => self
587 .sessions
588 .get(session_id)
589 .map(|e| e.value().clone())
590 .map(|arc| arc.read().clone()),
591 Err(error) => {
592 tracing::warn!(%session_id, %error, "failed to load session from storage");
593 self.sessions
594 .get(session_id)
595 .map(|e| e.value().clone())
596 .map(|arc| arc.read().clone())
597 }
598 }
599 }
600
601 async fn save_and_cache_session(&self, session: &mut Session) {
602 self.save_and_cache(session).await;
603 }
604
605 async fn try_reserve_runner(
606 &self,
607 session_id: &str,
608 event_sender: &broadcast::Sender<AgentEvent>,
609 ) -> Option<RunnerReservation> {
610 try_reserve_runner(&self.agent_runners, session_id, event_sender).await
611 }
612
613 async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String> {
614 let runners = self.agent_runners.read().await;
615 runners.get(session_id).map(|r| r.run_id.clone())
616 }
617
618 async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent> {
619 crate::execution::session_events::get_or_create_event_sender(
620 &self.session_event_senders,
621 session_id,
622 )
623 .await
624 }
625
626 async fn spawn_resume_execution(&self, request: ResumeSpawnRequest) {
627 let ResumeSpawnRequest {
628 session_id,
629 session,
630 cancel_token,
631 run_id: _,
632 event_sender,
633 config,
634 } = request;
635
636 let Some(root_tools) = self.root_tools.read().await.clone() else {
637 tracing::error!(%session_id, "cannot resume parent after child completion: root tool surface is not initialized");
638 return;
639 };
640
641 let model = session.model.clone();
642 let resolved_provider_name = session_effective_model_ref(&session)
643 .map(|model_ref| model_ref.provider)
644 .unwrap_or(config.provider_name);
645 let provider_override = session_effective_model_ref(&session)
646 .and_then(|model_ref| match self.provider_router.route(&model_ref) {
647 Ok(provider) => Some(provider),
648 Err(error) => {
649 tracing::warn!(
650 session_id = %session_id,
651 provider = %model_ref.provider,
652 model = %model_ref.model,
653 error = %error,
654 "failed to resolve provider override for child-completion parent resume; falling back to runtime provider"
655 );
656 None
657 }
658 });
659 let config_snapshot = self.config.read().await.clone();
660 let resolved_fast_provider = resolve_fast_model(
661 &config_snapshot,
662 &resolved_provider_name,
663 &self.provider_registry,
664 )
665 .map(|model| model.provider);
666 let reasoning_effort = session.reasoning_effort;
667 let reasoning_effort_source = session
668 .metadata
669 .get("reasoning_effort_source")
670 .cloned()
671 .unwrap_or_default();
672 let gold_config = resolve_gold_config(
673 &config_snapshot,
674 session
675 .metadata
676 .get(GOLD_CONFIG_METADATA_KEY)
677 .map(String::as_str),
678 )
679 .or(config.gold_config.clone());
680
681 let (mpsc_tx, _forwarder) = create_event_forwarder(
682 session_id.clone(),
683 event_sender,
684 self.agent_runners.clone(),
685 self.account_feed_inbox.clone(),
686 );
687
688 let config_handle = self.config.clone();
689 let cached_config = Arc::new(StdRwLock::new(config_snapshot.clone()));
690 let provider_registry = self.provider_registry.clone();
691 let provider_name_for_aux = resolved_provider_name.clone();
692 let auxiliary_model_resolver = std::sync::Arc::new(move || {
693 let config_snapshot = read_config_snapshot(&config_handle, cached_config.as_ref());
694 let areas = resolve_global_area_models(
696 &config_snapshot,
697 &provider_name_for_aux,
698 &provider_registry,
699 );
700 crate::AuxiliaryModelConfig {
701 fast_model_name: areas.fast.as_ref().map(|m| m.model_name.clone()),
702 fast_model_provider: areas.fast.map(|m| m.provider),
703 background_model_name: areas.background.as_ref().map(|m| m.model_name.clone()),
704 planning_model_name: None,
705 search_model_name: None,
706 summarization_model_name: areas
707 .summarization
708 .as_ref()
709 .map(|m| m.model_name.clone()),
710 background_model_provider: areas.background.map(|m| m.provider),
711 summarization_model_provider: areas.summarization.map(|m| m.provider),
712 }
713 });
714 let model_roster = crate::ModelRoster {
715 model: Some(model),
716 provider_name: Some(resolved_provider_name),
717 provider_type: config.provider_type.clone(),
718 fast: crate::RoleModel::from_parts(config.fast_model, resolved_fast_provider),
719 background: crate::RoleModel::from_parts(
720 config.background_model,
721 config.background_model_provider,
722 ),
723 summarization: crate::RoleModel::from_parts(
724 config.summarization_model,
725 config.summarization_model_provider,
726 ),
727 };
728
729 let guardian_config = read_guardian_config(&session);
734 let guardian_spawner = self.guardian_spawner.read().await.clone();
735
736 spawn_session_execution(SessionExecutionArgs {
737 agent: self.agent.clone(),
738 session_id,
739 session,
740 tools_override: Some(root_tools),
741 provider_override,
742 model_roster,
743 reasoning_effort,
744 reasoning_effort_source,
745 auxiliary_model_resolver: Some(auxiliary_model_resolver),
746 disabled_tools: Some(config.disabled_tools),
747 disabled_skill_ids: Some(config.disabled_skill_ids),
748 selected_skill_ids: None,
749 selected_skill_mode: None,
750 cancel_token,
751 mpsc_tx,
752 image_fallback: config.image_fallback,
753 gold_config,
754 guardian_config,
755 guardian_spawner,
756 bash_resume_hook: {
757 let hook: Arc<dyn BashResumeHook> = Arc::new(self.clone());
758 Some(hook)
759 },
760 app_data_dir: Some(self.app_data_dir.clone()),
761 runners: self.agent_runners.clone(),
762 sessions_cache: self.sessions.clone(),
763 on_complete: None,
764 });
765 }
766}
767
768fn bash_completion_resume_message(bash_ids: &[String], timed_out: bool) -> Message {
778 let body = if timed_out {
779 format!(
780 "Runtime notification: the background-Bash wait ceiling was reached while one or more \
781 shell(s) ({}) may still be running. The session is being resumed so it is not \
782 stranded; verify their actual status with BashOutput before assuming completion.",
783 bash_ids.join(", ")
784 )
785 } else {
786 format!(
787 "Runtime notification: all background Bash shell(s) ({}) have completed. \
788 Review their output with BashOutput and resume the task from where you left off.",
789 bash_ids.join(", ")
790 )
791 };
792 let mut message = Message::user(body);
793 message.metadata = Some(serde_json::json!({
794 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
795 RUNTIME_RESUME_MESSAGE_KIND_KEY: BASH_COMPLETION_RESUME_KIND,
796 }));
797 message.never_compress = false;
798 message
799}
800
801fn bash_resume_should_retry(outcome: &ResumeOutcome, persisted_waiting_for_bash: bool) -> bool {
814 match outcome {
815 ResumeOutcome::Started { .. } | ResumeOutcome::NotFound => false,
816 ResumeOutcome::Completed | ResumeOutcome::AlreadyRunning { .. } => {
817 persisted_waiting_for_bash
818 }
819 }
820}
821
822impl ChildCompletionCoordinator {
824 async fn bash_self_resume(&self, session_id: String, bash_ids: Vec<String>) {
842 let poll_interval = Duration::from_millis(200);
843 let max_poll = Duration::from_secs(6 * 3600 + 600);
847 let deadline = tokio::time::Instant::now() + max_poll;
848
849 let mut timed_out = false;
850 loop {
851 let still_running =
852 bamboo_tools::tools::bash_runtime::running_shells_for_session(&session_id);
853 if still_running.is_empty() {
854 break;
855 }
856 if tokio::time::Instant::now() >= deadline {
857 timed_out = true;
858 tracing::warn!(
859 session_id = %session_id,
860 "bash self-resume poll exceeded the wait ceiling; forcing resume"
861 );
862 break;
863 }
864 tokio::time::sleep(poll_interval).await;
865 }
866
867 const MAX_RESUME_ATTEMPTS: u8 = 5;
871 for attempt in 0..MAX_RESUME_ATTEMPTS {
872 if attempt > 0 {
873 tokio::time::sleep(poll_interval).await;
874 }
875
876 let Some(mut session) = self.load_session(&session_id).await else {
877 tracing::warn!(%session_id, "bash self-resume: session not found; nothing to resume");
878 return;
879 };
880
881 let mut runtime_state = read_runtime_state(&session);
882 if runtime_state.waiting_for_bash.is_none() {
883 tracing::info!(
888 %session_id, attempt,
889 "bash self-resume: persisted bash wait already cleared; nothing to resume"
890 );
891 return;
892 }
893
894 runtime_state.waiting_for_bash = None;
895 runtime_state.status = AgentStatusState::Idle;
896 runtime_state.suspension = None;
897 write_runtime_state(&mut session, &runtime_state);
898 session.metadata.remove("runtime.suspend_reason");
899 session.add_message(bash_completion_resume_message(&bash_ids, timed_out));
900 session.updated_at = Utc::now();
901 self.save_and_cache(&mut session).await;
902 tracing::info!(
903 session_id = %session_id,
904 shell_count = bash_ids.len(),
905 timed_out, attempt,
906 "bash self-resume: cleared bash wait and appended resume message"
907 );
908
909 let outcome = self.resume_parent(session_id.clone()).await;
910 match outcome {
911 ResumeOutcome::Started { .. } => {
912 tracing::info!(%session_id, attempt, "bash self-resume: resume fired");
913 return;
914 }
915 ResumeOutcome::NotFound => {
916 tracing::warn!(%session_id, "bash self-resume: session vanished during resume");
917 return;
918 }
919 _ => {
920 let clobbered = match self.load_session(&session_id).await {
926 Some(reloaded) => read_runtime_state(&reloaded).waiting_for_bash.is_some(),
927 None => {
928 tracing::warn!(
929 %session_id,
930 "bash self-resume: session vanished after resume"
931 );
932 return;
933 }
934 };
935 if bash_resume_should_retry(&outcome, clobbered) {
936 tracing::warn!(
937 %session_id, attempt,
938 outcome = outcome.as_str(),
939 "bash self-resume: persisted wait still set after resume (finalize-clobber); retrying"
940 );
941 continue;
942 }
943 tracing::info!(
944 %session_id, attempt,
945 outcome = outcome.as_str(),
946 "bash self-resume: wait cleared and resume handled; stopping"
947 );
948 return;
949 }
950 }
951 }
952
953 tracing::warn!(
954 %session_id,
955 attempts = MAX_RESUME_ATTEMPTS,
956 "bash self-resume: exhausted clobber-retry budget without confirming resume; giving up"
957 );
958 }
959}
960
961impl BashResumeHook for ChildCompletionCoordinator {
962 fn arrange_bash_self_resume(&self, session_id: String, bash_ids: Vec<String>) {
963 let coordinator = Arc::new(self.clone());
964 tokio::spawn(async move {
965 coordinator.bash_self_resume(session_id, bash_ids).await;
966 });
967 }
968}
969
970#[cfg(test)]
971mod tests {
972 use super::*;
973 use bamboo_agent_core::Message;
974
975 fn make_completion(status: &str) -> ChildCompletion {
976 ChildCompletion {
977 parent_session_id: "parent-1".to_string(),
978 child_session_id: "child-1".to_string(),
979 status: status.to_string(),
980 error: None,
981 completed_at: Utc::now(),
982 }
983 }
984
985 struct StubChildIndex {
988 children: Vec<(String, Option<String>)>,
989 }
990
991 #[async_trait]
992 impl Storage for StubChildIndex {
993 async fn save_session(&self, _session: &Session) -> std::io::Result<()> {
994 Ok(())
995 }
996 async fn load_session(&self, _id: &str) -> std::io::Result<Option<Session>> {
997 Ok(None)
998 }
999 async fn delete_session(&self, _id: &str) -> std::io::Result<bool> {
1000 Ok(false)
1001 }
1002 async fn list_child_run_statuses(
1003 &self,
1004 _parent_session_id: &str,
1005 ) -> std::io::Result<Vec<(String, Option<String>)>> {
1006 Ok(self.children.clone())
1007 }
1008 }
1009
1010 #[tokio::test]
1011 async fn derive_completed_only_includes_terminal_children() {
1012 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
1013 children: vec![
1014 ("a".into(), Some("completed".into())),
1015 ("b".into(), Some("running".into())),
1016 ("c".into(), Some("error".into())),
1017 ("d".into(), None),
1018 ],
1019 });
1020 let completed = derive_completed_child_ids(&storage, "parent-1", "b").await;
1021 assert_eq!(
1023 completed,
1024 vec!["a".to_string(), "b".to_string(), "c".to_string()]
1025 );
1026 }
1027
1028 #[tokio::test]
1029 async fn derive_completed_folds_in_just_completed_when_index_lags() {
1030 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
1032 children: vec![("only".into(), Some("running".into()))],
1033 });
1034 let completed = derive_completed_child_ids(&storage, "parent-1", "only").await;
1035 assert_eq!(completed, vec!["only".to_string()]);
1036 }
1037
1038 #[test]
1039 fn wait_policy_all_uses_derived_completed_set() {
1040 let waited = vec!["a".to_string(), "b".to_string()];
1041 assert!(!wait_policy_satisfied(
1042 ChildWaitPolicy::All,
1043 &waited,
1044 &["a".to_string()],
1045 "completed"
1046 ));
1047 assert!(wait_policy_satisfied(
1048 ChildWaitPolicy::All,
1049 &waited,
1050 &["a".to_string(), "b".to_string()],
1051 "completed"
1052 ));
1053 }
1054
1055 #[test]
1056 fn child_final_assistant_text_returns_last_assistant() {
1057 let mut session = Session::new("child-1", "gpt-4");
1058 session.messages.push(Message::user("hi"));
1059 session
1060 .messages
1061 .push(Message::assistant("first answer", None));
1062 session.messages.push(Message::user("again"));
1063 session
1064 .messages
1065 .push(Message::assistant("final answer", None));
1066
1067 assert_eq!(
1068 child_final_assistant_text(&session).as_deref(),
1069 Some("final answer")
1070 );
1071 }
1072
1073 #[test]
1074 fn child_final_assistant_text_returns_none_when_blank() {
1075 let mut session = Session::new("child-1", "gpt-4");
1076 session.messages.push(Message::assistant(" ", None));
1077 assert!(child_final_assistant_text(&session).is_none());
1078 }
1079
1080 #[test]
1081 fn child_final_assistant_text_returns_none_when_no_assistant() {
1082 let mut session = Session::new("child-1", "gpt-4");
1083 session.messages.push(Message::user("hi"));
1084 assert!(child_final_assistant_text(&session).is_none());
1085 }
1086
1087 #[test]
1088 fn runtime_resume_message_folds_full_response_without_truncation() {
1089 let completion = make_completion("completed");
1092 let long: String = "a".repeat(10_000);
1093 let message = runtime_resume_message(&completion, 0, Some(&long));
1094 assert!(message.content.contains(&long));
1095 assert!(!message.content.contains("truncated"));
1096 }
1097
1098 #[test]
1099 fn runtime_resume_message_includes_child_response_when_provided() {
1100 let completion = make_completion("completed");
1101 let message = runtime_resume_message(&completion, 0, Some("the answer is 42"));
1102
1103 assert!(matches!(message.role, Role::User));
1104 assert!(!message.never_compress);
1107 assert!(message.content.contains("Child final response:"));
1108 assert!(message.content.contains("the answer is 42"));
1109
1110 let metadata = message.metadata.expect("metadata present");
1111 assert_eq!(
1112 metadata.get("hidden_from_ui").and_then(|v| v.as_bool()),
1113 Some(true)
1114 );
1115 assert_eq!(
1116 metadata.get("runtime_kind").and_then(|v| v.as_str()),
1117 Some("child_completion_resume")
1118 );
1119 assert_eq!(
1120 metadata
1121 .get("child_final_response_included")
1122 .and_then(|v| v.as_bool()),
1123 Some(true)
1124 );
1125 }
1126
1127 #[test]
1128 fn runtime_resume_message_falls_back_to_error_when_no_response() {
1129 let mut completion = make_completion("error");
1130 completion.error = Some("boom".to_string());
1131
1132 let message = runtime_resume_message(&completion, 1, None);
1133 assert!(message.content.contains("Child error:"));
1134 assert!(message.content.contains("boom"));
1135 let metadata = message.metadata.expect("metadata present");
1136 assert_eq!(
1137 metadata
1138 .get("child_final_response_included")
1139 .and_then(|v| v.as_bool()),
1140 Some(false)
1141 );
1142 }
1143
1144 #[test]
1145 fn runtime_resume_message_minimal_when_no_response_and_no_error() {
1146 let completion = make_completion("completed");
1147 let message = runtime_resume_message(&completion, 2, None);
1148 assert!(!message.content.contains("Child final response:"));
1149 assert!(!message.content.contains("Child error:"));
1150 assert!(message.content.contains("Resume the parent task"));
1151 }
1152
1153 #[test]
1154 fn read_config_snapshot_refreshes_cached_snapshot_from_live_config() {
1155 let runtime = tokio::runtime::Runtime::new().expect("runtime");
1156
1157 runtime.block_on(async {
1158 let config = Arc::new(RwLock::new(Config::default()));
1159 config.write().await.provider = "copilot".to_string();
1160 let cached_config = StdRwLock::new(Config::default());
1161
1162 let snapshot = read_config_snapshot(&config, &cached_config);
1163
1164 assert_eq!(snapshot.provider, "copilot");
1165 assert_eq!(
1166 cached_config.read().expect("cached snapshot lock").provider,
1167 "copilot"
1168 );
1169 });
1170 }
1171
1172 #[test]
1173 fn read_config_snapshot_uses_cached_snapshot_when_live_lock_is_busy() {
1174 let runtime = tokio::runtime::Runtime::new().expect("runtime");
1175
1176 runtime.block_on(async {
1177 let cached_snapshot = Config {
1178 provider: "cached-provider".to_string(),
1179 ..Default::default()
1180 };
1181
1182 let config = Arc::new(RwLock::new(Config::default()));
1183 let cached_config = StdRwLock::new(cached_snapshot);
1184 let _write_guard = config.write().await;
1185
1186 let snapshot = read_config_snapshot(&config, &cached_config);
1187
1188 assert_eq!(snapshot.provider, "cached-provider");
1189 });
1190 }
1191
1192 #[test]
1195 fn bash_completion_resume_message_normal_announces_completion() {
1196 let ids = vec!["bg-1".to_string(), "bg-2".to_string()];
1197 let message = bash_completion_resume_message(&ids, false);
1198 assert!(
1200 message.content.contains("have completed"),
1201 "normal resume message must announce completion: {}",
1202 message.content
1203 );
1204 let metadata = message.metadata.expect("metadata present");
1206 assert_eq!(
1207 metadata
1208 .get(RUNTIME_RESUME_MESSAGE_HIDDEN_KEY)
1209 .and_then(|v| v.as_bool()),
1210 Some(true),
1211 "resume message must be hidden from the UI"
1212 );
1213 assert_eq!(
1214 metadata
1215 .get(RUNTIME_RESUME_MESSAGE_KIND_KEY)
1216 .and_then(|v| v.as_str()),
1217 Some(BASH_COMPLETION_RESUME_KIND),
1218 "resume message must carry the bash-completion kind discriminant"
1219 );
1220 }
1221
1222 #[test]
1223 fn bash_completion_resume_message_deadline_does_not_claim_completion() {
1224 let ids = vec!["bg-long".to_string()];
1228 let message = bash_completion_resume_message(&ids, true);
1229 assert!(
1230 !message.content.contains("have completed"),
1231 "deadline resume message must NOT claim the shells completed: {}",
1232 message.content
1233 );
1234 assert!(
1235 message.content.contains("may still be running"),
1236 "deadline resume message must warn shells may still be running: {}",
1237 message.content
1238 );
1239 assert!(
1240 message.content.contains("BashOutput"),
1241 "deadline resume message must direct verification via BashOutput: {}",
1242 message.content
1243 );
1244 let metadata = message.metadata.expect("metadata present");
1246 assert_eq!(
1247 metadata
1248 .get(RUNTIME_RESUME_MESSAGE_KIND_KEY)
1249 .and_then(|v| v.as_str()),
1250 Some(BASH_COMPLETION_RESUME_KIND)
1251 );
1252 }
1253
1254 #[test]
1255 fn bash_resume_should_retry_matrix() {
1256 assert!(!bash_resume_should_retry(
1262 &ResumeOutcome::Started { run_id: "r".into() },
1263 true
1264 ));
1265 assert!(!bash_resume_should_retry(
1266 &ResumeOutcome::Started { run_id: "r".into() },
1267 false
1268 ));
1269
1270 assert!(!bash_resume_should_retry(&ResumeOutcome::NotFound, true));
1272 assert!(!bash_resume_should_retry(&ResumeOutcome::NotFound, false));
1273
1274 assert!(bash_resume_should_retry(&ResumeOutcome::Completed, true));
1276 assert!(!bash_resume_should_retry(&ResumeOutcome::Completed, false));
1279
1280 assert!(bash_resume_should_retry(
1283 &ResumeOutcome::AlreadyRunning { run_id: "r".into() },
1284 true
1285 ));
1286 assert!(!bash_resume_should_retry(
1288 &ResumeOutcome::AlreadyRunning { run_id: "r".into() },
1289 false
1290 ));
1291 }
1292}