Skip to main content

bamboo_engine/session_app/
child_completion_coordinator.rs

1//! Child-session completion coordinator.
2//!
3//! Receives terminal child runner notifications from `bamboo-engine`, updates
4//! durable parent wait state, and resumes the parent when the configured wait
5//! policy is satisfied.
6
7use std::collections::HashMap;
8use std::sync::{Arc, OnceLock, RwLock as StdRwLock};
9use std::time::Duration;
10
11use crate::execution::{
12    create_event_forwarder, spawn_session_execution, try_reserve_runner, AgentRunner,
13    ChildCompletion, ChildCompletionHandler, RunnerReservation, SessionExecutionArgs,
14};
15use crate::runtime::config::{BashResumeHook, GuardianSpawner, BASH_COMPLETION_RESUME_KIND};
16use crate::runtime::guardian_state::{
17    parse_guardian_verdict, read_guardian_config, read_guardian_state, write_guardian_state,
18    GuardianVerdict,
19};
20use crate::Agent;
21use async_trait::async_trait;
22use bamboo_agent_core::storage::Storage;
23use bamboo_agent_core::tools::ToolExecutor;
24use bamboo_agent_core::{AgentEvent, Message, Role, Session};
25use bamboo_domain::session::runtime_state::{
26    AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState,
27};
28use bamboo_llm::{Config, ProviderModelRouter, ProviderRegistry};
29use bamboo_storage::LockedSessionStore;
30use chrono::Utc;
31use tokio::sync::{broadcast, RwLock};
32
33use crate::model_areas::resolve_global_area_models;
34use crate::model_config_helper::{
35    resolve_fast_model, resolve_gold_config, GOLD_CONFIG_METADATA_KEY,
36};
37use crate::session_app::provider_model::session_effective_model_ref;
38use crate::session_app::resume::{
39    resume_session_execution, ResumeExecutionPort, ResumeSpawnRequest,
40};
41use crate::session_app::types::{ResumeConfigSnapshot, ResumeOutcome};
42
43const AGENT_RUNTIME_STATE_METADATA_KEY: &str = "agent.runtime.state";
44const RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: &str = "hidden_from_ui";
45const RUNTIME_RESUME_MESSAGE_KIND_KEY: &str = "runtime_kind";
46
47fn read_runtime_state(session: &Session) -> AgentRuntimeState {
48    session
49        .agent_runtime_state
50        .clone()
51        .or_else(|| {
52            session
53                .metadata
54                .get(AGENT_RUNTIME_STATE_METADATA_KEY)
55                .and_then(|raw| serde_json::from_str::<AgentRuntimeState>(raw).ok())
56        })
57        .unwrap_or_else(|| AgentRuntimeState::new(format!("{}-child-wait", session.id)))
58}
59
60fn write_runtime_state(session: &mut Session, runtime_state: &AgentRuntimeState) {
61    session.agent_runtime_state = Some(runtime_state.clone());
62    if let Ok(serialized) = serde_json::to_string(runtime_state) {
63        session
64            .metadata
65            .insert(AGENT_RUNTIME_STATE_METADATA_KEY.to_string(), serialized);
66    }
67}
68
69fn is_error_like(status: &str) -> bool {
70    matches!(status, "error" | "timeout" | "cancelled")
71}
72
73/// Terminal child run statuses, as mirrored into the session index.
74fn is_terminal_child_status(status: &str) -> bool {
75    matches!(
76        status,
77        "completed" | "error" | "timeout" | "cancelled" | "skipped"
78    )
79}
80
81/// Reconstruct the set of completed child session ids for a parent from the
82/// session index (the single source of truth), folding in the child whose
83/// completion event is being processed so a momentarily-lagging index can never
84/// stall the parent's resume.
85async fn derive_completed_child_ids(
86    storage: &Arc<dyn Storage>,
87    parent_session_id: &str,
88    just_completed_child_id: &str,
89) -> Vec<String> {
90    let mut completed: Vec<String> = storage
91        .list_child_run_statuses(parent_session_id)
92        .await
93        .unwrap_or_default()
94        .into_iter()
95        .filter(|(_, status)| status.as_deref().is_some_and(is_terminal_child_status))
96        .map(|(id, _)| id)
97        .collect();
98    if !completed.iter().any(|id| id == just_completed_child_id) {
99        completed.push(just_completed_child_id.to_string());
100    }
101    completed.sort();
102    completed.dedup();
103    completed
104}
105
106fn read_config_snapshot(config: &Arc<RwLock<Config>>, cached_config: &StdRwLock<Config>) -> Config {
107    if let Ok(config_guard) = config.try_read() {
108        let snapshot = config_guard.clone();
109
110        if let Ok(mut cached_guard) = cached_config.try_write() {
111            *cached_guard = snapshot.clone();
112        }
113
114        snapshot
115    } else {
116        cached_config
117            .try_read()
118            .map(|guard| guard.clone())
119            .unwrap_or_default()
120    }
121}
122
123/// Per-parent async locks that serialize concurrent `on_child_completed`
124/// invocations for the same parent session.
125///
126/// Race eliminated: when `wait_for=Any` and two child sessions complete
127/// simultaneously, both invocations load the parent with
128/// `waiting_for_children=Some` before either persists the cleared state, so
129/// both pass `wait_policy_satisfied`, both clear `waiting_for_children`, add a
130/// duplicate resume message, and call `resume_parent` — a double resume.
131/// Holding this per-parent `tokio::sync::Mutex` across the load-check-save
132/// critical section makes the second caller observe the already-cleared state.
133///
134/// The inner `std::sync::Mutex` guards only the brief HashMap lookup/insert
135/// (no await inside); the per-parent `tokio::sync::Mutex` is the one held
136/// across the async critical section. Entries accumulate but are small
137/// (`Arc<tokio::sync::Mutex<()>>` ≈ 24 bytes) and bounded by the number of
138/// distinct parent sessions.
139fn parent_locks() -> &'static std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
140    static LOCKS: OnceLock<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
141        OnceLock::new();
142    LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
143}
144
145fn wait_policy_satisfied(
146    policy: ChildWaitPolicy,
147    wait_child_ids: &[String],
148    completed_child_ids: &[String],
149    latest_status: &str,
150) -> bool {
151    if wait_child_ids.is_empty() {
152        return false;
153    }
154
155    match policy {
156        ChildWaitPolicy::All => wait_child_ids
157            .iter()
158            .all(|id| completed_child_ids.iter().any(|completed| completed == id)),
159        ChildWaitPolicy::Any => completed_child_ids
160            .iter()
161            .any(|id| wait_child_ids.iter().any(|wait_id| wait_id == id)),
162        ChildWaitPolicy::FirstError => {
163            is_error_like(latest_status)
164                || wait_child_ids
165                    .iter()
166                    .all(|id| completed_child_ids.iter().any(|completed| completed == id))
167        }
168    }
169}
170
171/// Extract the child session's last assistant content, if any. Returns `None`
172/// when the child produced no assistant message (e.g. errored before the first
173/// model response, or only emitted tool messages).
174fn child_final_assistant_text(child: &Session) -> Option<String> {
175    child
176        .messages
177        .iter()
178        .rev()
179        .find(|message| matches!(message.role, Role::Assistant))
180        .map(|message| message.content.clone())
181        .filter(|content| !content.trim().is_empty())
182}
183
184fn runtime_resume_message(
185    completion: &ChildCompletion,
186    remaining_children: usize,
187    child_final_response: Option<&str>,
188) -> Message {
189    let mut body = format!(
190        "Runtime notification: child session `{}` finished with status `{}`. Remaining child sessions: {}.",
191        completion.child_session_id, completion.status, remaining_children
192    );
193
194    // Fold the child's full final response back into the parent — no
195    // truncation. Sub-agents are first-class agents whose complete conclusion
196    // should be available to the parent without an extra `SubAgent.get` round
197    // trip. The message is left compressible (see `never_compress` below) so a
198    // long transcript can still be reclaimed under parent compaction.
199    let final_response = child_final_response.map(str::to_string);
200    if let Some(response) = final_response.as_deref() {
201        body.push_str("\n\nChild final response:\n");
202        body.push_str(response);
203    } else if let Some(error) = completion.error.as_deref() {
204        if !error.is_empty() {
205            body.push_str("\n\nChild error:\n");
206            body.push_str(error);
207        }
208    }
209
210    body.push_str(
211        "\n\nResume the parent task using this child result and continue from the previous plan. \
212         If you need the full child transcript, call SubAgent.get(child_session_id).",
213    );
214
215    let mut message = Message::user(body);
216    message.metadata = Some(serde_json::json!({
217        RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
218        RUNTIME_RESUME_MESSAGE_KIND_KEY: "child_completion_resume",
219        "child_session_id": completion.child_session_id,
220        "child_status": completion.status,
221        "child_error": completion.error,
222        "completed_at": completion.completed_at,
223        "child_final_response_included": final_response.is_some(),
224    }));
225    // Allow parent-side compaction to reclaim this (now untruncated) message if
226    // the parent context grows — important once children nest and fold full
227    // results upward. The `SubAgent.get` hint preserves recoverability.
228    message.never_compress = false;
229    message
230}
231
232/// The hidden resume message for a completed **guardian** review: a directive,
233/// verdict-tailored note that carries the reviewer's findings straight into the
234/// parent (so it can act without a `SubAgent.get`), mirroring
235/// [`runtime_resume_message`]'s hidden/compressible shape.
236fn guardian_resume_message(completion: &ChildCompletion, verdict: &GuardianVerdict) -> Message {
237    let mut body = if verdict.approve {
238        String::from(
239            "Guardian review APPROVED: an independent reviewer verified the work and found no blocking issues. You may finalize the task.",
240        )
241    } else {
242        String::from(
243            "Guardian review REJECTED: an independent reviewer found issues. Address every finding below before completing — do NOT declare the task complete until they are resolved.",
244        )
245    };
246    if let Some(summary) = verdict.summary.as_deref().filter(|s| !s.trim().is_empty()) {
247        body.push_str("\n\nReviewer summary: ");
248        body.push_str(summary);
249    }
250    if !verdict.findings.is_empty() {
251        body.push_str("\n\nFindings:");
252        for (idx, finding) in verdict.findings.iter().enumerate() {
253            body.push_str(&format!("\n{}. {}", idx + 1, finding));
254        }
255    }
256    body.push_str(
257        "\n\nIf you need the full guardian transcript, call SubAgent.get(child_session_id).",
258    );
259
260    let mut message = Message::user(body);
261    message.metadata = Some(serde_json::json!({
262        RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
263        RUNTIME_RESUME_MESSAGE_KIND_KEY: "guardian_review_resume",
264        "child_session_id": completion.child_session_id,
265        "child_status": completion.status,
266        "guardian_approved": verdict.approve,
267        "completed_at": completion.completed_at,
268    }));
269    message.never_compress = false;
270    message
271}
272
273#[derive(Clone)]
274pub struct ChildCompletionCoordinator {
275    storage: Arc<dyn Storage>,
276    persistence: Arc<bamboo_storage::LockedSessionStore>,
277    sessions: crate::SessionCache,
278    agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
279    session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
280    agent: Arc<Agent>,
281    config: Arc<RwLock<Config>>,
282    provider_registry: Arc<ProviderRegistry>,
283    provider_router: Arc<ProviderModelRouter>,
284    app_data_dir: std::path::PathBuf,
285    account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
286    root_tools: Arc<RwLock<Option<Arc<dyn ToolExecutor>>>>,
287    /// Late-bound guardian reviewer spawner, set post-construction by the server
288    /// (mirrors `root_tools`). Re-injected into resumed runs so a guardian's
289    /// reject→fix verdict can be re-reviewed across the suspend/resume boundary.
290    guardian_spawner: Arc<RwLock<Option<Arc<dyn GuardianSpawner>>>>,
291}
292
293impl ChildCompletionCoordinator {
294    #[allow(clippy::too_many_arguments)]
295    pub fn new(
296        storage: Arc<dyn Storage>,
297        persistence: Arc<LockedSessionStore>,
298        sessions: crate::SessionCache,
299        agent_runners: Arc<RwLock<HashMap<String, AgentRunner>>>,
300        session_event_senders: Arc<RwLock<HashMap<String, broadcast::Sender<AgentEvent>>>>,
301        agent: Arc<Agent>,
302        config: Arc<RwLock<Config>>,
303        provider_registry: Arc<ProviderRegistry>,
304        provider_router: Arc<ProviderModelRouter>,
305        app_data_dir: std::path::PathBuf,
306        account_feed_inbox: Option<crate::execution::AccountFeedInbox>,
307    ) -> Self {
308        Self {
309            storage,
310            persistence,
311            sessions,
312            agent_runners,
313            session_event_senders,
314            agent,
315            config,
316            provider_registry,
317            provider_router,
318            app_data_dir,
319            account_feed_inbox,
320            root_tools: Arc::new(RwLock::new(None)),
321            guardian_spawner: Arc::new(RwLock::new(None)),
322        }
323    }
324
325    pub async fn set_root_tools(&self, tools: Arc<dyn ToolExecutor>) {
326        *self.root_tools.write().await = Some(tools);
327    }
328
329    /// Wire the guardian reviewer spawner (server-provided), so resumed runs can
330    /// re-spawn a guardian to re-review a fix after a reject verdict.
331    pub async fn set_guardian_spawner(&self, spawner: Arc<dyn GuardianSpawner>) {
332        *self.guardian_spawner.write().await = Some(spawner);
333    }
334
335    fn build_resume_config(
336        &self,
337        session: &Session,
338        config_snapshot: &Config,
339    ) -> ResumeConfigSnapshot {
340        crate::session_app::resolution::resolve_resume_config_snapshot(
341            config_snapshot,
342            &self.provider_registry,
343            session,
344            None,
345        )
346    }
347
348    /// Drive a parent-resume and return the final [`ResumeOutcome`] so callers
349    /// can distinguish a successful spawn (`Started`) from a gate-blocked
350    /// attempt (`Completed`). The bash self-resume poll task uses this to
351    /// detect the finalize-clobber case — its appended resume message was
352    /// reverted by the suspending runner's final `merge_save_runtime`, so the
353    /// resume port's `has_pending_user_message` gate fails and nothing spawns —
354    /// and retry the clear→append→resume (see [`Self::bash_self_resume`]).
355    async fn resume_parent(&self, parent_session_id: String) -> ResumeOutcome {
356        for attempt in 0..=5u8 {
357            if attempt > 0 {
358                tokio::time::sleep(Duration::from_millis(250 * attempt as u64)).await;
359            }
360
361            let Some(session) = self.load_session(&parent_session_id).await else {
362                tracing::warn!(%parent_session_id, "cannot resume parent after child completion: session not found");
363                return ResumeOutcome::NotFound;
364            };
365            let config_snapshot = self.config.read().await.clone();
366            let resume_config = self.build_resume_config(&session, &config_snapshot);
367            let outcome = resume_session_execution(self, &parent_session_id, resume_config).await;
368            tracing::info!(
369                %parent_session_id,
370                attempt,
371                outcome = outcome.as_str(),
372                "child completion requested parent resume"
373            );
374
375            if !matches!(outcome, ResumeOutcome::AlreadyRunning { .. }) {
376                return outcome;
377            }
378        }
379        // Exhausted the AlreadyRunning retry budget; surface the final state.
380        ResumeOutcome::AlreadyRunning {
381            run_id: String::new(),
382        }
383    }
384
385    async fn save_and_cache(&self, session: &mut Session) {
386        if let Err(error) = self.persistence.merge_save_runtime(session).await {
387            tracing::warn!(session_id = %session.id, %error, "failed to persist session");
388        }
389        self.sessions.insert(
390            session.id.clone(),
391            Arc::new(parking_lot::RwLock::new(session.clone())),
392        );
393    }
394}
395
396#[async_trait]
397impl ChildCompletionHandler for ChildCompletionCoordinator {
398    async fn on_child_completed(&self, completion: ChildCompletion) {
399        // Acquire a per-parent async lock to eliminate the concurrent
400        // double-resume race (see `parent_locks` for the full scenario). The
401        // inner std::sync::Mutex is released immediately so no sync lock is
402        // held across the await that follows.
403        let per_parent = {
404            let mut map = parent_locks().lock().expect("parent lock map poisoned");
405            map.entry(completion.parent_session_id.clone())
406                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
407                .clone()
408        };
409        let _per_parent_guard = per_parent.lock().await;
410
411        let Some(mut parent) = self.load_session(&completion.parent_session_id).await else {
412            tracing::warn!(
413                parent_session_id = %completion.parent_session_id,
414                child_session_id = %completion.child_session_id,
415                "child completion received for missing parent"
416            );
417            return;
418        };
419
420        // A parent may itself be a child (nested sub-agents): the rest of this
421        // handler is kind-agnostic — it operates on `completion.parent_session_id`,
422        // inspects that session's own `waiting_for_children` runtime state, and
423        // resumes it. (Previously this bailed unless the parent was Root, which
424        // silently dropped grandchild completions.)
425        let mut runtime_state = read_runtime_state(&parent);
426
427        // Single source of truth: reconstruct the completed-child set from the
428        // session index rather than from a denormalized copy on the parent file.
429        let completed_child_ids = derive_completed_child_ids(
430            &self.storage,
431            &completion.parent_session_id,
432            &completion.child_session_id,
433        )
434        .await;
435
436        let mut should_resume = false;
437        let mut remaining_children = 0usize;
438        if let Some(wait) = runtime_state.waiting_for_children.clone() {
439            remaining_children = wait
440                .child_session_ids
441                .iter()
442                .filter(|id| !completed_child_ids.iter().any(|completed| completed == *id))
443                .count();
444            should_resume = wait_policy_satisfied(
445                wait.wait_for,
446                &wait.child_session_ids,
447                &completed_child_ids,
448                &completion.status,
449            );
450            if should_resume {
451                runtime_state.waiting_for_children = None;
452                runtime_state.status = AgentStatusState::Idle;
453                runtime_state.suspension = None;
454            }
455        }
456
457        if should_resume {
458            parent.metadata.remove("runtime.suspend_reason");
459            // Load the completed child once. The guardian branch inspects its
460            // subagent_type + final verdict; the generic path folds its final
461            // assistant content into the hidden resume message (avoiding an extra
462            // `SubAgent.get` round trip after resume).
463            let loaded_child = match self
464                .storage
465                .load_session(&completion.child_session_id)
466                .await
467            {
468                Ok(child) => child,
469                Err(error) => {
470                    tracing::warn!(
471                        child_session_id = %completion.child_session_id,
472                        %error,
473                        "failed to load child session for runtime resume message"
474                    );
475                    None
476                }
477            };
478
479            // Guardian branch: a completing guardian reviewer that matches the
480            // parent's recorded review advances GuardianState (phase → Reviewed)
481            // and resumes with a verdict-tailored, findings-carrying message. Any
482            // id mismatch or unparseable verdict falls through to the generic
483            // resume, so the parent is never stranded.
484            let reviewed_round = runtime_state.round.current_round;
485            let guardian_resume = loaded_child.as_ref().and_then(|child| {
486                if child.subagent_type().as_deref() != Some("guardian") {
487                    return None;
488                }
489                let mut guardian_state = read_guardian_state(&parent)?;
490                if guardian_state.guardian_child_id.as_deref()
491                    != Some(completion.child_session_id.as_str())
492                {
493                    // A *different* guardian is legitimately still in flight —
494                    // leave its Pending state intact and use the generic resume.
495                    tracing::warn!(
496                        parent_session_id = %completion.parent_session_id,
497                        child_session_id = %completion.child_session_id,
498                        expected = ?guardian_state.guardian_child_id,
499                        "guardian completion does not match recorded guardian_child_id; using generic resume"
500                    );
501                    return None;
502                }
503                // This IS the guardian we dispatched, so we MUST advance the
504                // phase out of `Pending` — otherwise the next terminal gate's
505                // `Pending => return None` would let the run complete unreviewed.
506                // A reviewer that errored or produced unparseable output is
507                // treated as a SYNTHETIC REJECT (never a silent pass), so the
508                // budgeted re-review loop governs the outcome: fail-closed, but
509                // still bounded by `max_reviews`.
510                let verdict = child_final_assistant_text(child)
511                    .and_then(|text| match parse_guardian_verdict(&text) {
512                        Ok(verdict) => Some(verdict),
513                        Err(error) => {
514                            tracing::warn!(
515                                child_session_id = %completion.child_session_id,
516                                %error,
517                                "guardian verdict unparseable; recording a synthetic reject"
518                            );
519                            None
520                        }
521                    })
522                    .unwrap_or_else(|| {
523                        GuardianVerdict::rejected(vec![
524                            "The guardian reviewer did not return a usable verdict (it errored or \
525                             emitted unparseable output); the work has NOT been independently \
526                             verified."
527                                .to_string(),
528                        ])
529                    });
530                let approved = verdict.approve;
531                let message = guardian_resume_message(&completion, &verdict);
532                guardian_state.record_verdict(verdict, reviewed_round);
533                write_guardian_state(&mut parent, guardian_state);
534                tracing::info!(
535                    parent_session_id = %completion.parent_session_id,
536                    child_session_id = %completion.child_session_id,
537                    approved,
538                    "guardian verdict recorded; resuming parent"
539                );
540                Some(message)
541            });
542
543            let resume_message = guardian_resume.unwrap_or_else(|| {
544                runtime_resume_message(
545                    &completion,
546                    remaining_children,
547                    loaded_child
548                        .as_ref()
549                        .and_then(child_final_assistant_text)
550                        .as_deref(),
551                )
552            });
553            parent.add_message(resume_message);
554        } else if runtime_state.waiting_for_children.is_some() {
555            runtime_state.status = AgentStatusState::Suspended;
556            runtime_state.suspension = Some(SuspensionState {
557                reason: "waiting_for_children".to_string(),
558                suspended_at: Utc::now(),
559                resumable: true,
560                hook_point: Some("ChildCompletion".to_string()),
561            });
562        }
563
564        parent.updated_at = Utc::now();
565        write_runtime_state(&mut parent, &runtime_state);
566        self.save_and_cache(&mut parent).await;
567
568        // Capture before releasing the per-parent lock so the borrow checker
569        // is satisfied; `resume_parent` has its own retry loop and should not
570        // hold the per-parent lock (it would block other completions for the
571        // same parent, and the state is already durably settled above).
572        let resume_parent_id = parent.id.clone();
573        drop(_per_parent_guard);
574
575        if should_resume {
576            self.resume_parent(resume_parent_id).await;
577        }
578    }
579}
580
581#[async_trait]
582impl ResumeExecutionPort for ChildCompletionCoordinator {
583    async fn load_session(&self, session_id: &str) -> Option<Session> {
584        match self.storage.load_session(session_id).await {
585            Ok(Some(session)) => Some(session),
586            Ok(None) => self
587                .sessions
588                .get(session_id)
589                .map(|e| e.value().clone())
590                .map(|arc| arc.read().clone()),
591            Err(error) => {
592                tracing::warn!(%session_id, %error, "failed to load session from storage");
593                self.sessions
594                    .get(session_id)
595                    .map(|e| e.value().clone())
596                    .map(|arc| arc.read().clone())
597            }
598        }
599    }
600
601    async fn save_and_cache_session(&self, session: &mut Session) {
602        self.save_and_cache(session).await;
603    }
604
605    async fn try_reserve_runner(
606        &self,
607        session_id: &str,
608        event_sender: &broadcast::Sender<AgentEvent>,
609    ) -> Option<RunnerReservation> {
610        try_reserve_runner(&self.agent_runners, session_id, event_sender).await
611    }
612
613    async fn get_existing_runner_run_id(&self, session_id: &str) -> Option<String> {
614        let runners = self.agent_runners.read().await;
615        runners.get(session_id).map(|r| r.run_id.clone())
616    }
617
618    async fn get_or_create_event_sender(&self, session_id: &str) -> broadcast::Sender<AgentEvent> {
619        crate::execution::session_events::get_or_create_event_sender(
620            &self.session_event_senders,
621            session_id,
622        )
623        .await
624    }
625
626    async fn spawn_resume_execution(&self, request: ResumeSpawnRequest) {
627        let ResumeSpawnRequest {
628            session_id,
629            session,
630            cancel_token,
631            run_id: _,
632            event_sender,
633            config,
634        } = request;
635
636        let Some(root_tools) = self.root_tools.read().await.clone() else {
637            tracing::error!(%session_id, "cannot resume parent after child completion: root tool surface is not initialized");
638            return;
639        };
640
641        let model = session.model.clone();
642        let resolved_provider_name = session_effective_model_ref(&session)
643            .map(|model_ref| model_ref.provider)
644            .unwrap_or(config.provider_name);
645        let provider_override = session_effective_model_ref(&session)
646            .and_then(|model_ref| match self.provider_router.route(&model_ref) {
647                Ok(provider) => Some(provider),
648                Err(error) => {
649                    tracing::warn!(
650                        session_id = %session_id,
651                        provider = %model_ref.provider,
652                        model = %model_ref.model,
653                        error = %error,
654                        "failed to resolve provider override for child-completion parent resume; falling back to runtime provider"
655                    );
656                    None
657                }
658            });
659        let config_snapshot = self.config.read().await.clone();
660        let resolved_fast_provider = resolve_fast_model(
661            &config_snapshot,
662            &resolved_provider_name,
663            &self.provider_registry,
664        )
665        .map(|model| model.provider);
666        let reasoning_effort = session.reasoning_effort;
667        let reasoning_effort_source = session
668            .metadata
669            .get("reasoning_effort_source")
670            .cloned()
671            .unwrap_or_default();
672        let gold_config = resolve_gold_config(
673            &config_snapshot,
674            session
675                .metadata
676                .get(GOLD_CONFIG_METADATA_KEY)
677                .map(String::as_str),
678        )
679        .or(config.gold_config.clone());
680
681        let (mpsc_tx, _forwarder) = create_event_forwarder(
682            session_id.clone(),
683            event_sender,
684            self.agent_runners.clone(),
685            self.account_feed_inbox.clone(),
686        );
687
688        let config_handle = self.config.clone();
689        let cached_config = Arc::new(StdRwLock::new(config_snapshot.clone()));
690        let provider_registry = self.provider_registry.clone();
691        let provider_name_for_aux = resolved_provider_name.clone();
692        let auxiliary_model_resolver = std::sync::Arc::new(move || {
693            let config_snapshot = read_config_snapshot(&config_handle, cached_config.as_ref());
694            // Auxiliary models are global (config-derived), never session-bound.
695            let areas = resolve_global_area_models(
696                &config_snapshot,
697                &provider_name_for_aux,
698                &provider_registry,
699            );
700            crate::AuxiliaryModelConfig {
701                fast_model_name: areas.fast.as_ref().map(|m| m.model_name.clone()),
702                fast_model_provider: areas.fast.map(|m| m.provider),
703                background_model_name: areas.background.as_ref().map(|m| m.model_name.clone()),
704                planning_model_name: None,
705                search_model_name: None,
706                summarization_model_name: areas
707                    .summarization
708                    .as_ref()
709                    .map(|m| m.model_name.clone()),
710                background_model_provider: areas.background.map(|m| m.provider),
711                summarization_model_provider: areas.summarization.map(|m| m.provider),
712            }
713        });
714        let model_roster = crate::ModelRoster {
715            model: Some(model),
716            provider_name: Some(resolved_provider_name),
717            provider_type: config.provider_type.clone(),
718            fast: crate::RoleModel::from_parts(config.fast_model, resolved_fast_provider),
719            background: crate::RoleModel::from_parts(
720                config.background_model,
721                config.background_model_provider,
722            ),
723            summarization: crate::RoleModel::from_parts(
724                config.summarization_model,
725                config.summarization_model_provider,
726            ),
727        };
728
729        // Re-inject guardian state on resume so a reject→fix verdict can be
730        // re-reviewed: config from the session (persisted at first spawn),
731        // spawner from the coordinator-held handle. Absent guardian config this
732        // stays `None`, and the approve→complete path is unchanged.
733        let guardian_config = read_guardian_config(&session);
734        let guardian_spawner = self.guardian_spawner.read().await.clone();
735
736        spawn_session_execution(SessionExecutionArgs {
737            agent: self.agent.clone(),
738            session_id,
739            session,
740            tools_override: Some(root_tools),
741            provider_override,
742            model_roster,
743            reasoning_effort,
744            reasoning_effort_source,
745            auxiliary_model_resolver: Some(auxiliary_model_resolver),
746            disabled_tools: Some(config.disabled_tools),
747            disabled_skill_ids: Some(config.disabled_skill_ids),
748            selected_skill_ids: None,
749            selected_skill_mode: None,
750            cancel_token,
751            mpsc_tx,
752            image_fallback: config.image_fallback,
753            gold_config,
754            guardian_config,
755            guardian_spawner,
756            bash_resume_hook: {
757                let hook: Arc<dyn BashResumeHook> = Arc::new(self.clone());
758                Some(hook)
759            },
760            app_data_dir: Some(self.app_data_dir.clone()),
761            runners: self.agent_runners.clone(),
762            sessions_cache: self.sessions.clone(),
763            on_complete: None,
764        });
765    }
766}
767
768/// Hidden resume message for a bash-completion self-resume (issue #84 Phase 2b).
769/// Mirrors [`runtime_resume_message`]'s hidden/compressible shape so the resume
770/// port's `has_pending_user_message` gate is satisfied.
771///
772/// `timed_out` selects the wording: the normal path (all shells finished)
773/// announces completion; the deadline path (the 6h+10m wait ceiling was hit with
774/// shells STILL running) must NOT claim the shells completed — it says they may
775/// still be running so the model verifies with BashOutput instead of assuming
776/// success on a false premise.
777fn bash_completion_resume_message(bash_ids: &[String], timed_out: bool) -> Message {
778    let body = if timed_out {
779        format!(
780            "Runtime notification: the background-Bash wait ceiling was reached while one or more \
781             shell(s) ({}) may still be running. The session is being resumed so it is not \
782             stranded; verify their actual status with BashOutput before assuming completion.",
783            bash_ids.join(", ")
784        )
785    } else {
786        format!(
787            "Runtime notification: all background Bash shell(s) ({}) have completed. \
788             Review their output with BashOutput and resume the task from where you left off.",
789            bash_ids.join(", ")
790        )
791    };
792    let mut message = Message::user(body);
793    message.metadata = Some(serde_json::json!({
794        RUNTIME_RESUME_MESSAGE_HIDDEN_KEY: true,
795        RUNTIME_RESUME_MESSAGE_KIND_KEY: BASH_COMPLETION_RESUME_KIND,
796    }));
797    message.never_compress = false;
798    message
799}
800
801/// Decide whether the bash self-resume should retry its clear→append→resume
802/// sequence after a resume attempt returned `outcome`, given that the persisted
803/// bash wait is (`true`) / is not (`false`) still set on reload.
804///
805/// Retry **only** when the resume did NOT spawn (`Completed` — no pending user
806/// message, i.e. our resume message was dropped — or `AlreadyRunning`) AND the
807/// persisted bash wait is still set: the signature of the finalize-clobber, where
808/// the suspending runner's one-shot final `merge_save_runtime` lands after our
809/// save and reverts `waiting_for_bash=Some` while dropping our resume message, so
810/// `has_pending_user_message` fails and nothing spawns. `Started` (resume fired)
811/// and `NotFound` (session gone) never retry. Pure helper so the clobber
812/// detection is unit-testable in isolation from async I/O.
813fn bash_resume_should_retry(outcome: &ResumeOutcome, persisted_waiting_for_bash: bool) -> bool {
814    match outcome {
815        ResumeOutcome::Started { .. } | ResumeOutcome::NotFound => false,
816        ResumeOutcome::Completed | ResumeOutcome::AlreadyRunning { .. } => {
817            persisted_waiting_for_bash
818        }
819    }
820}
821
822/// Bash self-resume support (issue #84 Phase 2b).
823impl ChildCompletionCoordinator {
824    /// Poll the live background-shell registry until all captured shells are no
825    /// longer running, then clear the bash wait and resume the session. This is
826    /// the liveness guarantee: **polling** the registry — not the one-shot
827    /// `BashCompleted` event — so even if a shell completed between the suspend
828    /// snapshot and this task's first poll, or before any event subscriber
829    /// existed, the registry reports it as not-running and the session resumes.
830    ///
831    /// The clear→append→resume is a **bounded retry loop** that closes the
832    /// finalize-clobber strand. The suspending runner's `finalize_task_context`
833    /// runs a full `save_runtime_session` (same `merge_save_runtime`, which
834    /// overwrites the whole `messages` array) AFTER this task is spawned; if it
835    /// lands after ours it reverts `waiting_for_bash=Some` and drops our resume
836    /// message, so `has_pending_user_message` fails and `resume_parent` returns
837    /// `Completed` without spawning. We detect that (persisted wait still set
838    /// after a non-`Started` outcome) and re-clear/re-append/re-resume. It
839    /// converges because the runner's finalize persist is one-shot: once landed,
840    /// our retry's save is the last writer, the message sticks, and resume fires.
841    async fn bash_self_resume(&self, session_id: String, bash_ids: Vec<String>) {
842        let poll_interval = Duration::from_millis(200);
843        // Hard ceiling: the wait lease (6 h) + the registry GC TTL (5 min) +
844        // margin. After this the shells are gone from the registry regardless,
845        // so force-resume to avoid stranding the session on a GC edge case.
846        let max_poll = Duration::from_secs(6 * 3600 + 600);
847        let deadline = tokio::time::Instant::now() + max_poll;
848
849        let mut timed_out = false;
850        loop {
851            let still_running =
852                bamboo_tools::tools::bash_runtime::running_shells_for_session(&session_id);
853            if still_running.is_empty() {
854                break;
855            }
856            if tokio::time::Instant::now() >= deadline {
857                timed_out = true;
858                tracing::warn!(
859                    session_id = %session_id,
860                    "bash self-resume poll exceeded the wait ceiling; forcing resume"
861                );
862                break;
863            }
864            tokio::time::sleep(poll_interval).await;
865        }
866
867        // Clobber-retry loop (see the function doc). Bounded: the runner's
868        // finalize persist is one-shot, so once it has landed our retry's save
869        // is the last writer, the resume message sticks, and the resume fires.
870        const MAX_RESUME_ATTEMPTS: u8 = 5;
871        for attempt in 0..MAX_RESUME_ATTEMPTS {
872            if attempt > 0 {
873                tokio::time::sleep(poll_interval).await;
874            }
875
876            let Some(mut session) = self.load_session(&session_id).await else {
877                tracing::warn!(%session_id, "bash self-resume: session not found; nothing to resume");
878                return;
879            };
880
881            let mut runtime_state = read_runtime_state(&session);
882            if runtime_state.waiting_for_bash.is_none() {
883                // Double-resume guard: the wait was already cleared by another
884                // path (a user-driven resume, or a racing duplicate poll task),
885                // or our own prior clear survived a clobber-retry. Do not append
886                // a duplicate message or request a redundant resume.
887                tracing::info!(
888                    %session_id, attempt,
889                    "bash self-resume: persisted bash wait already cleared; nothing to resume"
890                );
891                return;
892            }
893
894            runtime_state.waiting_for_bash = None;
895            runtime_state.status = AgentStatusState::Idle;
896            runtime_state.suspension = None;
897            write_runtime_state(&mut session, &runtime_state);
898            session.metadata.remove("runtime.suspend_reason");
899            session.add_message(bash_completion_resume_message(&bash_ids, timed_out));
900            session.updated_at = Utc::now();
901            self.save_and_cache(&mut session).await;
902            tracing::info!(
903                session_id = %session_id,
904                shell_count = bash_ids.len(),
905                timed_out, attempt,
906                "bash self-resume: cleared bash wait and appended resume message"
907            );
908
909            let outcome = self.resume_parent(session_id.clone()).await;
910            match outcome {
911                ResumeOutcome::Started { .. } => {
912                    tracing::info!(%session_id, attempt, "bash self-resume: resume fired");
913                    return;
914                }
915                ResumeOutcome::NotFound => {
916                    tracing::warn!(%session_id, "bash self-resume: session vanished during resume");
917                    return;
918                }
919                _ => {
920                    // Completed (no pending user message ⇒ our resume message was
921                    // dropped by the runner's finalize persist) or AlreadyRunning.
922                    // Decide via the persisted bash wait: still set ⇒
923                    // finalize-clobber ⇒ retry; cleared ⇒ the session is being
924                    // handled (by us or a concurrent resume) ⇒ stop.
925                    let clobbered = match self.load_session(&session_id).await {
926                        Some(reloaded) => read_runtime_state(&reloaded).waiting_for_bash.is_some(),
927                        None => {
928                            tracing::warn!(
929                                %session_id,
930                                "bash self-resume: session vanished after resume"
931                            );
932                            return;
933                        }
934                    };
935                    if bash_resume_should_retry(&outcome, clobbered) {
936                        tracing::warn!(
937                            %session_id, attempt,
938                            outcome = outcome.as_str(),
939                            "bash self-resume: persisted wait still set after resume (finalize-clobber); retrying"
940                        );
941                        continue;
942                    }
943                    tracing::info!(
944                        %session_id, attempt,
945                        outcome = outcome.as_str(),
946                        "bash self-resume: wait cleared and resume handled; stopping"
947                    );
948                    return;
949                }
950            }
951        }
952
953        tracing::warn!(
954            %session_id,
955            attempts = MAX_RESUME_ATTEMPTS,
956            "bash self-resume: exhausted clobber-retry budget without confirming resume; giving up"
957        );
958    }
959}
960
961impl BashResumeHook for ChildCompletionCoordinator {
962    fn arrange_bash_self_resume(&self, session_id: String, bash_ids: Vec<String>) {
963        let coordinator = Arc::new(self.clone());
964        tokio::spawn(async move {
965            coordinator.bash_self_resume(session_id, bash_ids).await;
966        });
967    }
968}
969
970#[cfg(test)]
971mod tests {
972    use super::*;
973    use bamboo_agent_core::Message;
974
975    fn make_completion(status: &str) -> ChildCompletion {
976        ChildCompletion {
977            parent_session_id: "parent-1".to_string(),
978            child_session_id: "child-1".to_string(),
979            status: status.to_string(),
980            error: None,
981            completed_at: Utc::now(),
982        }
983    }
984
985    // ── ② derive completed children from the index ──────────────────────
986
987    struct StubChildIndex {
988        children: Vec<(String, Option<String>)>,
989    }
990
991    #[async_trait]
992    impl Storage for StubChildIndex {
993        async fn save_session(&self, _session: &Session) -> std::io::Result<()> {
994            Ok(())
995        }
996        async fn load_session(&self, _id: &str) -> std::io::Result<Option<Session>> {
997            Ok(None)
998        }
999        async fn delete_session(&self, _id: &str) -> std::io::Result<bool> {
1000            Ok(false)
1001        }
1002        async fn list_child_run_statuses(
1003            &self,
1004            _parent_session_id: &str,
1005        ) -> std::io::Result<Vec<(String, Option<String>)>> {
1006            Ok(self.children.clone())
1007        }
1008    }
1009
1010    #[tokio::test]
1011    async fn derive_completed_only_includes_terminal_children() {
1012        let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
1013            children: vec![
1014                ("a".into(), Some("completed".into())),
1015                ("b".into(), Some("running".into())),
1016                ("c".into(), Some("error".into())),
1017                ("d".into(), None),
1018            ],
1019        });
1020        let completed = derive_completed_child_ids(&storage, "parent-1", "b").await;
1021        // Terminal from index: a, c. Plus the just-completed child b folded in.
1022        assert_eq!(
1023            completed,
1024            vec!["a".to_string(), "b".to_string(), "c".to_string()]
1025        );
1026    }
1027
1028    #[tokio::test]
1029    async fn derive_completed_folds_in_just_completed_when_index_lags() {
1030        // Index hasn't caught up — reports the child as still running.
1031        let storage: Arc<dyn Storage> = Arc::new(StubChildIndex {
1032            children: vec![("only".into(), Some("running".into()))],
1033        });
1034        let completed = derive_completed_child_ids(&storage, "parent-1", "only").await;
1035        assert_eq!(completed, vec!["only".to_string()]);
1036    }
1037
1038    #[test]
1039    fn wait_policy_all_uses_derived_completed_set() {
1040        let waited = vec!["a".to_string(), "b".to_string()];
1041        assert!(!wait_policy_satisfied(
1042            ChildWaitPolicy::All,
1043            &waited,
1044            &["a".to_string()],
1045            "completed"
1046        ));
1047        assert!(wait_policy_satisfied(
1048            ChildWaitPolicy::All,
1049            &waited,
1050            &["a".to_string(), "b".to_string()],
1051            "completed"
1052        ));
1053    }
1054
1055    #[test]
1056    fn child_final_assistant_text_returns_last_assistant() {
1057        let mut session = Session::new("child-1", "gpt-4");
1058        session.messages.push(Message::user("hi"));
1059        session
1060            .messages
1061            .push(Message::assistant("first answer", None));
1062        session.messages.push(Message::user("again"));
1063        session
1064            .messages
1065            .push(Message::assistant("final answer", None));
1066
1067        assert_eq!(
1068            child_final_assistant_text(&session).as_deref(),
1069            Some("final answer")
1070        );
1071    }
1072
1073    #[test]
1074    fn child_final_assistant_text_returns_none_when_blank() {
1075        let mut session = Session::new("child-1", "gpt-4");
1076        session.messages.push(Message::assistant("   ", None));
1077        assert!(child_final_assistant_text(&session).is_none());
1078    }
1079
1080    #[test]
1081    fn child_final_assistant_text_returns_none_when_no_assistant() {
1082        let mut session = Session::new("child-1", "gpt-4");
1083        session.messages.push(Message::user("hi"));
1084        assert!(child_final_assistant_text(&session).is_none());
1085    }
1086
1087    #[test]
1088    fn runtime_resume_message_folds_full_response_without_truncation() {
1089        // A very long child final response is folded in verbatim (no 4000-char
1090        // cap, no truncation marker).
1091        let completion = make_completion("completed");
1092        let long: String = "a".repeat(10_000);
1093        let message = runtime_resume_message(&completion, 0, Some(&long));
1094        assert!(message.content.contains(&long));
1095        assert!(!message.content.contains("truncated"));
1096    }
1097
1098    #[test]
1099    fn runtime_resume_message_includes_child_response_when_provided() {
1100        let completion = make_completion("completed");
1101        let message = runtime_resume_message(&completion, 0, Some("the answer is 42"));
1102
1103        assert!(matches!(message.role, Role::User));
1104        // Folded child results are now compressible so the parent context can
1105        // reclaim them under compaction.
1106        assert!(!message.never_compress);
1107        assert!(message.content.contains("Child final response:"));
1108        assert!(message.content.contains("the answer is 42"));
1109
1110        let metadata = message.metadata.expect("metadata present");
1111        assert_eq!(
1112            metadata.get("hidden_from_ui").and_then(|v| v.as_bool()),
1113            Some(true)
1114        );
1115        assert_eq!(
1116            metadata.get("runtime_kind").and_then(|v| v.as_str()),
1117            Some("child_completion_resume")
1118        );
1119        assert_eq!(
1120            metadata
1121                .get("child_final_response_included")
1122                .and_then(|v| v.as_bool()),
1123            Some(true)
1124        );
1125    }
1126
1127    #[test]
1128    fn runtime_resume_message_falls_back_to_error_when_no_response() {
1129        let mut completion = make_completion("error");
1130        completion.error = Some("boom".to_string());
1131
1132        let message = runtime_resume_message(&completion, 1, None);
1133        assert!(message.content.contains("Child error:"));
1134        assert!(message.content.contains("boom"));
1135        let metadata = message.metadata.expect("metadata present");
1136        assert_eq!(
1137            metadata
1138                .get("child_final_response_included")
1139                .and_then(|v| v.as_bool()),
1140            Some(false)
1141        );
1142    }
1143
1144    #[test]
1145    fn runtime_resume_message_minimal_when_no_response_and_no_error() {
1146        let completion = make_completion("completed");
1147        let message = runtime_resume_message(&completion, 2, None);
1148        assert!(!message.content.contains("Child final response:"));
1149        assert!(!message.content.contains("Child error:"));
1150        assert!(message.content.contains("Resume the parent task"));
1151    }
1152
1153    #[test]
1154    fn read_config_snapshot_refreshes_cached_snapshot_from_live_config() {
1155        let runtime = tokio::runtime::Runtime::new().expect("runtime");
1156
1157        runtime.block_on(async {
1158            let config = Arc::new(RwLock::new(Config::default()));
1159            config.write().await.provider = "copilot".to_string();
1160            let cached_config = StdRwLock::new(Config::default());
1161
1162            let snapshot = read_config_snapshot(&config, &cached_config);
1163
1164            assert_eq!(snapshot.provider, "copilot");
1165            assert_eq!(
1166                cached_config.read().expect("cached snapshot lock").provider,
1167                "copilot"
1168            );
1169        });
1170    }
1171
1172    #[test]
1173    fn read_config_snapshot_uses_cached_snapshot_when_live_lock_is_busy() {
1174        let runtime = tokio::runtime::Runtime::new().expect("runtime");
1175
1176        runtime.block_on(async {
1177            let cached_snapshot = Config {
1178                provider: "cached-provider".to_string(),
1179                ..Default::default()
1180            };
1181
1182            let config = Arc::new(RwLock::new(Config::default()));
1183            let cached_config = StdRwLock::new(cached_snapshot);
1184            let _write_guard = config.write().await;
1185
1186            let snapshot = read_config_snapshot(&config, &cached_config);
1187
1188            assert_eq!(snapshot.provider, "cached-provider");
1189        });
1190    }
1191
1192    // ── Bash self-resume (issue #84 Phase 2b): deadline message + clobber-retry ──
1193
1194    #[test]
1195    fn bash_completion_resume_message_normal_announces_completion() {
1196        let ids = vec!["bg-1".to_string(), "bg-2".to_string()];
1197        let message = bash_completion_resume_message(&ids, false);
1198        // Normal path: the shells genuinely finished.
1199        assert!(
1200            message.content.contains("have completed"),
1201            "normal resume message must announce completion: {}",
1202            message.content
1203        );
1204        // Hidden + compressible so the resume gate sees it but the UI hides it.
1205        let metadata = message.metadata.expect("metadata present");
1206        assert_eq!(
1207            metadata
1208                .get(RUNTIME_RESUME_MESSAGE_HIDDEN_KEY)
1209                .and_then(|v| v.as_bool()),
1210            Some(true),
1211            "resume message must be hidden from the UI"
1212        );
1213        assert_eq!(
1214            metadata
1215                .get(RUNTIME_RESUME_MESSAGE_KIND_KEY)
1216                .and_then(|v| v.as_str()),
1217            Some(BASH_COMPLETION_RESUME_KIND),
1218            "resume message must carry the bash-completion kind discriminant"
1219        );
1220    }
1221
1222    #[test]
1223    fn bash_completion_resume_message_deadline_does_not_claim_completion() {
1224        // The 6h+10m deadline force-breaks with shells STILL running. The message
1225        // must NOT say "have completed" — that would let the model assume success
1226        // on a false premise. It must direct the model to verify with BashOutput.
1227        let ids = vec!["bg-long".to_string()];
1228        let message = bash_completion_resume_message(&ids, true);
1229        assert!(
1230            !message.content.contains("have completed"),
1231            "deadline resume message must NOT claim the shells completed: {}",
1232            message.content
1233        );
1234        assert!(
1235            message.content.contains("may still be running"),
1236            "deadline resume message must warn shells may still be running: {}",
1237            message.content
1238        );
1239        assert!(
1240            message.content.contains("BashOutput"),
1241            "deadline resume message must direct verification via BashOutput: {}",
1242            message.content
1243        );
1244        // Same hidden/kind shape so the resume gate is satisfied identically.
1245        let metadata = message.metadata.expect("metadata present");
1246        assert_eq!(
1247            metadata
1248                .get(RUNTIME_RESUME_MESSAGE_KIND_KEY)
1249                .and_then(|v| v.as_str()),
1250            Some(BASH_COMPLETION_RESUME_KIND)
1251        );
1252    }
1253
1254    #[test]
1255    fn bash_resume_should_retry_matrix() {
1256        // The finalize-clobber retry predicate (issue #84 Phase 2b). Retry only
1257        // when the resume did NOT spawn (Completed / AlreadyRunning) AND the
1258        // persisted bash wait is still set on reload — the clobber signature.
1259
1260        // Started: the resume fired — never retry, regardless of persisted state.
1261        assert!(!bash_resume_should_retry(
1262            &ResumeOutcome::Started { run_id: "r".into() },
1263            true
1264        ));
1265        assert!(!bash_resume_should_retry(
1266            &ResumeOutcome::Started { run_id: "r".into() },
1267            false
1268        ));
1269
1270        // NotFound: session gone — never retry.
1271        assert!(!bash_resume_should_retry(&ResumeOutcome::NotFound, true));
1272        assert!(!bash_resume_should_retry(&ResumeOutcome::NotFound, false));
1273
1274        // Completed + persisted wait still set ⇒ finalize-clobber ⇒ retry.
1275        assert!(bash_resume_should_retry(&ResumeOutcome::Completed, true));
1276        // Completed + persisted wait cleared ⇒ handled (our message stuck, or a
1277        // concurrent resume finished) ⇒ stop.
1278        assert!(!bash_resume_should_retry(&ResumeOutcome::Completed, false));
1279
1280        // AlreadyRunning + persisted wait still set ⇒ clobbered while a runner is
1281        // (stale-)active ⇒ retry to re-establish the resume message.
1282        assert!(bash_resume_should_retry(
1283            &ResumeOutcome::AlreadyRunning { run_id: "r".into() },
1284            true
1285        ));
1286        // AlreadyRunning + wait cleared ⇒ a runner owns the session ⇒ stop.
1287        assert!(!bash_resume_should_retry(
1288            &ResumeOutcome::AlreadyRunning { run_id: "r".into() },
1289            false
1290        ));
1291    }
1292}