Skip to main content

meerkat_runtime/
session_adapter.rs

1//! RuntimeSessionAdapter — wraps a SessionService with per-session RuntimeDrivers.
2//!
3//! This adapter lives in meerkat-runtime so that meerkat-session doesn't need
4//! to depend on meerkat-runtime. Surfaces use this adapter to get v9 runtime
5//! capabilities on top of any SessionService implementation.
6//!
7//! When a session is registered with a `CoreExecutor`, a background RuntimeLoop
8//! task is spawned per session. `accept_input()` queues the input in the driver
9//! and, if wake is requested, signals the loop. The loop dequeues, stages,
10//! applies via CoreExecutor (which calls SessionService::start_turn()), and
11//! marks inputs as consumed.
12
13use std::collections::{HashMap, HashSet};
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::BlobStore;
18use meerkat_core::comms_drain_lifecycle_authority::{
19    CommsDrainLifecycleAuthority, CommsDrainLifecycleEffect, CommsDrainMode, DrainExitReason,
20};
21use meerkat_core::generated::{protocol_comms_drain_abort, protocol_comms_drain_spawn};
22use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
23use meerkat_core::lifecycle::run_control::RunControlCommand;
24use meerkat_core::lifecycle::{InputId, RunId};
25use meerkat_core::types::SessionId;
26
27use crate::accept::AcceptOutcome;
28use crate::driver::ephemeral::EphemeralRuntimeDriver;
29use crate::driver::persistent::PersistentRuntimeDriver;
30use crate::identifiers::LogicalRuntimeId;
31use crate::input::Input;
32use crate::input_lifecycle_authority::InputLifecycleError;
33use crate::input_state::InputState;
34use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
35use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
36use crate::store::RuntimeStore;
37use crate::tokio;
38use crate::tokio::sync::{Mutex, RwLock, mpsc};
39use crate::traits::{
40    DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
41    RuntimeControlPlaneError, RuntimeDriver, RuntimeDriverError,
42};
43
44/// Error type for [`RuntimeSessionAdapter::prepare_bindings`].
45#[derive(Debug, thiserror::Error)]
46pub enum RuntimeBindingsError {
47    /// Session was not found after registration (should not happen in practice).
48    #[error("session {0} not found in runtime adapter after registration")]
49    SessionNotFound(SessionId),
50}
51
52/// Shared driver handle used by both the adapter and the RuntimeLoop.
53pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
54
55/// Per-session runtime driver entry.
56pub(crate) enum DriverEntry {
57    Ephemeral(EphemeralRuntimeDriver),
58    Persistent(PersistentRuntimeDriver),
59}
60
61impl DriverEntry {
62    pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
63        match self {
64            DriverEntry::Ephemeral(d) => d,
65            DriverEntry::Persistent(d) => d,
66        }
67    }
68
69    pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
70        match self {
71            DriverEntry::Ephemeral(d) => d,
72            DriverEntry::Persistent(d) => d,
73        }
74    }
75
76    /// Set the silent comms intents for the underlying driver.
77    pub(crate) fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
78        match self {
79            DriverEntry::Ephemeral(d) => d.set_silent_comms_intents(intents),
80            DriverEntry::Persistent(d) => d.set_silent_comms_intents(intents),
81        }
82    }
83
84    /// Check if the runtime is idle or attached (quiescent with or without executor).
85    pub(crate) fn is_idle_or_attached(&self) -> bool {
86        match self {
87            DriverEntry::Ephemeral(d) => d.is_idle_or_attached(),
88            DriverEntry::Persistent(d) => d.is_idle_or_attached(),
89        }
90    }
91
92    /// Whether this session is quiescent for detached-wake purposes.
93    ///
94    /// A session is quiescent when it is idle/attached (not running) AND has
95    /// no non-terminal inputs in its ledger. Queued-only inputs intentionally
96    /// block quiescence — `accept_input_without_wake` stages work without
97    /// waking, so detached-wake must not race with pending queue processing.
98    pub(crate) fn is_quiescent_for_detached_wake(&self) -> bool {
99        self.is_idle_or_attached() && self.as_driver().active_input_ids().is_empty()
100    }
101
102    /// Attach an executor (Idle → Attached).
103    pub(crate) fn attach(&mut self) -> Result<(), RuntimeStateTransitionError> {
104        match self {
105            DriverEntry::Ephemeral(d) => d.attach(),
106            DriverEntry::Persistent(d) => d.attach(),
107        }
108    }
109
110    /// Detach an executor (Attached → Idle). No-op if not Attached.
111    pub(crate) fn detach(
112        &mut self,
113    ) -> Result<Option<crate::runtime_state::RuntimeState>, RuntimeStateTransitionError> {
114        match self {
115            DriverEntry::Ephemeral(d) => d.detach(),
116            DriverEntry::Persistent(d) => d.detach(),
117        }
118    }
119
120    /// Check if the runtime can process queued inputs (Idle, Attached, or Retired).
121    pub(crate) fn can_process_queue(&self) -> bool {
122        match self {
123            DriverEntry::Ephemeral(d) => d.control().can_process_queue(),
124            DriverEntry::Persistent(d) => d.inner_ref().control().can_process_queue(),
125        }
126    }
127
128    /// Drain and return the typed post-admission signal.
129    pub(crate) fn take_post_admission_signal(
130        &mut self,
131    ) -> crate::driver::ephemeral::PostAdmissionSignal {
132        match self {
133            DriverEntry::Ephemeral(d) => d.take_post_admission_signal(),
134            DriverEntry::Persistent(d) => d.take_post_admission_signal(),
135        }
136    }
137
138    /// Check and clear the wake flag (backward-compat wrapper).
139    pub(crate) fn take_wake_requested(&mut self) -> bool {
140        match self {
141            DriverEntry::Ephemeral(d) => d.take_wake_requested(),
142            DriverEntry::Persistent(d) => d.take_wake_requested(),
143        }
144    }
145
146    /// Dequeue the next input for processing.
147    pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
148        match self {
149            DriverEntry::Ephemeral(d) => d.dequeue_next(),
150            DriverEntry::Persistent(d) => d.dequeue_next(),
151        }
152    }
153
154    /// Dequeue a specific input by ID from whichever queue contains it.
155    pub(crate) fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
156        match self {
157            DriverEntry::Ephemeral(d) => d.dequeue_by_id(input_id),
158            DriverEntry::Persistent(d) => d.dequeue_by_id(input_id),
159        }
160    }
161
162    /// Get a reference to the ingress authority.
163    pub(crate) fn ingress(&self) -> &crate::runtime_ingress_authority::RuntimeIngressAuthority {
164        match self {
165            DriverEntry::Ephemeral(d) => d.ingress(),
166            DriverEntry::Persistent(d) => d.inner_ref().ingress(),
167        }
168    }
169
170    pub(crate) fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
171        match self {
172            DriverEntry::Ephemeral(d) => d.has_queued_input_outside(excluded),
173            DriverEntry::Persistent(d) => d.has_queued_input_outside(excluded),
174        }
175    }
176
177    /// Start a new run (Idle → Running).
178    pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
179        match self {
180            DriverEntry::Ephemeral(d) => d.start_run(run_id),
181            DriverEntry::Persistent(d) => d.start_run(run_id),
182        }
183    }
184
185    /// Complete a run (Running → Idle).
186    pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
187        match self {
188            DriverEntry::Ephemeral(d) => d.complete_run(),
189            DriverEntry::Persistent(d) => d.complete_run(),
190        }
191    }
192
193    /// Stage an input (Queued → Staged).
194    pub(crate) fn stage_input(
195        &mut self,
196        input_id: &InputId,
197        run_id: &RunId,
198    ) -> Result<(), InputLifecycleError> {
199        match self {
200            DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
201            DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
202        }
203    }
204
205    /// Stage a batch of inputs atomically in a single `StageDrainSnapshot`.
206    pub(crate) fn stage_batch(
207        &mut self,
208        input_ids: &[InputId],
209        run_id: &RunId,
210    ) -> Result<(), InputLifecycleError> {
211        match self {
212            DriverEntry::Ephemeral(d) => d.stage_batch(input_ids, run_id),
213            DriverEntry::Persistent(d) => d.stage_batch(input_ids, run_id),
214        }
215    }
216
217    /// Roll back staged inputs after a failed staging attempt.
218    pub(crate) fn rollback_staged(
219        &mut self,
220        input_ids: &[InputId],
221    ) -> Result<(), InputLifecycleError> {
222        match self {
223            DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
224            DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
225        }
226    }
227
228    pub(crate) async fn abandon_pending_inputs(
229        &mut self,
230        reason: crate::input_state::InputAbandonReason,
231    ) -> Result<usize, RuntimeDriverError> {
232        match self {
233            DriverEntry::Ephemeral(d) => Ok(d.abandon_pending_inputs(reason)),
234            DriverEntry::Persistent(d) => d.abandon_pending_inputs(reason).await,
235        }
236    }
237}
238
239/// Shared completion registry (accessed by adapter for registration and loop for resolution).
240pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
241
242/// Per-session state: driver + registration phase.
243struct RuntimeSessionEntry {
244    /// Shared driver handle (accessed by both adapter methods and RuntimeLoop).
245    driver: SharedDriver,
246    /// Shared async-operation lifecycle registry for this runtime/session.
247    ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
248    /// Runtime epoch identity — stable across rebuilds, rotated on reset/restart-without-recovery.
249    epoch_id: meerkat_core::RuntimeEpochId,
250    /// Shared consumer cursor state for the epoch.
251    cursor_state: Arc<meerkat_core::EpochCursorState>,
252    /// Completion waiters (accessed by accept_input_with_completion and RuntimeLoop).
253    completions: SharedCompletionRegistry,
254    /// Registration phase — explicit type-level distinction between
255    /// "registered but inert" and "executor attached."
256    phase: RegistrationPhase,
257    /// Detached-wake state for background op completions.
258    /// Shared with the runtime loop which selects on the Notify directly.
259    detached_wake: Option<Arc<crate::detached_wake::DetachedWakeState>>,
260    /// Whether this session should remain hosted for peer ingress while idle.
261    keep_alive: bool,
262    /// Comms runtime used to realize peer ingress for this session, if any.
263    comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
264}
265
266/// Capability bundle for an attached runtime loop.
267///
268/// Keep all loop-related handles together so "attached vs detached" cannot
269/// drift into partially-populated shell state.
270struct RuntimeLoopAttachment {
271    wake_tx: mpsc::Sender<()>,
272    control_tx: mpsc::Sender<RunControlCommand>,
273    _loop_handle: tokio::task::JoinHandle<()>,
274}
275
276/// Explicit registration phase — the type-level distinction between
277/// "registered but inert," "attachment in progress," and "executor attached."
278///
279/// Replaces the implicit `Option<RuntimeLoopAttachment>` discriminant
280/// (Dogma §8: Option must not hide ownership uncertainty).
281enum RegistrationPhase {
282    /// Registered via `prepare_bindings()`. No executor — inputs queue
283    /// but are not processed until an executor attaches.
284    Queuing,
285    /// `ensure_session_with_executor()` is in progress — another task is
286    /// wiring the runtime loop. Concurrent callers must treat this as
287    /// "attachment pending" and not race a second loop spawn.
288    Attaching,
289    /// Executor attached with live channels. Inputs are processed
290    /// by the RuntimeLoop.
291    Active(RuntimeLoopAttachment),
292}
293
294impl RuntimeSessionEntry {
295    fn attachment_is_live(&self) -> bool {
296        match &self.phase {
297            RegistrationPhase::Active(attachment) => {
298                !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed()
299            }
300            RegistrationPhase::Queuing | RegistrationPhase::Attaching => false,
301        }
302    }
303
304    /// Returns `true` if an executor is attached with live channels, OR if
305    /// attachment is in progress (another task is wiring the loop).
306    /// Used by external-facing queries (`session_has_executor`) to prevent
307    /// concurrent callers from racing a second loop spawn.
308    fn has_attachment_or_attaching(&self) -> bool {
309        matches!(self.phase, RegistrationPhase::Attaching) || self.attachment_is_live()
310    }
311
312    /// Returns `true` only if the executor is fully attached with live channels.
313    /// Used by internal publish logic within `ensure_session_with_executor`
314    /// where the caller itself may have set `Attaching`.
315    fn has_live_attachment(&self) -> bool {
316        self.attachment_is_live()
317    }
318
319    fn attach_runtime_loop(
320        &mut self,
321        wake_tx: mpsc::Sender<()>,
322        control_tx: mpsc::Sender<RunControlCommand>,
323        loop_handle: tokio::task::JoinHandle<()>,
324    ) {
325        self.phase = RegistrationPhase::Active(RuntimeLoopAttachment {
326            wake_tx,
327            control_tx,
328            _loop_handle: loop_handle,
329        });
330    }
331
332    fn clear_dead_attachment(&mut self) -> bool {
333        if matches!(self.phase, RegistrationPhase::Active(_)) && !self.attachment_is_live() {
334            // Don't regress to Queuing if another task is mid-attach;
335            // Active with dead channels goes back to Queuing for retry.
336            self.phase = RegistrationPhase::Queuing;
337            // Clear detached wake state — it will be re-created on
338            // re-registration along with the new runtime loop.
339            self.detached_wake = None;
340            return true;
341        }
342        false
343    }
344
345    fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
346        match &self.phase {
347            RegistrationPhase::Active(attachment)
348                if !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed() =>
349            {
350                Some(attachment.wake_tx.clone())
351            }
352            _ => None,
353        }
354    }
355
356    fn control_sender(&self) -> Option<mpsc::Sender<RunControlCommand>> {
357        match &self.phase {
358            RegistrationPhase::Active(attachment)
359                if !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed() =>
360            {
361                Some(attachment.control_tx.clone())
362            }
363            _ => None,
364        }
365    }
366}
367
368/// Per-session comms drain slot, driven by `CommsDrainLifecycleAuthority`.
369///
370/// ALL state transitions go through the authority -- no manual
371/// `handle.is_finished()` checks in shell code.
372struct CommsDrainSlot {
373    authority: CommsDrainLifecycleAuthority,
374    handle: Option<tokio::task::JoinHandle<()>>,
375}
376
377impl CommsDrainSlot {
378    fn new() -> Self {
379        Self {
380            authority: CommsDrainLifecycleAuthority::new(),
381            handle: None,
382        }
383    }
384}
385
386fn apply_runtime_drain_effects(slot: &mut CommsDrainSlot, effects: &[CommsDrainLifecycleEffect]) {
387    for effect in effects {
388        if let CommsDrainLifecycleEffect::AbortDrainTask = effect
389            && let Some(handle) = slot.handle.take()
390        {
391            handle.abort();
392        }
393    }
394}
395
396fn abort_slot(slot: &mut CommsDrainSlot) {
397    match protocol_comms_drain_abort::execute_stop_requested(&mut slot.authority) {
398        Ok(result) => {
399            apply_runtime_drain_effects(slot, &result.effects);
400            // Under TerminalClosure policy, the abort obligation is implicitly
401            // satisfied when the machine reaches Stopped phase. Drop it.
402            let _ = result.obligation;
403        }
404        Err(_) => {
405            // Already stopped or inactive — just clean up the handle
406            if let Some(handle) = slot.handle.take() {
407                handle.abort();
408            }
409        }
410    }
411}
412
413fn desired_peer_ingress_mode(
414    runtime_state: RuntimeState,
415    comms_enabled: bool,
416    keep_alive: bool,
417) -> Option<CommsDrainMode> {
418    if !comms_enabled {
419        return None;
420    }
421
422    match runtime_state {
423        RuntimeState::Attached | RuntimeState::Running | RuntimeState::Recovering => {
424            Some(CommsDrainMode::AttachedSession)
425        }
426        RuntimeState::Idle if keep_alive => Some(CommsDrainMode::PersistentHost),
427        RuntimeState::Initializing
428        | RuntimeState::Idle
429        | RuntimeState::Retired
430        | RuntimeState::Stopped
431        | RuntimeState::Destroyed => None,
432    }
433}
434
435/// Wraps a SessionService to provide v9 runtime capabilities.
436///
437/// Maintains a per-session RuntimeDriver registry. When sessions are registered
438/// with a `CoreExecutor`, a RuntimeLoop task is spawned that processes queued
439/// inputs by calling `CoreExecutor::apply()` (which triggers
440/// `SessionService::start_turn()` under the hood).
441pub struct RuntimeSessionAdapter {
442    /// Per-session entries.
443    sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
444    /// Runtime mode.
445    mode: RuntimeMode,
446    /// Optional RuntimeStore for persistent drivers.
447    store: Option<Arc<dyn RuntimeStore>>,
448    /// Blob store used by persistent drivers for durable input externalization.
449    blob_store: Option<Arc<dyn BlobStore>>,
450    /// Per-session comms drain lifecycle, driven by machine authority.
451    comms_drain_slots: RwLock<HashMap<SessionId, CommsDrainSlot>>,
452}
453
454impl RuntimeSessionAdapter {
455    /// Create an ephemeral adapter (all sessions use EphemeralRuntimeDriver).
456    pub fn ephemeral() -> Self {
457        Self {
458            sessions: RwLock::new(HashMap::new()),
459            mode: RuntimeMode::V9Compliant,
460            store: None,
461            blob_store: None,
462            comms_drain_slots: RwLock::new(HashMap::new()),
463        }
464    }
465
466    /// Create a persistent adapter with a RuntimeStore.
467    pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
468        Self {
469            sessions: RwLock::new(HashMap::new()),
470            mode: RuntimeMode::V9Compliant,
471            store: Some(store),
472            blob_store: Some(blob_store),
473            comms_drain_slots: RwLock::new(HashMap::new()),
474        }
475    }
476
477    /// Create a persistent adapter with a RuntimeStore but no blob store.
478    ///
479    /// The driver will fall back to ephemeral mode for sessions (no durable
480    /// boundary commits), but ops lifecycle recovery from the store still works.
481    /// Primarily useful for tests that need to verify recovery without needing
482    /// a full blob store.
483    pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
484        Self {
485            sessions: RwLock::new(HashMap::new()),
486            mode: RuntimeMode::V9Compliant,
487            store: Some(store),
488            blob_store: None,
489            comms_drain_slots: RwLock::new(HashMap::new()),
490        }
491    }
492
493    /// Create a driver entry for a session.
494    fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
495        let runtime_id = LogicalRuntimeId::new(session_id.to_string());
496        match (&self.store, &self.blob_store) {
497            (Some(store), Some(blob_store)) => DriverEntry::Persistent(
498                PersistentRuntimeDriver::new(runtime_id, store.clone(), blob_store.clone()),
499            ),
500            (Some(_store), None) => {
501                tracing::warn!(
502                    %session_id,
503                    "persistent runtime store present but blob store missing; \
504                     falling back to ephemeral driver"
505                );
506                DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id))
507            }
508            _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
509        }
510    }
511
512    /// Recover or create fresh ops lifecycle state for a session.
513    ///
514    /// This is the single canonical recovery seam. Both `register_session()`
515    /// and `ensure_session_with_executor()`'s cold path call this to create
516    /// epoch-local state. If a durable store is available, attempts to load
517    /// the persisted snapshot; otherwise creates fresh state with a new epoch.
518    async fn recover_or_create_ops_state(
519        &self,
520        session_id: &SessionId,
521    ) -> (
522        Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
523        meerkat_core::RuntimeEpochId,
524        Arc<meerkat_core::EpochCursorState>,
525    ) {
526        if let Some(ref store) = self.store {
527            let runtime_id = crate::identifiers::LogicalRuntimeId::new(session_id.to_string());
528            match store.load_ops_lifecycle(&runtime_id).await {
529                Ok(Some(snapshot)) => {
530                    let recovered_epoch = snapshot.epoch_id.clone();
531                    let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
532                        snapshot.cursors.agent_applied_cursor,
533                        snapshot.cursors.runtime_observed_seq,
534                        snapshot.cursors.runtime_last_injected_seq,
535                    );
536                    let recovered_ops_count = snapshot.completion_entries.len();
537                    let registry =
538                        crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
539                    tracing::info!(
540                        %session_id,
541                        epoch_id = %recovered_epoch,
542                        recovered_ops = recovered_ops_count,
543                        "ops lifecycle recovered from durable store (same epoch)"
544                    );
545                    (
546                        Arc::new(registry),
547                        recovered_epoch,
548                        Arc::new(recovered_cursors),
549                    )
550                }
551                Ok(None) => {
552                    tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
553                    (
554                        Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
555                        meerkat_core::RuntimeEpochId::new(),
556                        Arc::new(meerkat_core::EpochCursorState::new()),
557                    )
558                }
559                Err(err) => {
560                    tracing::warn!(
561                        %session_id,
562                        error = %err,
563                        "failed to load ops lifecycle; epoch rotated"
564                    );
565                    (
566                        Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
567                        meerkat_core::RuntimeEpochId::new(),
568                        Arc::new(meerkat_core::EpochCursorState::new()),
569                    )
570                }
571            }
572        } else {
573            (
574                Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
575                meerkat_core::RuntimeEpochId::new(),
576                Arc::new(meerkat_core::EpochCursorState::new()),
577            )
578        }
579    }
580
581    /// Register a runtime driver for a session (no RuntimeLoop — inputs queue but
582    /// nothing processes them automatically). Useful for tests and legacy mode.
583    pub async fn register_session(&self, session_id: SessionId) {
584        {
585            let mut sessions = self.sessions.write().await;
586            if let Some(existing) = sessions.get_mut(&session_id) {
587                existing.clear_dead_attachment();
588                return;
589            }
590        }
591
592        let mut entry = self.make_driver(&session_id);
593        if let Err(err) = entry.as_driver_mut().recover().await {
594            tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
595            return;
596        }
597
598        let (ops_lifecycle, epoch_id, cursor_state) =
599            self.recover_or_create_ops_state(&session_id).await;
600
601        let session_entry = RuntimeSessionEntry {
602            driver: Arc::new(Mutex::new(entry)),
603            ops_lifecycle,
604            epoch_id,
605            cursor_state,
606            completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
607            phase: RegistrationPhase::Queuing,
608            detached_wake: None,
609            keep_alive: false,
610            comms_runtime: None,
611        };
612        let mut sessions = self.sessions.write().await;
613        if let Some(existing) = sessions.get_mut(&session_id) {
614            existing.clear_dead_attachment();
615        } else {
616            sessions.insert(session_id, session_entry);
617        }
618    }
619
620    /// Set the silent comms intents for a session's runtime driver.
621    ///
622    /// Peer requests whose intent matches one of these strings will be accepted
623    /// without triggering an LLM turn (ApplyMode::Ignore, WakeMode::None).
624    pub async fn set_session_silent_intents(&self, session_id: &SessionId, intents: Vec<String>) {
625        let sessions = self.sessions.read().await;
626        if let Some(entry) = sessions.get(session_id) {
627            let mut driver = entry.driver.lock().await;
628            driver.set_silent_comms_intents(intents);
629        }
630    }
631
632    /// Register a runtime driver for a session WITH a RuntimeLoop backed by a
633    /// `CoreExecutor`. When `accept_input()` queues an input and requests wake,
634    /// the loop dequeues it and calls `executor.apply()` (which triggers
635    /// `SessionService::start_turn()`).
636    pub async fn register_session_with_executor(
637        &self,
638        session_id: SessionId,
639        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
640    ) {
641        self.ensure_session_with_executor(session_id, executor)
642            .await;
643    }
644
645    /// Ensure a runtime driver with executor exists for the session.
646    ///
647    /// If a session was already registered without a loop, upgrade the
648    /// existing driver in place so queued inputs remain attached to the same
649    /// runtime ledger and can start draining immediately.
650    pub async fn ensure_session_with_executor(
651        &self,
652        session_id: SessionId,
653        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
654    ) {
655        let existing = {
656            let mut sessions = self.sessions.write().await;
657            sessions.get_mut(&session_id).map(|entry| {
658                entry.clear_dead_attachment();
659                let occupied = entry.has_attachment_or_attaching();
660                if !occupied {
661                    // Claim the attachment slot so concurrent callers see
662                    // Attaching and return early instead of racing a second
663                    // loop spawn (which would cross-wire detached-wake state).
664                    entry.phase = RegistrationPhase::Attaching;
665                }
666                (
667                    occupied,
668                    entry.driver.clone(),
669                    entry.completions.clone(),
670                    entry.ops_lifecycle.clone(),
671                )
672            })
673        };
674
675        let (driver, completions, ops_lifecycle) =
676            if let Some((has_attachment, driver, completions, ops_lifecycle)) = existing {
677                if has_attachment {
678                    return;
679                }
680                (driver, completions, ops_lifecycle)
681            } else {
682                let mut recovered_entry = self.make_driver(&session_id);
683                if let Err(err) = recovered_entry.as_driver_mut().recover().await {
684                    tracing::error!(
685                        %session_id,
686                        error = %err,
687                        "failed to recover runtime driver during registration"
688                    );
689                    return;
690                }
691
692                // Recover ops state OUTSIDE the sessions lock to avoid blocking
693                // other adapter operations behind potentially slow disk I/O.
694                let (recovered_ops, recovered_epoch, recovered_cursors) =
695                    self.recover_or_create_ops_state(&session_id).await;
696
697                // Double-check under the lock — another task may have inserted
698                // the entry while we were recovering.
699                let mut sessions = self.sessions.write().await;
700                if let Some(entry) = sessions.get_mut(&session_id) {
701                    entry.clear_dead_attachment();
702                    if entry.has_attachment_or_attaching() {
703                        return;
704                    }
705                    entry.phase = RegistrationPhase::Attaching;
706                    (
707                        entry.driver.clone(),
708                        entry.completions.clone(),
709                        entry.ops_lifecycle.clone(),
710                    )
711                } else {
712                    let driver = Arc::new(Mutex::new(recovered_entry));
713                    let completions =
714                        Arc::new(Mutex::new(crate::completion::CompletionRegistry::new()));
715                    sessions.insert(
716                        session_id.clone(),
717                        RuntimeSessionEntry {
718                            driver: driver.clone(),
719                            ops_lifecycle: recovered_ops.clone(),
720                            epoch_id: recovered_epoch,
721                            cursor_state: recovered_cursors,
722                            completions: completions.clone(),
723                            phase: RegistrationPhase::Queuing,
724                            detached_wake: None,
725                            keep_alive: false,
726                            comms_runtime: None,
727                        },
728                    );
729                    (driver, completions, recovered_ops)
730                }
731            };
732
733        let should_wake = {
734            let mut driver_guard = driver.lock().await;
735            if let Err(error) = driver_guard.attach() {
736                let repaired = if error.from == RuntimeState::Attached
737                    && error.to == RuntimeState::Attached
738                {
739                    tracing::warn!(
740                        %session_id,
741                        error = %error,
742                        "runtime driver remained attached without a live published loop; detaching and retrying attachment"
743                    );
744                    match driver_guard.detach() {
745                        Ok(_) => match driver_guard.attach() {
746                            Ok(()) => true,
747                            Err(retry_error) => {
748                                tracing::warn!(
749                                    %session_id,
750                                    error = %retry_error,
751                                    "failed to re-attach runtime driver after repairing stale attachment state"
752                                );
753                                false
754                            }
755                        },
756                        Err(detach_error) => {
757                            tracing::warn!(
758                                %session_id,
759                                error = %detach_error,
760                                "failed to detach stale attached runtime driver before retrying attachment"
761                            );
762                            false
763                        }
764                    }
765                } else {
766                    false
767                };
768                if !repaired {
769                    tracing::warn!(
770                        %session_id,
771                        error = %error,
772                        "failed to attach runtime driver before publishing loop attachment"
773                    );
774                    self.revert_attaching(&session_id).await;
775                    return;
776                }
777            }
778            !driver_guard.as_driver().active_input_ids().is_empty()
779        };
780
781        // Wire detached-op wake state: the ops lifecycle registry will set
782        // `pending = true` and fire `notify` when a BackgroundToolOp reaches
783        // terminal. The waker task (spawned after attachment below) then injects
784        // a continuation through the canonical ingress seam.
785        let detached_wake_state = Arc::new(crate::detached_wake::DetachedWakeState::new());
786        ops_lifecycle.set_detached_wake(Arc::clone(&detached_wake_state));
787
788        // Wire persistence channel if a durable store is available.
789        if let Some(ref store) = self.store {
790            let (persist_tx, mut persist_rx) =
791                crate::tokio::sync::mpsc::channel::<crate::ops_lifecycle::PersistedOpsSnapshot>(16);
792            let entry_epoch_id = {
793                let sessions = self.sessions.read().await;
794                sessions
795                    .get(&session_id)
796                    .map(|e| e.epoch_id.clone())
797                    .unwrap_or_else(meerkat_core::RuntimeEpochId::new)
798            };
799            let entry_cursor = {
800                let sessions = self.sessions.read().await;
801                sessions
802                    .get(&session_id)
803                    .map(|e| Arc::clone(&e.cursor_state))
804                    .unwrap_or_else(|| Arc::new(meerkat_core::EpochCursorState::new()))
805            };
806            ops_lifecycle.set_persistence_channel(persist_tx, entry_epoch_id, entry_cursor);
807
808            // Spawn persistence task
809            let store_clone = Arc::clone(store);
810            let runtime_id = crate::identifiers::LogicalRuntimeId::new(session_id.to_string());
811            crate::tokio::spawn(async move {
812                while let Some(snapshot) = persist_rx.recv().await {
813                    if let Err(e) = store_clone
814                        .persist_ops_lifecycle(&runtime_id, &snapshot)
815                        .await
816                    {
817                        tracing::warn!(
818                            error = %e,
819                            "failed to persist ops lifecycle snapshot"
820                        );
821                    }
822                }
823            });
824        }
825
826        // Get the completion feed from the registry for feed-based idle wake.
827        let completion_feed = ops_lifecycle.completion_feed_handle();
828
829        let (wake_tx, wake_rx) = mpsc::channel(16);
830        let (control_tx, control_rx) = mpsc::channel(16);
831        let entry_cursor_state = {
832            let sessions = self.sessions.read().await;
833            sessions
834                .get(&session_id)
835                .map(|e| Arc::clone(&e.cursor_state))
836        };
837        let mut pending_loop_handle =
838            Some(crate::runtime_loop::spawn_runtime_loop_with_completions(
839                driver.clone(),
840                executor,
841                wake_rx,
842                control_rx,
843                Some(completions.clone()),
844                Some(Arc::clone(&detached_wake_state)),
845                Some(completion_feed),
846                entry_cursor_state,
847            ));
848
849        let (published, detach_after_abort) = {
850            let mut sessions = self.sessions.write().await;
851            match sessions.get_mut(&session_id) {
852                None => (false, true),
853                Some(entry) => {
854                    entry.clear_dead_attachment();
855                    if entry.has_live_attachment() {
856                        (false, false)
857                    } else if !Arc::ptr_eq(&entry.driver, &driver)
858                        || !Arc::ptr_eq(&entry.completions, &completions)
859                    {
860                        tracing::warn!(
861                            %session_id,
862                            "runtime session entry changed while wiring executor; aborting stale loop attachment"
863                        );
864                        (false, true)
865                    } else {
866                        match pending_loop_handle.take() {
867                            Some(loop_handle) => {
868                                entry.attach_runtime_loop(wake_tx.clone(), control_tx, loop_handle);
869                                entry.detached_wake = Some(Arc::clone(&detached_wake_state));
870                                (true, false)
871                            }
872                            None => {
873                                tracing::error!(
874                                    %session_id,
875                                    "runtime loop handle missing during attachment publish"
876                                );
877                                (false, true)
878                            }
879                        }
880                    }
881                }
882            }
883        };
884
885        if !published {
886            if let Some(loop_handle) = pending_loop_handle.take() {
887                loop_handle.abort();
888            }
889            if detach_after_abort {
890                let mut driver_guard = driver.lock().await;
891                let _ = driver_guard.detach();
892            }
893            self.revert_attaching(&session_id).await;
894            return;
895        }
896
897        if should_wake {
898            let _ = wake_tx.try_send(());
899        }
900    }
901
902    /// Revert `Attaching → Queuing` if attachment failed. This unblocks
903    /// future `ensure_session_with_executor` callers that would otherwise
904    /// see `Attaching` forever and return early.
905    async fn revert_attaching(&self, session_id: &SessionId) {
906        let mut sessions = self.sessions.write().await;
907        if let Some(entry) = sessions.get_mut(session_id)
908            && matches!(entry.phase, RegistrationPhase::Attaching)
909        {
910            entry.phase = RegistrationPhase::Queuing;
911        }
912    }
913
914    /// Unregister a session's runtime driver.
915    ///
916    /// Detaches the executor (Attached → Idle) before removal, then drops
917    /// the wake channel sender, which causes the RuntimeLoop to exit.
918    pub async fn unregister_session(&self, session_id: &SessionId) {
919        let entry = {
920            let mut sessions = self.sessions.write().await;
921            let mut slots = self.comms_drain_slots.write().await;
922            // Remove + abort drain slot before dropping session binding so
923            // slot keys remain a subset of registered-session keys.
924            if let Some(mut slot) = slots.remove(session_id) {
925                abort_slot(&mut slot);
926            }
927            sessions.remove(session_id)
928        };
929
930        if let Some(entry) = entry {
931            let mut driver = entry.driver.lock().await;
932            let _ = driver.detach(); // Attached → Idle (no-op if not Attached)
933            drop(driver);
934
935            let mut completions = entry.completions.lock().await;
936            completions.resolve_all_terminated("runtime session unregistered");
937        }
938    }
939
940    /// Check whether a runtime driver is already registered for a session.
941    pub async fn contains_session(&self, session_id: &SessionId) -> bool {
942        self.sessions.read().await.contains_key(session_id)
943    }
944
945    /// Check whether a session has an active RuntimeLoop or attachment in
946    /// progress. Returns `false` only for `Queuing` sessions (registered via
947    /// `prepare_bindings()` with no executor) and unknown sessions.
948    pub async fn session_has_executor(&self, session_id: &SessionId) -> bool {
949        let sessions = self.sessions.read().await;
950        sessions
951            .get(session_id)
952            .map(RuntimeSessionEntry::has_attachment_or_attaching)
953            .unwrap_or(false)
954    }
955
956    /// Check whether a session already has a comms runtime configured.
957    ///
958    /// Returns `true` if `update_peer_ingress_context` was previously called
959    /// with a non-None comms runtime for this session (e.g., via
960    /// `SessionRuntime::enable_comms_drain`).
961    pub async fn session_has_comms(&self, session_id: &SessionId) -> bool {
962        let sessions = self.sessions.read().await;
963        sessions
964            .get(session_id)
965            .map(|entry| entry.comms_runtime.is_some())
966            .unwrap_or(false)
967    }
968
969    /// Cancel the currently-running turn for a registered session.
970    pub async fn interrupt_current_run(
971        &self,
972        session_id: &SessionId,
973    ) -> Result<(), RuntimeDriverError> {
974        let (driver, control_tx) = {
975            let sessions = self.sessions.read().await;
976            let entry = sessions
977                .get(session_id)
978                .ok_or(RuntimeDriverError::NotReady {
979                    state: RuntimeState::Destroyed,
980                })?;
981            (entry.driver.clone(), entry.control_sender())
982        };
983
984        let Some(control_tx) = control_tx else {
985            let state = {
986                let driver = driver.lock().await;
987                driver.as_driver().runtime_state()
988            };
989            return Err(RuntimeDriverError::NotReady { state });
990        };
991        control_tx
992            .send(RunControlCommand::CancelCurrentRun {
993                reason: "mob interrupt".to_string(),
994            })
995            .await
996            .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
997    }
998
999    /// Stop the attached runtime executor through the out-of-band control
1000    /// channel. When no loop is attached yet, a stop command is applied directly
1001    /// against the driver so queued work is still terminated consistently.
1002    pub async fn stop_runtime_executor(
1003        &self,
1004        session_id: &SessionId,
1005        command: RunControlCommand,
1006    ) -> Result<(), RuntimeDriverError> {
1007        let (driver, completions, control_tx) = {
1008            let sessions = self.sessions.read().await;
1009            let entry = sessions
1010                .get(session_id)
1011                .ok_or(RuntimeDriverError::NotReady {
1012                    state: RuntimeState::Destroyed,
1013                })?;
1014            (
1015                entry.driver.clone(),
1016                entry.completions.clone(),
1017                entry.control_sender(),
1018            )
1019        };
1020
1021        if let Some(control_tx) = control_tx
1022            && control_tx.send(command.clone()).await.is_ok()
1023        {
1024            return Ok(());
1025        }
1026
1027        if matches!(command, RunControlCommand::StopRuntimeExecutor { .. }) {
1028            let mut driver = driver.lock().await;
1029            driver
1030                .as_driver_mut()
1031                .on_runtime_control(crate::traits::RuntimeControlCommand::Stop)
1032                .await?;
1033            drop(driver);
1034            let mut completions = completions.lock().await;
1035            completions.resolve_all_terminated("runtime stopped");
1036            drop(completions);
1037
1038            // No live control sender was available for this stop path. Scrub any
1039            // dead attachment capabilities that may still be published.
1040            let mut sessions = self.sessions.write().await;
1041            if let Some(entry) = sessions.get_mut(session_id) {
1042                entry.clear_dead_attachment();
1043            }
1044            Ok(())
1045        } else {
1046            Err(RuntimeDriverError::Internal(
1047                "failed to send stop: runtime loop is unavailable".into(),
1048            ))
1049        }
1050    }
1051
1052    /// Accept an input and execute it synchronously through the runtime driver.
1053    ///
1054    /// This is useful for surfaces that need the legacy request/response shape
1055    /// while still preserving v9 input lifecycle semantics.
1056    pub async fn accept_input_and_run<T, F, Fut>(
1057        &self,
1058        session_id: &SessionId,
1059        input: Input,
1060        op: F,
1061    ) -> Result<T, RuntimeDriverError>
1062    where
1063        F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
1064        Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
1065    {
1066        let driver = {
1067            let sessions = self.sessions.read().await;
1068            sessions
1069                .get(session_id)
1070                .ok_or(RuntimeDriverError::NotReady {
1071                    state: RuntimeState::Destroyed,
1072                })?
1073                .driver
1074                .clone()
1075        };
1076
1077        let (input_id, run_id, primitive) = {
1078            let mut driver = driver.lock().await;
1079            if !driver.is_idle_or_attached() {
1080                return Err(RuntimeDriverError::NotReady {
1081                    state: driver.as_driver().runtime_state(),
1082                });
1083            }
1084
1085            let active_input_ids = driver.as_driver().active_input_ids();
1086            if !active_input_ids.is_empty() {
1087                let duplicate_active_input =
1088                    input.header().idempotency_key.as_ref().and_then(|key| {
1089                        active_input_ids.iter().find(|active_id| {
1090                            driver
1091                                .as_driver()
1092                                .input_state(active_id)
1093                                .and_then(|state| state.idempotency_key.as_ref())
1094                                == Some(key)
1095                        })
1096                    });
1097                if let Some(existing_id) = duplicate_active_input {
1098                    return Err(RuntimeDriverError::ValidationFailed {
1099                        reason: format!(
1100                            "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
1101                        ),
1102                    });
1103                }
1104                return Err(RuntimeDriverError::NotReady {
1105                    state: driver.as_driver().runtime_state(),
1106                });
1107            }
1108
1109            let outcome = driver.as_driver_mut().accept_input(input).await?;
1110            let input_id = match outcome {
1111                AcceptOutcome::Accepted { input_id, .. } => input_id,
1112                AcceptOutcome::Deduplicated { existing_id, .. } => {
1113                    return Err(RuntimeDriverError::ValidationFailed {
1114                        reason: format!(
1115                            "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
1116                        ),
1117                    });
1118                }
1119                AcceptOutcome::Rejected { reason } => {
1120                    return Err(RuntimeDriverError::ValidationFailed {
1121                        reason: reason.to_string(),
1122                    });
1123                }
1124            };
1125
1126            if !driver.is_idle_or_attached() {
1127                return Err(RuntimeDriverError::NotReady {
1128                    state: driver.as_driver().runtime_state(),
1129                });
1130            }
1131
1132            let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
1133                RuntimeDriverError::Internal("accepted input was not queued for execution".into())
1134            })?;
1135            if dequeued_id != input_id {
1136                return Err(RuntimeDriverError::NotReady {
1137                    state: driver.as_driver().runtime_state(),
1138                });
1139            }
1140            let run_id = RunId::new();
1141            driver.start_run(run_id.clone()).map_err(|err| {
1142                RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
1143            })?;
1144            driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
1145                RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
1146            })?;
1147            let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
1148            (input_id, run_id, primitive)
1149        };
1150
1151        match op(run_id.clone(), primitive.clone()).await {
1152            Ok((result, output)) => {
1153                let mut driver = driver.lock().await;
1154                if let Err(err) = driver
1155                    .as_driver_mut()
1156                    .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
1157                        run_id: run_id.clone(),
1158                        receipt: output.receipt,
1159                        session_snapshot: output.session_snapshot,
1160                    })
1161                    .await
1162                {
1163                    if let Err(unwind_err) = driver
1164                        .as_driver_mut()
1165                        .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
1166                            run_id,
1167                            error: format!("boundary commit failed: {err}"),
1168                            recoverable: true,
1169                        })
1170                        .await
1171                    {
1172                        return Err(RuntimeDriverError::Internal(format!(
1173                            "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
1174                        )));
1175                    }
1176                    return Err(RuntimeDriverError::Internal(format!(
1177                        "runtime boundary commit failed: {err}"
1178                    )));
1179                }
1180                if let Err(err) = driver
1181                    .as_driver_mut()
1182                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
1183                        run_id,
1184                        consumed_input_ids: vec![input_id],
1185                    })
1186                    .await
1187                {
1188                    drop(driver);
1189                    self.unregister_session(session_id).await;
1190                    return Err(RuntimeDriverError::Internal(format!(
1191                        "failed to persist runtime completion snapshot: {err}"
1192                    )));
1193                }
1194                Ok(result)
1195            }
1196            Err(err) => {
1197                let mut driver = driver.lock().await;
1198                if let Err(run_err) = driver
1199                    .as_driver_mut()
1200                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
1201                        run_id,
1202                        error: err.to_string(),
1203                        recoverable: true,
1204                    })
1205                    .await
1206                {
1207                    drop(driver);
1208                    self.unregister_session(session_id).await;
1209                    return Err(RuntimeDriverError::Internal(format!(
1210                        "failed to persist runtime failure snapshot: {run_err}"
1211                    )));
1212                }
1213                Err(err)
1214            }
1215        }
1216    }
1217
1218    /// Accept an input and return a completion handle that resolves when the
1219    /// input reaches a terminal state (Consumed or Abandoned).
1220    ///
1221    /// Returns `(AcceptOutcome, Option<CompletionHandle>)`:
1222    /// - `(Accepted, Some(handle))` — await handle for result
1223    /// - `(Accepted, None)` — input reached a terminal state during admission
1224    /// - `(Deduplicated, Some(handle))` — joined in-flight waiter
1225    /// - `(Deduplicated, None)` — input already terminal; no waiter needed
1226    /// - `(Rejected, _)` — returned as `Err(ValidationFailed)`
1227    pub async fn accept_input_with_completion(
1228        &self,
1229        session_id: &SessionId,
1230        input: Input,
1231    ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
1232    {
1233        let (driver, completions, wake_tx, _control_tx) = {
1234            let sessions = self.sessions.read().await;
1235            let entry = sessions
1236                .get(session_id)
1237                .ok_or(RuntimeDriverError::NotReady {
1238                    state: RuntimeState::Destroyed,
1239                })?;
1240            (
1241                entry.driver.clone(),
1242                entry.completions.clone(),
1243                entry.wake_sender(),
1244                entry.control_sender(),
1245            )
1246        };
1247
1248        let (outcome, signal, handle) = {
1249            let mut driver = driver.lock().await;
1250            let result = driver.as_driver_mut().accept_input(input).await?;
1251
1252            match &result {
1253                AcceptOutcome::Accepted { input_id, .. } => {
1254                    let is_terminal = driver
1255                        .as_driver()
1256                        .input_state(input_id)
1257                        .map(|state| state.current_state().is_terminal())
1258                        .unwrap_or(true);
1259                    let handle = if is_terminal {
1260                        None
1261                    } else {
1262                        Some({
1263                            let mut completions = completions.lock().await;
1264                            completions.register(input_id.clone())
1265                        })
1266                    };
1267                    let signal = driver.take_post_admission_signal();
1268                    (result, signal, handle)
1269                }
1270                AcceptOutcome::Deduplicated { existing_id, .. } => {
1271                    // Check if the existing input is already terminal
1272                    let existing_state = driver.as_driver().input_state(existing_id);
1273                    let is_terminal = existing_state
1274                        .map(|s| s.current_state().is_terminal())
1275                        .unwrap_or(true); // missing state = already cleaned up = terminal
1276
1277                    if is_terminal {
1278                        // Input already processed — no handle, no waiter
1279                        (
1280                            result,
1281                            crate::driver::ephemeral::PostAdmissionSignal::None,
1282                            None,
1283                        )
1284                    } else {
1285                        // In-flight — join existing waiters via multi-waiter Vec
1286                        let handle = {
1287                            let mut completions = completions.lock().await;
1288                            completions.register(existing_id.clone())
1289                        };
1290                        (
1291                            result,
1292                            crate::driver::ephemeral::PostAdmissionSignal::None,
1293                            Some(handle),
1294                        )
1295                    }
1296                }
1297                AcceptOutcome::Rejected { reason } => {
1298                    return Err(RuntimeDriverError::ValidationFailed {
1299                        reason: reason.to_string(),
1300                    });
1301                }
1302            }
1303        };
1304
1305        if signal.should_wake()
1306            && let Some(ref wake_tx) = wake_tx
1307        {
1308            let _ = wake_tx.try_send(());
1309        }
1310
1311        Ok((outcome, handle))
1312    }
1313
1314    /// Accept an input but intentionally do not wake the runtime loop.
1315    ///
1316    /// This is reserved for explicitly queued-only surface contracts that
1317    /// stage work for the next turn boundary instead of waking an idle session
1318    /// immediately.
1319    pub async fn accept_input_without_wake(
1320        &self,
1321        session_id: &SessionId,
1322        input: Input,
1323    ) -> Result<AcceptOutcome, RuntimeDriverError> {
1324        let driver = {
1325            let sessions = self.sessions.read().await;
1326            let entry = sessions
1327                .get(session_id)
1328                .ok_or(RuntimeDriverError::NotReady {
1329                    state: RuntimeState::Destroyed,
1330                })?;
1331            entry.driver.clone()
1332        };
1333
1334        let outcome = {
1335            let mut driver = driver.lock().await;
1336            let result = driver.as_driver_mut().accept_input(input).await?;
1337            let signal = driver.take_post_admission_signal();
1338            debug_assert!(
1339                !signal.should_process_immediately(),
1340                "queue-only admission unexpectedly requested immediate processing"
1341            );
1342            // Intentionally discard the signal — this is the no-wake path.
1343            result
1344        };
1345
1346        Ok(outcome)
1347    }
1348
1349    /// Get the shared ops lifecycle registry for a session/runtime instance.
1350    pub async fn ops_lifecycle_registry(
1351        &self,
1352        session_id: &SessionId,
1353    ) -> Option<Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>> {
1354        let sessions = self.sessions.read().await;
1355        sessions
1356            .get(session_id)
1357            .map(|e| Arc::clone(&e.ops_lifecycle))
1358    }
1359
1360    /// Prepare canonical runtime bindings for a session.
1361    ///
1362    /// This is the single canonical helper that replaces the hand-rolled
1363    /// `register_session()` + `ops_lifecycle_registry()` + manual threading
1364    /// dance. All runtime-backed surfaces should call this instead.
1365    ///
1366    /// The method is idempotent: if the session is already registered, it
1367    /// returns bindings from the existing entry. The epoch_id is stable
1368    /// across repeated calls for the same session.
1369    pub async fn prepare_bindings(
1370        &self,
1371        session_id: SessionId,
1372    ) -> Result<meerkat_core::SessionRuntimeBindings, RuntimeBindingsError> {
1373        self.register_session(session_id.clone()).await;
1374        let sessions = self.sessions.read().await;
1375        let entry = sessions
1376            .get(&session_id)
1377            .ok_or(RuntimeBindingsError::SessionNotFound(session_id.clone()))?;
1378        Ok(meerkat_core::SessionRuntimeBindings {
1379            session_id,
1380            epoch_id: entry.epoch_id.clone(),
1381            ops_lifecycle: Arc::clone(&entry.ops_lifecycle)
1382                as Arc<dyn meerkat_core::OpsLifecycleRegistry>,
1383            cursor_state: Arc::clone(&entry.cursor_state),
1384        })
1385    }
1386
1387    /// Update the session's peer-ingress context, then reconcile the canonical
1388    /// drain lifecycle.
1389    ///
1390    /// Surfaces may call this when they learn or change keep_alive/comms
1391    /// context, but the adapter owns the actual drain-mode decision.
1392    pub async fn update_peer_ingress_context(
1393        self: &Arc<Self>,
1394        session_id: &SessionId,
1395        keep_alive: bool,
1396        comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
1397    ) -> bool {
1398        {
1399            let mut sessions = self.sessions.write().await;
1400            let Some(entry) = sessions.get_mut(session_id) else {
1401                tracing::warn!(
1402                    %session_id,
1403                    "refusing to configure comms drain for unregistered session"
1404                );
1405                return false;
1406            };
1407            entry.keep_alive = keep_alive;
1408            entry.comms_runtime = comms_runtime;
1409        }
1410        self.reconcile_peer_ingress(session_id).await
1411    }
1412
1413    async fn reconcile_peer_ingress(self: &Arc<Self>, session_id: &SessionId) -> bool {
1414        let (keep_alive, comms_runtime) = {
1415            let mut sessions = self.sessions.write().await;
1416            let Some(entry) = sessions.get_mut(session_id) else {
1417                return false;
1418            };
1419            entry.clear_dead_attachment();
1420            (entry.keep_alive, entry.comms_runtime.clone())
1421        };
1422
1423        let state = match self.runtime_state(session_id).await {
1424            Ok(state) => state,
1425            Err(_) => RuntimeState::Destroyed,
1426        };
1427        let desired = desired_peer_ingress_mode(state, comms_runtime.is_some(), keep_alive);
1428
1429        let Some(comms) = comms_runtime else {
1430            if desired.is_none() {
1431                self.abort_comms_drain(session_id).await;
1432            }
1433            return false;
1434        };
1435
1436        let mut slots = self.comms_drain_slots.write().await;
1437        let slot = slots
1438            .entry(session_id.clone())
1439            .or_insert_with(CommsDrainSlot::new);
1440
1441        let Some(mode) = desired else {
1442            abort_slot(slot);
1443            return false;
1444        };
1445
1446        let result =
1447            match protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode) {
1448                Ok(r) => r,
1449                Err(e) => {
1450                    tracing::trace!(error = %e, "comms drain authority rejected EnsureRunning");
1451                    return false;
1452                }
1453            };
1454
1455        // Execute effects from the transition
1456        for effect in &result.effects {
1457            match effect {
1458                CommsDrainLifecycleEffect::SpawnDrainTask { .. } => {
1459                    let handle = crate::comms_drain::spawn_comms_drain(
1460                        Arc::clone(self),
1461                        session_id.clone(),
1462                        comms.clone(),
1463                    );
1464                    slot.handle = Some(handle);
1465                }
1466                CommsDrainLifecycleEffect::AbortDrainTask => {
1467                    if let Some(handle) = slot.handle.take() {
1468                        handle.abort();
1469                    }
1470                }
1471            }
1472        }
1473
1474        let Some(obligation) = result.obligation else {
1475            tracing::warn!(
1476                %session_id,
1477                "comms drain spawn transition emitted no obligation"
1478            );
1479            return false;
1480        };
1481
1482        // The runtime spawns the drain task synchronously (as a tokio task
1483        // above), so we immediately close the spawn obligation.
1484        match protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation) {
1485            Ok(_effects) => {}
1486            Err(e) => {
1487                tracing::trace!(error = %e, "comms drain authority rejected TaskSpawned");
1488            }
1489        }
1490        true
1491    }
1492
1493    /// Notify the authority that a drain task has exited with the given reason.
1494    ///
1495    /// Called from drain task exit paths (or by wrappers that detect task
1496    /// completion). The authority decides whether to enter ExitedRespawnable
1497    /// (PersistentHost + Failed) or Stopped.
1498    pub async fn notify_comms_drain_exited(
1499        self: &Arc<Self>,
1500        session_id: &SessionId,
1501        reason: DrainExitReason,
1502    ) {
1503        {
1504            let mut slots = self.comms_drain_slots.write().await;
1505            if let Some(slot) = slots.get_mut(session_id) {
1506                slot.handle.take(); // clean up finished handle
1507                match protocol_comms_drain_spawn::notify_task_exited(&mut slot.authority, reason) {
1508                    Ok(_effects) => {}
1509                    Err(e) => {
1510                        tracing::warn!(error = %e, "comms drain authority rejected TaskExited");
1511                    }
1512                }
1513            }
1514        }
1515        let _ = self.reconcile_peer_ingress(session_id).await;
1516    }
1517
1518    /// Abort all active comms drain tasks.
1519    pub async fn abort_comms_drains(&self) {
1520        let mut slots = self.comms_drain_slots.write().await;
1521        for (_, slot) in slots.iter_mut() {
1522            abort_slot(slot);
1523        }
1524    }
1525
1526    /// Abort the comms drain task for a specific session.
1527    pub async fn abort_comms_drain(&self, session_id: &SessionId) {
1528        let mut slots = self.comms_drain_slots.write().await;
1529        if let Some(slot) = slots.get_mut(session_id) {
1530            abort_slot(slot);
1531        }
1532    }
1533
1534    /// Wait for a session's comms drain task to finish.
1535    ///
1536    /// Returns immediately if no drain is active for the session.
1537    /// If the task already notified the authority (normal exit), this is a no-op
1538    /// for authority state. If the task panicked without notifying, this submits
1539    /// `TaskExited { Failed }` as a safety net.
1540    pub async fn wait_comms_drain(&self, session_id: &SessionId) {
1541        let handle = {
1542            let mut slots = self.comms_drain_slots.write().await;
1543            slots
1544                .get_mut(session_id)
1545                .and_then(|slot| slot.handle.take())
1546        };
1547        if let Some(handle) = handle {
1548            let _ = handle.await;
1549        }
1550        // Safety net: if the authority is still Running after the task exited,
1551        // the task panicked without notifying. Submit Failed to prevent the
1552        // authority from being stuck in Running forever.
1553        let mut slots = self.comms_drain_slots.write().await;
1554        if let Some(slot) = slots.get_mut(session_id)
1555            && slot.authority.phase()
1556                == meerkat_core::comms_drain_lifecycle_authority::CommsDrainPhase::Running
1557        {
1558            tracing::warn!(
1559                "comms_drain: task exited without notifying authority (likely panicked), \
1560                 submitting Failed safety net"
1561            );
1562            match protocol_comms_drain_spawn::notify_task_exited(
1563                &mut slot.authority,
1564                DrainExitReason::Failed,
1565            ) {
1566                Ok(effects) => {
1567                    apply_runtime_drain_effects(slot, &effects);
1568                }
1569                Err(e) => {
1570                    tracing::warn!(error = %e, "comms drain authority rejected safety-net TaskExited");
1571                }
1572            }
1573        }
1574    }
1575}
1576
1577#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1578#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1579impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
1580    fn runtime_mode(&self) -> RuntimeMode {
1581        self.mode
1582    }
1583
1584    async fn accept_input(
1585        &self,
1586        session_id: &SessionId,
1587        input: Input,
1588    ) -> Result<AcceptOutcome, RuntimeDriverError> {
1589        let (driver, wake_tx, _control_tx) = {
1590            let sessions = self.sessions.read().await;
1591            let entry = sessions
1592                .get(session_id)
1593                .ok_or(RuntimeDriverError::NotReady {
1594                    state: RuntimeState::Destroyed,
1595                })?;
1596            (
1597                entry.driver.clone(),
1598                entry.wake_sender(),
1599                entry.control_sender(),
1600            )
1601        };
1602
1603        // Accept input and drain the typed post-admission signal under the driver lock.
1604        let (outcome, signal) = {
1605            let mut driver = driver.lock().await;
1606            let result = driver.as_driver_mut().accept_input(input).await?;
1607            let signal = driver.take_post_admission_signal();
1608            (result, signal)
1609        };
1610
1611        // Deliver typed post-admission signals.
1612        if signal.should_wake() {
1613            match wake_tx {
1614                Some(ref wake_tx) => {
1615                    let _ = wake_tx.try_send(());
1616                }
1617                None => {
1618                    tracing::warn!(
1619                        %session_id,
1620                        "input accepted but runtime loop is not attached — \
1621                         wake signal dropped, input will remain queued until \
1622                         a loop is re-attached"
1623                    );
1624                }
1625            }
1626        }
1627
1628        Ok(outcome)
1629    }
1630
1631    async fn accept_input_with_completion(
1632        &self,
1633        session_id: &SessionId,
1634        input: Input,
1635    ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
1636    {
1637        RuntimeSessionAdapter::accept_input_with_completion(self, session_id, input).await
1638    }
1639
1640    async fn runtime_state(
1641        &self,
1642        session_id: &SessionId,
1643    ) -> Result<RuntimeState, RuntimeDriverError> {
1644        let driver = {
1645            let sessions = self.sessions.read().await;
1646            let entry = sessions
1647                .get(session_id)
1648                .ok_or(RuntimeDriverError::NotReady {
1649                    state: RuntimeState::Destroyed,
1650                })?;
1651            entry.driver.clone()
1652        };
1653        let driver = driver.lock().await;
1654        Ok(driver.as_driver().runtime_state())
1655    }
1656
1657    async fn retire_runtime(
1658        &self,
1659        session_id: &SessionId,
1660    ) -> Result<RetireReport, RuntimeDriverError> {
1661        let (driver_handle, completions, wake_tx) = {
1662            let sessions = self.sessions.read().await;
1663            let entry = sessions
1664                .get(session_id)
1665                .ok_or(RuntimeDriverError::NotReady {
1666                    state: RuntimeState::Destroyed,
1667                })?;
1668            (
1669                entry.driver.clone(),
1670                entry.completions.clone(),
1671                entry.wake_sender(),
1672            )
1673        };
1674        let mut driver = driver_handle.lock().await;
1675        let mut report = driver.as_driver_mut().retire().await?;
1676        drop(driver); // Release driver lock before waking
1677
1678        if report.inputs_pending_drain > 0 {
1679            // Wake the runtime loop so it drains already-queued inputs.
1680            // Retired state allows processing but rejects new accepts.
1681            if let Some(ref wake_tx) = wake_tx
1682                && wake_tx.send(()).await.is_ok()
1683            {
1684                return Ok(report);
1685            }
1686
1687            // No live loop can drain this retired queue. Abandon the queued work
1688            // now so later recovery/upgrade paths do not execute inputs whose
1689            // waiters were already terminated.
1690            let mut driver = driver_handle.lock().await;
1691            let abandoned = driver
1692                .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1693                .await?;
1694            drop(driver);
1695            let mut completions = completions.lock().await;
1696            completions.resolve_all_terminated("retired without runtime loop");
1697            report.inputs_abandoned += abandoned;
1698            report.inputs_pending_drain = 0;
1699        }
1700
1701        Ok(report)
1702    }
1703
1704    async fn reset_runtime(
1705        &self,
1706        session_id: &SessionId,
1707    ) -> Result<ResetReport, RuntimeDriverError> {
1708        let (driver, completions) = {
1709            let sessions = self.sessions.read().await;
1710            let entry = sessions
1711                .get(session_id)
1712                .ok_or(RuntimeDriverError::NotReady {
1713                    state: RuntimeState::Destroyed,
1714                })?;
1715            (entry.driver.clone(), entry.completions.clone())
1716        };
1717        let mut driver = driver.lock().await;
1718        if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
1719            return Err(RuntimeDriverError::NotReady {
1720                state: RuntimeState::Running,
1721            });
1722        }
1723        let report = driver.as_driver_mut().reset().await?;
1724        drop(driver);
1725
1726        // Resolve all pending completion waiters — reset discards all queued work
1727        let mut completions = completions.lock().await;
1728        completions.resolve_all_terminated("runtime reset");
1729
1730        Ok(report)
1731    }
1732
1733    async fn input_state(
1734        &self,
1735        session_id: &SessionId,
1736        input_id: &InputId,
1737    ) -> Result<Option<InputState>, RuntimeDriverError> {
1738        let driver = {
1739            let sessions = self.sessions.read().await;
1740            let entry = sessions
1741                .get(session_id)
1742                .ok_or(RuntimeDriverError::NotReady {
1743                    state: RuntimeState::Destroyed,
1744                })?;
1745            entry.driver.clone()
1746        };
1747        let driver = driver.lock().await;
1748        Ok(driver.as_driver().input_state(input_id).cloned())
1749    }
1750
1751    async fn list_active_inputs(
1752        &self,
1753        session_id: &SessionId,
1754    ) -> Result<Vec<InputId>, RuntimeDriverError> {
1755        let driver = {
1756            let sessions = self.sessions.read().await;
1757            let entry = sessions
1758                .get(session_id)
1759                .ok_or(RuntimeDriverError::NotReady {
1760                    state: RuntimeState::Destroyed,
1761                })?;
1762            entry.driver.clone()
1763        };
1764        let driver = driver.lock().await;
1765        Ok(driver.as_driver().active_input_ids())
1766    }
1767}
1768
1769// ---------------------------------------------------------------------------
1770// RuntimeControlPlane implementation
1771// ---------------------------------------------------------------------------
1772
1773impl RuntimeSessionAdapter {
1774    /// Resolve a LogicalRuntimeId to a SessionId for internal lookup.
1775    ///
1776    /// The adapter uses `LogicalRuntimeId::new(session_id.to_string())` when
1777    /// creating drivers, so runtime IDs are UUID strings that parse back to
1778    /// SessionId.
1779    fn resolve_session_id(
1780        runtime_id: &LogicalRuntimeId,
1781    ) -> Result<SessionId, RuntimeControlPlaneError> {
1782        runtime_id
1783            .0
1784            .parse::<uuid::Uuid>()
1785            .map(SessionId)
1786            .map_err(|_| RuntimeControlPlaneError::NotFound(runtime_id.clone()))
1787    }
1788
1789    /// Look up the session entry for a runtime ID, returning a control-plane error
1790    /// if not found.
1791    async fn lookup_entry(
1792        &self,
1793        runtime_id: &LogicalRuntimeId,
1794    ) -> Result<
1795        (
1796            SessionId,
1797            SharedDriver,
1798            SharedCompletionRegistry,
1799            Option<mpsc::Sender<()>>,
1800        ),
1801        RuntimeControlPlaneError,
1802    > {
1803        let session_id = Self::resolve_session_id(runtime_id)?;
1804        let sessions = self.sessions.read().await;
1805        let entry = sessions
1806            .get(&session_id)
1807            .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))?;
1808        Ok((
1809            session_id,
1810            entry.driver.clone(),
1811            entry.completions.clone(),
1812            entry.wake_sender(),
1813        ))
1814    }
1815}
1816
1817#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1818#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1819impl crate::traits::RuntimeControlPlane for RuntimeSessionAdapter {
1820    async fn ingest(
1821        &self,
1822        runtime_id: &LogicalRuntimeId,
1823        input: Input,
1824    ) -> Result<AcceptOutcome, RuntimeControlPlaneError> {
1825        let (session_id, driver, _completions, wake_tx, _control_tx) = {
1826            let (sid, d, c, w) = self.lookup_entry(runtime_id).await?;
1827            let ctrl = {
1828                let sessions = self.sessions.read().await;
1829                sessions
1830                    .get(&sid)
1831                    .and_then(RuntimeSessionEntry::control_sender)
1832            };
1833            (sid, d, c, w, ctrl)
1834        };
1835        let _ = session_id;
1836
1837        let (outcome, signal) = {
1838            let mut drv = driver.lock().await;
1839            let result = drv
1840                .as_driver_mut()
1841                .accept_input(input)
1842                .await
1843                .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1844            let signal = drv.take_post_admission_signal();
1845            (result, signal)
1846        };
1847
1848        if signal.should_wake()
1849            && let Some(ref tx) = wake_tx
1850        {
1851            let _ = tx.try_send(());
1852        }
1853
1854        Ok(outcome)
1855    }
1856
1857    async fn publish_event(
1858        &self,
1859        event: crate::runtime_event::RuntimeEventEnvelope,
1860    ) -> Result<(), RuntimeControlPlaneError> {
1861        let runtime_id = event.runtime_id.clone();
1862        let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(&runtime_id).await?;
1863
1864        let mut drv = driver.lock().await;
1865        drv.as_driver_mut()
1866            .on_runtime_event(event)
1867            .await
1868            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))
1869    }
1870
1871    async fn retire(
1872        &self,
1873        runtime_id: &LogicalRuntimeId,
1874    ) -> Result<RetireReport, RuntimeControlPlaneError> {
1875        let (session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1876        let _ = session_id;
1877
1878        let mut drv = driver.lock().await;
1879        let mut report = drv
1880            .as_driver_mut()
1881            .retire()
1882            .await
1883            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1884        drop(drv);
1885
1886        if report.inputs_pending_drain > 0 {
1887            if let Some(ref tx) = wake_tx
1888                && tx.send(()).await.is_ok()
1889            {
1890                return Ok(report);
1891            }
1892
1893            // No live loop — abandon queued work
1894            let mut drv = driver.lock().await;
1895            let abandoned = drv
1896                .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1897                .await
1898                .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1899            drop(drv);
1900            let mut comp = completions.lock().await;
1901            comp.resolve_all_terminated("retired without runtime loop");
1902            report.inputs_abandoned += abandoned;
1903            report.inputs_pending_drain = 0;
1904        }
1905
1906        Ok(report)
1907    }
1908
1909    async fn recycle(
1910        &self,
1911        runtime_id: &LogicalRuntimeId,
1912    ) -> Result<RecycleReport, RuntimeControlPlaneError> {
1913        let (_session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1914
1915        let (transferred, active_after_recycle) = {
1916            let mut drv = driver.lock().await;
1917            let state = drv.as_driver().runtime_state();
1918            if matches!(state, RuntimeState::Running) {
1919                return Err(RuntimeControlPlaneError::InvalidState { state });
1920            }
1921            let should_restore_attached = matches!(state, RuntimeState::Attached);
1922
1923            let transferred = match &mut *drv {
1924                DriverEntry::Ephemeral(driver) => driver
1925                    .recycle_preserving_work()
1926                    .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1927                DriverEntry::Persistent(driver) => driver
1928                    .recycle_preserving_work()
1929                    .await
1930                    .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1931            };
1932
1933            if should_restore_attached
1934                && matches!(drv.as_driver().runtime_state(), RuntimeState::Idle)
1935            {
1936                drv.attach()
1937                    .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1938            }
1939
1940            let active_after_recycle = drv.as_driver().active_input_ids();
1941            (transferred, active_after_recycle)
1942        };
1943
1944        // Reconcile existing waiters: keep waiting only for inputs that remain
1945        // active after recycle; terminate stale waiters so they cannot hang.
1946        {
1947            let pending_after: HashSet<InputId> = active_after_recycle.into_iter().collect();
1948            let mut comp = completions.lock().await;
1949            comp.resolve_not_pending(
1950                |input_id| pending_after.contains(input_id),
1951                "recycled input no longer pending",
1952            );
1953        }
1954
1955        // Wake the runtime loop to process re-queued inputs
1956        if let Some(ref tx) = wake_tx {
1957            let _ = tx.try_send(());
1958        }
1959
1960        Ok(RecycleReport {
1961            inputs_transferred: transferred,
1962        })
1963    }
1964
1965    async fn reset(
1966        &self,
1967        runtime_id: &LogicalRuntimeId,
1968    ) -> Result<crate::traits::ResetReport, RuntimeControlPlaneError> {
1969        let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1970
1971        let mut drv = driver.lock().await;
1972        if matches!(drv.as_driver().runtime_state(), RuntimeState::Running) {
1973            return Err(RuntimeControlPlaneError::InvalidState {
1974                state: RuntimeState::Running,
1975            });
1976        }
1977        let report = drv
1978            .as_driver_mut()
1979            .reset()
1980            .await
1981            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1982        drop(drv);
1983
1984        let mut comp = completions.lock().await;
1985        comp.resolve_all_terminated("runtime reset");
1986
1987        Ok(report)
1988    }
1989
1990    async fn recover(
1991        &self,
1992        runtime_id: &LogicalRuntimeId,
1993    ) -> Result<RecoveryReport, RuntimeControlPlaneError> {
1994        let (_session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1995
1996        let mut drv = driver.lock().await;
1997        let report = drv
1998            .as_driver_mut()
1999            .recover()
2000            .await
2001            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
2002        drop(drv);
2003
2004        if let Some(ref tx) = wake_tx {
2005            let _ = tx.try_send(());
2006        }
2007
2008        Ok(report)
2009    }
2010
2011    async fn destroy(
2012        &self,
2013        runtime_id: &LogicalRuntimeId,
2014    ) -> Result<DestroyReport, RuntimeControlPlaneError> {
2015        let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
2016
2017        let mut drv = driver.lock().await;
2018        let report = drv
2019            .as_driver_mut()
2020            .destroy()
2021            .await
2022            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
2023        drop(drv);
2024
2025        let mut comp = completions.lock().await;
2026        comp.resolve_all_terminated("runtime destroyed");
2027
2028        Ok(report)
2029    }
2030
2031    async fn runtime_state(
2032        &self,
2033        runtime_id: &LogicalRuntimeId,
2034    ) -> Result<RuntimeState, RuntimeControlPlaneError> {
2035        let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
2036
2037        let drv = driver.lock().await;
2038        Ok(drv.as_driver().runtime_state())
2039    }
2040
2041    async fn load_boundary_receipt(
2042        &self,
2043        runtime_id: &LogicalRuntimeId,
2044        run_id: &RunId,
2045        sequence: u64,
2046    ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeControlPlaneError> {
2047        match &self.store {
2048            Some(store) => store
2049                .load_boundary_receipt(runtime_id, run_id, sequence)
2050                .await
2051                .map_err(|e| RuntimeControlPlaneError::StoreError(e.to_string())),
2052            None => {
2053                // Ephemeral mode — no persisted receipts
2054                Ok(None)
2055            }
2056        }
2057    }
2058}
2059
2060#[cfg(test)]
2061#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
2062mod tests {
2063    use super::*;
2064    use std::sync::atomic::{AtomicBool, Ordering};
2065    use std::time::Duration;
2066
2067    use meerkat_core::agent::CommsRuntime;
2068    use meerkat_core::comms_drain_lifecycle_authority::{CommsDrainMode, CommsDrainPhase};
2069    use tokio::sync::Notify;
2070
2071    struct FakeDrainRuntime {
2072        notify: Arc<Notify>,
2073        dismiss: AtomicBool,
2074    }
2075
2076    impl FakeDrainRuntime {
2077        fn dismissing() -> Self {
2078            Self {
2079                notify: Arc::new(Notify::new()),
2080                dismiss: AtomicBool::new(true),
2081            }
2082        }
2083
2084        fn idle() -> Self {
2085            Self {
2086                notify: Arc::new(Notify::new()),
2087                dismiss: AtomicBool::new(false),
2088            }
2089        }
2090    }
2091
2092    #[async_trait::async_trait]
2093    impl CommsRuntime for FakeDrainRuntime {
2094        async fn drain_messages(&self) -> Vec<String> {
2095            Vec::new()
2096        }
2097
2098        fn inbox_notify(&self) -> Arc<Notify> {
2099            Arc::clone(&self.notify)
2100        }
2101
2102        fn dismiss_received(&self) -> bool {
2103            self.dismiss.load(Ordering::Acquire)
2104        }
2105
2106        async fn drain_peer_input_candidates(
2107            &self,
2108        ) -> Vec<meerkat_core::interaction::PeerInputCandidate> {
2109            Vec::new()
2110        }
2111    }
2112
2113    async fn spawn_test_comms_drain(
2114        adapter: &Arc<RuntimeSessionAdapter>,
2115        session_id: &SessionId,
2116        mode: CommsDrainMode,
2117        comms_runtime: Arc<dyn CommsRuntime>,
2118    ) {
2119        adapter.register_session(session_id.clone()).await;
2120        let mut slots = adapter.comms_drain_slots.write().await;
2121        let slot = slots
2122            .entry(session_id.clone())
2123            .or_insert_with(CommsDrainSlot::new);
2124        let result = protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode)
2125            .expect("ensure running");
2126        let obligation = result
2127            .obligation
2128            .expect("spawn obligation should be present");
2129
2130        apply_runtime_drain_effects(slot, &result.effects);
2131        for effect in &result.effects {
2132            if let CommsDrainLifecycleEffect::SpawnDrainTask { .. } = effect {
2133                slot.handle = Some(crate::comms_drain::spawn_comms_drain(
2134                    Arc::clone(adapter),
2135                    session_id.clone(),
2136                    Arc::clone(&comms_runtime),
2137                ));
2138            }
2139        }
2140
2141        let feedback_effects =
2142            protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation)
2143                .expect("task spawned");
2144        apply_runtime_drain_effects(slot, &feedback_effects);
2145    }
2146
2147    async fn current_phase(
2148        adapter: &Arc<RuntimeSessionAdapter>,
2149        session_id: &SessionId,
2150    ) -> Option<CommsDrainPhase> {
2151        let slots = adapter.comms_drain_slots.read().await;
2152        slots.get(session_id).map(|slot| slot.authority.phase())
2153    }
2154
2155    async fn handle_present(adapter: &Arc<RuntimeSessionAdapter>, session_id: &SessionId) -> bool {
2156        let slots = adapter.comms_drain_slots.read().await;
2157        slots
2158            .get(session_id)
2159            .and_then(|slot| slot.handle.as_ref())
2160            .is_some()
2161    }
2162
2163    async fn wait_for_phase(
2164        adapter: &Arc<RuntimeSessionAdapter>,
2165        session_id: &SessionId,
2166        expected: CommsDrainPhase,
2167    ) {
2168        tokio::time::timeout(Duration::from_secs(1), async {
2169            loop {
2170                if current_phase(adapter, session_id).await == Some(expected) {
2171                    break;
2172                }
2173                tokio::time::sleep(Duration::from_millis(5)).await;
2174            }
2175        })
2176        .await
2177        .expect("phase transition");
2178    }
2179
2180    #[tokio::test]
2181    async fn dismiss_exit_updates_authority_before_join() {
2182        let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
2183        let session_id = SessionId::new();
2184        let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::dismissing());
2185
2186        spawn_test_comms_drain(
2187            &adapter,
2188            &session_id,
2189            CommsDrainMode::PersistentHost,
2190            comms_runtime,
2191        )
2192        .await;
2193
2194        wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
2195        assert!(
2196            !handle_present(&adapter, &session_id).await,
2197            "drain task should clear its slot before wait_comms_drain joins"
2198        );
2199
2200        adapter.wait_comms_drain(&session_id).await;
2201        assert_eq!(
2202            current_phase(&adapter, &session_id).await,
2203            Some(CommsDrainPhase::Stopped)
2204        );
2205    }
2206
2207    #[test]
2208    fn desired_peer_ingress_mode_tracks_live_attachment_and_keep_alive() {
2209        assert_eq!(
2210            desired_peer_ingress_mode(RuntimeState::Attached, true, false),
2211            Some(CommsDrainMode::AttachedSession)
2212        );
2213        assert_eq!(
2214            desired_peer_ingress_mode(RuntimeState::Running, true, false),
2215            Some(CommsDrainMode::AttachedSession)
2216        );
2217        assert_eq!(
2218            desired_peer_ingress_mode(RuntimeState::Recovering, true, false),
2219            Some(CommsDrainMode::AttachedSession)
2220        );
2221        assert_eq!(
2222            desired_peer_ingress_mode(RuntimeState::Idle, true, true),
2223            Some(CommsDrainMode::PersistentHost)
2224        );
2225        assert_eq!(
2226            desired_peer_ingress_mode(RuntimeState::Idle, true, false),
2227            None
2228        );
2229        assert_eq!(
2230            desired_peer_ingress_mode(RuntimeState::Attached, false, true),
2231            None
2232        );
2233        assert_eq!(
2234            desired_peer_ingress_mode(RuntimeState::Initializing, true, true),
2235            None
2236        );
2237        assert_eq!(
2238            desired_peer_ingress_mode(RuntimeState::Retired, true, true),
2239            None
2240        );
2241        assert_eq!(
2242            desired_peer_ingress_mode(RuntimeState::Stopped, true, true),
2243            None
2244        );
2245        assert_eq!(
2246            desired_peer_ingress_mode(RuntimeState::Destroyed, true, true),
2247            None
2248        );
2249    }
2250
2251    #[tokio::test]
2252    async fn unregister_session_aborts_and_removes_drain_slot() {
2253        let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
2254        let session_id = SessionId::new();
2255        let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
2256
2257        adapter.register_session(session_id.clone()).await;
2258        spawn_test_comms_drain(
2259            &adapter,
2260            &session_id,
2261            CommsDrainMode::PersistentHost,
2262            comms_runtime,
2263        )
2264        .await;
2265
2266        assert_eq!(
2267            current_phase(&adapter, &session_id).await,
2268            Some(CommsDrainPhase::Running)
2269        );
2270        assert!(handle_present(&adapter, &session_id).await);
2271
2272        adapter.unregister_session(&session_id).await;
2273
2274        let slots = adapter.comms_drain_slots.read().await;
2275        assert!(
2276            !slots.contains_key(&session_id),
2277            "unregister must remove the comms drain slot entirely"
2278        );
2279    }
2280}