Skip to main content

meerkat_runtime/
session_adapter.rs

1//! RuntimeSessionAdapter — wraps a SessionService with per-session RuntimeDrivers.
2//!
3//! This adapter lives in meerkat-runtime so that meerkat-session doesn't need
4//! to depend on meerkat-runtime. Surfaces use this adapter to get v9 runtime
5//! capabilities on top of any SessionService implementation.
6//!
7//! When a session is registered with a `CoreExecutor`, a background RuntimeLoop
8//! task is spawned per session. `accept_input()` queues the input in the driver
9//! and, if wake is requested, signals the loop. The loop dequeues, stages,
10//! applies via CoreExecutor (which calls SessionService::start_turn()), and
11//! marks inputs as consumed.
12
13use std::collections::{HashMap, HashSet};
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::BlobStore;
18use meerkat_core::comms_drain_lifecycle_authority::{
19    CommsDrainLifecycleAuthority, CommsDrainLifecycleEffect, CommsDrainMode, DrainExitReason,
20};
21use meerkat_core::generated::{protocol_comms_drain_abort, protocol_comms_drain_spawn};
22use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
23use meerkat_core::lifecycle::run_control::RunControlCommand;
24use meerkat_core::lifecycle::{InputId, RunId};
25use meerkat_core::types::SessionId;
26
27use crate::accept::AcceptOutcome;
28use crate::driver::ephemeral::EphemeralRuntimeDriver;
29use crate::driver::persistent::PersistentRuntimeDriver;
30use crate::identifiers::LogicalRuntimeId;
31use crate::input::Input;
32use crate::input_lifecycle_authority::InputLifecycleError;
33use crate::input_state::InputState;
34use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
35use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
36use crate::store::RuntimeStore;
37use crate::tokio;
38use crate::tokio::sync::{Mutex, RwLock, mpsc};
39use crate::traits::{
40    DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
41    RuntimeControlPlaneError, RuntimeDriver, RuntimeDriverError,
42};
43
44/// Shared driver handle used by both the adapter and the RuntimeLoop.
45pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
46
47/// Per-session runtime driver entry.
48pub(crate) enum DriverEntry {
49    Ephemeral(EphemeralRuntimeDriver),
50    Persistent(PersistentRuntimeDriver),
51}
52
53impl DriverEntry {
54    pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
55        match self {
56            DriverEntry::Ephemeral(d) => d,
57            DriverEntry::Persistent(d) => d,
58        }
59    }
60
61    pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
62        match self {
63            DriverEntry::Ephemeral(d) => d,
64            DriverEntry::Persistent(d) => d,
65        }
66    }
67
68    /// Set the silent comms intents for the underlying driver.
69    pub(crate) fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
70        match self {
71            DriverEntry::Ephemeral(d) => d.set_silent_comms_intents(intents),
72            DriverEntry::Persistent(d) => d.set_silent_comms_intents(intents),
73        }
74    }
75
76    /// Check if the runtime is idle or attached (quiescent with or without executor).
77    pub(crate) fn is_idle_or_attached(&self) -> bool {
78        match self {
79            DriverEntry::Ephemeral(d) => d.is_idle_or_attached(),
80            DriverEntry::Persistent(d) => d.is_idle_or_attached(),
81        }
82    }
83
84    /// Attach an executor (Idle → Attached).
85    pub(crate) fn attach(&mut self) -> Result<(), RuntimeStateTransitionError> {
86        match self {
87            DriverEntry::Ephemeral(d) => d.attach(),
88            DriverEntry::Persistent(d) => d.attach(),
89        }
90    }
91
92    /// Detach an executor (Attached → Idle). No-op if not Attached.
93    pub(crate) fn detach(
94        &mut self,
95    ) -> Result<Option<crate::runtime_state::RuntimeState>, RuntimeStateTransitionError> {
96        match self {
97            DriverEntry::Ephemeral(d) => d.detach(),
98            DriverEntry::Persistent(d) => d.detach(),
99        }
100    }
101
102    /// Check if the runtime can process queued inputs (Idle, Attached, or Retired).
103    pub(crate) fn can_process_queue(&self) -> bool {
104        match self {
105            DriverEntry::Ephemeral(d) => d.control().can_process_queue(),
106            DriverEntry::Persistent(d) => d.inner_ref().control().can_process_queue(),
107        }
108    }
109
110    /// Check and clear the wake flag.
111    pub(crate) fn take_wake_requested(&mut self) -> bool {
112        match self {
113            DriverEntry::Ephemeral(d) => d.take_wake_requested(),
114            DriverEntry::Persistent(d) => d.take_wake_requested(),
115        }
116    }
117
118    /// Check and clear the immediate processing flag.
119    pub(crate) fn take_process_requested(&mut self) -> bool {
120        match self {
121            DriverEntry::Ephemeral(d) => d.take_process_requested(),
122            DriverEntry::Persistent(d) => d.take_process_requested(),
123        }
124    }
125
126    /// Dequeue the next input for processing.
127    pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
128        match self {
129            DriverEntry::Ephemeral(d) => d.dequeue_next(),
130            DriverEntry::Persistent(d) => d.dequeue_next(),
131        }
132    }
133
134    /// Dequeue a specific input by ID from whichever queue contains it.
135    pub(crate) fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
136        match self {
137            DriverEntry::Ephemeral(d) => d.dequeue_by_id(input_id),
138            DriverEntry::Persistent(d) => d.dequeue_by_id(input_id),
139        }
140    }
141
142    /// Get a reference to the ingress authority.
143    pub(crate) fn ingress(&self) -> &crate::runtime_ingress_authority::RuntimeIngressAuthority {
144        match self {
145            DriverEntry::Ephemeral(d) => d.ingress(),
146            DriverEntry::Persistent(d) => d.inner_ref().ingress(),
147        }
148    }
149
150    pub(crate) fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
151        match self {
152            DriverEntry::Ephemeral(d) => d.has_queued_input_outside(excluded),
153            DriverEntry::Persistent(d) => d.has_queued_input_outside(excluded),
154        }
155    }
156
157    /// Start a new run (Idle → Running).
158    pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
159        match self {
160            DriverEntry::Ephemeral(d) => d.start_run(run_id),
161            DriverEntry::Persistent(d) => d.start_run(run_id),
162        }
163    }
164
165    /// Complete a run (Running → Idle).
166    pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
167        match self {
168            DriverEntry::Ephemeral(d) => d.complete_run(),
169            DriverEntry::Persistent(d) => d.complete_run(),
170        }
171    }
172
173    /// Stage an input (Queued → Staged).
174    pub(crate) fn stage_input(
175        &mut self,
176        input_id: &InputId,
177        run_id: &RunId,
178    ) -> Result<(), InputLifecycleError> {
179        match self {
180            DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
181            DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
182        }
183    }
184
185    /// Stage a batch of inputs atomically in a single `StageDrainSnapshot`.
186    pub(crate) fn stage_batch(
187        &mut self,
188        input_ids: &[InputId],
189        run_id: &RunId,
190    ) -> Result<(), InputLifecycleError> {
191        match self {
192            DriverEntry::Ephemeral(d) => d.stage_batch(input_ids, run_id),
193            DriverEntry::Persistent(d) => d.stage_batch(input_ids, run_id),
194        }
195    }
196
197    /// Roll back staged inputs after a failed staging attempt.
198    pub(crate) fn rollback_staged(
199        &mut self,
200        input_ids: &[InputId],
201    ) -> Result<(), InputLifecycleError> {
202        match self {
203            DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
204            DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
205        }
206    }
207
208    pub(crate) async fn abandon_pending_inputs(
209        &mut self,
210        reason: crate::input_state::InputAbandonReason,
211    ) -> Result<usize, RuntimeDriverError> {
212        match self {
213            DriverEntry::Ephemeral(d) => Ok(d.abandon_pending_inputs(reason)),
214            DriverEntry::Persistent(d) => d.abandon_pending_inputs(reason).await,
215        }
216    }
217}
218
219/// Shared completion registry (accessed by adapter for registration and loop for resolution).
220pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
221
222/// Per-session state: driver + optional RuntimeLoop.
223struct RuntimeSessionEntry {
224    /// Shared driver handle (accessed by both adapter methods and RuntimeLoop).
225    driver: SharedDriver,
226    /// Shared async-operation lifecycle registry for this runtime/session.
227    ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
228    /// Completion waiters (accessed by accept_input_with_completion and RuntimeLoop).
229    completions: SharedCompletionRegistry,
230    /// Runtime-loop capabilities. Presence means a loop is attached.
231    attachment: Option<RuntimeLoopAttachment>,
232}
233
234/// Capability bundle for an attached runtime loop.
235///
236/// Keep all loop-related handles together so "attached vs detached" cannot
237/// drift into partially-populated shell state.
238struct RuntimeLoopAttachment {
239    wake_tx: mpsc::Sender<()>,
240    control_tx: mpsc::Sender<RunControlCommand>,
241    _loop_handle: tokio::task::JoinHandle<()>,
242}
243
244impl RuntimeSessionEntry {
245    fn attachment_is_live(&self) -> bool {
246        self.attachment
247            .as_ref()
248            .map(|attachment| !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed())
249            .unwrap_or(false)
250    }
251
252    fn has_attachment(&self) -> bool {
253        self.attachment_is_live()
254    }
255
256    fn attach_runtime_loop(
257        &mut self,
258        wake_tx: mpsc::Sender<()>,
259        control_tx: mpsc::Sender<RunControlCommand>,
260        loop_handle: tokio::task::JoinHandle<()>,
261    ) {
262        self.attachment = Some(RuntimeLoopAttachment {
263            wake_tx,
264            control_tx,
265            _loop_handle: loop_handle,
266        });
267    }
268
269    fn clear_dead_attachment(&mut self) -> bool {
270        if self.attachment.is_some() && !self.attachment_is_live() {
271            self.attachment = None;
272            return true;
273        }
274        false
275    }
276
277    fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
278        if !self.attachment_is_live() {
279            return None;
280        }
281        self.attachment
282            .as_ref()
283            .map(|attachment| attachment.wake_tx.clone())
284    }
285
286    fn control_sender(&self) -> Option<mpsc::Sender<RunControlCommand>> {
287        if !self.attachment_is_live() {
288            return None;
289        }
290        self.attachment
291            .as_ref()
292            .map(|attachment| attachment.control_tx.clone())
293    }
294}
295
296/// Per-session comms drain slot, driven by `CommsDrainLifecycleAuthority`.
297///
298/// ALL state transitions go through the authority -- no manual
299/// `handle.is_finished()` checks in shell code.
300struct CommsDrainSlot {
301    authority: CommsDrainLifecycleAuthority,
302    handle: Option<tokio::task::JoinHandle<()>>,
303}
304
305impl CommsDrainSlot {
306    fn new() -> Self {
307        Self {
308            authority: CommsDrainLifecycleAuthority::new(),
309            handle: None,
310        }
311    }
312}
313
314fn apply_runtime_drain_effects(slot: &mut CommsDrainSlot, effects: &[CommsDrainLifecycleEffect]) {
315    for effect in effects {
316        if let CommsDrainLifecycleEffect::AbortDrainTask = effect
317            && let Some(handle) = slot.handle.take()
318        {
319            handle.abort();
320        }
321    }
322}
323
324fn abort_slot(slot: &mut CommsDrainSlot) {
325    match protocol_comms_drain_abort::execute_stop_requested(&mut slot.authority) {
326        Ok(result) => {
327            apply_runtime_drain_effects(slot, &result.effects);
328            // Under TerminalClosure policy, the abort obligation is implicitly
329            // satisfied when the machine reaches Stopped phase. Drop it.
330            let _ = result.obligation;
331        }
332        Err(_) => {
333            // Already stopped or inactive — just clean up the handle
334            if let Some(handle) = slot.handle.take() {
335                handle.abort();
336            }
337        }
338    }
339}
340
341/// Wraps a SessionService to provide v9 runtime capabilities.
342///
343/// Maintains a per-session RuntimeDriver registry. When sessions are registered
344/// with a `CoreExecutor`, a RuntimeLoop task is spawned that processes queued
345/// inputs by calling `CoreExecutor::apply()` (which triggers
346/// `SessionService::start_turn()` under the hood).
347pub struct RuntimeSessionAdapter {
348    /// Per-session entries.
349    sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
350    /// Runtime mode.
351    mode: RuntimeMode,
352    /// Optional RuntimeStore for persistent drivers.
353    store: Option<Arc<dyn RuntimeStore>>,
354    /// Blob store used by persistent drivers for durable input externalization.
355    blob_store: Option<Arc<dyn BlobStore>>,
356    /// Per-session comms drain lifecycle, driven by machine authority.
357    comms_drain_slots: RwLock<HashMap<SessionId, CommsDrainSlot>>,
358}
359
360impl RuntimeSessionAdapter {
361    /// Create an ephemeral adapter (all sessions use EphemeralRuntimeDriver).
362    pub fn ephemeral() -> Self {
363        Self {
364            sessions: RwLock::new(HashMap::new()),
365            mode: RuntimeMode::V9Compliant,
366            store: None,
367            blob_store: None,
368            comms_drain_slots: RwLock::new(HashMap::new()),
369        }
370    }
371
372    /// Create a persistent adapter with a RuntimeStore.
373    pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
374        Self {
375            sessions: RwLock::new(HashMap::new()),
376            mode: RuntimeMode::V9Compliant,
377            store: Some(store),
378            blob_store: Some(blob_store),
379            comms_drain_slots: RwLock::new(HashMap::new()),
380        }
381    }
382
383    /// Create a driver entry for a session.
384    fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
385        let runtime_id = LogicalRuntimeId::new(session_id.to_string());
386        match (&self.store, &self.blob_store) {
387            (Some(store), Some(blob_store)) => DriverEntry::Persistent(
388                PersistentRuntimeDriver::new(runtime_id, store.clone(), blob_store.clone()),
389            ),
390            (Some(_store), None) => {
391                tracing::warn!(
392                    %session_id,
393                    "persistent runtime store present but blob store missing; \
394                     falling back to ephemeral driver"
395                );
396                DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id))
397            }
398            _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
399        }
400    }
401
402    /// Register a runtime driver for a session (no RuntimeLoop — inputs queue but
403    /// nothing processes them automatically). Useful for tests and legacy mode.
404    pub async fn register_session(&self, session_id: SessionId) {
405        {
406            let mut sessions = self.sessions.write().await;
407            if let Some(existing) = sessions.get_mut(&session_id) {
408                existing.clear_dead_attachment();
409                return;
410            }
411        }
412
413        let mut entry = self.make_driver(&session_id);
414        if let Err(err) = entry.as_driver_mut().recover().await {
415            tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
416            return;
417        }
418        let session_entry = RuntimeSessionEntry {
419            driver: Arc::new(Mutex::new(entry)),
420            ops_lifecycle: Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
421            completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
422            attachment: None,
423        };
424        let mut sessions = self.sessions.write().await;
425        if let Some(existing) = sessions.get_mut(&session_id) {
426            existing.clear_dead_attachment();
427        } else {
428            sessions.insert(session_id, session_entry);
429        }
430    }
431
432    /// Set the silent comms intents for a session's runtime driver.
433    ///
434    /// Peer requests whose intent matches one of these strings will be accepted
435    /// without triggering an LLM turn (ApplyMode::Ignore, WakeMode::None).
436    pub async fn set_session_silent_intents(&self, session_id: &SessionId, intents: Vec<String>) {
437        let sessions = self.sessions.read().await;
438        if let Some(entry) = sessions.get(session_id) {
439            let mut driver = entry.driver.lock().await;
440            driver.set_silent_comms_intents(intents);
441        }
442    }
443
444    /// Register a runtime driver for a session WITH a RuntimeLoop backed by a
445    /// `CoreExecutor`. When `accept_input()` queues an input and requests wake,
446    /// the loop dequeues it and calls `executor.apply()` (which triggers
447    /// `SessionService::start_turn()`).
448    pub async fn register_session_with_executor(
449        &self,
450        session_id: SessionId,
451        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
452    ) {
453        self.ensure_session_with_executor(session_id, executor)
454            .await;
455    }
456
457    /// Ensure a runtime driver with executor exists for the session.
458    ///
459    /// If a session was already registered without a loop, upgrade the
460    /// existing driver in place so queued inputs remain attached to the same
461    /// runtime ledger and can start draining immediately.
462    pub async fn ensure_session_with_executor(
463        &self,
464        session_id: SessionId,
465        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
466    ) {
467        let existing = {
468            let mut sessions = self.sessions.write().await;
469            sessions.get_mut(&session_id).map(|entry| {
470                entry.clear_dead_attachment();
471                (
472                    entry.has_attachment(),
473                    entry.driver.clone(),
474                    entry.completions.clone(),
475                )
476            })
477        };
478
479        let (driver, completions) = if let Some((has_attachment, driver, completions)) = existing {
480            if has_attachment {
481                return;
482            }
483            (driver, completions)
484        } else {
485            let mut recovered_entry = self.make_driver(&session_id);
486            if let Err(err) = recovered_entry.as_driver_mut().recover().await {
487                tracing::error!(
488                    %session_id,
489                    error = %err,
490                    "failed to recover runtime driver during registration"
491                );
492                return;
493            }
494
495            let mut sessions = self.sessions.write().await;
496            if let Some(entry) = sessions.get_mut(&session_id) {
497                entry.clear_dead_attachment();
498                if entry.has_attachment() {
499                    return;
500                }
501                (entry.driver.clone(), entry.completions.clone())
502            } else {
503                let driver = Arc::new(Mutex::new(recovered_entry));
504                let completions =
505                    Arc::new(Mutex::new(crate::completion::CompletionRegistry::new()));
506                sessions.insert(
507                    session_id.clone(),
508                    RuntimeSessionEntry {
509                        driver: driver.clone(),
510                        ops_lifecycle: Arc::new(
511                            crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new(),
512                        ),
513                        completions: completions.clone(),
514                        attachment: None,
515                    },
516                );
517                (driver, completions)
518            }
519        };
520
521        let should_wake = {
522            let mut driver_guard = driver.lock().await;
523            if let Err(error) = driver_guard.attach() {
524                let repaired = if error.from == RuntimeState::Attached
525                    && error.to == RuntimeState::Attached
526                {
527                    tracing::warn!(
528                        %session_id,
529                        error = %error,
530                        "runtime driver remained attached without a live published loop; detaching and retrying attachment"
531                    );
532                    match driver_guard.detach() {
533                        Ok(_) => match driver_guard.attach() {
534                            Ok(()) => true,
535                            Err(retry_error) => {
536                                tracing::warn!(
537                                    %session_id,
538                                    error = %retry_error,
539                                    "failed to re-attach runtime driver after repairing stale attachment state"
540                                );
541                                false
542                            }
543                        },
544                        Err(detach_error) => {
545                            tracing::warn!(
546                                %session_id,
547                                error = %detach_error,
548                                "failed to detach stale attached runtime driver before retrying attachment"
549                            );
550                            false
551                        }
552                    }
553                } else {
554                    false
555                };
556                if !repaired {
557                    tracing::warn!(
558                        %session_id,
559                        error = %error,
560                        "failed to attach runtime driver before publishing loop attachment"
561                    );
562                    return;
563                }
564            }
565            !driver_guard.as_driver().active_input_ids().is_empty()
566        };
567
568        let (wake_tx, wake_rx) = mpsc::channel(16);
569        let (control_tx, control_rx) = mpsc::channel(16);
570        let mut pending_loop_handle =
571            Some(crate::runtime_loop::spawn_runtime_loop_with_completions(
572                driver.clone(),
573                executor,
574                wake_rx,
575                control_rx,
576                Some(completions.clone()),
577            ));
578
579        let (published, detach_after_abort) = {
580            let mut sessions = self.sessions.write().await;
581            match sessions.get_mut(&session_id) {
582                None => (false, true),
583                Some(entry) => {
584                    entry.clear_dead_attachment();
585                    if entry.has_attachment() {
586                        (false, false)
587                    } else if !Arc::ptr_eq(&entry.driver, &driver)
588                        || !Arc::ptr_eq(&entry.completions, &completions)
589                    {
590                        tracing::warn!(
591                            %session_id,
592                            "runtime session entry changed while wiring executor; aborting stale loop attachment"
593                        );
594                        (false, true)
595                    } else {
596                        match pending_loop_handle.take() {
597                            Some(loop_handle) => {
598                                entry.attach_runtime_loop(wake_tx.clone(), control_tx, loop_handle);
599                                (true, false)
600                            }
601                            None => {
602                                tracing::error!(
603                                    %session_id,
604                                    "runtime loop handle missing during attachment publish"
605                                );
606                                (false, true)
607                            }
608                        }
609                    }
610                }
611            }
612        };
613
614        if !published {
615            if let Some(loop_handle) = pending_loop_handle.take() {
616                loop_handle.abort();
617            }
618            if detach_after_abort {
619                let mut driver_guard = driver.lock().await;
620                let _ = driver_guard.detach();
621            }
622            return;
623        }
624
625        if should_wake {
626            let _ = wake_tx.try_send(());
627        }
628    }
629
630    /// Unregister a session's runtime driver.
631    ///
632    /// Detaches the executor (Attached → Idle) before removal, then drops
633    /// the wake channel sender, which causes the RuntimeLoop to exit.
634    pub async fn unregister_session(&self, session_id: &SessionId) {
635        let entry = {
636            let mut sessions = self.sessions.write().await;
637            let mut slots = self.comms_drain_slots.write().await;
638            // Remove + abort drain slot before dropping session binding so
639            // slot keys remain a subset of registered-session keys.
640            if let Some(mut slot) = slots.remove(session_id) {
641                abort_slot(&mut slot);
642            }
643            sessions.remove(session_id)
644        };
645
646        if let Some(entry) = entry {
647            let mut driver = entry.driver.lock().await;
648            let _ = driver.detach(); // Attached → Idle (no-op if not Attached)
649            drop(driver);
650
651            let mut completions = entry.completions.lock().await;
652            completions.resolve_all_terminated("runtime session unregistered");
653        }
654    }
655
656    /// Check whether a runtime driver is already registered for a session.
657    pub async fn contains_session(&self, session_id: &SessionId) -> bool {
658        self.sessions.read().await.contains_key(session_id)
659    }
660
661    /// Cancel the currently-running turn for a registered session.
662    pub async fn interrupt_current_run(
663        &self,
664        session_id: &SessionId,
665    ) -> Result<(), RuntimeDriverError> {
666        let (driver, control_tx) = {
667            let sessions = self.sessions.read().await;
668            let entry = sessions
669                .get(session_id)
670                .ok_or(RuntimeDriverError::NotReady {
671                    state: RuntimeState::Destroyed,
672                })?;
673            (entry.driver.clone(), entry.control_sender())
674        };
675
676        let Some(control_tx) = control_tx else {
677            let state = {
678                let driver = driver.lock().await;
679                driver.as_driver().runtime_state()
680            };
681            return Err(RuntimeDriverError::NotReady { state });
682        };
683        control_tx
684            .send(RunControlCommand::CancelCurrentRun {
685                reason: "mob interrupt".to_string(),
686            })
687            .await
688            .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
689    }
690
691    /// Stop the attached runtime executor through the out-of-band control
692    /// channel. When no loop is attached yet, a stop command is applied directly
693    /// against the driver so queued work is still terminated consistently.
694    pub async fn stop_runtime_executor(
695        &self,
696        session_id: &SessionId,
697        command: RunControlCommand,
698    ) -> Result<(), RuntimeDriverError> {
699        let (driver, completions, control_tx) = {
700            let sessions = self.sessions.read().await;
701            let entry = sessions
702                .get(session_id)
703                .ok_or(RuntimeDriverError::NotReady {
704                    state: RuntimeState::Destroyed,
705                })?;
706            (
707                entry.driver.clone(),
708                entry.completions.clone(),
709                entry.control_sender(),
710            )
711        };
712
713        if let Some(control_tx) = control_tx
714            && control_tx.send(command.clone()).await.is_ok()
715        {
716            return Ok(());
717        }
718
719        if matches!(command, RunControlCommand::StopRuntimeExecutor { .. }) {
720            let mut driver = driver.lock().await;
721            driver
722                .as_driver_mut()
723                .on_runtime_control(crate::traits::RuntimeControlCommand::Stop)
724                .await?;
725            drop(driver);
726            let mut completions = completions.lock().await;
727            completions.resolve_all_terminated("runtime stopped");
728            drop(completions);
729
730            // No live control sender was available for this stop path. Scrub any
731            // dead attachment capabilities that may still be published.
732            let mut sessions = self.sessions.write().await;
733            if let Some(entry) = sessions.get_mut(session_id) {
734                entry.clear_dead_attachment();
735            }
736            Ok(())
737        } else {
738            Err(RuntimeDriverError::Internal(
739                "failed to send stop: runtime loop is unavailable".into(),
740            ))
741        }
742    }
743
744    /// Accept an input and execute it synchronously through the runtime driver.
745    ///
746    /// This is useful for surfaces that need the legacy request/response shape
747    /// while still preserving v9 input lifecycle semantics.
748    pub async fn accept_input_and_run<T, F, Fut>(
749        &self,
750        session_id: &SessionId,
751        input: Input,
752        op: F,
753    ) -> Result<T, RuntimeDriverError>
754    where
755        F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
756        Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
757    {
758        let driver = {
759            let sessions = self.sessions.read().await;
760            sessions
761                .get(session_id)
762                .ok_or(RuntimeDriverError::NotReady {
763                    state: RuntimeState::Destroyed,
764                })?
765                .driver
766                .clone()
767        };
768
769        let (input_id, run_id, primitive) = {
770            let mut driver = driver.lock().await;
771            if !driver.is_idle_or_attached() {
772                return Err(RuntimeDriverError::NotReady {
773                    state: driver.as_driver().runtime_state(),
774                });
775            }
776
777            let active_input_ids = driver.as_driver().active_input_ids();
778            if !active_input_ids.is_empty() {
779                let duplicate_active_input =
780                    input.header().idempotency_key.as_ref().and_then(|key| {
781                        active_input_ids.iter().find(|active_id| {
782                            driver
783                                .as_driver()
784                                .input_state(active_id)
785                                .and_then(|state| state.idempotency_key.as_ref())
786                                == Some(key)
787                        })
788                    });
789                if let Some(existing_id) = duplicate_active_input {
790                    return Err(RuntimeDriverError::ValidationFailed {
791                        reason: format!(
792                            "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
793                        ),
794                    });
795                }
796                return Err(RuntimeDriverError::NotReady {
797                    state: driver.as_driver().runtime_state(),
798                });
799            }
800
801            let outcome = driver.as_driver_mut().accept_input(input).await?;
802            let input_id = match outcome {
803                AcceptOutcome::Accepted { input_id, .. } => input_id,
804                AcceptOutcome::Deduplicated { existing_id, .. } => {
805                    return Err(RuntimeDriverError::ValidationFailed {
806                        reason: format!(
807                            "accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
808                        ),
809                    });
810                }
811                AcceptOutcome::Rejected { reason } => {
812                    return Err(RuntimeDriverError::ValidationFailed { reason });
813                }
814            };
815
816            if !driver.is_idle_or_attached() {
817                return Err(RuntimeDriverError::NotReady {
818                    state: driver.as_driver().runtime_state(),
819                });
820            }
821
822            let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
823                RuntimeDriverError::Internal("accepted input was not queued for execution".into())
824            })?;
825            if dequeued_id != input_id {
826                return Err(RuntimeDriverError::NotReady {
827                    state: driver.as_driver().runtime_state(),
828                });
829            }
830            let run_id = RunId::new();
831            driver.start_run(run_id.clone()).map_err(|err| {
832                RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
833            })?;
834            driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
835                RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
836            })?;
837            let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
838            (input_id, run_id, primitive)
839        };
840
841        match op(run_id.clone(), primitive.clone()).await {
842            Ok((result, output)) => {
843                let mut driver = driver.lock().await;
844                if let Err(err) = driver
845                    .as_driver_mut()
846                    .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
847                        run_id: run_id.clone(),
848                        receipt: output.receipt,
849                        session_snapshot: output.session_snapshot,
850                    })
851                    .await
852                {
853                    if let Err(unwind_err) = driver
854                        .as_driver_mut()
855                        .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
856                            run_id,
857                            error: format!("boundary commit failed: {err}"),
858                            recoverable: true,
859                        })
860                        .await
861                    {
862                        return Err(RuntimeDriverError::Internal(format!(
863                            "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
864                        )));
865                    }
866                    return Err(RuntimeDriverError::Internal(format!(
867                        "runtime boundary commit failed: {err}"
868                    )));
869                }
870                if let Err(err) = driver
871                    .as_driver_mut()
872                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
873                        run_id,
874                        consumed_input_ids: vec![input_id],
875                    })
876                    .await
877                {
878                    drop(driver);
879                    self.unregister_session(session_id).await;
880                    return Err(RuntimeDriverError::Internal(format!(
881                        "failed to persist runtime completion snapshot: {err}"
882                    )));
883                }
884                Ok(result)
885            }
886            Err(err) => {
887                let mut driver = driver.lock().await;
888                if let Err(run_err) = driver
889                    .as_driver_mut()
890                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
891                        run_id,
892                        error: err.to_string(),
893                        recoverable: true,
894                    })
895                    .await
896                {
897                    drop(driver);
898                    self.unregister_session(session_id).await;
899                    return Err(RuntimeDriverError::Internal(format!(
900                        "failed to persist runtime failure snapshot: {run_err}"
901                    )));
902                }
903                Err(err)
904            }
905        }
906    }
907
908    /// Accept an input and return a completion handle that resolves when the
909    /// input reaches a terminal state (Consumed or Abandoned).
910    ///
911    /// Returns `(AcceptOutcome, Option<CompletionHandle>)`:
912    /// - `(Accepted, Some(handle))` — await handle for result
913    /// - `(Accepted, None)` — input reached a terminal state during admission
914    /// - `(Deduplicated, Some(handle))` — joined in-flight waiter
915    /// - `(Deduplicated, None)` — input already terminal; no waiter needed
916    /// - `(Rejected, _)` — returned as `Err(ValidationFailed)`
917    pub async fn accept_input_with_completion(
918        &self,
919        session_id: &SessionId,
920        input: Input,
921    ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
922    {
923        let (driver, completions, wake_tx) = {
924            let sessions = self.sessions.read().await;
925            let entry = sessions
926                .get(session_id)
927                .ok_or(RuntimeDriverError::NotReady {
928                    state: RuntimeState::Destroyed,
929                })?;
930            (
931                entry.driver.clone(),
932                entry.completions.clone(),
933                entry.wake_sender(),
934            )
935        };
936
937        let (outcome, should_wake, should_process, handle) = {
938            let mut driver = driver.lock().await;
939            let result = driver.as_driver_mut().accept_input(input).await?;
940
941            match &result {
942                AcceptOutcome::Accepted { input_id, .. } => {
943                    let is_terminal = driver
944                        .as_driver()
945                        .input_state(input_id)
946                        .map(|state| state.current_state().is_terminal())
947                        .unwrap_or(true);
948                    let handle = if is_terminal {
949                        None
950                    } else {
951                        Some({
952                            let mut completions = completions.lock().await;
953                            completions.register(input_id.clone())
954                        })
955                    };
956                    let wake = driver.take_wake_requested();
957                    let process_now = driver.take_process_requested();
958                    (result, wake, process_now, handle)
959                }
960                AcceptOutcome::Deduplicated { existing_id, .. } => {
961                    // Check if the existing input is already terminal
962                    let existing_state = driver.as_driver().input_state(existing_id);
963                    let is_terminal = existing_state
964                        .map(|s| s.current_state().is_terminal())
965                        .unwrap_or(true); // missing state = already cleaned up = terminal
966
967                    if is_terminal {
968                        // Input already processed — no handle, no waiter
969                        (result, false, false, None)
970                    } else {
971                        // In-flight — join existing waiters via multi-waiter Vec
972                        let handle = {
973                            let mut completions = completions.lock().await;
974                            completions.register(existing_id.clone())
975                        };
976                        (result, false, false, Some(handle))
977                    }
978                }
979                AcceptOutcome::Rejected { reason } => {
980                    return Err(RuntimeDriverError::ValidationFailed {
981                        reason: reason.clone(),
982                    });
983                }
984            }
985        };
986
987        if (should_wake || should_process)
988            && let Some(ref wake_tx) = wake_tx
989        {
990            let _ = wake_tx.try_send(());
991        }
992
993        Ok((outcome, handle))
994    }
995
996    /// Accept an input but intentionally do not wake the runtime loop.
997    ///
998    /// This is reserved for explicitly queued-only surface contracts that
999    /// stage work for the next turn boundary instead of waking an idle session
1000    /// immediately.
1001    pub async fn accept_input_without_wake(
1002        &self,
1003        session_id: &SessionId,
1004        input: Input,
1005    ) -> Result<AcceptOutcome, RuntimeDriverError> {
1006        let driver = {
1007            let sessions = self.sessions.read().await;
1008            let entry = sessions
1009                .get(session_id)
1010                .ok_or(RuntimeDriverError::NotReady {
1011                    state: RuntimeState::Destroyed,
1012                })?;
1013            entry.driver.clone()
1014        };
1015
1016        let outcome = {
1017            let mut driver = driver.lock().await;
1018            let result = driver.as_driver_mut().accept_input(input).await?;
1019            let _ = driver.take_wake_requested();
1020            let process_requested = driver.take_process_requested();
1021            debug_assert!(
1022                !process_requested,
1023                "queue-only admission unexpectedly requested immediate processing"
1024            );
1025            result
1026        };
1027
1028        Ok(outcome)
1029    }
1030
1031    /// Get the shared ops lifecycle registry for a session/runtime instance.
1032    pub async fn ops_lifecycle_registry(
1033        &self,
1034        session_id: &SessionId,
1035    ) -> Option<Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>> {
1036        let sessions = self.sessions.read().await;
1037        sessions
1038            .get(session_id)
1039            .map(|e| Arc::clone(&e.ops_lifecycle))
1040    }
1041
1042    /// Manage the comms drain lifecycle for a session based on keep_alive intent.
1043    ///
1044    /// When `keep_alive` is true, spawns a drain if one is not already running.
1045    /// When `keep_alive` is false, aborts any running drain for the session.
1046    /// Returns `true` if a new drain was spawned.
1047    ///
1048    /// All state transitions go through `CommsDrainLifecycleAuthority`.
1049    pub async fn maybe_spawn_comms_drain(
1050        self: &Arc<Self>,
1051        session_id: &SessionId,
1052        keep_alive: bool,
1053        comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
1054    ) -> bool {
1055        if !keep_alive {
1056            // Explicit disable: stop any running drain for this session.
1057            self.abort_comms_drain(session_id).await;
1058            return false;
1059        }
1060
1061        let mode = CommsDrainMode::PersistentHost;
1062
1063        let comms = match comms_runtime {
1064            Some(c) => c,
1065            None => return false,
1066        };
1067
1068        let sessions = self.sessions.read().await;
1069        if !sessions.contains_key(session_id) {
1070            tracing::warn!(
1071                %session_id,
1072                "refusing to spawn comms drain for unregistered session"
1073            );
1074            return false;
1075        }
1076        // Keep the session read guard while mutating drain slots so unregister
1077        // cannot race between registration check and slot publication.
1078        let mut slots = self.comms_drain_slots.write().await;
1079        let slot = slots
1080            .entry(session_id.clone())
1081            .or_insert_with(CommsDrainSlot::new);
1082
1083        let result =
1084            match protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode) {
1085                Ok(r) => r,
1086                Err(e) => {
1087                    tracing::trace!(error = %e, "comms drain authority rejected EnsureRunning");
1088                    return false;
1089                }
1090            };
1091
1092        // Execute effects from the transition
1093        for effect in &result.effects {
1094            match effect {
1095                CommsDrainLifecycleEffect::SpawnDrainTask { mode: spawn_mode } => {
1096                    let idle_timeout = match spawn_mode {
1097                        CommsDrainMode::PersistentHost => Some(std::time::Duration::MAX),
1098                        CommsDrainMode::Timed => None,
1099                    };
1100                    let handle = crate::comms_drain::spawn_comms_drain(
1101                        Arc::clone(self),
1102                        session_id.clone(),
1103                        comms.clone(),
1104                        idle_timeout,
1105                    );
1106                    slot.handle = Some(handle);
1107                }
1108                CommsDrainLifecycleEffect::AbortDrainTask => {
1109                    if let Some(handle) = slot.handle.take() {
1110                        handle.abort();
1111                    }
1112                }
1113            }
1114        }
1115
1116        let Some(obligation) = result.obligation else {
1117            tracing::warn!(
1118                %session_id,
1119                "comms drain spawn transition emitted no obligation"
1120            );
1121            return false;
1122        };
1123
1124        // The runtime spawns the drain task synchronously (as a tokio task
1125        // above), so we immediately close the spawn obligation.
1126        match protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation) {
1127            Ok(_effects) => {}
1128            Err(e) => {
1129                tracing::trace!(error = %e, "comms drain authority rejected TaskSpawned");
1130            }
1131        }
1132        true
1133    }
1134
1135    /// Notify the authority that a drain task has exited with the given reason.
1136    ///
1137    /// Called from drain task exit paths (or by wrappers that detect task
1138    /// completion). The authority decides whether to enter ExitedRespawnable
1139    /// (PersistentHost + Failed) or Stopped.
1140    pub async fn notify_comms_drain_exited(&self, session_id: &SessionId, reason: DrainExitReason) {
1141        let mut slots = self.comms_drain_slots.write().await;
1142        if let Some(slot) = slots.get_mut(session_id) {
1143            slot.handle.take(); // clean up finished handle
1144            match protocol_comms_drain_spawn::notify_task_exited(&mut slot.authority, reason) {
1145                Ok(_effects) => {}
1146                Err(e) => {
1147                    tracing::warn!(error = %e, "comms drain authority rejected TaskExited");
1148                }
1149            }
1150        }
1151    }
1152
1153    /// Abort all active comms drain tasks.
1154    pub async fn abort_comms_drains(&self) {
1155        let mut slots = self.comms_drain_slots.write().await;
1156        for (_, slot) in slots.iter_mut() {
1157            abort_slot(slot);
1158        }
1159    }
1160
1161    /// Abort the comms drain task for a specific session.
1162    pub async fn abort_comms_drain(&self, session_id: &SessionId) {
1163        let mut slots = self.comms_drain_slots.write().await;
1164        if let Some(slot) = slots.get_mut(session_id) {
1165            abort_slot(slot);
1166        }
1167    }
1168
1169    /// Wait for a session's comms drain task to finish.
1170    ///
1171    /// Returns immediately if no drain is active for the session.
1172    /// If the task already notified the authority (normal exit), this is a no-op
1173    /// for authority state. If the task panicked without notifying, this submits
1174    /// `TaskExited { Failed }` as a safety net.
1175    pub async fn wait_comms_drain(&self, session_id: &SessionId) {
1176        let handle = {
1177            let mut slots = self.comms_drain_slots.write().await;
1178            slots
1179                .get_mut(session_id)
1180                .and_then(|slot| slot.handle.take())
1181        };
1182        if let Some(handle) = handle {
1183            let _ = handle.await;
1184        }
1185        // Safety net: if the authority is still Running after the task exited,
1186        // the task panicked without notifying. Submit Failed to prevent the
1187        // authority from being stuck in Running forever.
1188        let mut slots = self.comms_drain_slots.write().await;
1189        if let Some(slot) = slots.get_mut(session_id)
1190            && slot.authority.phase()
1191                == meerkat_core::comms_drain_lifecycle_authority::CommsDrainPhase::Running
1192        {
1193            tracing::warn!(
1194                "comms_drain: task exited without notifying authority (likely panicked), \
1195                 submitting Failed safety net"
1196            );
1197            match protocol_comms_drain_spawn::notify_task_exited(
1198                &mut slot.authority,
1199                DrainExitReason::Failed,
1200            ) {
1201                Ok(effects) => {
1202                    apply_runtime_drain_effects(slot, &effects);
1203                }
1204                Err(e) => {
1205                    tracing::warn!(error = %e, "comms drain authority rejected safety-net TaskExited");
1206                }
1207            }
1208        }
1209    }
1210}
1211
1212#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1213#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1214impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
1215    fn runtime_mode(&self) -> RuntimeMode {
1216        self.mode
1217    }
1218
1219    async fn accept_input(
1220        &self,
1221        session_id: &SessionId,
1222        input: Input,
1223    ) -> Result<AcceptOutcome, RuntimeDriverError> {
1224        let (driver, wake_tx) = {
1225            let sessions = self.sessions.read().await;
1226            let entry = sessions
1227                .get(session_id)
1228                .ok_or(RuntimeDriverError::NotReady {
1229                    state: RuntimeState::Destroyed,
1230                })?;
1231            (entry.driver.clone(), entry.wake_sender())
1232        };
1233
1234        // Accept input and check wake under the driver lock
1235        let (outcome, should_wake, should_process) = {
1236            let mut driver = driver.lock().await;
1237            let result = driver.as_driver_mut().accept_input(input).await?;
1238            let wake = driver.take_wake_requested();
1239            let process_now = driver.take_process_requested();
1240            (result, wake, process_now)
1241        };
1242
1243        // Signal the RuntimeLoop if wake or immediate processing was requested.
1244        if (should_wake || should_process)
1245            && let Some(ref wake_tx) = wake_tx
1246        {
1247            // Non-blocking: if the channel is full, the loop is already processing
1248            let _ = wake_tx.try_send(());
1249        }
1250
1251        Ok(outcome)
1252    }
1253
1254    async fn accept_input_with_completion(
1255        &self,
1256        session_id: &SessionId,
1257        input: Input,
1258    ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
1259    {
1260        RuntimeSessionAdapter::accept_input_with_completion(self, session_id, input).await
1261    }
1262
1263    async fn runtime_state(
1264        &self,
1265        session_id: &SessionId,
1266    ) -> Result<RuntimeState, RuntimeDriverError> {
1267        let driver = {
1268            let sessions = self.sessions.read().await;
1269            let entry = sessions
1270                .get(session_id)
1271                .ok_or(RuntimeDriverError::NotReady {
1272                    state: RuntimeState::Destroyed,
1273                })?;
1274            entry.driver.clone()
1275        };
1276        let driver = driver.lock().await;
1277        Ok(driver.as_driver().runtime_state())
1278    }
1279
1280    async fn retire_runtime(
1281        &self,
1282        session_id: &SessionId,
1283    ) -> Result<RetireReport, RuntimeDriverError> {
1284        let (driver_handle, completions, wake_tx) = {
1285            let sessions = self.sessions.read().await;
1286            let entry = sessions
1287                .get(session_id)
1288                .ok_or(RuntimeDriverError::NotReady {
1289                    state: RuntimeState::Destroyed,
1290                })?;
1291            (
1292                entry.driver.clone(),
1293                entry.completions.clone(),
1294                entry.wake_sender(),
1295            )
1296        };
1297        let mut driver = driver_handle.lock().await;
1298        let mut report = driver.as_driver_mut().retire().await?;
1299        drop(driver); // Release driver lock before waking
1300
1301        if report.inputs_pending_drain > 0 {
1302            // Wake the runtime loop so it drains already-queued inputs.
1303            // Retired state allows processing but rejects new accepts.
1304            if let Some(ref wake_tx) = wake_tx
1305                && wake_tx.send(()).await.is_ok()
1306            {
1307                return Ok(report);
1308            }
1309
1310            // No live loop can drain this retired queue. Abandon the queued work
1311            // now so later recovery/upgrade paths do not execute inputs whose
1312            // waiters were already terminated.
1313            let mut driver = driver_handle.lock().await;
1314            let abandoned = driver
1315                .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1316                .await?;
1317            drop(driver);
1318            let mut completions = completions.lock().await;
1319            completions.resolve_all_terminated("retired without runtime loop");
1320            report.inputs_abandoned += abandoned;
1321            report.inputs_pending_drain = 0;
1322        }
1323
1324        Ok(report)
1325    }
1326
1327    async fn reset_runtime(
1328        &self,
1329        session_id: &SessionId,
1330    ) -> Result<ResetReport, RuntimeDriverError> {
1331        let (driver, completions) = {
1332            let sessions = self.sessions.read().await;
1333            let entry = sessions
1334                .get(session_id)
1335                .ok_or(RuntimeDriverError::NotReady {
1336                    state: RuntimeState::Destroyed,
1337                })?;
1338            (entry.driver.clone(), entry.completions.clone())
1339        };
1340        let mut driver = driver.lock().await;
1341        if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
1342            return Err(RuntimeDriverError::NotReady {
1343                state: RuntimeState::Running,
1344            });
1345        }
1346        let report = driver.as_driver_mut().reset().await?;
1347        drop(driver);
1348
1349        // Resolve all pending completion waiters — reset discards all queued work
1350        let mut completions = completions.lock().await;
1351        completions.resolve_all_terminated("runtime reset");
1352
1353        Ok(report)
1354    }
1355
1356    async fn input_state(
1357        &self,
1358        session_id: &SessionId,
1359        input_id: &InputId,
1360    ) -> Result<Option<InputState>, RuntimeDriverError> {
1361        let driver = {
1362            let sessions = self.sessions.read().await;
1363            let entry = sessions
1364                .get(session_id)
1365                .ok_or(RuntimeDriverError::NotReady {
1366                    state: RuntimeState::Destroyed,
1367                })?;
1368            entry.driver.clone()
1369        };
1370        let driver = driver.lock().await;
1371        Ok(driver.as_driver().input_state(input_id).cloned())
1372    }
1373
1374    async fn list_active_inputs(
1375        &self,
1376        session_id: &SessionId,
1377    ) -> Result<Vec<InputId>, RuntimeDriverError> {
1378        let driver = {
1379            let sessions = self.sessions.read().await;
1380            let entry = sessions
1381                .get(session_id)
1382                .ok_or(RuntimeDriverError::NotReady {
1383                    state: RuntimeState::Destroyed,
1384                })?;
1385            entry.driver.clone()
1386        };
1387        let driver = driver.lock().await;
1388        Ok(driver.as_driver().active_input_ids())
1389    }
1390}
1391
1392// ---------------------------------------------------------------------------
1393// RuntimeControlPlane implementation
1394// ---------------------------------------------------------------------------
1395
1396impl RuntimeSessionAdapter {
1397    /// Resolve a LogicalRuntimeId to a SessionId for internal lookup.
1398    ///
1399    /// The adapter uses `LogicalRuntimeId::new(session_id.to_string())` when
1400    /// creating drivers, so runtime IDs are UUID strings that parse back to
1401    /// SessionId.
1402    fn resolve_session_id(
1403        runtime_id: &LogicalRuntimeId,
1404    ) -> Result<SessionId, RuntimeControlPlaneError> {
1405        runtime_id
1406            .0
1407            .parse::<uuid::Uuid>()
1408            .map(SessionId)
1409            .map_err(|_| RuntimeControlPlaneError::NotFound(runtime_id.clone()))
1410    }
1411
1412    /// Look up the session entry for a runtime ID, returning a control-plane error
1413    /// if not found.
1414    async fn lookup_entry(
1415        &self,
1416        runtime_id: &LogicalRuntimeId,
1417    ) -> Result<
1418        (
1419            SessionId,
1420            SharedDriver,
1421            SharedCompletionRegistry,
1422            Option<mpsc::Sender<()>>,
1423        ),
1424        RuntimeControlPlaneError,
1425    > {
1426        let session_id = Self::resolve_session_id(runtime_id)?;
1427        let sessions = self.sessions.read().await;
1428        let entry = sessions
1429            .get(&session_id)
1430            .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))?;
1431        Ok((
1432            session_id,
1433            entry.driver.clone(),
1434            entry.completions.clone(),
1435            entry.wake_sender(),
1436        ))
1437    }
1438}
1439
1440#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
1441#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
1442impl crate::traits::RuntimeControlPlane for RuntimeSessionAdapter {
1443    async fn ingest(
1444        &self,
1445        runtime_id: &LogicalRuntimeId,
1446        input: Input,
1447    ) -> Result<AcceptOutcome, RuntimeControlPlaneError> {
1448        let (session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1449        let _ = session_id;
1450
1451        let (outcome, should_wake, should_process) = {
1452            let mut drv = driver.lock().await;
1453            let result = drv
1454                .as_driver_mut()
1455                .accept_input(input)
1456                .await
1457                .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1458            let wake = drv.take_wake_requested();
1459            let process_now = drv.take_process_requested();
1460            (result, wake, process_now)
1461        };
1462
1463        if (should_wake || should_process)
1464            && let Some(ref tx) = wake_tx
1465        {
1466            let _ = tx.try_send(());
1467        }
1468
1469        Ok(outcome)
1470    }
1471
1472    async fn publish_event(
1473        &self,
1474        event: crate::runtime_event::RuntimeEventEnvelope,
1475    ) -> Result<(), RuntimeControlPlaneError> {
1476        let runtime_id = event.runtime_id.clone();
1477        let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(&runtime_id).await?;
1478
1479        let mut drv = driver.lock().await;
1480        drv.as_driver_mut()
1481            .on_runtime_event(event)
1482            .await
1483            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))
1484    }
1485
1486    async fn retire(
1487        &self,
1488        runtime_id: &LogicalRuntimeId,
1489    ) -> Result<RetireReport, RuntimeControlPlaneError> {
1490        let (session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1491        let _ = session_id;
1492
1493        let mut drv = driver.lock().await;
1494        let mut report = drv
1495            .as_driver_mut()
1496            .retire()
1497            .await
1498            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1499        drop(drv);
1500
1501        if report.inputs_pending_drain > 0 {
1502            if let Some(ref tx) = wake_tx
1503                && tx.send(()).await.is_ok()
1504            {
1505                return Ok(report);
1506            }
1507
1508            // No live loop — abandon queued work
1509            let mut drv = driver.lock().await;
1510            let abandoned = drv
1511                .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
1512                .await
1513                .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1514            drop(drv);
1515            let mut comp = completions.lock().await;
1516            comp.resolve_all_terminated("retired without runtime loop");
1517            report.inputs_abandoned += abandoned;
1518            report.inputs_pending_drain = 0;
1519        }
1520
1521        Ok(report)
1522    }
1523
1524    async fn recycle(
1525        &self,
1526        runtime_id: &LogicalRuntimeId,
1527    ) -> Result<RecycleReport, RuntimeControlPlaneError> {
1528        let (_session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1529
1530        let (transferred, active_after_recycle) = {
1531            let mut drv = driver.lock().await;
1532            let state = drv.as_driver().runtime_state();
1533            if matches!(state, RuntimeState::Running) {
1534                return Err(RuntimeControlPlaneError::InvalidState { state });
1535            }
1536            let should_restore_attached = matches!(state, RuntimeState::Attached);
1537
1538            let transferred = match &mut *drv {
1539                DriverEntry::Ephemeral(driver) => driver
1540                    .recycle_preserving_work()
1541                    .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1542                DriverEntry::Persistent(driver) => driver
1543                    .recycle_preserving_work()
1544                    .await
1545                    .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
1546            };
1547
1548            if should_restore_attached
1549                && matches!(drv.as_driver().runtime_state(), RuntimeState::Idle)
1550            {
1551                drv.attach()
1552                    .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1553            }
1554
1555            let active_after_recycle = drv.as_driver().active_input_ids();
1556            (transferred, active_after_recycle)
1557        };
1558
1559        // Reconcile existing waiters: keep waiting only for inputs that remain
1560        // active after recycle; terminate stale waiters so they cannot hang.
1561        {
1562            let pending_after: HashSet<InputId> = active_after_recycle.into_iter().collect();
1563            let mut comp = completions.lock().await;
1564            comp.resolve_not_pending(
1565                |input_id| pending_after.contains(input_id),
1566                "recycled input no longer pending",
1567            );
1568        }
1569
1570        // Wake the runtime loop to process re-queued inputs
1571        if let Some(ref tx) = wake_tx {
1572            let _ = tx.try_send(());
1573        }
1574
1575        Ok(RecycleReport {
1576            inputs_transferred: transferred,
1577        })
1578    }
1579
1580    async fn reset(
1581        &self,
1582        runtime_id: &LogicalRuntimeId,
1583    ) -> Result<crate::traits::ResetReport, RuntimeControlPlaneError> {
1584        let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1585
1586        let mut drv = driver.lock().await;
1587        if matches!(drv.as_driver().runtime_state(), RuntimeState::Running) {
1588            return Err(RuntimeControlPlaneError::InvalidState {
1589                state: RuntimeState::Running,
1590            });
1591        }
1592        let report = drv
1593            .as_driver_mut()
1594            .reset()
1595            .await
1596            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1597        drop(drv);
1598
1599        let mut comp = completions.lock().await;
1600        comp.resolve_all_terminated("runtime reset");
1601
1602        Ok(report)
1603    }
1604
1605    async fn recover(
1606        &self,
1607        runtime_id: &LogicalRuntimeId,
1608    ) -> Result<RecoveryReport, RuntimeControlPlaneError> {
1609        let (_session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
1610
1611        let mut drv = driver.lock().await;
1612        let report = drv
1613            .as_driver_mut()
1614            .recover()
1615            .await
1616            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1617        drop(drv);
1618
1619        if let Some(ref tx) = wake_tx {
1620            let _ = tx.try_send(());
1621        }
1622
1623        Ok(report)
1624    }
1625
1626    async fn destroy(
1627        &self,
1628        runtime_id: &LogicalRuntimeId,
1629    ) -> Result<DestroyReport, RuntimeControlPlaneError> {
1630        let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1631
1632        let mut drv = driver.lock().await;
1633        let report = drv
1634            .as_driver_mut()
1635            .destroy()
1636            .await
1637            .map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
1638        drop(drv);
1639
1640        let mut comp = completions.lock().await;
1641        comp.resolve_all_terminated("runtime destroyed");
1642
1643        Ok(report)
1644    }
1645
1646    async fn runtime_state(
1647        &self,
1648        runtime_id: &LogicalRuntimeId,
1649    ) -> Result<RuntimeState, RuntimeControlPlaneError> {
1650        let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
1651
1652        let drv = driver.lock().await;
1653        Ok(drv.as_driver().runtime_state())
1654    }
1655
1656    async fn load_boundary_receipt(
1657        &self,
1658        runtime_id: &LogicalRuntimeId,
1659        run_id: &RunId,
1660        sequence: u64,
1661    ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeControlPlaneError> {
1662        match &self.store {
1663            Some(store) => store
1664                .load_boundary_receipt(runtime_id, run_id, sequence)
1665                .await
1666                .map_err(|e| RuntimeControlPlaneError::StoreError(e.to_string())),
1667            None => {
1668                // Ephemeral mode — no persisted receipts
1669                Ok(None)
1670            }
1671        }
1672    }
1673}
1674
1675#[cfg(test)]
1676#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1677mod tests {
1678    use super::*;
1679    use std::sync::atomic::{AtomicBool, Ordering};
1680    use std::time::Duration;
1681
1682    use meerkat_core::agent::{CommsCapabilityError, CommsRuntime};
1683    use meerkat_core::comms_drain_lifecycle_authority::{CommsDrainMode, CommsDrainPhase};
1684    use tokio::sync::Notify;
1685
1686    struct FakeDrainRuntime {
1687        notify: Arc<Notify>,
1688        dismiss: AtomicBool,
1689    }
1690
1691    impl FakeDrainRuntime {
1692        fn dismissing() -> Self {
1693            Self {
1694                notify: Arc::new(Notify::new()),
1695                dismiss: AtomicBool::new(true),
1696            }
1697        }
1698
1699        fn idle() -> Self {
1700            Self {
1701                notify: Arc::new(Notify::new()),
1702                dismiss: AtomicBool::new(false),
1703            }
1704        }
1705    }
1706
1707    #[async_trait::async_trait]
1708    impl CommsRuntime for FakeDrainRuntime {
1709        async fn drain_messages(&self) -> Vec<String> {
1710            Vec::new()
1711        }
1712
1713        fn inbox_notify(&self) -> Arc<Notify> {
1714            Arc::clone(&self.notify)
1715        }
1716
1717        fn dismiss_received(&self) -> bool {
1718            self.dismiss.load(Ordering::Acquire)
1719        }
1720
1721        async fn drain_classified_inbox_interactions(
1722            &self,
1723        ) -> Result<Vec<meerkat_core::interaction::ClassifiedInboxInteraction>, CommsCapabilityError>
1724        {
1725            Ok(Vec::new())
1726        }
1727    }
1728
1729    async fn spawn_test_comms_drain(
1730        adapter: &Arc<RuntimeSessionAdapter>,
1731        session_id: &SessionId,
1732        mode: CommsDrainMode,
1733        comms_runtime: Arc<dyn CommsRuntime>,
1734        idle_timeout: Duration,
1735    ) {
1736        adapter.register_session(session_id.clone()).await;
1737        let mut slots = adapter.comms_drain_slots.write().await;
1738        let slot = slots
1739            .entry(session_id.clone())
1740            .or_insert_with(CommsDrainSlot::new);
1741        let result = protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode)
1742            .expect("ensure running");
1743        let obligation = result
1744            .obligation
1745            .expect("spawn obligation should be present");
1746
1747        apply_runtime_drain_effects(slot, &result.effects);
1748        for effect in &result.effects {
1749            if let CommsDrainLifecycleEffect::SpawnDrainTask { .. } = effect {
1750                slot.handle = Some(crate::comms_drain::spawn_comms_drain(
1751                    Arc::clone(adapter),
1752                    session_id.clone(),
1753                    Arc::clone(&comms_runtime),
1754                    Some(idle_timeout),
1755                ));
1756            }
1757        }
1758
1759        let feedback_effects =
1760            protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation)
1761                .expect("task spawned");
1762        apply_runtime_drain_effects(slot, &feedback_effects);
1763    }
1764
1765    async fn current_phase(
1766        adapter: &Arc<RuntimeSessionAdapter>,
1767        session_id: &SessionId,
1768    ) -> Option<CommsDrainPhase> {
1769        let slots = adapter.comms_drain_slots.read().await;
1770        slots.get(session_id).map(|slot| slot.authority.phase())
1771    }
1772
1773    async fn handle_present(adapter: &Arc<RuntimeSessionAdapter>, session_id: &SessionId) -> bool {
1774        let slots = adapter.comms_drain_slots.read().await;
1775        slots
1776            .get(session_id)
1777            .and_then(|slot| slot.handle.as_ref())
1778            .is_some()
1779    }
1780
1781    async fn wait_for_phase(
1782        adapter: &Arc<RuntimeSessionAdapter>,
1783        session_id: &SessionId,
1784        expected: CommsDrainPhase,
1785    ) {
1786        tokio::time::timeout(Duration::from_secs(1), async {
1787            loop {
1788                if current_phase(adapter, session_id).await == Some(expected) {
1789                    break;
1790                }
1791                tokio::time::sleep(Duration::from_millis(5)).await;
1792            }
1793        })
1794        .await
1795        .expect("phase transition");
1796    }
1797
1798    #[tokio::test]
1799    async fn dismiss_exit_updates_authority_before_join() {
1800        let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
1801        let session_id = SessionId::new();
1802        let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::dismissing());
1803
1804        spawn_test_comms_drain(
1805            &adapter,
1806            &session_id,
1807            CommsDrainMode::PersistentHost,
1808            comms_runtime,
1809            Duration::from_millis(25),
1810        )
1811        .await;
1812
1813        wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
1814        assert!(
1815            !handle_present(&adapter, &session_id).await,
1816            "drain task should clear its slot before wait_comms_drain joins"
1817        );
1818
1819        adapter.wait_comms_drain(&session_id).await;
1820        assert_eq!(
1821            current_phase(&adapter, &session_id).await,
1822            Some(CommsDrainPhase::Stopped)
1823        );
1824    }
1825
1826    #[tokio::test]
1827    async fn idle_timeout_updates_authority_before_join() {
1828        let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
1829        let session_id = SessionId::new();
1830        let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
1831
1832        spawn_test_comms_drain(
1833            &adapter,
1834            &session_id,
1835            CommsDrainMode::Timed,
1836            comms_runtime,
1837            Duration::from_millis(25),
1838        )
1839        .await;
1840
1841        wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
1842        assert!(
1843            !handle_present(&adapter, &session_id).await,
1844            "drain task should clear its slot before wait_comms_drain joins"
1845        );
1846
1847        adapter.wait_comms_drain(&session_id).await;
1848        assert_eq!(
1849            current_phase(&adapter, &session_id).await,
1850            Some(CommsDrainPhase::Stopped)
1851        );
1852    }
1853
1854    #[tokio::test]
1855    async fn unregister_session_aborts_and_removes_drain_slot() {
1856        let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
1857        let session_id = SessionId::new();
1858        let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
1859
1860        adapter.register_session(session_id.clone()).await;
1861        spawn_test_comms_drain(
1862            &adapter,
1863            &session_id,
1864            CommsDrainMode::PersistentHost,
1865            comms_runtime,
1866            Duration::from_secs(60),
1867        )
1868        .await;
1869
1870        assert_eq!(
1871            current_phase(&adapter, &session_id).await,
1872            Some(CommsDrainPhase::Running)
1873        );
1874        assert!(handle_present(&adapter, &session_id).await);
1875
1876        adapter.unregister_session(&session_id).await;
1877
1878        let slots = adapter.comms_drain_slots.read().await;
1879        assert!(
1880            !slots.contains_key(&session_id),
1881            "unregister must remove the comms drain slot entirely"
1882        );
1883    }
1884}