Skip to main content

bamboo_engine/session_app/
child_completion_coordinator.rs

1//! Child-session completion coordinator.
2//!
3//! Receives terminal child runner notifications from `bamboo-engine`, updates
4//! durable parent wait state, and resumes the parent when the configured wait
5//! policy is satisfied.
6
7use std::collections::HashMap;
8use std::sync::{Arc, OnceLock, RwLock as StdRwLock};
9use std::time::Duration;
10
11use crate::execution::{
12    create_event_forwarder, spawn_session_execution, try_reserve_runner, AgentRunner,
13    ChildCompletion, ChildCompletionHandler, RunnerReservation, SessionExecutionArgs,
14};
15use crate::Agent;
16use async_trait::async_trait;
17use bamboo_agent_core::storage::Storage;
18use bamboo_agent_core::tools::ToolExecutor;
19use bamboo_agent_core::{AgentEvent, Message, Role, Session, SessionKind};
20use bamboo_domain::session::runtime_state::{
21    AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState,
22};
23use bamboo_llm::{Config, ProviderModelRouter, ProviderRegistry};
24use bamboo_storage::LockedSessionStore;
25use chrono::Utc;
26use tokio::sync::{broadcast, RwLock};
27
28use crate::model_areas::resolve_global_area_models;
29use crate::model_config_helper::{
30    resolve_fast_model, resolve_gold_config, GOLD_CONFIG_METADATA_KEY,
31};
32use crate::session_app::provider_model::session_effective_model_ref;
33use crate::session_app::resume::{
34    resume_session_execution, ResumeExecutionPort, ResumeSpawnRequest,
35};
36use crate::session_app::types::{ResumeConfigSnapshot, ResumeOutcome};
37
38const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
39const RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: &str = "hidden_from_ui";
40const RUNTIME_RESUME_MESSAGE_KIND_KEY: &str = "runtime_kind";
41
42fn read_runtime_state(session: &Session) -> AgentRuntimeState {
43    session
44        .agent_runtime_state
45        .clone()
46        .or_else(|| {
47            session
48                .metadata
49                .get(AGENT_RUNTIME_STATE_METADATA_KEY)
50                .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
51        })
52        .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-child-wait", session.id)))
53}
54
55fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
56    session.agent_runtime_state = Some(runtime_state.clone());
57    if let Ok(serialized) = serde_json::to_string(runtime_state) {
58        session
59            .metadata
60            .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
61    }
62}
63
64fn is_error_like(status: &str) -> bool {
65    matches!(status, "error" | "timeout" | "cancelled")
66}
67
68/// Terminal child run statuses, as mirrored into the session index.
69fn is_terminal_child_status(status: &str) -> bool {
70    matches!(
71        status,
72        "completed" | "error" | "timeout" | "cancelled" | "skipped"
73    )
74}
75
76/// Reconstruct the set of completed child session ids for a parent from the
77/// session index (the single source of truth), folding in the child whose
78/// completion event is being processed so a momentarily-lagging index can never
79/// stall the parent's resume.
80async fn derive_completed_child_ids(
81    storage: &Arc<dyn Storage>,
82    parent_session_id: &str,
83    just_completed_child_id: &str,
84) -> Vec<String> {
85    let mut completed: Vec<String> = storage
86        .list_child_run_statuses(parent_session_id)
87        .await
88        .unwrap_or_default()
89        .into_iter()
90        .filter(|(_, status)| status.as_deref().is_some_and(is_terminal_child_status))
91        .map(|(id, _)| id)
92        .collect();
93    if !completed.iter().any(|id| id == just_completed_child_id) {
94        completed.push(just_completed_child_id.to_string());
95    }
96    completed.sort();
97    completed.dedup();
98    completed
99}
100
101fn read_config_snapshot(config: &Arc<RwLock<Config>>, cached_config: &StdRwLock<Config>) -> Config {
102    if let Ok(config_guard) = config.try_read() {
103        let snapshot = config_guard.clone();
104
105        if let Ok(mut cached_guard) = cached_config.try_write() {
106            *cached_guard = snapshot.clone();
107        }
108
109        snapshot
110    } else {
111        cached_config
112            .try_read()
113            .map(|guard| guard.clone())
114            .unwrap_or_default()
115    }
116}
117
118/// Per-parent async locks that serialize concurrent `on_child_completed`
119/// invocations for the same parent session.
120///
121/// Race eliminated: when `wait_for=Any` and two child sessions complete
122/// simultaneously, both invocations load the parent with
123/// `waiting_for_children=Some` before either persists the cleared state, so
124/// both pass `wait_policy_satisfied`, both clear `waiting_for_children`, add a
125/// duplicate resume message, and call `resume_parent` — a double resume.
126/// Holding this per-parent `tokio::sync::Mutex` across the load-check-save
127/// critical section makes the second caller observe the already-cleared state.
128///
129/// The inner `std::sync::Mutex` guards only the brief HashMap lookup/insert
130/// (no await inside); the per-parent `tokio::sync::Mutex` is the one held
131/// across the async critical section. Entries accumulate but are small
132/// (`Arc<tokio::sync::Mutex<()>>` ≈ 24 bytes) and bounded by the number of
133/// distinct parent sessions.
134fn parent_locks() -> &'static std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
135    static LOCKS: OnceLock<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
136        OnceLock::new();
137    LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
138}
139
140fn wait_policy_satisfied(
141    policy: ChildWaitPolicy,
142    wait_child_ids: &[String],
143    completed_child_ids: &[String],
144    latest_status: &str,
145) -> bool {
146    if wait_child_ids.is_empty() {
147        return false;
148    }
149
150    match policy {
151        ChildWaitPolicy::All => wait_child_ids
152            .iter()
153            .all(|id| completed_child_ids.iter().any(|completed| completed == id)),
154        ChildWaitPolicy::Any => completed_child_ids
155            .iter()
156            .any(|id| wait_child_ids.iter().any(|wait_id| wait_id == id)),
157        ChildWaitPolicy::FirstError => {
158            is_error_like(latest_status)
159                || wait_child_ids
160                    .iter()
161                    .all(|id| completed_child_ids.iter().any(|completed| completed == id))
162        }
163    }
164}
165
166/// Maximum number of characters of the child's final assistant content that
167/// will be embedded into the hidden runtime resume message. Anything longer is
168/// truncated with a marker so the root LLM can still call `SubAgent.get` to
169/// fetch the full child output if needed.
170const RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS: usize = 4000;
171const RUNTIME_RESUME_CHILD_RESULT_TRUNCATION_MARKER: &str =
172    "\n\n[... child final response truncated; call SubAgent.get for full content ...]";
173
174/// Extract the child session's last assistant content, if any. Returns `None`
175/// when the child produced no assistant message (e.g. errored before the first
176/// model response, or only emitted tool messages).
177fn child_final_assistant_text(child: &Session) -> Option<String> {
178    child
179        .messages
180        .iter()
181        .rev()
182        .find(|message| matches!(message.role, Role::Assistant))
183        .map(|message| message.content.clone())
184        .filter(|content| !content.trim().is_empty())
185}
186
187fn truncate_for_resume(content: &str) -> String {
188    if content.chars().count() <= RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS {
189        return content.to_string();
190    }
191    let head: String = content
192        .chars()
193        .take(RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS)
194        .collect();
195    format!("{head}{RUNTIME_RESUME_CHILD_RESULT_TRUNCATION_MARKER}")
196}
197
198fn runtime_resume_message(
199    completion: &ChildCompletion,
200    remaining_children: usize,
201    child_final_response: Option<&str>,
202) -> Message {
203    let mut body = format!(
204        "Runtime notification: child session `{}` finished with status `{}`. Remaining child sessions: {}.",
205        completion.child_session_id, completion.status, remaining_children
206    );
207
208    let truncated_response = child_final_response.map(truncate_for_resume);
209    if let Some(response) = truncated_response.as_deref() {
210        body.push_str("\n\nChild final response:\n");
211        body.push_str(response);
212    } else if let Some(error) = completion.error.as_deref() {
213        if !error.is_empty() {
214            body.push_str("\n\nChild error:\n");
215            body.push_str(error);
216        }
217    }
218
219    body.push_str(
220        "\n\nResume the parent task using this child result and continue from the previous plan. \
221         If you need the full child transcript, call SubAgent.get(child_session_id).",
222    );
223
224    let mut message = Message::user(body);
225    message.metadata = Some(serde_json::json!({
226        RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
227        RUNTIME_RESUME_MESSAGE_KIND_KEY: "child_completion_resume",
228        "child_session_id": completion.child_session_id,
229        "child_status": completion.status,
230        "child_error": completion.error,
231        "completed_at": completion.completed_at,
232        "child_final_response_included": truncated_response.is_some(),
233    }));
234    message.never_compress = true;
235    message
236}
237
238#[derive(Clone)]
239pub struct ChildCompletionCoordinator {
240    storage: Arc<dyn Storage>,
241    persistence: Arc<bamboo_storage::LockedSessionStore>,
242    sessions: crate::SessionCache,
243    agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
244    session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
245    agent: Arc<Agent>,
246    config: Arc<RwLock<Config>>,
247    provider_registry: Arc<ProviderRegistry>,
248    provider_router: Arc<ProviderModelRouter>,
249    app_data_dir: std::path::PathBuf,
250    account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
251    root_tools: Arc<RwLock<Option<Arc<dyn ToolExecutor>>>>,
252}
253
254impl ChildCompletionCoordinator {
255    #[allow(clippy::too_many_arguments)]
256    pub fn new(
257        storage: Arc<dyn Storage>,
258        persistence: Arc<LockedSessionStore>,
259        sessions: crate::SessionCache,
260        agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
261        session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
262        agent: Arc<Agent>,
263        config: Arc<RwLock<Config>>,
264        provider_registry: Arc<ProviderRegistry>,
265        provider_router: Arc<ProviderModelRouter>,
266        app_data_dir: std::path::PathBuf,
267        account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
268    ) -> Self {
269        Self {
270            storage,
271            persistence,
272            sessions,
273            agent_runners,
274            session_event_senders,
275            agent,
276            config,
277            provider_registry,
278            provider_router,
279            app_data_dir,
280            account_feed_inbox,
281            root_tools: Arc::new(RwLock::new(None)),
282        }
283    }
284
285    pub async fn set_root_tools(&self, tools: Arc<dyn ToolExecutor>) {
286        *self.root_tools.write().await = Some(tools);
287    }
288
289    fn build_resume_config(
290        &self,
291        session: &Session,
292        config_snapshot: &Config,
293    ) -> ResumeConfigSnapshot {
294        crate::session_app::resolution::resolve_resume_config_snapshot(
295            config_snapshot,
296            &self.provider_registry,
297            session,
298            None,
299        )
300    }
301
302    async fn resume_parent(&self, parent_session_id: String) {
303        for attempt in 0..=5u8 {
304            if attempt > 0 {
305                tokio::time::sleep(Duration::from_millis(250 * attempt as u64)).await;
306            }
307
308            let Some(session) = self.load_session(&parent_session_id).await else {
309                tracing::warn!(%parent_session_id, "cannot resume parent after child completion: session not found");
310                return;
311            };
312            let config_snapshot = self.config.read().await.clone();
313            let resume_config = self.build_resume_config(&session, &config_snapshot);
314            let outcome = resume_session_execution(self, &parent_session_id, resume_config).await;
315            tracing::info!(
316                %parent_session_id,
317                attempt,
318                outcome = outcome.as_str(),
319                "child completion requested parent resume"
320            );
321
322            if !matches!(outcome, ResumeOutcome::AlreadyRunning { .. }) {
323                return;
324            }
325        }
326    }
327
328    async fn save_and_cache(&self, session: &mut Session) {
329        if let Err(error) = self.persistence.merge_save_runtime(session).await {
330            tracing::warn!(session_id = %session.id, %error, "failed to persist session");
331        }
332        self.sessions.insert(
333            session.id.clone(),
334            Arc::new(parking_lot::RwLock::new(session.clone())),
335        );
336    }
337}
338
339#[async_trait]
340impl ChildCompletionHandler for ChildCompletionCoordinator {
341    async fn on_child_completed(&self, completion: ChildCompletion) {
342        // Acquire a per-parent async lock to eliminate the concurrent
343        // double-resume race (see `parent_locks` for the full scenario). The
344        // inner std::sync::Mutex is released immediately so no sync lock is
345        // held across the await that follows.
346        let per_parent = {
347            let mut map = parent_locks().lock().expect("parent lock map poisoned");
348            map.entry(completion.parent_session_id.clone())
349                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
350                .clone()
351        };
352        let _per_parent_guard = per_parent.lock().await;
353
354        let Some(mut parent) = self.load_session(&completion.parent_session_id).await else {
355            tracing::warn!(
356                parent_session_id = %completion.parent_session_id,
357                child_session_id = %completion.child_session_id,
358                "child completion received for missing parent"
359            );
360            return;
361        };
362
363        if parent.kind != SessionKind::Root {
364            tracing::warn!(
365                parent_session_id = %completion.parent_session_id,
366                child_session_id = %completion.child_session_id,
367                "child completion parent is not a root session"
368            );
369            return;
370        }
371
372        let mut runtime_state = read_runtime_state(&parent);
373
374        // Single source of truth: reconstruct the completed-child set from the
375        // session index rather than from a denormalized copy on the parent file.
376        let completed_child_ids = derive_completed_child_ids(
377            &self.storage,
378            &completion.parent_session_id,
379            &completion.child_session_id,
380        )
381        .await;
382
383        let mut should_resume = false;
384        let mut remaining_children = 0usize;
385        if let Some(wait) = runtime_state.waiting_for_children.clone() {
386            remaining_children = wait
387                .child_session_ids
388                .iter()
389                .filter(|id| !completed_child_ids.iter().any(|completed| completed == *id))
390                .count();
391            should_resume = wait_policy_satisfied(
392                wait.wait_for,
393                &wait.child_session_ids,
394                &completed_child_ids,
395                &completion.status,
396            );
397            if should_resume {
398                runtime_state.waiting_for_children = None;
399                runtime_state.status = AgentStatusState::Idle;
400                runtime_state.suspension = None;
401            }
402        }
403
404        if should_resume {
405            parent.metadata.remove("runtime.suspend_reason");
406            // Best-effort: load the child session so we can include its final
407            // assistant content in the hidden resume message. This avoids the
408            // root LLM having to make an extra `SubAgent.get` call after
409            // resume just to read what the child concluded — which would cost
410            // an additional LLM round trip.
411            let child_final_response = match self
412                .storage
413                .load_session(&completion.child_session_id)
414                .await
415            {
416                Ok(Some(child)) => child_final_assistant_text(&child),
417                Ok(None) => None,
418                Err(error) => {
419                    tracing::warn!(
420                        child_session_id = %completion.child_session_id,
421                        %error,
422                        "failed to load child session for runtime resume message"
423                    );
424                    None
425                }
426            };
427            parent.add_message(runtime_resume_message(
428                &completion,
429                remaining_children,
430                child_final_response.as_deref(),
431            ));
432        } else if runtime_state.waiting_for_children.is_some() {
433            runtime_state.status = AgentStatusState::Suspended;
434            runtime_state.suspension = Some(SuspensionState {
435                reason: "waiting_for_children".to_string(),
436                suspended_at: Utc::now(),
437                resumable: true,
438                hook_point: Some("ChildCompletion".to_string()),
439            });
440        }
441
442        parent.updated_at = Utc::now();
443        write_runtime_state(&mut parent, &runtime_state);
444        self.save_and_cache(&mut parent).await;
445
446        // Capture before releasing the per-parent lock so the borrow checker
447        // is satisfied; `resume_parent` has its own retry loop and should not
448        // hold the per-parent lock (it would block other completions for the
449        // same parent, and the state is already durably settled above).
450        let resume_parent_id = parent.id.clone();
451        drop(_per_parent_guard);
452
453        if should_resume {
454            self.resume_parent(resume_parent_id).await;
455        }
456    }
457}
458
459#[async_trait]
460impl ResumeExecutionPort for ChildCompletionCoordinator {
461    async fn load_session(&self, session_id: &str) -> Option<Session> {
462        match self.storage.load_session(session_id).await {
463            Ok(Some(session)) => Some(session),
464            Ok(None) => self
465                .sessions
466                .get(session_id)
467                .map(|e| e.value().clone())
468                .map(|arc| arc.read().clone()),
469            Err(error) => {
470                tracing::warn!(%session_id, %error, "failed to load session from storage");
471                self.sessions
472                    .get(session_id)
473                    .map(|e| e.value().clone())
474                    .map(|arc| arc.read().clone())
475            }
476        }
477    }
478
479    async fn save_and_cache_session(&self, session: &mut Session) {
480        self.save_and_cache(session).await;
481    }
482
483    async fn try_reserve_runner(
484        &self,
485        session_id: &str,
486        event_sender: &broadcast::Sender<AgentEvent>,
487    ) -> Option<RunnerReservation> {
488        try_reserve_runner(&self.agent_runners, session_id, event_sender).await
489    }
490
491    async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String> {
492        let runners = self.agent_runners.read().await;
493        runners.get(session_id).map(|r| r.run_id.clone())
494    }
495
496    async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent> {
497        crate::execution::session_events::get_or_create_event_sender(
498            &self.session_event_senders,
499            session_id,
500        )
501        .await
502    }
503
504    async fn spawn_resume_execution(&self, request: ResumeSpawnRequest) {
505        let ResumeSpawnRequest {
506            session_id,
507            session,
508            cancel_token,
509            run_id: _,
510            event_sender,
511            config,
512        } = request;
513
514        let Some(root_tools) = self.root_tools.read().await.clone() else {
515            tracing::error!(%session_id, "cannot resume parent after child completion: root tool surface is not initialized");
516            return;
517        };
518
519        let model = session.model.clone();
520        let resolved_provider_name = session_effective_model_ref(&session)
521            .map(|model_ref| model_ref.provider)
522            .unwrap_or(config.provider_name);
523        let provider_override = session_effective_model_ref(&session)
524            .and_then(|model_ref| match self.provider_router.route(&model_ref) {
525                Ok(provider) => Some(provider),
526                Err(error) => {
527                    tracing::warn!(
528                        session_id = %session_id,
529                        provider = %model_ref.provider,
530                        model = %model_ref.model,
531                        error = %error,
532                        "failed to resolve provider override for child-completion parent resume; falling back to runtime provider"
533                    );
534                    None
535                }
536            });
537        let config_snapshot = self.config.read().await.clone();
538        let resolved_fast_provider = resolve_fast_model(
539            &config_snapshot,
540            &resolved_provider_name,
541            &self.provider_registry,
542        )
543        .map(|model| model.provider);
544        let reasoning_effort = session.reasoning_effort;
545        let reasoning_effort_source = session
546            .metadata
547            .get("reasoning_effort_source")
548            .cloned()
549            .unwrap_or_default();
550        let gold_config = resolve_gold_config(
551            &config_snapshot,
552            session
553                .metadata
554                .get(GOLD_CONFIG_METADATA_KEY)
555                .map(String::as_str),
556        )
557        .or(config.gold_config.clone());
558
559        let (mpsc_tx, _forwarder) = create_event_forwarder(
560            session_id.clone(),
561            event_sender,
562            self.agent_runners.clone(),
563            self.account_feed_inbox.clone(),
564        );
565
566        let config_handle = self.config.clone();
567        let cached_config = Arc::new(StdRwLock::new(config_snapshot.clone()));
568        let provider_registry = self.provider_registry.clone();
569        let provider_name_for_aux = resolved_provider_name.clone();
570        let auxiliary_model_resolver = std::sync::Arc::new(move || {
571            let config_snapshot = read_config_snapshot(&config_handle, cached_config.as_ref());
572            // Auxiliary models are global (config-derived), never session-bound.
573            let areas = resolve_global_area_models(
574                &config_snapshot,
575                &provider_name_for_aux,
576                &provider_registry,
577            );
578            crate::AuxiliaryModelConfig {
579                fast_model_name: areas.fast.as_ref().map(|m| m.model_name.clone()),
580                fast_model_provider: areas.fast.map(|m| m.provider),
581                background_model_name: areas.background.as_ref().map(|m| m.model_name.clone()),
582                planning_model_name: None,
583                search_model_name: None,
584                summarization_model_name: areas
585                    .summarization
586                    .as_ref()
587                    .map(|m| m.model_name.clone()),
588                background_model_provider: areas.background.map(|m| m.provider),
589                summarization_model_provider: areas.summarization.map(|m| m.provider),
590            }
591        });
592        let model_roster = crate::ModelRoster {
593            model: Some(model),
594            provider_name: Some(resolved_provider_name),
595            provider_type: config.provider_type.clone(),
596            fast: crate::RoleModel::from_parts(config.fast_model, resolved_fast_provider),
597            background: crate::RoleModel::from_parts(
598                config.background_model,
599                config.background_model_provider,
600            ),
601            summarization: crate::RoleModel::from_parts(
602                config.summarization_model,
603                config.summarization_model_provider,
604            ),
605        };
606        spawn_session_execution(SessionExecutionArgs {
607            agent: self.agent.clone(),
608            session_id,
609            session,
610            tools_override: Some(root_tools),
611            provider_override,
612            model_roster,
613            reasoning_effort,
614            reasoning_effort_source,
615            auxiliary_model_resolver: Some(auxiliary_model_resolver),
616            disabled_tools: Some(config.disabled_tools),
617            disabled_skill_ids: Some(config.disabled_skill_ids),
618            selected_skill_ids: None,
619            selected_skill_mode: None,
620            cancel_token,
621            mpsc_tx,
622            image_fallback: config.image_fallback,
623            gold_config,
624            app_data_dir: Some(self.app_data_dir.clone()),
625            runners: self.agent_runners.clone(),
626            sessions_cache: self.sessions.clone(),
627            on_complete: None,
628        });
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635    use bamboo_agent_core::Message;
636
637    fn make_completion(status: &str) -> ChildCompletion {
638        ChildCompletion {
639            parent_session_id: "parent-1".to_string(),
640            child_session_id: "child-1".to_string(),
641            status: status.to_string(),
642            error: None,
643            completed_at: Utc::now(),
644        }
645    }
646
647    // ── ② derive completed children from the index ──────────────────────
648
649    struct StubChildIndex {
650        children: Vec<(String, Option<String>)>,
651    }
652
653    #[async_trait]
654    impl Storage for StubChildIndex {
655        async fn save_session(&self, _session: &Session) -> std::io::Result<()> {
656            Ok(())
657        }
658        async fn load_session(&self, _id: &str) -> std::io::Result<Option<Session>> {
659            Ok(None)
660        }
661        async fn delete_session(&self, _id: &str) -> std::io::Result<bool> {
662            Ok(false)
663        }
664        async fn list_child_run_statuses(
665            &self,
666            _parent_session_id: &str,
667        ) -> std::io::Result<Vec<(String, Option<String>)>> {
668            Ok(self.children.clone())
669        }
670    }
671
672    #[tokio::test]
673    async fn derive_completed_only_includes_terminal_children() {
674        let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
675            children: vec![
676                ("a".into(), Some("completed".into())),
677                ("b".into(), Some("running".into())),
678                ("c".into(), Some("error".into())),
679                ("d".into(), None),
680            ],
681        });
682        let completed = derive_completed_child_ids(&storage, "parent-1", "b").await;
683        // Terminal from index: a, c. Plus the just-completed child b folded in.
684        assert_eq!(
685            completed,
686            vec!["a".to_string(), "b".to_string(), "c".to_string()]
687        );
688    }
689
690    #[tokio::test]
691    async fn derive_completed_folds_in_just_completed_when_index_lags() {
692        // Index hasn't caught up — reports the child as still running.
693        let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
694            children: vec![("only".into(), Some("running".into()))],
695        });
696        let completed = derive_completed_child_ids(&storage, "parent-1", "only").await;
697        assert_eq!(completed, vec!["only".to_string()]);
698    }
699
700    #[test]
701    fn wait_policy_all_uses_derived_completed_set() {
702        let waited = vec!["a".to_string(), "b".to_string()];
703        assert!(!wait_policy_satisfied(
704            ChildWaitPolicy::All,
705            &waited,
706            &["a".to_string()],
707            "completed"
708        ));
709        assert!(wait_policy_satisfied(
710            ChildWaitPolicy::All,
711            &waited,
712            &["a".to_string(), "b".to_string()],
713            "completed"
714        ));
715    }
716
717    #[test]
718    fn child_final_assistant_text_returns_last_assistant() {
719        let mut session = Session::new("child-1", "gpt-4");
720        session.messages.push(Message::user("hi"));
721        session
722            .messages
723            .push(Message::assistant("first answer", None));
724        session.messages.push(Message::user("again"));
725        session
726            .messages
727            .push(Message::assistant("final answer", None));
728
729        assert_eq!(
730            child_final_assistant_text(&session).as_deref(),
731            Some("final answer")
732        );
733    }
734
735    #[test]
736    fn child_final_assistant_text_returns_none_when_blank() {
737        let mut session = Session::new("child-1", "gpt-4");
738        session.messages.push(Message::assistant("   ", None));
739        assert!(child_final_assistant_text(&session).is_none());
740    }
741
742    #[test]
743    fn child_final_assistant_text_returns_none_when_no_assistant() {
744        let mut session = Session::new("child-1", "gpt-4");
745        session.messages.push(Message::user("hi"));
746        assert!(child_final_assistant_text(&session).is_none());
747    }
748
749    #[test]
750    fn truncate_for_resume_passthrough_when_short() {
751        let s = "hello world";
752        assert_eq!(truncate_for_resume(s), s);
753    }
754
755    #[test]
756    fn truncate_for_resume_truncates_long_content() {
757        let long: String = "a".repeat(RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS + 100);
758        let truncated = truncate_for_resume(&long);
759        assert!(truncated.len() > RUNTIME_RESUME_CHILD_RESULT_MAX_CHARS);
760        assert!(truncated.ends_with(RUNTIME_RESUME_CHILD_RESULT_TRUNCATION_MARKER));
761    }
762
763    #[test]
764    fn runtime_resume_message_includes_child_response_when_provided() {
765        let completion = make_completion("completed");
766        let message = runtime_resume_message(&completion, 0, Some("the answer is 42"));
767
768        assert!(matches!(message.role, Role::User));
769        assert!(message.never_compress);
770        assert!(message.content.contains("Child final response:"));
771        assert!(message.content.contains("the answer is 42"));
772
773        let metadata = message.metadata.expect("metadata present");
774        assert_eq!(
775            metadata.get("hidden_from_ui").and_then(|v| v.as_bool()),
776            Some(true)
777        );
778        assert_eq!(
779            metadata.get("runtime_kind").and_then(|v| v.as_str()),
780            Some("child_completion_resume")
781        );
782        assert_eq!(
783            metadata
784                .get("child_final_response_included")
785                .and_then(|v| v.as_bool()),
786            Some(true)
787        );
788    }
789
790    #[test]
791    fn runtime_resume_message_falls_back_to_error_when_no_response() {
792        let mut completion = make_completion("error");
793        completion.error = Some("boom".to_string());
794
795        let message = runtime_resume_message(&completion, 1, None);
796        assert!(message.content.contains("Child error:"));
797        assert!(message.content.contains("boom"));
798        let metadata = message.metadata.expect("metadata present");
799        assert_eq!(
800            metadata
801                .get("child_final_response_included")
802                .and_then(|v| v.as_bool()),
803            Some(false)
804        );
805    }
806
807    #[test]
808    fn runtime_resume_message_minimal_when_no_response_and_no_error() {
809        let completion = make_completion("completed");
810        let message = runtime_resume_message(&completion, 2, None);
811        assert!(!message.content.contains("Child final response:"));
812        assert!(!message.content.contains("Child error:"));
813        assert!(message.content.contains("Resume the parent task"));
814    }
815
816    #[test]
817    fn read_config_snapshot_refreshes_cached_snapshot_from_live_config() {
818        let runtime = tokio::runtime::Runtime::new().expect("runtime");
819
820        runtime.block_on(async {
821            let config = Arc::new(RwLock::new(Config::default()));
822            config.write().await.provider = "copilot".to_string();
823            let cached_config = StdRwLock::new(Config::default());
824
825            let snapshot = read_config_snapshot(&config, &cached_config);
826
827            assert_eq!(snapshot.provider, "copilot");
828            assert_eq!(
829                cached_config.read().expect("cached snapshot lock").provider,
830                "copilot"
831            );
832        });
833    }
834
835    #[test]
836    fn read_config_snapshot_uses_cached_snapshot_when_live_lock_is_busy() {
837        let runtime = tokio::runtime::Runtime::new().expect("runtime");
838
839        runtime.block_on(async {
840            let cached_snapshot = Config {
841                provider: "cached-provider".to_string(),
842                ..Default::default()
843            };
844
845            let config = Arc::new(RwLock::new(Config::default()));
846            let cached_config = StdRwLock::new(cached_snapshot);
847            let _write_guard = config.write().await;
848
849            let snapshot = read_config_snapshot(&config, &cached_config);
850
851            assert_eq!(snapshot.provider, "cached-provider");
852        });
853    }
854}