1use std::collections::HashMap;
8use std::sync::{Arc, OnceLock, RwLock as StdRwLock};
9use std::time::Duration;
10
11use crate::execution::{
12 create_event_forwarder, spawn_session_execution, try_reserve_runner, AgentRunner,
13 ChildCompletion, ChildCompletionHandler, RunnerReservation, SessionExecutionArgs,
14};
15use crate::Agent;
16use async_trait::async_trait;
17use bamboo_agent_core::storage::Storage;
18use bamboo_agent_core::tools::ToolExecutor;
19use bamboo_agent_core::{AgentEvent, Message, Role, Session, SessionKind};
20use bamboo_domain::session::runtime_state::{
21 AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState,
22};
23use bamboo_llm::{Config, ProviderModelRouter, ProviderRegistry};
24use bamboo_storage::LockedSessionStore;
25use chrono::Utc;
26use tokio::sync::{broadcast, RwLock};
27
28use crate::model_areas::resolve_global_area_models;
29use crate::model_config_helper::{
30 resolve_fast_model, resolve_gold_config, GOLD_CONFIG_METADATA_KEY,
31};
32use crate::session_app::provider_model::session_effective_model_ref;
33use crate::session_app::resume::{
34 resume_session_execution, ResumeExecutionPort, ResumeSpawnRequest,
35};
36use crate::session_app::types::{ResumeConfigSnapshot, ResumeOutcome};
37
38const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
39const RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: &str = "hidden_from_ui";
40const RUNTIME_RESUME_MESSAGE_KIND_KEY: &str = "runtime_kind";
41
42fn read_runtime_state(session: &Session) -> AgentRuntimeState {
43 session
44 .agent_runtime_state
45 .clone()
46 .or_else(|| {
47 session
48 .metadata
49 .get(AGENT_RUNTIME_STATE_METADATA_KEY)
50 .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
51 })
52 .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-child-wait", session.id)))
53}
54
55fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
56 session.agent_runtime_state = Some(runtime_state.clone());
57 if let Ok(serialized) = serde_json::to_string(runtime_state) {
58 session
59 .metadata
60 .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
61 }
62}
63
64fn is_error_like(status: &str) -> bool {
65 matches!(status, "error" | "timeout" | "cancelled")
66}
67
68fn is_terminal_child_status(status: &str) -> bool {
70 matches!(
71 status,
72 "completed" | "error" | "timeout" | "cancelled" | "skipped"
73 )
74}
75
76async fn derive_completed_child_ids(
81 storage: &Arc<dyn Storage>,
82 parent_session_id: &str,
83 just_completed_child_id: &str,
84) -> Vec<String> {
85 let mut completed: Vec<String> = storage
86 .list_child_run_statuses(parent_session_id)
87 .await
88 .unwrap_or_default()
89 .into_iter()
90 .filter(|(_, status)| status.as_deref().is_some_and(is_terminal_child_status))
91 .map(|(id, _)| id)
92 .collect();
93 if !completed.iter().any(|id| id == just_completed_child_id) {
94 completed.push(just_completed_child_id.to_string());
95 }
96 completed.sort();
97 completed.dedup();
98 completed
99}
100
101fn read_config_snapshot(config: &Arc<RwLock<Config>>, cached_config: &StdRwLock<Config>) -> Config {
102 if let Ok(config_guard) = config.try_read() {
103 let snapshot = config_guard.clone();
104
105 if let Ok(mut cached_guard) = cached_config.try_write() {
106 *cached_guard = snapshot.clone();
107 }
108
109 snapshot
110 } else {
111 cached_config
112 .try_read()
113 .map(|guard| guard.clone())
114 .unwrap_or_default()
115 }
116}
117
118fn parent_locks() -> &'static std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
135 static LOCKS: OnceLock<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
136 OnceLock::new();
137 LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
138}
139
140fn wait_policy_satisfied(
141 policy: ChildWaitPolicy,
142 wait_child_ids: &[String],
143 completed_child_ids: &[String],
144 latest_status: &str,
145) -> bool {
146 if wait_child_ids.is_empty() {
147 return false;
148 }
149
150 match policy {
151 ChildWaitPolicy::All => wait_child_ids
152 .iter()
153 .all(|id| completed_child_ids.iter().any(|completed| completed == id)),
154 ChildWaitPolicy::Any => completed_child_ids
155 .iter()
156 .any(|id| wait_child_ids.iter().any(|wait_id| wait_id == id)),
157 ChildWaitPolicy::FirstError => {
158 is_error_like(latest_status)
159 || wait_child_ids
160 .iter()
161 .all(|id| completed_child_ids.iter().any(|completed| completed == id))
162 }
163 }
164}
165
166const RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS: usize = 4000;
171const RUNTIME_RESUME_CHILD_RESULT_TRUNCATION_MARKER: &str =
172 "\n\n[... child final response truncated; call SubAgent.get for full content ...]";
173
174fn child_final_assistant_text(child: &Session) -> Option<String> {
178 child
179 .messages
180 .iter()
181 .rev()
182 .find(|message| matches!(message.role, Role::Assistant))
183 .map(|message| message.content.clone())
184 .filter(|content| !content.trim().is_empty())
185}
186
187fn truncate_for_resume(content: &str) -> String {
188 if content.chars().count() <= RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS {
189 return content.to_string();
190 }
191 let head: String = content
192 .chars()
193 .take(RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS)
194 .collect();
195 format!("{head}{RUNTIME_RESUME_CHILD_RESULT_TRUNCATION_MARKER}")
196}
197
198fn runtime_resume_message(
199 completion: &ChildCompletion,
200 remaining_children: usize,
201 child_final_response: Option<&str>,
202) -> Message {
203 let mut body = format!(
204 "Runtime notification: child session `{}` finished with status `{}`. Remaining child sessions: {}.",
205 completion.child_session_id, completion.status, remaining_children
206 );
207
208 let truncated_response = child_final_response.map(truncate_for_resume);
209 if let Some(response) = truncated_response.as_deref() {
210 body.push_str("\n\nChild final response:\n");
211 body.push_str(response);
212 } else if let Some(error) = completion.error.as_deref() {
213 if !error.is_empty() {
214 body.push_str("\n\nChild error:\n");
215 body.push_str(error);
216 }
217 }
218
219 body.push_str(
220 "\n\nResume the parent task using this child result and continue from the previous plan. \
221 If you need the full child transcript, call SubAgent.get(child_session_id).",
222 );
223
224 let mut message = Message::user(body);
225 message.metadata = Some(serde_json::json!({
226 RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
227 RUNTIME_RESUME_MESSAGE_KIND_KEY: "child_completion_resume",
228 "child_session_id": completion.child_session_id,
229 "child_status": completion.status,
230 "child_error": completion.error,
231 "completed_at": completion.completed_at,
232 "child_final_response_included": truncated_response.is_some(),
233 }));
234 message.never_compress = true;
235 message
236}
237
238#[derive(Clone)]
239pub struct ChildCompletionCoordinator {
240 storage: Arc<dyn Storage>,
241 persistence: Arc<bamboo_storage::LockedSessionStore>,
242 sessions: crate::SessionCache,
243 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
244 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
245 agent: Arc<Agent>,
246 config: Arc<RwLock<Config>>,
247 provider_registry: Arc<ProviderRegistry>,
248 provider_router: Arc<ProviderModelRouter>,
249 app_data_dir: std::path::PathBuf,
250 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
251 root_tools: Arc<RwLock<Option<Arc<dyn ToolExecutor>>>>,
252}
253
254impl ChildCompletionCoordinator {
255 #[allow(clippy::too_many_arguments)]
256 pub fn new(
257 storage: Arc<dyn Storage>,
258 persistence: Arc<LockedSessionStore>,
259 sessions: crate::SessionCache,
260 agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
261 session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
262 agent: Arc<Agent>,
263 config: Arc<RwLock<Config>>,
264 provider_registry: Arc<ProviderRegistry>,
265 provider_router: Arc<ProviderModelRouter>,
266 app_data_dir: std::path::PathBuf,
267 account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
268 ) -> Self {
269 Self {
270 storage,
271 persistence,
272 sessions,
273 agent_runners,
274 session_event_senders,
275 agent,
276 config,
277 provider_registry,
278 provider_router,
279 app_data_dir,
280 account_feed_inbox,
281 root_tools: Arc::new(RwLock::new(None)),
282 }
283 }
284
285 pub async fn set_root_tools(&self, tools: Arc<dyn ToolExecutor>) {
286 *self.root_tools.write().await = Some(tools);
287 }
288
289 fn build_resume_config(
290 &self,
291 session: &Session,
292 config_snapshot: &Config,
293 ) -> ResumeConfigSnapshot {
294 crate::session_app::resolution::resolve_resume_config_snapshot(
295 config_snapshot,
296 &self.provider_registry,
297 session,
298 None,
299 )
300 }
301
302 async fn resume_parent(&self, parent_session_id: String) {
303 for attempt in 0..=5u8 {
304 if attempt > 0 {
305 tokio::time::sleep(Duration::from_millis(250 * attempt as u64)).await;
306 }
307
308 let Some(session) = self.load_session(&parent_session_id).await else {
309 tracing::warn!(%parent_session_id, "cannot resume parent after child completion: session not found");
310 return;
311 };
312 let config_snapshot = self.config.read().await.clone();
313 let resume_config = self.build_resume_config(&session, &config_snapshot);
314 let outcome = resume_session_execution(self, &parent_session_id, resume_config).await;
315 tracing::info!(
316 %parent_session_id,
317 attempt,
318 outcome = outcome.as_str(),
319 "child completion requested parent resume"
320 );
321
322 if !matches!(outcome, ResumeOutcome::AlreadyRunning { .. }) {
323 return;
324 }
325 }
326 }
327
328 async fn save_and_cache(&self, session: &mut Session) {
329 if let Err(error) = self.persistence.merge_save_runtime(session).await {
330 tracing::warn!(session_id = %session.id, %error, "failed to persist session");
331 }
332 self.sessions.insert(
333 session.id.clone(),
334 Arc::new(parking_lot::RwLock::new(session.clone())),
335 );
336 }
337}
338
339#[async_trait]
340impl ChildCompletionHandler for ChildCompletionCoordinator {
341 async fn on_child_completed(&self, completion: ChildCompletion) {
342 let per_parent = {
347 let mut map = parent_locks().lock().expect("parent lock map poisoned");
348 map.entry(completion.parent_session_id.clone())
349 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
350 .clone()
351 };
352 let _per_parent_guard = per_parent.lock().await;
353
354 let Some(mut parent) = self.load_session(&completion.parent_session_id).await else {
355 tracing::warn!(
356 parent_session_id = %completion.parent_session_id,
357 child_session_id = %completion.child_session_id,
358 "child completion received for missing parent"
359 );
360 return;
361 };
362
363 if parent.kind != SessionKind::Root {
364 tracing::warn!(
365 parent_session_id = %completion.parent_session_id,
366 child_session_id = %completion.child_session_id,
367 "child completion parent is not a root session"
368 );
369 return;
370 }
371
372 let mut runtime_state = read_runtime_state(&parent);
373
374 let completed_child_ids = derive_completed_child_ids(
377 &self.storage,
378 &completion.parent_session_id,
379 &completion.child_session_id,
380 )
381 .await;
382
383 let mut should_resume = false;
384 let mut remaining_children = 0usize;
385 if let Some(wait) = runtime_state.waiting_for_children.clone() {
386 remaining_children = wait
387 .child_session_ids
388 .iter()
389 .filter(|id| !completed_child_ids.iter().any(|completed| completed == *id))
390 .count();
391 should_resume = wait_policy_satisfied(
392 wait.wait_for,
393 &wait.child_session_ids,
394 &completed_child_ids,
395 &completion.status,
396 );
397 if should_resume {
398 runtime_state.waiting_for_children = None;
399 runtime_state.status = AgentStatusState::Idle;
400 runtime_state.suspension = None;
401 }
402 }
403
404 if should_resume {
405 parent.metadata.remove("runtime.suspend_reason");
406 let child_final_response = match self
412 .storage
413 .load_session(&completion.child_session_id)
414 .await
415 {
416 Ok(Some(child)) => child_final_assistant_text(&child),
417 Ok(None) => None,
418 Err(error) => {
419 tracing::warn!(
420 child_session_id = %completion.child_session_id,
421 %error,
422 "failed to load child session for runtime resume message"
423 );
424 None
425 }
426 };
427 parent.add_message(runtime_resume_message(
428 &completion,
429 remaining_children,
430 child_final_response.as_deref(),
431 ));
432 } else if runtime_state.waiting_for_children.is_some() {
433 runtime_state.status = AgentStatusState::Suspended;
434 runtime_state.suspension = Some(SuspensionState {
435 reason: "waiting_for_children".to_string(),
436 suspended_at: Utc::now(),
437 resumable: true,
438 hook_point: Some("ChildCompletion".to_string()),
439 });
440 }
441
442 parent.updated_at = Utc::now();
443 write_runtime_state(&mut parent, &runtime_state);
444 self.save_and_cache(&mut parent).await;
445
446 let resume_parent_id = parent.id.clone();
451 drop(_per_parent_guard);
452
453 if should_resume {
454 self.resume_parent(resume_parent_id).await;
455 }
456 }
457}
458
459#[async_trait]
460impl ResumeExecutionPort for ChildCompletionCoordinator {
461 async fn load_session(&self, session_id: &str) -> Option<Session> {
462 match self.storage.load_session(session_id).await {
463 Ok(Some(session)) => Some(session),
464 Ok(None) => self
465 .sessions
466 .get(session_id)
467 .map(|e| e.value().clone())
468 .map(|arc| arc.read().clone()),
469 Err(error) => {
470 tracing::warn!(%session_id, %error, "failed to load session from storage");
471 self.sessions
472 .get(session_id)
473 .map(|e| e.value().clone())
474 .map(|arc| arc.read().clone())
475 }
476 }
477 }
478
479 async fn save_and_cache_session(&self, session: &mut Session) {
480 self.save_and_cache(session).await;
481 }
482
483 async fn try_reserve_runner(
484 &self,
485 session_id: &str,
486 event_sender: &broadcast::Sender<AgentEvent>,
487 ) -> Option<RunnerReservation> {
488 try_reserve_runner(&self.agent_runners, session_id, event_sender).await
489 }
490
491 async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String> {
492 let runners = self.agent_runners.read().await;
493 runners.get(session_id).map(|r| r.run_id.clone())
494 }
495
496 async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent> {
497 crate::execution::session_events::get_or_create_event_sender(
498 &self.session_event_senders,
499 session_id,
500 )
501 .await
502 }
503
504 async fn spawn_resume_execution(&self, request: ResumeSpawnRequest) {
505 let ResumeSpawnRequest {
506 session_id,
507 session,
508 cancel_token,
509 run_id: _,
510 event_sender,
511 config,
512 } = request;
513
514 let Some(root_tools) = self.root_tools.read().await.clone() else {
515 tracing::error!(%session_id, "cannot resume parent after child completion: root tool surface is not initialized");
516 return;
517 };
518
519 let model = session.model.clone();
520 let resolved_provider_name = session_effective_model_ref(&session)
521 .map(|model_ref| model_ref.provider)
522 .unwrap_or(config.provider_name);
523 let provider_override = session_effective_model_ref(&session)
524 .and_then(|model_ref| match self.provider_router.route(&model_ref) {
525 Ok(provider) => Some(provider),
526 Err(error) => {
527 tracing::warn!(
528 session_id = %session_id,
529 provider = %model_ref.provider,
530 model = %model_ref.model,
531 error = %error,
532 "failed to resolve provider override for child-completion parent resume; falling back to runtime provider"
533 );
534 None
535 }
536 });
537 let config_snapshot = self.config.read().await.clone();
538 let resolved_fast_provider = resolve_fast_model(
539 &config_snapshot,
540 &resolved_provider_name,
541 &self.provider_registry,
542 )
543 .map(|model| model.provider);
544 let reasoning_effort = session.reasoning_effort;
545 let reasoning_effort_source = session
546 .metadata
547 .get("reasoning_effort_source")
548 .cloned()
549 .unwrap_or_default();
550 let gold_config = resolve_gold_config(
551 &config_snapshot,
552 session
553 .metadata
554 .get(GOLD_CONFIG_METADATA_KEY)
555 .map(String::as_str),
556 )
557 .or(config.gold_config.clone());
558
559 let (mpsc_tx, _forwarder) = create_event_forwarder(
560 session_id.clone(),
561 event_sender,
562 self.agent_runners.clone(),
563 self.account_feed_inbox.clone(),
564 );
565
566 let config_handle = self.config.clone();
567 let cached_config = Arc::new(StdRwLock::new(config_snapshot.clone()));
568 let provider_registry = self.provider_registry.clone();
569 let provider_name_for_aux = resolved_provider_name.clone();
570 let auxiliary_model_resolver = std::sync::Arc::new(move || {
571 let config_snapshot = read_config_snapshot(&config_handle, cached_config.as_ref());
572 let areas = resolve_global_area_models(
574 &config_snapshot,
575 &provider_name_for_aux,
576 &provider_registry,
577 );
578 crate::AuxiliaryModelConfig {
579 fast_model_name: areas.fast.as_ref().map(|m| m.model_name.clone()),
580 fast_model_provider: areas.fast.map(|m| m.provider),
581 background_model_name: areas.background.as_ref().map(|m| m.model_name.clone()),
582 planning_model_name: None,
583 search_model_name: None,
584 summarization_model_name: areas
585 .summarization
586 .as_ref()
587 .map(|m| m.model_name.clone()),
588 background_model_provider: areas.background.map(|m| m.provider),
589 summarization_model_provider: areas.summarization.map(|m| m.provider),
590 }
591 });
592 let model_roster = crate::ModelRoster {
593 model: Some(model),
594 provider_name: Some(resolved_provider_name),
595 provider_type: config.provider_type.clone(),
596 fast: crate::RoleModel::from_parts(config.fast_model, resolved_fast_provider),
597 background: crate::RoleModel::from_parts(
598 config.background_model,
599 config.background_model_provider,
600 ),
601 summarization: crate::RoleModel::from_parts(
602 config.summarization_model,
603 config.summarization_model_provider,
604 ),
605 };
606 spawn_session_execution(SessionExecutionArgs {
607 agent: self.agent.clone(),
608 session_id,
609 session,
610 tools_override: Some(root_tools),
611 provider_override,
612 model_roster,
613 reasoning_effort,
614 reasoning_effort_source,
615 auxiliary_model_resolver: Some(auxiliary_model_resolver),
616 disabled_tools: Some(config.disabled_tools),
617 disabled_skill_ids: Some(config.disabled_skill_ids),
618 selected_skill_ids: None,
619 selected_skill_mode: None,
620 cancel_token,
621 mpsc_tx,
622 image_fallback: config.image_fallback,
623 gold_config,
624 app_data_dir: Some(self.app_data_dir.clone()),
625 runners: self.agent_runners.clone(),
626 sessions_cache: self.sessions.clone(),
627 on_complete: None,
628 });
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use bamboo_agent_core::Message;
636
637 fn make_completion(status: &str) -> ChildCompletion {
638 ChildCompletion {
639 parent_session_id: "parent-1".to_string(),
640 child_session_id: "child-1".to_string(),
641 status: status.to_string(),
642 error: None,
643 completed_at: Utc::now(),
644 }
645 }
646
647 struct StubChildIndex {
650 children: Vec<(String, Option<String>)>,
651 }
652
653 #[async_trait]
654 impl Storage for StubChildIndex {
655 async fn save_session(&self, _session: &Session) -> std::io::Result<()> {
656 Ok(())
657 }
658 async fn load_session(&self, _id: &str) -> std::io::Result<Option<Session>> {
659 Ok(None)
660 }
661 async fn delete_session(&self, _id: &str) -> std::io::Result<bool> {
662 Ok(false)
663 }
664 async fn list_child_run_statuses(
665 &self,
666 _parent_session_id: &str,
667 ) -> std::io::Result<Vec<(String, Option<String>)>> {
668 Ok(self.children.clone())
669 }
670 }
671
672 #[tokio::test]
673 async fn derive_completed_only_includes_terminal_children() {
674 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
675 children: vec![
676 ("a".into(), Some("completed".into())),
677 ("b".into(), Some("running".into())),
678 ("c".into(), Some("error".into())),
679 ("d".into(), None),
680 ],
681 });
682 let completed = derive_completed_child_ids(&storage, "parent-1", "b").await;
683 assert_eq!(
685 completed,
686 vec!["a".to_string(), "b".to_string(), "c".to_string()]
687 );
688 }
689
690 #[tokio::test]
691 async fn derive_completed_folds_in_just_completed_when_index_lags() {
692 let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
694 children: vec![("only".into(), Some("running".into()))],
695 });
696 let completed = derive_completed_child_ids(&storage, "parent-1", "only").await;
697 assert_eq!(completed, vec!["only".to_string()]);
698 }
699
700 #[test]
701 fn wait_policy_all_uses_derived_completed_set() {
702 let waited = vec!["a".to_string(), "b".to_string()];
703 assert!(!wait_policy_satisfied(
704 ChildWaitPolicy::All,
705 &waited,
706 &["a".to_string()],
707 "completed"
708 ));
709 assert!(wait_policy_satisfied(
710 ChildWaitPolicy::All,
711 &waited,
712 &["a".to_string(), "b".to_string()],
713 "completed"
714 ));
715 }
716
717 #[test]
718 fn child_final_assistant_text_returns_last_assistant() {
719 let mut session = Session::new("child-1", "gpt-4");
720 session.messages.push(Message::user("hi"));
721 session
722 .messages
723 .push(Message::assistant("first answer", None));
724 session.messages.push(Message::user("again"));
725 session
726 .messages
727 .push(Message::assistant("final answer", None));
728
729 assert_eq!(
730 child_final_assistant_text(&session).as_deref(),
731 Some("final answer")
732 );
733 }
734
735 #[test]
736 fn child_final_assistant_text_returns_none_when_blank() {
737 let mut session = Session::new("child-1", "gpt-4");
738 session.messages.push(Message::assistant(" ", None));
739 assert!(child_final_assistant_text(&session).is_none());
740 }
741
742 #[test]
743 fn child_final_assistant_text_returns_none_when_no_assistant() {
744 let mut session = Session::new("child-1", "gpt-4");
745 session.messages.push(Message::user("hi"));
746 assert!(child_final_assistant_text(&session).is_none());
747 }
748
749 #[test]
750 fn truncate_for_resume_passthrough_when_short() {
751 let s = "hello world";
752 assert_eq!(truncate_for_resume(s), s);
753 }
754
755 #[test]
756 fn truncate_for_resume_truncates_long_content() {
757 let long: String = "a".repeat(RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS + 100);
758 let truncated = truncate_for_resume(&long);
759 assert!(truncated.len() > RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS);
760 assert!(truncated.ends_with(RUNTIME_RESUME_CHILD_RESULT_TRUNCATION_MARKER));
761 }
762
763 #[test]
764 fn runtime_resume_message_includes_child_response_when_provided() {
765 let completion = make_completion("completed");
766 let message = runtime_resume_message(&completion, 0, Some("the answer is 42"));
767
768 assert!(matches!(message.role, Role::User));
769 assert!(message.never_compress);
770 assert!(message.content.contains("Child final response:"));
771 assert!(message.content.contains("the answer is 42"));
772
773 let metadata = message.metadata.expect("metadata present");
774 assert_eq!(
775 metadata.get("hidden_from_ui").and_then(|v| v.as_bool()),
776 Some(true)
777 );
778 assert_eq!(
779 metadata.get("runtime_kind").and_then(|v| v.as_str()),
780 Some("child_completion_resume")
781 );
782 assert_eq!(
783 metadata
784 .get("child_final_response_included")
785 .and_then(|v| v.as_bool()),
786 Some(true)
787 );
788 }
789
790 #[test]
791 fn runtime_resume_message_falls_back_to_error_when_no_response() {
792 let mut completion = make_completion("error");
793 completion.error = Some("boom".to_string());
794
795 let message = runtime_resume_message(&completion, 1, None);
796 assert!(message.content.contains("Child error:"));
797 assert!(message.content.contains("boom"));
798 let metadata = message.metadata.expect("metadata present");
799 assert_eq!(
800 metadata
801 .get("child_final_response_included")
802 .and_then(|v| v.as_bool()),
803 Some(false)
804 );
805 }
806
807 #[test]
808 fn runtime_resume_message_minimal_when_no_response_and_no_error() {
809 let completion = make_completion("completed");
810 let message = runtime_resume_message(&completion, 2, None);
811 assert!(!message.content.contains("Child final response:"));
812 assert!(!message.content.contains("Child error:"));
813 assert!(message.content.contains("Resume the parent task"));
814 }
815
816 #[test]
817 fn read_config_snapshot_refreshes_cached_snapshot_from_live_config() {
818 let runtime = tokio::runtime::Runtime::new().expect("runtime");
819
820 runtime.block_on(async {
821 let config = Arc::new(RwLock::new(Config::default()));
822 config.write().await.provider = "copilot".to_string();
823 let cached_config = StdRwLock::new(Config::default());
824
825 let snapshot = read_config_snapshot(&config, &cached_config);
826
827 assert_eq!(snapshot.provider, "copilot");
828 assert_eq!(
829 cached_config.read().expect("cached snapshot lock").provider,
830 "copilot"
831 );
832 });
833 }
834
835 #[test]
836 fn read_config_snapshot_uses_cached_snapshot_when_live_lock_is_busy() {
837 let runtime = tokio::runtime::Runtime::new().expect("runtime");
838
839 runtime.block_on(async {
840 let cached_snapshot = Config {
841 provider: "cached-provider".to_string(),
842 ..Default::default()
843 };
844
845 let config = Arc::new(RwLock::new(Config::default()));
846 let cached_config = StdRwLock::new(cached_snapshot);
847 let _write_guard = config.write().await;
848
849 let snapshot = read_config_snapshot(&config, &cached_config);
850
851 assert_eq!(snapshot.provider, "cached-provider");
852 });
853 }
854}