Skip to main content

bamboo_engine/session_app/
child_completion_coordinator.rs

1//! Child-session completion coordinator.
2//!
3//! Receives terminal child runner notifications from `bamboo-engine`, updates
4//! durable parent wait state, and resumes the parent when the configured wait
5//! policy is satisfied.
6
7use std::collections::HashMap;
8use std::sync::{Arc, OnceLock, RwLock as StdRwLock};
9use std::time::Duration;
10
11use crate::execution::{
12    create_event_forwarder, spawn_session_execution, try_reserve_runner, AgentRunner,
13    ChildCompletion, ChildCompletionHandler, RunnerReservation, SessionExecutionArgs,
14};
15use crate::runtime::config::GuardianSpawner;
16use crate::runtime::guardian_state::{
17    parse_guardian_verdict, read_guardian_config, read_guardian_state, write_guardian_state,
18    GuardianVerdict,
19};
20use crate::Agent;
21use async_trait::async_trait;
22use bamboo_agent_core::storage::Storage;
23use bamboo_agent_core::tools::ToolExecutor;
24use bamboo_agent_core::{AgentEvent, Message, Role, Session};
25use bamboo_domain::session::runtime_state::{
26    AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState,
27};
28use bamboo_llm::{Config, ProviderModelRouter, ProviderRegistry};
29use bamboo_storage::LockedSessionStore;
30use chrono::Utc;
31use tokio::sync::{broadcast, RwLock};
32
33use crate::model_areas::resolve_global_area_models;
34use crate::model_config_helper::{
35    resolve_fast_model, resolve_gold_config, GOLD_CONFIG_METADATA_KEY,
36};
37use crate::session_app::provider_model::session_effective_model_ref;
38use crate::session_app::resume::{
39    resume_session_execution, ResumeExecutionPort, ResumeSpawnRequest,
40};
41use crate::session_app::types::{ResumeConfigSnapshot, ResumeOutcome};
42
43const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
44const RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: &str = "hidden_from_ui";
45const RUNTIME_RESUME_MESSAGE_KIND_KEY: &str = "runtime_kind";
46
47fn read_runtime_state(session: &Session) -> AgentRuntimeState {
48    session
49        .agent_runtime_state
50        .clone()
51        .or_else(|| {
52            session
53                .metadata
54                .get(AGENT_RUNTIME_STATE_METADATA_KEY)
55                .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
56        })
57        .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-child-wait", session.id)))
58}
59
60fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
61    session.agent_runtime_state = Some(runtime_state.clone());
62    if let Ok(serialized) = serde_json::to_string(runtime_state) {
63        session
64            .metadata
65            .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
66    }
67}
68
69fn is_error_like(status: &str) -> bool {
70    matches!(status, "error" | "timeout" | "cancelled")
71}
72
73/// Terminal child run statuses, as mirrored into the session index.
74fn is_terminal_child_status(status: &str) -> bool {
75    matches!(
76        status,
77        "completed" | "error" | "timeout" | "cancelled" | "skipped"
78    )
79}
80
81/// Reconstruct the set of completed child session ids for a parent from the
82/// session index (the single source of truth), folding in the child whose
83/// completion event is being processed so a momentarily-lagging index can never
84/// stall the parent's resume.
85async fn derive_completed_child_ids(
86    storage: &Arc<dyn Storage>,
87    parent_session_id: &str,
88    just_completed_child_id: &str,
89) -> Vec<String> {
90    let mut completed: Vec<String> = storage
91        .list_child_run_statuses(parent_session_id)
92        .await
93        .unwrap_or_default()
94        .into_iter()
95        .filter(|(_, status)| status.as_deref().is_some_and(is_terminal_child_status))
96        .map(|(id, _)| id)
97        .collect();
98    if !completed.iter().any(|id| id == just_completed_child_id) {
99        completed.push(just_completed_child_id.to_string());
100    }
101    completed.sort();
102    completed.dedup();
103    completed
104}
105
106fn read_config_snapshot(config: &Arc<RwLock<Config>>, cached_config: &StdRwLock<Config>) -> Config {
107    if let Ok(config_guard) = config.try_read() {
108        let snapshot = config_guard.clone();
109
110        if let Ok(mut cached_guard) = cached_config.try_write() {
111            *cached_guard = snapshot.clone();
112        }
113
114        snapshot
115    } else {
116        cached_config
117            .try_read()
118            .map(|guard| guard.clone())
119            .unwrap_or_default()
120    }
121}
122
123/// Per-parent async locks that serialize concurrent `on_child_completed`
124/// invocations for the same parent session.
125///
126/// Race eliminated: when `wait_for=Any` and two child sessions complete
127/// simultaneously, both invocations load the parent with
128/// `waiting_for_children=Some` before either persists the cleared state, so
129/// both pass `wait_policy_satisfied`, both clear `waiting_for_children`, add a
130/// duplicate resume message, and call `resume_parent` — a double resume.
131/// Holding this per-parent `tokio::sync::Mutex` across the load-check-save
132/// critical section makes the second caller observe the already-cleared state.
133///
134/// The inner `std::sync::Mutex` guards only the brief HashMap lookup/insert
135/// (no await inside); the per-parent `tokio::sync::Mutex` is the one held
136/// across the async critical section. Entries accumulate but are small
137/// (`Arc<tokio::sync::Mutex<()>>` ≈ 24 bytes) and bounded by the number of
138/// distinct parent sessions.
139fn parent_locks() -> &'static std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
140    static LOCKS: OnceLock<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
141        OnceLock::new();
142    LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
143}
144
145fn wait_policy_satisfied(
146    policy: ChildWaitPolicy,
147    wait_child_ids: &[String],
148    completed_child_ids: &[String],
149    latest_status: &str,
150) -> bool {
151    if wait_child_ids.is_empty() {
152        return false;
153    }
154
155    match policy {
156        ChildWaitPolicy::All => wait_child_ids
157            .iter()
158            .all(|id| completed_child_ids.iter().any(|completed| completed == id)),
159        ChildWaitPolicy::Any => completed_child_ids
160            .iter()
161            .any(|id| wait_child_ids.iter().any(|wait_id| wait_id == id)),
162        ChildWaitPolicy::FirstError => {
163            is_error_like(latest_status)
164                || wait_child_ids
165                    .iter()
166                    .all(|id| completed_child_ids.iter().any(|completed| completed == id))
167        }
168    }
169}
170
171/// Extract the child session's last assistant content, if any. Returns `None`
172/// when the child produced no assistant message (e.g. errored before the first
173/// model response, or only emitted tool messages).
174fn child_final_assistant_text(child: &Session) -> Option<String> {
175    child
176        .messages
177        .iter()
178        .rev()
179        .find(|message| matches!(message.role, Role::Assistant))
180        .map(|message| message.content.clone())
181        .filter(|content| !content.trim().is_empty())
182}
183
184fn runtime_resume_message(
185    completion: &ChildCompletion,
186    remaining_children: usize,
187    child_final_response: Option<&str>,
188) -> Message {
189    let mut body = format!(
190        "Runtime notification: child session `{}` finished with status `{}`. Remaining child sessions: {}.",
191        completion.child_session_id, completion.status, remaining_children
192    );
193
194    // Fold the child's full final response back into the parent — no
195    // truncation. Sub-agents are first-class agents whose complete conclusion
196    // should be available to the parent without an extra `SubAgent.get` round
197    // trip. The message is left compressible (see `never_compress` below) so a
198    // long transcript can still be reclaimed under parent compaction.
199    let final_response = child_final_response.map(str::to_string);
200    if let Some(response) = final_response.as_deref() {
201        body.push_str("\n\nChild final response:\n");
202        body.push_str(response);
203    } else if let Some(error) = completion.error.as_deref() {
204        if !error.is_empty() {
205            body.push_str("\n\nChild error:\n");
206            body.push_str(error);
207        }
208    }
209
210    body.push_str(
211        "\n\nResume the parent task using this child result and continue from the previous plan. \
212         If you need the full child transcript, call SubAgent.get(child_session_id).",
213    );
214
215    let mut message = Message::user(body);
216    message.metadata = Some(serde_json::json!({
217        RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
218        RUNTIME_RESUME_MESSAGE_KIND_KEY: "child_completion_resume",
219        "child_session_id": completion.child_session_id,
220        "child_status": completion.status,
221        "child_error": completion.error,
222        "completed_at": completion.completed_at,
223        "child_final_response_included": final_response.is_some(),
224    }));
225    // Allow parent-side compaction to reclaim this (now untruncated) message if
226    // the parent context grows — important once children nest and fold full
227    // results upward. The `SubAgent.get` hint preserves recoverability.
228    message.never_compress = false;
229    message
230}
231
232/// The hidden resume message for a completed **guardian** review: a directive,
233/// verdict-tailored note that carries the reviewer's findings straight into the
234/// parent (so it can act without a `SubAgent.get`), mirroring
235/// [`runtime_resume_message`]'s hidden/compressible shape.
236fn guardian_resume_message(completion: &ChildCompletion, verdict: &GuardianVerdict) -> Message {
237    let mut body = if verdict.approve {
238        String::from(
239            "Guardian review APPROVED: an independent reviewer verified the work and found no blocking issues. You may finalize the task.",
240        )
241    } else {
242        String::from(
243            "Guardian review REJECTED: an independent reviewer found issues. Address every finding below before completing — do NOT declare the task complete until they are resolved.",
244        )
245    };
246    if let Some(summary) = verdict.summary.as_deref().filter(|s| !s.trim().is_empty()) {
247        body.push_str("\n\nReviewer summary: ");
248        body.push_str(summary);
249    }
250    if !verdict.findings.is_empty() {
251        body.push_str("\n\nFindings:");
252        for (idx, finding) in verdict.findings.iter().enumerate() {
253            body.push_str(&format!("\n{}. {}", idx + 1, finding));
254        }
255    }
256    body.push_str(
257        "\n\nIf you need the full guardian transcript, call SubAgent.get(child_session_id).",
258    );
259
260    let mut message = Message::user(body);
261    message.metadata = Some(serde_json::json!({
262        RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
263        RUNTIME_RESUME_MESSAGE_KIND_KEY: "guardian_review_resume",
264        "child_session_id": completion.child_session_id,
265        "child_status": completion.status,
266        "guardian_approved": verdict.approve,
267        "completed_at": completion.completed_at,
268    }));
269    message.never_compress = false;
270    message
271}
272
273#[derive(Clone)]
274pub struct ChildCompletionCoordinator {
275    storage: Arc<dyn Storage>,
276    persistence: Arc<bamboo_storage::LockedSessionStore>,
277    sessions: crate::SessionCache,
278    agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
279    session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
280    agent: Arc<Agent>,
281    config: Arc<RwLock<Config>>,
282    provider_registry: Arc<ProviderRegistry>,
283    provider_router: Arc<ProviderModelRouter>,
284    app_data_dir: std::path::PathBuf,
285    account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
286    root_tools: Arc<RwLock<Option<Arc<dyn ToolExecutor>>>>,
287    /// Late-bound guardian reviewer spawner, set post-construction by the server
288    /// (mirrors `root_tools`). Re-injected into resumed runs so a guardian's
289    /// reject→fix verdict can be re-reviewed across the suspend/resume boundary.
290    guardian_spawner: Arc<RwLock<Option<Arc<dyn GuardianSpawner>>>>,
291}
292
293impl ChildCompletionCoordinator {
294    #[allow(clippy::too_many_arguments)]
295    pub fn new(
296        storage: Arc<dyn Storage>,
297        persistence: Arc<LockedSessionStore>,
298        sessions: crate::SessionCache,
299        agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
300        session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
301        agent: Arc<Agent>,
302        config: Arc<RwLock<Config>>,
303        provider_registry: Arc<ProviderRegistry>,
304        provider_router: Arc<ProviderModelRouter>,
305        app_data_dir: std::path::PathBuf,
306        account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
307    ) -> Self {
308        Self {
309            storage,
310            persistence,
311            sessions,
312            agent_runners,
313            session_event_senders,
314            agent,
315            config,
316            provider_registry,
317            provider_router,
318            app_data_dir,
319            account_feed_inbox,
320            root_tools: Arc::new(RwLock::new(None)),
321            guardian_spawner: Arc::new(RwLock::new(None)),
322        }
323    }
324
325    pub async fn set_root_tools(&self, tools: Arc<dyn ToolExecutor>) {
326        *self.root_tools.write().await = Some(tools);
327    }
328
329    /// Wire the guardian reviewer spawner (server-provided), so resumed runs can
330    /// re-spawn a guardian to re-review a fix after a reject verdict.
331    pub async fn set_guardian_spawner(&self, spawner: Arc<dyn GuardianSpawner>) {
332        *self.guardian_spawner.write().await = Some(spawner);
333    }
334
335    fn build_resume_config(
336        &self,
337        session: &Session,
338        config_snapshot: &Config,
339    ) -> ResumeConfigSnapshot {
340        crate::session_app::resolution::resolve_resume_config_snapshot(
341            config_snapshot,
342            &self.provider_registry,
343            session,
344            None,
345        )
346    }
347
348    async fn resume_parent(&self, parent_session_id: String) {
349        for attempt in 0..=5u8 {
350            if attempt > 0 {
351                tokio::time::sleep(Duration::from_millis(250 * attempt as u64)).await;
352            }
353
354            let Some(session) = self.load_session(&parent_session_id).await else {
355                tracing::warn!(%parent_session_id, "cannot resume parent after child completion: session not found");
356                return;
357            };
358            let config_snapshot = self.config.read().await.clone();
359            let resume_config = self.build_resume_config(&session, &config_snapshot);
360            let outcome = resume_session_execution(self, &parent_session_id, resume_config).await;
361            tracing::info!(
362                %parent_session_id,
363                attempt,
364                outcome = outcome.as_str(),
365                "child completion requested parent resume"
366            );
367
368            if !matches!(outcome, ResumeOutcome::AlreadyRunning { .. }) {
369                return;
370            }
371        }
372    }
373
374    async fn save_and_cache(&self, session: &mut Session) {
375        if let Err(error) = self.persistence.merge_save_runtime(session).await {
376            tracing::warn!(session_id = %session.id, %error, "failed to persist session");
377        }
378        self.sessions.insert(
379            session.id.clone(),
380            Arc::new(parking_lot::RwLock::new(session.clone())),
381        );
382    }
383}
384
385#[async_trait]
386impl ChildCompletionHandler for ChildCompletionCoordinator {
387    async fn on_child_completed(&self, completion: ChildCompletion) {
388        // Acquire a per-parent async lock to eliminate the concurrent
389        // double-resume race (see `parent_locks` for the full scenario). The
390        // inner std::sync::Mutex is released immediately so no sync lock is
391        // held across the await that follows.
392        let per_parent = {
393            let mut map = parent_locks().lock().expect("parent lock map poisoned");
394            map.entry(completion.parent_session_id.clone())
395                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
396                .clone()
397        };
398        let _per_parent_guard = per_parent.lock().await;
399
400        let Some(mut parent) = self.load_session(&completion.parent_session_id).await else {
401            tracing::warn!(
402                parent_session_id = %completion.parent_session_id,
403                child_session_id = %completion.child_session_id,
404                "child completion received for missing parent"
405            );
406            return;
407        };
408
409        // A parent may itself be a child (nested sub-agents): the rest of this
410        // handler is kind-agnostic — it operates on `completion.parent_session_id`,
411        // inspects that session's own `waiting_for_children` runtime state, and
412        // resumes it. (Previously this bailed unless the parent was Root, which
413        // silently dropped grandchild completions.)
414        let mut runtime_state = read_runtime_state(&parent);
415
416        // Single source of truth: reconstruct the completed-child set from the
417        // session index rather than from a denormalized copy on the parent file.
418        let completed_child_ids = derive_completed_child_ids(
419            &self.storage,
420            &completion.parent_session_id,
421            &completion.child_session_id,
422        )
423        .await;
424
425        let mut should_resume = false;
426        let mut remaining_children = 0usize;
427        if let Some(wait) = runtime_state.waiting_for_children.clone() {
428            remaining_children = wait
429                .child_session_ids
430                .iter()
431                .filter(|id| !completed_child_ids.iter().any(|completed| completed == *id))
432                .count();
433            should_resume = wait_policy_satisfied(
434                wait.wait_for,
435                &wait.child_session_ids,
436                &completed_child_ids,
437                &completion.status,
438            );
439            if should_resume {
440                runtime_state.waiting_for_children = None;
441                runtime_state.status = AgentStatusState::Idle;
442                runtime_state.suspension = None;
443            }
444        }
445
446        if should_resume {
447            parent.metadata.remove("runtime.suspend_reason");
448            // Load the completed child once. The guardian branch inspects its
449            // subagent_type + final verdict; the generic path folds its final
450            // assistant content into the hidden resume message (avoiding an extra
451            // `SubAgent.get` round trip after resume).
452            let loaded_child = match self
453                .storage
454                .load_session(&completion.child_session_id)
455                .await
456            {
457                Ok(child) => child,
458                Err(error) => {
459                    tracing::warn!(
460                        child_session_id = %completion.child_session_id,
461                        %error,
462                        "failed to load child session for runtime resume message"
463                    );
464                    None
465                }
466            };
467
468            // Guardian branch: a completing guardian reviewer that matches the
469            // parent's recorded review advances GuardianState (phase → Reviewed)
470            // and resumes with a verdict-tailored, findings-carrying message. Any
471            // id mismatch or unparseable verdict falls through to the generic
472            // resume, so the parent is never stranded.
473            let reviewed_round = runtime_state.round.current_round;
474            let guardian_resume = loaded_child.as_ref().and_then(|child| {
475                if child.subagent_type().as_deref() != Some("guardian") {
476                    return None;
477                }
478                let mut guardian_state = read_guardian_state(&parent)?;
479                if guardian_state.guardian_child_id.as_deref()
480                    != Some(completion.child_session_id.as_str())
481                {
482                    // A *different* guardian is legitimately still in flight —
483                    // leave its Pending state intact and use the generic resume.
484                    tracing::warn!(
485                        parent_session_id = %completion.parent_session_id,
486                        child_session_id = %completion.child_session_id,
487                        expected = ?guardian_state.guardian_child_id,
488                        "guardian completion does not match recorded guardian_child_id; using generic resume"
489                    );
490                    return None;
491                }
492                // This IS the guardian we dispatched, so we MUST advance the
493                // phase out of `Pending` — otherwise the next terminal gate's
494                // `Pending => return None` would let the run complete unreviewed.
495                // A reviewer that errored or produced unparseable output is
496                // treated as a SYNTHETIC REJECT (never a silent pass), so the
497                // budgeted re-review loop governs the outcome: fail-closed, but
498                // still bounded by `max_reviews`.
499                let verdict = child_final_assistant_text(child)
500                    .and_then(|text| match parse_guardian_verdict(&text) {
501                        Ok(verdict) => Some(verdict),
502                        Err(error) => {
503                            tracing::warn!(
504                                child_session_id = %completion.child_session_id,
505                                %error,
506                                "guardian verdict unparseable; recording a synthetic reject"
507                            );
508                            None
509                        }
510                    })
511                    .unwrap_or_else(|| {
512                        GuardianVerdict::rejected(vec![
513                            "The guardian reviewer did not return a usable verdict (it errored or \
514                             emitted unparseable output); the work has NOT been independently \
515                             verified."
516                                .to_string(),
517                        ])
518                    });
519                let approved = verdict.approve;
520                let message = guardian_resume_message(&completion, &verdict);
521                guardian_state.record_verdict(verdict, reviewed_round);
522                write_guardian_state(&mut parent, guardian_state);
523                tracing::info!(
524                    parent_session_id = %completion.parent_session_id,
525                    child_session_id = %completion.child_session_id,
526                    approved,
527                    "guardian verdict recorded; resuming parent"
528                );
529                Some(message)
530            });
531
532            let resume_message = guardian_resume.unwrap_or_else(|| {
533                runtime_resume_message(
534                    &completion,
535                    remaining_children,
536                    loaded_child
537                        .as_ref()
538                        .and_then(child_final_assistant_text)
539                        .as_deref(),
540                )
541            });
542            parent.add_message(resume_message);
543        } else if runtime_state.waiting_for_children.is_some() {
544            runtime_state.status = AgentStatusState::Suspended;
545            runtime_state.suspension = Some(SuspensionState {
546                reason: "waiting_for_children".to_string(),
547                suspended_at: Utc::now(),
548                resumable: true,
549                hook_point: Some("ChildCompletion".to_string()),
550            });
551        }
552
553        parent.updated_at = Utc::now();
554        write_runtime_state(&mut parent, &runtime_state);
555        self.save_and_cache(&mut parent).await;
556
557        // Capture before releasing the per-parent lock so the borrow checker
558        // is satisfied; `resume_parent` has its own retry loop and should not
559        // hold the per-parent lock (it would block other completions for the
560        // same parent, and the state is already durably settled above).
561        let resume_parent_id = parent.id.clone();
562        drop(_per_parent_guard);
563
564        if should_resume {
565            self.resume_parent(resume_parent_id).await;
566        }
567    }
568}
569
570#[async_trait]
571impl ResumeExecutionPort for ChildCompletionCoordinator {
572    async fn load_session(&self, session_id: &str) -> Option<Session> {
573        match self.storage.load_session(session_id).await {
574            Ok(Some(session)) => Some(session),
575            Ok(None) => self
576                .sessions
577                .get(session_id)
578                .map(|e| e.value().clone())
579                .map(|arc| arc.read().clone()),
580            Err(error) => {
581                tracing::warn!(%session_id, %error, "failed to load session from storage");
582                self.sessions
583                    .get(session_id)
584                    .map(|e| e.value().clone())
585                    .map(|arc| arc.read().clone())
586            }
587        }
588    }
589
590    async fn save_and_cache_session(&self, session: &mut Session) {
591        self.save_and_cache(session).await;
592    }
593
594    async fn try_reserve_runner(
595        &self,
596        session_id: &str,
597        event_sender: &broadcast::Sender<AgentEvent>,
598    ) -> Option<RunnerReservation> {
599        try_reserve_runner(&self.agent_runners, session_id, event_sender).await
600    }
601
602    async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String> {
603        let runners = self.agent_runners.read().await;
604        runners.get(session_id).map(|r| r.run_id.clone())
605    }
606
607    async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent> {
608        crate::execution::session_events::get_or_create_event_sender(
609            &self.session_event_senders,
610            session_id,
611        )
612        .await
613    }
614
615    async fn spawn_resume_execution(&self, request: ResumeSpawnRequest) {
616        let ResumeSpawnRequest {
617            session_id,
618            session,
619            cancel_token,
620            run_id: _,
621            event_sender,
622            config,
623        } = request;
624
625        let Some(root_tools) = self.root_tools.read().await.clone() else {
626            tracing::error!(%session_id, "cannot resume parent after child completion: root tool surface is not initialized");
627            return;
628        };
629
630        let model = session.model.clone();
631        let resolved_provider_name = session_effective_model_ref(&session)
632            .map(|model_ref| model_ref.provider)
633            .unwrap_or(config.provider_name);
634        let provider_override = session_effective_model_ref(&session)
635            .and_then(|model_ref| match self.provider_router.route(&model_ref) {
636                Ok(provider) => Some(provider),
637                Err(error) => {
638                    tracing::warn!(
639                        session_id = %session_id,
640                        provider = %model_ref.provider,
641                        model = %model_ref.model,
642                        error = %error,
643                        "failed to resolve provider override for child-completion parent resume; falling back to runtime provider"
644                    );
645                    None
646                }
647            });
648        let config_snapshot = self.config.read().await.clone();
649        let resolved_fast_provider = resolve_fast_model(
650            &config_snapshot,
651            &resolved_provider_name,
652            &self.provider_registry,
653        )
654        .map(|model| model.provider);
655        let reasoning_effort = session.reasoning_effort;
656        let reasoning_effort_source = session
657            .metadata
658            .get("reasoning_effort_source")
659            .cloned()
660            .unwrap_or_default();
661        let gold_config = resolve_gold_config(
662            &config_snapshot,
663            session
664                .metadata
665                .get(GOLD_CONFIG_METADATA_KEY)
666                .map(String::as_str),
667        )
668        .or(config.gold_config.clone());
669
670        let (mpsc_tx, _forwarder) = create_event_forwarder(
671            session_id.clone(),
672            event_sender,
673            self.agent_runners.clone(),
674            self.account_feed_inbox.clone(),
675        );
676
677        let config_handle = self.config.clone();
678        let cached_config = Arc::new(StdRwLock::new(config_snapshot.clone()));
679        let provider_registry = self.provider_registry.clone();
680        let provider_name_for_aux = resolved_provider_name.clone();
681        let auxiliary_model_resolver = std::sync::Arc::new(move || {
682            let config_snapshot = read_config_snapshot(&config_handle, cached_config.as_ref());
683            // Auxiliary models are global (config-derived), never session-bound.
684            let areas = resolve_global_area_models(
685                &config_snapshot,
686                &provider_name_for_aux,
687                &provider_registry,
688            );
689            crate::AuxiliaryModelConfig {
690                fast_model_name: areas.fast.as_ref().map(|m| m.model_name.clone()),
691                fast_model_provider: areas.fast.map(|m| m.provider),
692                background_model_name: areas.background.as_ref().map(|m| m.model_name.clone()),
693                planning_model_name: None,
694                search_model_name: None,
695                summarization_model_name: areas
696                    .summarization
697                    .as_ref()
698                    .map(|m| m.model_name.clone()),
699                background_model_provider: areas.background.map(|m| m.provider),
700                summarization_model_provider: areas.summarization.map(|m| m.provider),
701            }
702        });
703        let model_roster = crate::ModelRoster {
704            model: Some(model),
705            provider_name: Some(resolved_provider_name),
706            provider_type: config.provider_type.clone(),
707            fast: crate::RoleModel::from_parts(config.fast_model, resolved_fast_provider),
708            background: crate::RoleModel::from_parts(
709                config.background_model,
710                config.background_model_provider,
711            ),
712            summarization: crate::RoleModel::from_parts(
713                config.summarization_model,
714                config.summarization_model_provider,
715            ),
716        };
717
718        // Re-inject guardian state on resume so a reject→fix verdict can be
719        // re-reviewed: config from the session (persisted at first spawn),
720        // spawner from the coordinator-held handle. Absent guardian config this
721        // stays `None`, and the approve→complete path is unchanged.
722        let guardian_config = read_guardian_config(&session);
723        let guardian_spawner = self.guardian_spawner.read().await.clone();
724
725        spawn_session_execution(SessionExecutionArgs {
726            agent: self.agent.clone(),
727            session_id,
728            session,
729            tools_override: Some(root_tools),
730            provider_override,
731            model_roster,
732            reasoning_effort,
733            reasoning_effort_source,
734            auxiliary_model_resolver: Some(auxiliary_model_resolver),
735            disabled_tools: Some(config.disabled_tools),
736            disabled_skill_ids: Some(config.disabled_skill_ids),
737            selected_skill_ids: None,
738            selected_skill_mode: None,
739            cancel_token,
740            mpsc_tx,
741            image_fallback: config.image_fallback,
742            gold_config,
743            guardian_config,
744            guardian_spawner,
745            app_data_dir: Some(self.app_data_dir.clone()),
746            runners: self.agent_runners.clone(),
747            sessions_cache: self.sessions.clone(),
748            on_complete: None,
749        });
750    }
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use bamboo_agent_core::Message;
757
758    fn make_completion(status: &str) -> ChildCompletion {
759        ChildCompletion {
760            parent_session_id: "parent-1".to_string(),
761            child_session_id: "child-1".to_string(),
762            status: status.to_string(),
763            error: None,
764            completed_at: Utc::now(),
765        }
766    }
767
768    // ── ② derive completed children from the index ──────────────────────
769
770    struct StubChildIndex {
771        children: Vec<(String, Option<String>)>,
772    }
773
774    #[async_trait]
775    impl Storage for StubChildIndex {
776        async fn save_session(&self, _session: &Session) -> std::io::Result<()> {
777            Ok(())
778        }
779        async fn load_session(&self, _id: &str) -> std::io::Result<Option<Session>> {
780            Ok(None)
781        }
782        async fn delete_session(&self, _id: &str) -> std::io::Result<bool> {
783            Ok(false)
784        }
785        async fn list_child_run_statuses(
786            &self,
787            _parent_session_id: &str,
788        ) -> std::io::Result<Vec<(String, Option<String>)>> {
789            Ok(self.children.clone())
790        }
791    }
792
793    #[tokio::test]
794    async fn derive_completed_only_includes_terminal_children() {
795        let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
796            children: vec![
797                ("a".into(), Some("completed".into())),
798                ("b".into(), Some("running".into())),
799                ("c".into(), Some("error".into())),
800                ("d".into(), None),
801            ],
802        });
803        let completed = derive_completed_child_ids(&storage, "parent-1", "b").await;
804        // Terminal from index: a, c. Plus the just-completed child b folded in.
805        assert_eq!(
806            completed,
807            vec!["a".to_string(), "b".to_string(), "c".to_string()]
808        );
809    }
810
811    #[tokio::test]
812    async fn derive_completed_folds_in_just_completed_when_index_lags() {
813        // Index hasn't caught up — reports the child as still running.
814        let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
815            children: vec![("only".into(), Some("running".into()))],
816        });
817        let completed = derive_completed_child_ids(&storage, "parent-1", "only").await;
818        assert_eq!(completed, vec!["only".to_string()]);
819    }
820
821    #[test]
822    fn wait_policy_all_uses_derived_completed_set() {
823        let waited = vec!["a".to_string(), "b".to_string()];
824        assert!(!wait_policy_satisfied(
825            ChildWaitPolicy::All,
826            &waited,
827            &["a".to_string()],
828            "completed"
829        ));
830        assert!(wait_policy_satisfied(
831            ChildWaitPolicy::All,
832            &waited,
833            &["a".to_string(), "b".to_string()],
834            "completed"
835        ));
836    }
837
838    #[test]
839    fn child_final_assistant_text_returns_last_assistant() {
840        let mut session = Session::new("child-1", "gpt-4");
841        session.messages.push(Message::user("hi"));
842        session
843            .messages
844            .push(Message::assistant("first answer", None));
845        session.messages.push(Message::user("again"));
846        session
847            .messages
848            .push(Message::assistant("final answer", None));
849
850        assert_eq!(
851            child_final_assistant_text(&session).as_deref(),
852            Some("final answer")
853        );
854    }
855
856    #[test]
857    fn child_final_assistant_text_returns_none_when_blank() {
858        let mut session = Session::new("child-1", "gpt-4");
859        session.messages.push(Message::assistant("   ", None));
860        assert!(child_final_assistant_text(&session).is_none());
861    }
862
863    #[test]
864    fn child_final_assistant_text_returns_none_when_no_assistant() {
865        let mut session = Session::new("child-1", "gpt-4");
866        session.messages.push(Message::user("hi"));
867        assert!(child_final_assistant_text(&session).is_none());
868    }
869
870    #[test]
871    fn runtime_resume_message_folds_full_response_without_truncation() {
872        // A very long child final response is folded in verbatim (no 4000-char
873        // cap, no truncation marker).
874        let completion = make_completion("completed");
875        let long: String = "a".repeat(10_000);
876        let message = runtime_resume_message(&completion, 0, Some(&long));
877        assert!(message.content.contains(&long));
878        assert!(!message.content.contains("truncated"));
879    }
880
881    #[test]
882    fn runtime_resume_message_includes_child_response_when_provided() {
883        let completion = make_completion("completed");
884        let message = runtime_resume_message(&completion, 0, Some("the answer is 42"));
885
886        assert!(matches!(message.role, Role::User));
887        // Folded child results are now compressible so the parent context can
888        // reclaim them under compaction.
889        assert!(!message.never_compress);
890        assert!(message.content.contains("Child final response:"));
891        assert!(message.content.contains("the answer is 42"));
892
893        let metadata = message.metadata.expect("metadata present");
894        assert_eq!(
895            metadata.get("hidden_from_ui").and_then(|v| v.as_bool()),
896            Some(true)
897        );
898        assert_eq!(
899            metadata.get("runtime_kind").and_then(|v| v.as_str()),
900            Some("child_completion_resume")
901        );
902        assert_eq!(
903            metadata
904                .get("child_final_response_included")
905                .and_then(|v| v.as_bool()),
906            Some(true)
907        );
908    }
909
910    #[test]
911    fn runtime_resume_message_falls_back_to_error_when_no_response() {
912        let mut completion = make_completion("error");
913        completion.error = Some("boom".to_string());
914
915        let message = runtime_resume_message(&completion, 1, None);
916        assert!(message.content.contains("Child error:"));
917        assert!(message.content.contains("boom"));
918        let metadata = message.metadata.expect("metadata present");
919        assert_eq!(
920            metadata
921                .get("child_final_response_included")
922                .and_then(|v| v.as_bool()),
923            Some(false)
924        );
925    }
926
927    #[test]
928    fn runtime_resume_message_minimal_when_no_response_and_no_error() {
929        let completion = make_completion("completed");
930        let message = runtime_resume_message(&completion, 2, None);
931        assert!(!message.content.contains("Child final response:"));
932        assert!(!message.content.contains("Child error:"));
933        assert!(message.content.contains("Resume the parent task"));
934    }
935
936    #[test]
937    fn read_config_snapshot_refreshes_cached_snapshot_from_live_config() {
938        let runtime = tokio::runtime::Runtime::new().expect("runtime");
939
940        runtime.block_on(async {
941            let config = Arc::new(RwLock::new(Config::default()));
942            config.write().await.provider = "copilot".to_string();
943            let cached_config = StdRwLock::new(Config::default());
944
945            let snapshot = read_config_snapshot(&config, &cached_config);
946
947            assert_eq!(snapshot.provider, "copilot");
948            assert_eq!(
949                cached_config.read().expect("cached snapshot lock").provider,
950                "copilot"
951            );
952        });
953    }
954
955    #[test]
956    fn read_config_snapshot_uses_cached_snapshot_when_live_lock_is_busy() {
957        let runtime = tokio::runtime::Runtime::new().expect("runtime");
958
959        runtime.block_on(async {
960            let cached_snapshot = Config {
961                provider: "cached-provider".to_string(),
962                ..Default::default()
963            };
964
965            let config = Arc::new(RwLock::new(Config::default()));
966            let cached_config = StdRwLock::new(cached_snapshot);
967            let _write_guard = config.write().await;
968
969            let snapshot = read_config_snapshot(&config, &cached_config);
970
971            assert_eq!(snapshot.provider, "cached-provider");
972        });
973    }
974}