1use std::collections::HashMap;
8use std::sync::{Arc, OnceLock, RwLock as StdRwLock};
9use std::time::Duration;
10
11use crate::execution::{
12 create_event_forwarder, spawn_session_execution, try_reserve_runner, AgentRunner,
13 ChildCompletion, ChildCompletionHandler, RunnerReservation, SessionExecutionArgs,
14};
15use crate::runtime::config::GuardianSpawner;
16use crate::runtime::guardian_state::{
17 parse_guardian_verdict, read_guardian_config, read_guardian_state, write_guardian_state,
18 GuardianVerdict,
19};
20use crate::Agent;
21use async_trait::async_trait;
22use bamboo_agent_core::storage::Storage;
23use bamboo_agent_core::tools::ToolExecutor;
24use bamboo_agent_core::{AgentEvent, Message, Role, Session};
25use bamboo_domain::session::runtime_state::{
26 AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState,
27};
28use bamboo_llm::{Config, ProviderModelRouter, ProviderRegistry};
29use bamboo_storage::LockedSessionStore;
30use chrono::Utc;
31use tokio::sync::{broadcast, RwLock};
32
33use crate::model_areas::resolve_global_area_models;
34use crate::model_config_helper::{
35 resolve_fast_model, resolve_gold_config, GOLD_CONFIG_METADATA_KEY,
36};
37use crate::session_app::provider_model::session_effective_model_ref;
38use crate::session_app::resume::{
39 resume_session_execution, ResumeExecutionPort, ResumeSpawnRequest,
40};
41use crate::session_app::types::{ResumeConfigSnapshot, ResumeOutcome};
42
43const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
44const RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: &str = "hidden_from_ui";
45const RUNTIME_RESUME_MESSAGE_KIND_KEY: &str = "runtime_kind";
46
47fn read_runtime_state(session: &Session) -> AgentRuntimeState {
48 session
49 .agent_runtime_state
50 .clone()
51 .or_else(|| {
52 session
53 .metadata
54 .get(AGENT_RUNTIME_STATE_METADATA_KEY)
55 .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
56 })
57 .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-child-wait", session.id)))
58}
59
60fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
61 session.agent_runtime_state = Some(runtime_state.clone());
62 if let Ok(serialized) = serde_json::to_string(runtime_state) {
63 session
64 .metadata
65 .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
66 }
67}
68
69fn is_error_like(status: &str) -> bool {
70 matches!(status, "error" | "timeout" | "cancelled")
71}
72
73fn is_terminal_child_status(status: &str) -> bool {
75 matches!(
76 status,
77 "completed" | "error" | "timeout" | "cancelled" | "skipped"
78 )
79}
80
81async fn derive_completed_child_ids(
86 storage: &Arc<dyn Storage>,
87 parent_session_id: &str,
88 just_completed_child_id: &str,
89) -> Vec<String> {
90 let mut completed: Vec<String> = storage
91 .list_child_run_statuses(parent_session_id)
92 .await
93 .unwrap_or_default()
94 .into_iter()
95 .filter(|(_, status)| status.as_deref().is_some_and(is_terminal_child_status))
96 .map(|(id, _)| id)
97 .collect();
98 if !completed.iter().any(|id| id == just_completed_child_id) {
99 completed.push(just_completed_child_id.to_string());
100 }
101 completed.sort();
102 completed.dedup();
103 completed
104}
105
106fn read_config_snapshot(config: &Arc<RwLock<Config>>, cached_config: &StdRwLock<Config>) -> Config {
107 if let Ok(config_guard) = config.try_read() {
108 let snapshot = config_guard.clone();
109
110 if let Ok(mut cached_guard) = cached_config.try_write() {
111 *cached_guard = snapshot.clone();
112 }
113
114 snapshot
115 } else {
116 cached_config
117 .try_read()
118 .map(|guard| guard.clone())
119 .unwrap_or_default()
120 }
121}
122
123fn parent_locks() -> &'static std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
140 static LOCKS: OnceLock<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
141 OnceLock::new();
142 LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
143}
144
145fn wait_policy_satisfied(
146 policy: ChildWaitPolicy,
147 wait_child_ids: &[String],
148 completed_child_ids: &[String],
149 latest_status: &str,
150) -> bool {
151 if wait_child_ids.is_empty() {
152 return false;
153 }
154
155 match policy {
156 ChildWaitPolicy::All => wait_child_ids
157 .iter()
158 .all(|id| completed_child_ids.iter().any(|completed| completed == id)),
159 ChildWaitPolicy::Any => completed_child_ids
160 .iter()
161 .any(|id| wait_child_ids.iter().any(|wait_id| wait_id == id)),
162 ChildWaitPolicy::FirstError => {
163 is_error_like(latest_status)
164 || wait_child_ids
165 .iter()
166 .all(|id| completed_child_ids.iter().any(|completed| completed == id))
167 }
168 }
169}
170
171fn child_final_assistant_text(child: &Session) -> Option<String> {
175 child
176 .messages
177 .iter()
178 .rev()
179 .find(|message| matches!(message.role, Role::Assistant))
180 .map(|message| message.content.clone())
181 .filter(|content| !content.trim().is_empty())
182}
183
184fn runtime_resume_message(
185 completion: &ChildCompletion,
186 remaining_children: usize,
187 child_final_response: Option<&str>,
188) -> Message {
189 let mut body = format!(
190 "Runtime notification: child session `{}` finished with status `{}`. Remaining child sessions: {}.",
191 completion.child_session_id, completion.status, remaining_children
192 );
193
194 let final_response = child_final_response.map(str::to_string);
200 if let Some(response) = final_response.as_deref() {
201 body.push_str("\n\nChild final response:\n");
202 body.push_str(response);
203 } else if let Some(error) = completion.error.as_deref() {
204 if !error.is_empty() {
205 body.push_str("\n\nChild error:\n");
206 body.push_str(error);
207 }
208 }
209
210 body.push_str(
211 "\n\nResume the parent task using this child result and continue from the previous plan. \
212 If you need the full child transcript, call SubAgent.get(child_session_id).",
213 );
214
215 let mut message = Message::user(body);
216 message.metadata = Some(serde_json::json!({
217 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
218 RUNTIME_RESUME_MESSAGE_KIND_KEY: "child_completion_resume",
219 "child_session_id": completion.child_session_id,
220 "child_status": completion.status,
221 "child_error": completion.error,
222 "completed_at": completion.completed_at,
223 "child_final_response_included": final_response.is_some(),
224 }));
225 message.never_compress = false;
229 message
230}
231
232fn guardian_resume_message(completion: &ChildCompletion, verdict: &GuardianVerdict) -> Message {
237 let mut body = if verdict.approve {
238 String::from(
239 "Guardian review APPROVED: an independent reviewer verified the work and found no blocking issues. You may finalize the task.",
240 )
241 } else {
242 String::from(
243 "Guardian review REJECTED: an independent reviewer found issues. Address every finding below before completing — do NOT declare the task complete until they are resolved.",
244 )
245 };
246 if let Some(summary) = verdict.summary.as_deref().filter(|s| !s.trim().is_empty()) {
247 body.push_str("\n\nReviewer summary: ");
248 body.push_str(summary);
249 }
250 if !verdict.findings.is_empty() {
251 body.push_str("\n\nFindings:");
252 for (idx, finding) in verdict.findings.iter().enumerate() {
253 body.push_str(&format!("\n{}. {}", idx + 1, finding));
254 }
255 }
256 body.push_str(
257 "\n\nIf you need the full guardian transcript, call SubAgent.get(child_session_id).",
258 );
259
260 let mut message = Message::user(body);
261 message.metadata = Some(serde_json::json!({
262 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
263 RUNTIME_RESUME_MESSAGE_KIND_KEY: "guardian_review_resume",
264 "child_session_id": completion.child_session_id,
265 "child_status": completion.status,
266 "guardian_approved": verdict.approve,
267 "completed_at": completion.completed_at,
268 }));
269 message.never_compress = false;
270 message
271}
272
273#[derive(Clone)]
274pub struct ChildCompletionCoordinator {
275 storage: Arc<dyn Storage>,
276 persistence: Arc<bamboo_storage::LockedSessionStore>,
277 sessions: crate::SessionCache,
278 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
279 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
280 agent: Arc<Agent>,
281 config: Arc<RwLock<Config>>,
282 provider_registry: Arc<ProviderRegistry>,
283 provider_router: Arc<ProviderModelRouter>,
284 app_data_dir: std::path::PathBuf,
285 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
286 root_tools: Arc<RwLock<Option<Arc<dyn ToolExecutor>>>>,
287 guardian_spawner: Arc<RwLock<Option<Arc<dyn GuardianSpawner>>>>,
291}
292
293impl ChildCompletionCoordinator {
294 #[allow(clippy::too_many_arguments)]
295 pub fn new(
296 storage: Arc<dyn Storage>,
297 persistence: Arc<LockedSessionStore>,
298 sessions: crate::SessionCache,
299 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
300 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
301 agent: Arc<Agent>,
302 config: Arc<RwLock<Config>>,
303 provider_registry: Arc<ProviderRegistry>,
304 provider_router: Arc<ProviderModelRouter>,
305 app_data_dir: std::path::PathBuf,
306 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
307 ) -> Self {
308 Self {
309 storage,
310 persistence,
311 sessions,
312 agent_runners,
313 session_event_senders,
314 agent,
315 config,
316 provider_registry,
317 provider_router,
318 app_data_dir,
319 account_feed_inbox,
320 root_tools: Arc::new(RwLock::new(None)),
321 guardian_spawner: Arc::new(RwLock::new(None)),
322 }
323 }
324
325 pub async fn set_root_tools(&self, tools: Arc<dyn ToolExecutor>) {
326 *self.root_tools.write().await = Some(tools);
327 }
328
329 pub async fn set_guardian_spawner(&self, spawner: Arc<dyn GuardianSpawner>) {
332 *self.guardian_spawner.write().await = Some(spawner);
333 }
334
335 fn build_resume_config(
336 &self,
337 session: &Session,
338 config_snapshot: &Config,
339 ) -> ResumeConfigSnapshot {
340 crate::session_app::resolution::resolve_resume_config_snapshot(
341 config_snapshot,
342 &self.provider_registry,
343 session,
344 None,
345 )
346 }
347
348 async fn resume_parent(&self, parent_session_id: String) {
349 for attempt in 0..=5u8 {
350 if attempt > 0 {
351 tokio::time::sleep(Duration::from_millis(250 * attempt as u64)).await;
352 }
353
354 let Some(session) = self.load_session(&parent_session_id).await else {
355 tracing::warn!(%parent_session_id, "cannot resume parent after child completion: session not found");
356 return;
357 };
358 let config_snapshot = self.config.read().await.clone();
359 let resume_config = self.build_resume_config(&session, &config_snapshot);
360 let outcome = resume_session_execution(self, &parent_session_id, resume_config).await;
361 tracing::info!(
362 %parent_session_id,
363 attempt,
364 outcome = outcome.as_str(),
365 "child completion requested parent resume"
366 );
367
368 if !matches!(outcome, ResumeOutcome::AlreadyRunning { .. }) {
369 return;
370 }
371 }
372 }
373
374 async fn save_and_cache(&self, session: &mut Session) {
375 if let Err(error) = self.persistence.merge_save_runtime(session).await {
376 tracing::warn!(session_id = %session.id, %error, "failed to persist session");
377 }
378 self.sessions.insert(
379 session.id.clone(),
380 Arc::new(parking_lot::RwLock::new(session.clone())),
381 );
382 }
383}
384
385#[async_trait]
386impl ChildCompletionHandler for ChildCompletionCoordinator {
387 async fn on_child_completed(&self, completion: ChildCompletion) {
388 let per_parent = {
393 let mut map = parent_locks().lock().expect("parent lock map poisoned");
394 map.entry(completion.parent_session_id.clone())
395 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
396 .clone()
397 };
398 let _per_parent_guard = per_parent.lock().await;
399
400 let Some(mut parent) = self.load_session(&completion.parent_session_id).await else {
401 tracing::warn!(
402 parent_session_id = %completion.parent_session_id,
403 child_session_id = %completion.child_session_id,
404 "child completion received for missing parent"
405 );
406 return;
407 };
408
409 let mut runtime_state = read_runtime_state(&parent);
415
416 let completed_child_ids = derive_completed_child_ids(
419 &self.storage,
420 &completion.parent_session_id,
421 &completion.child_session_id,
422 )
423 .await;
424
425 let mut should_resume = false;
426 let mut remaining_children = 0usize;
427 if let Some(wait) = runtime_state.waiting_for_children.clone() {
428 remaining_children = wait
429 .child_session_ids
430 .iter()
431 .filter(|id| !completed_child_ids.iter().any(|completed| completed == *id))
432 .count();
433 should_resume = wait_policy_satisfied(
434 wait.wait_for,
435 &wait.child_session_ids,
436 &completed_child_ids,
437 &completion.status,
438 );
439 if should_resume {
440 runtime_state.waiting_for_children = None;
441 runtime_state.status = AgentStatusState::Idle;
442 runtime_state.suspension = None;
443 }
444 }
445
446 if should_resume {
447 parent.metadata.remove("runtime.suspend_reason");
448 let loaded_child = match self
453 .storage
454 .load_session(&completion.child_session_id)
455 .await
456 {
457 Ok(child) => child,
458 Err(error) => {
459 tracing::warn!(
460 child_session_id = %completion.child_session_id,
461 %error,
462 "failed to load child session for runtime resume message"
463 );
464 None
465 }
466 };
467
468 let reviewed_round = runtime_state.round.current_round;
474 let guardian_resume = loaded_child.as_ref().and_then(|child| {
475 if child.subagent_type().as_deref() != Some("guardian") {
476 return None;
477 }
478 let mut guardian_state = read_guardian_state(&parent)?;
479 if guardian_state.guardian_child_id.as_deref()
480 != Some(completion.child_session_id.as_str())
481 {
482 tracing::warn!(
485 parent_session_id = %completion.parent_session_id,
486 child_session_id = %completion.child_session_id,
487 expected = ?guardian_state.guardian_child_id,
488 "guardian completion does not match recorded guardian_child_id; using generic resume"
489 );
490 return None;
491 }
492 let verdict = child_final_assistant_text(child)
500 .and_then(|text| match parse_guardian_verdict(&text) {
501 Ok(verdict) => Some(verdict),
502 Err(error) => {
503 tracing::warn!(
504 child_session_id = %completion.child_session_id,
505 %error,
506 "guardian verdict unparseable; recording a synthetic reject"
507 );
508 None
509 }
510 })
511 .unwrap_or_else(|| {
512 GuardianVerdict::rejected(vec![
513 "The guardian reviewer did not return a usable verdict (it errored or \
514 emitted unparseable output); the work has NOT been independently \
515 verified."
516 .to_string(),
517 ])
518 });
519 let approved = verdict.approve;
520 let message = guardian_resume_message(&completion, &verdict);
521 guardian_state.record_verdict(verdict, reviewed_round);
522 write_guardian_state(&mut parent, guardian_state);
523 tracing::info!(
524 parent_session_id = %completion.parent_session_id,
525 child_session_id = %completion.child_session_id,
526 approved,
527 "guardian verdict recorded; resuming parent"
528 );
529 Some(message)
530 });
531
532 let resume_message = guardian_resume.unwrap_or_else(|| {
533 runtime_resume_message(
534 &completion,
535 remaining_children,
536 loaded_child
537 .as_ref()
538 .and_then(child_final_assistant_text)
539 .as_deref(),
540 )
541 });
542 parent.add_message(resume_message);
543 } else if runtime_state.waiting_for_children.is_some() {
544 runtime_state.status = AgentStatusState::Suspended;
545 runtime_state.suspension = Some(SuspensionState {
546 reason: "waiting_for_children".to_string(),
547 suspended_at: Utc::now(),
548 resumable: true,
549 hook_point: Some("ChildCompletion".to_string()),
550 });
551 }
552
553 parent.updated_at = Utc::now();
554 write_runtime_state(&mut parent, &runtime_state);
555 self.save_and_cache(&mut parent).await;
556
557 let resume_parent_id = parent.id.clone();
562 drop(_per_parent_guard);
563
564 if should_resume {
565 self.resume_parent(resume_parent_id).await;
566 }
567 }
568}
569
570#[async_trait]
571impl ResumeExecutionPort for ChildCompletionCoordinator {
572 async fn load_session(&self, session_id: &str) -> Option<Session> {
573 match self.storage.load_session(session_id).await {
574 Ok(Some(session)) => Some(session),
575 Ok(None) => self
576 .sessions
577 .get(session_id)
578 .map(|e| e.value().clone())
579 .map(|arc| arc.read().clone()),
580 Err(error) => {
581 tracing::warn!(%session_id, %error, "failed to load session from storage");
582 self.sessions
583 .get(session_id)
584 .map(|e| e.value().clone())
585 .map(|arc| arc.read().clone())
586 }
587 }
588 }
589
590 async fn save_and_cache_session(&self, session: &mut Session) {
591 self.save_and_cache(session).await;
592 }
593
594 async fn try_reserve_runner(
595 &self,
596 session_id: &str,
597 event_sender: &broadcast::Sender<AgentEvent>,
598 ) -> Option<RunnerReservation> {
599 try_reserve_runner(&self.agent_runners, session_id, event_sender).await
600 }
601
602 async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String> {
603 let runners = self.agent_runners.read().await;
604 runners.get(session_id).map(|r| r.run_id.clone())
605 }
606
607 async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent> {
608 crate::execution::session_events::get_or_create_event_sender(
609 &self.session_event_senders,
610 session_id,
611 )
612 .await
613 }
614
615 async fn spawn_resume_execution(&self, request: ResumeSpawnRequest) {
616 let ResumeSpawnRequest {
617 session_id,
618 session,
619 cancel_token,
620 run_id: _,
621 event_sender,
622 config,
623 } = request;
624
625 let Some(root_tools) = self.root_tools.read().await.clone() else {
626 tracing::error!(%session_id, "cannot resume parent after child completion: root tool surface is not initialized");
627 return;
628 };
629
630 let model = session.model.clone();
631 let resolved_provider_name = session_effective_model_ref(&session)
632 .map(|model_ref| model_ref.provider)
633 .unwrap_or(config.provider_name);
634 let provider_override = session_effective_model_ref(&session)
635 .and_then(|model_ref| match self.provider_router.route(&model_ref) {
636 Ok(provider) => Some(provider),
637 Err(error) => {
638 tracing::warn!(
639 session_id = %session_id,
640 provider = %model_ref.provider,
641 model = %model_ref.model,
642 error = %error,
643 "failed to resolve provider override for child-completion parent resume; falling back to runtime provider"
644 );
645 None
646 }
647 });
648 let config_snapshot = self.config.read().await.clone();
649 let resolved_fast_provider = resolve_fast_model(
650 &config_snapshot,
651 &resolved_provider_name,
652 &self.provider_registry,
653 )
654 .map(|model| model.provider);
655 let reasoning_effort = session.reasoning_effort;
656 let reasoning_effort_source = session
657 .metadata
658 .get("reasoning_effort_source")
659 .cloned()
660 .unwrap_or_default();
661 let gold_config = resolve_gold_config(
662 &config_snapshot,
663 session
664 .metadata
665 .get(GOLD_CONFIG_METADATA_KEY)
666 .map(String::as_str),
667 )
668 .or(config.gold_config.clone());
669
670 let (mpsc_tx, _forwarder) = create_event_forwarder(
671 session_id.clone(),
672 event_sender,
673 self.agent_runners.clone(),
674 self.account_feed_inbox.clone(),
675 );
676
677 let config_handle = self.config.clone();
678 let cached_config = Arc::new(StdRwLock::new(config_snapshot.clone()));
679 let provider_registry = self.provider_registry.clone();
680 let provider_name_for_aux = resolved_provider_name.clone();
681 let auxiliary_model_resolver = std::sync::Arc::new(move || {
682 let config_snapshot = read_config_snapshot(&config_handle, cached_config.as_ref());
683 let areas = resolve_global_area_models(
685 &config_snapshot,
686 &provider_name_for_aux,
687 &provider_registry,
688 );
689 crate::AuxiliaryModelConfig {
690 fast_model_name: areas.fast.as_ref().map(|m| m.model_name.clone()),
691 fast_model_provider: areas.fast.map(|m| m.provider),
692 background_model_name: areas.background.as_ref().map(|m| m.model_name.clone()),
693 planning_model_name: None,
694 search_model_name: None,
695 summarization_model_name: areas
696 .summarization
697 .as_ref()
698 .map(|m| m.model_name.clone()),
699 background_model_provider: areas.background.map(|m| m.provider),
700 summarization_model_provider: areas.summarization.map(|m| m.provider),
701 }
702 });
703 let model_roster = crate::ModelRoster {
704 model: Some(model),
705 provider_name: Some(resolved_provider_name),
706 provider_type: config.provider_type.clone(),
707 fast: crate::RoleModel::from_parts(config.fast_model, resolved_fast_provider),
708 background: crate::RoleModel::from_parts(
709 config.background_model,
710 config.background_model_provider,
711 ),
712 summarization: crate::RoleModel::from_parts(
713 config.summarization_model,
714 config.summarization_model_provider,
715 ),
716 };
717
718 let guardian_config = read_guardian_config(&session);
723 let guardian_spawner = self.guardian_spawner.read().await.clone();
724
725 spawn_session_execution(SessionExecutionArgs {
726 agent: self.agent.clone(),
727 session_id,
728 session,
729 tools_override: Some(root_tools),
730 provider_override,
731 model_roster,
732 reasoning_effort,
733 reasoning_effort_source,
734 auxiliary_model_resolver: Some(auxiliary_model_resolver),
735 disabled_tools: Some(config.disabled_tools),
736 disabled_skill_ids: Some(config.disabled_skill_ids),
737 selected_skill_ids: None,
738 selected_skill_mode: None,
739 cancel_token,
740 mpsc_tx,
741 image_fallback: config.image_fallback,
742 gold_config,
743 guardian_config,
744 guardian_spawner,
745 app_data_dir: Some(self.app_data_dir.clone()),
746 runners: self.agent_runners.clone(),
747 sessions_cache: self.sessions.clone(),
748 on_complete: None,
749 });
750 }
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use bamboo_agent_core::Message;
757
758 fn make_completion(status: &str) -> ChildCompletion {
759 ChildCompletion {
760 parent_session_id: "parent-1".to_string(),
761 child_session_id: "child-1".to_string(),
762 status: status.to_string(),
763 error: None,
764 completed_at: Utc::now(),
765 }
766 }
767
768 struct StubChildIndex {
771 children: Vec<(String, Option<String>)>,
772 }
773
774 #[async_trait]
775 impl Storage for StubChildIndex {
776 async fn save_session(&self, _session: &Session) -> std::io::Result<()> {
777 Ok(())
778 }
779 async fn load_session(&self, _id: &str) -> std::io::Result<Option<Session>> {
780 Ok(None)
781 }
782 async fn delete_session(&self, _id: &str) -> std::io::Result<bool> {
783 Ok(false)
784 }
785 async fn list_child_run_statuses(
786 &self,
787 _parent_session_id: &str,
788 ) -> std::io::Result<Vec<(String, Option<String>)>> {
789 Ok(self.children.clone())
790 }
791 }
792
793 #[tokio::test]
794 async fn derive_completed_only_includes_terminal_children() {
795 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
796 children: vec![
797 ("a".into(), Some("completed".into())),
798 ("b".into(), Some("running".into())),
799 ("c".into(), Some("error".into())),
800 ("d".into(), None),
801 ],
802 });
803 let completed = derive_completed_child_ids(&storage, "parent-1", "b").await;
804 assert_eq!(
806 completed,
807 vec!["a".to_string(), "b".to_string(), "c".to_string()]
808 );
809 }
810
811 #[tokio::test]
812 async fn derive_completed_folds_in_just_completed_when_index_lags() {
813 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
815 children: vec![("only".into(), Some("running".into()))],
816 });
817 let completed = derive_completed_child_ids(&storage, "parent-1", "only").await;
818 assert_eq!(completed, vec!["only".to_string()]);
819 }
820
821 #[test]
822 fn wait_policy_all_uses_derived_completed_set() {
823 let waited = vec!["a".to_string(), "b".to_string()];
824 assert!(!wait_policy_satisfied(
825 ChildWaitPolicy::All,
826 &waited,
827 &["a".to_string()],
828 "completed"
829 ));
830 assert!(wait_policy_satisfied(
831 ChildWaitPolicy::All,
832 &waited,
833 &["a".to_string(), "b".to_string()],
834 "completed"
835 ));
836 }
837
838 #[test]
839 fn child_final_assistant_text_returns_last_assistant() {
840 let mut session = Session::new("child-1", "gpt-4");
841 session.messages.push(Message::user("hi"));
842 session
843 .messages
844 .push(Message::assistant("first answer", None));
845 session.messages.push(Message::user("again"));
846 session
847 .messages
848 .push(Message::assistant("final answer", None));
849
850 assert_eq!(
851 child_final_assistant_text(&session).as_deref(),
852 Some("final answer")
853 );
854 }
855
856 #[test]
857 fn child_final_assistant_text_returns_none_when_blank() {
858 let mut session = Session::new("child-1", "gpt-4");
859 session.messages.push(Message::assistant(" ", None));
860 assert!(child_final_assistant_text(&session).is_none());
861 }
862
863 #[test]
864 fn child_final_assistant_text_returns_none_when_no_assistant() {
865 let mut session = Session::new("child-1", "gpt-4");
866 session.messages.push(Message::user("hi"));
867 assert!(child_final_assistant_text(&session).is_none());
868 }
869
870 #[test]
871 fn runtime_resume_message_folds_full_response_without_truncation() {
872 let completion = make_completion("completed");
875 let long: String = "a".repeat(10_000);
876 let message = runtime_resume_message(&completion, 0, Some(&long));
877 assert!(message.content.contains(&long));
878 assert!(!message.content.contains("truncated"));
879 }
880
881 #[test]
882 fn runtime_resume_message_includes_child_response_when_provided() {
883 let completion = make_completion("completed");
884 let message = runtime_resume_message(&completion, 0, Some("the answer is 42"));
885
886 assert!(matches!(message.role, Role::User));
887 assert!(!message.never_compress);
890 assert!(message.content.contains("Child final response:"));
891 assert!(message.content.contains("the answer is 42"));
892
893 let metadata = message.metadata.expect("metadata present");
894 assert_eq!(
895 metadata.get("hidden_from_ui").and_then(|v| v.as_bool()),
896 Some(true)
897 );
898 assert_eq!(
899 metadata.get("runtime_kind").and_then(|v| v.as_str()),
900 Some("child_completion_resume")
901 );
902 assert_eq!(
903 metadata
904 .get("child_final_response_included")
905 .and_then(|v| v.as_bool()),
906 Some(true)
907 );
908 }
909
910 #[test]
911 fn runtime_resume_message_falls_back_to_error_when_no_response() {
912 let mut completion = make_completion("error");
913 completion.error = Some("boom".to_string());
914
915 let message = runtime_resume_message(&completion, 1, None);
916 assert!(message.content.contains("Child error:"));
917 assert!(message.content.contains("boom"));
918 let metadata = message.metadata.expect("metadata present");
919 assert_eq!(
920 metadata
921 .get("child_final_response_included")
922 .and_then(|v| v.as_bool()),
923 Some(false)
924 );
925 }
926
927 #[test]
928 fn runtime_resume_message_minimal_when_no_response_and_no_error() {
929 let completion = make_completion("completed");
930 let message = runtime_resume_message(&completion, 2, None);
931 assert!(!message.content.contains("Child final response:"));
932 assert!(!message.content.contains("Child error:"));
933 assert!(message.content.contains("Resume the parent task"));
934 }
935
936 #[test]
937 fn read_config_snapshot_refreshes_cached_snapshot_from_live_config() {
938 let runtime = tokio::runtime::Runtime::new().expect("runtime");
939
940 runtime.block_on(async {
941 let config = Arc::new(RwLock::new(Config::default()));
942 config.write().await.provider = "copilot".to_string();
943 let cached_config = StdRwLock::new(Config::default());
944
945 let snapshot = read_config_snapshot(&config, &cached_config);
946
947 assert_eq!(snapshot.provider, "copilot");
948 assert_eq!(
949 cached_config.read().expect("cached snapshot lock").provider,
950 "copilot"
951 );
952 });
953 }
954
955 #[test]
956 fn read_config_snapshot_uses_cached_snapshot_when_live_lock_is_busy() {
957 let runtime = tokio::runtime::Runtime::new().expect("runtime");
958
959 runtime.block_on(async {
960 let cached_snapshot = Config {
961 provider: "cached-provider".to_string(),
962 ..Default::default()
963 };
964
965 let config = Arc::new(RwLock::new(Config::default()));
966 let cached_config = StdRwLock::new(cached_snapshot);
967 let _write_guard = config.write().await;
968
969 let snapshot = read_config_snapshot(&config, &cached_config);
970
971 assert_eq!(snapshot.provider, "cached-provider");
972 });
973 }
974}