Skip to main content

meerkat_runtime/
session_adapter.rs

1//! RuntimeSessionAdapter — wraps a SessionService with per-session RuntimeDrivers.
2//!
3//! This adapter lives in meerkat-runtime so that meerkat-session doesn't need
4//! to depend on meerkat-runtime. Surfaces use this adapter to get v9 runtime
5//! capabilities on top of any SessionService implementation.
6//!
7//! When a session is registered with a `CoreExecutor`, a background RuntimeLoop
8//! task is spawned per session. `accept_input()` queues the input in the driver
9//! and, if wake is requested, signals the loop. The loop dequeues, stages,
10//! applies via CoreExecutor (which calls SessionService::start_turn()), and
11//! marks inputs as consumed.
12
13use std::collections::HashMap;
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
18use meerkat_core::lifecycle::run_control::RunControlCommand;
19use meerkat_core::lifecycle::{InputId, RunId};
20use meerkat_core::types::SessionId;
21
22use crate::accept::AcceptOutcome;
23use crate::driver::ephemeral::EphemeralRuntimeDriver;
24use crate::driver::persistent::PersistentRuntimeDriver;
25use crate::identifiers::LogicalRuntimeId;
26use crate::input::Input;
27use crate::input_machine::InputStateMachineError;
28use crate::input_state::InputState;
29use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
30use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
31use crate::store::RuntimeStore;
32use crate::tokio;
33use crate::tokio::sync::{Mutex, RwLock, mpsc};
34use crate::traits::{ResetReport, RetireReport, RuntimeDriver, RuntimeDriverError};
35
36/// Shared driver handle used by both the adapter and the RuntimeLoop.
37pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
38
39/// Per-session runtime driver entry.
40pub(crate) enum DriverEntry {
41    Ephemeral(EphemeralRuntimeDriver),
42    Persistent(PersistentRuntimeDriver),
43}
44
45impl DriverEntry {
46    pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
47        match self {
48            DriverEntry::Ephemeral(d) => d,
49            DriverEntry::Persistent(d) => d,
50        }
51    }
52
53    pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
54        match self {
55            DriverEntry::Ephemeral(d) => d,
56            DriverEntry::Persistent(d) => d,
57        }
58    }
59
60    /// Check if the runtime is idle.
61    pub(crate) fn is_idle(&self) -> bool {
62        match self {
63            DriverEntry::Ephemeral(d) => d.is_idle(),
64            DriverEntry::Persistent(d) => d.is_idle(),
65        }
66    }
67
68    /// Check if the runtime can process queued inputs (Idle or Retired).
69    pub(crate) fn can_process_queue(&self) -> bool {
70        match self {
71            DriverEntry::Ephemeral(d) => d.state_machine_ref().can_process_queue(),
72            DriverEntry::Persistent(d) => d.inner_ref().state_machine_ref().can_process_queue(),
73        }
74    }
75
76    /// Check and clear the wake flag.
77    pub(crate) fn take_wake_requested(&mut self) -> bool {
78        match self {
79            DriverEntry::Ephemeral(d) => d.take_wake_requested(),
80            DriverEntry::Persistent(d) => d.take_wake_requested(),
81        }
82    }
83
84    /// Check and clear the immediate processing flag.
85    pub(crate) fn take_process_requested(&mut self) -> bool {
86        match self {
87            DriverEntry::Ephemeral(d) => d.take_process_requested(),
88            DriverEntry::Persistent(d) => d.take_process_requested(),
89        }
90    }
91
92    /// Dequeue the next input for processing.
93    pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
94        match self {
95            DriverEntry::Ephemeral(d) => d.dequeue_next(),
96            DriverEntry::Persistent(d) => d.dequeue_next(),
97        }
98    }
99
100    /// Start a new run (Idle → Running).
101    pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
102        match self {
103            DriverEntry::Ephemeral(d) => d.start_run(run_id),
104            DriverEntry::Persistent(d) => d.start_run(run_id),
105        }
106    }
107
108    /// Complete a run (Running → Idle).
109    pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
110        match self {
111            DriverEntry::Ephemeral(d) => d.complete_run(),
112            DriverEntry::Persistent(d) => d.complete_run(),
113        }
114    }
115
116    /// Stage an input (Queued → Staged).
117    pub(crate) fn stage_input(
118        &mut self,
119        input_id: &InputId,
120        run_id: &RunId,
121    ) -> Result<(), InputStateMachineError> {
122        match self {
123            DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
124            DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
125        }
126    }
127
128    /// Apply an input after successful immediate execution.
129    #[allow(dead_code)]
130    pub(crate) fn apply_input(
131        &mut self,
132        input_id: &InputId,
133        run_id: &RunId,
134    ) -> Result<(), InputStateMachineError> {
135        match self {
136            DriverEntry::Ephemeral(d) => d.apply_input(input_id, run_id),
137            DriverEntry::Persistent(d) => d.apply_input(input_id, run_id),
138        }
139    }
140
141    /// Consume an input after successful immediate execution.
142    #[allow(dead_code)]
143    pub(crate) fn consume_inputs(
144        &mut self,
145        input_ids: &[InputId],
146        run_id: &RunId,
147    ) -> Result<(), InputStateMachineError> {
148        match self {
149            DriverEntry::Ephemeral(d) => d.consume_inputs(input_ids, run_id),
150            DriverEntry::Persistent(d) => d.consume_inputs(input_ids, run_id),
151        }
152    }
153
154    /// Roll back staged inputs after failed immediate execution.
155    #[allow(dead_code)]
156    pub(crate) fn rollback_staged(
157        &mut self,
158        input_ids: &[InputId],
159    ) -> Result<(), InputStateMachineError> {
160        match self {
161            DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
162            DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
163        }
164    }
165}
166
167/// Shared completion registry (accessed by adapter for registration and loop for resolution).
168pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
169
170/// Per-session state: driver + optional RuntimeLoop.
171struct RuntimeSessionEntry {
172    /// Shared driver handle (accessed by both adapter methods and RuntimeLoop).
173    driver: SharedDriver,
174    /// Completion waiters (accessed by accept_input_with_completion and RuntimeLoop).
175    completions: SharedCompletionRegistry,
176    /// Wake signal sender (if a RuntimeLoop is attached).
177    wake_tx: Option<mpsc::Sender<()>>,
178    /// Run-control sender for cancelling the current run.
179    control_tx: Option<mpsc::Sender<RunControlCommand>>,
180    /// Loop task handle (dropped on unregister, which closes the channel).
181    _loop_handle: Option<tokio::task::JoinHandle<()>>,
182}
183
184/// Wraps a SessionService to provide v9 runtime capabilities.
185///
186/// Maintains a per-session RuntimeDriver registry. When sessions are registered
187/// with a `CoreExecutor`, a RuntimeLoop task is spawned that processes queued
188/// inputs by calling `CoreExecutor::apply()` (which triggers
189/// `SessionService::start_turn()` under the hood).
190pub struct RuntimeSessionAdapter {
191    /// Per-session entries.
192    sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
193    /// Runtime mode.
194    mode: RuntimeMode,
195    /// Optional RuntimeStore for persistent drivers.
196    store: Option<Arc<dyn RuntimeStore>>,
197}
198
199impl RuntimeSessionAdapter {
200    /// Create an ephemeral adapter (all sessions use EphemeralRuntimeDriver).
201    pub fn ephemeral() -> Self {
202        Self {
203            sessions: RwLock::new(HashMap::new()),
204            mode: RuntimeMode::V9Compliant,
205            store: None,
206        }
207    }
208
209    /// Create a persistent adapter with a RuntimeStore.
210    pub fn persistent(store: Arc<dyn RuntimeStore>) -> Self {
211        Self {
212            sessions: RwLock::new(HashMap::new()),
213            mode: RuntimeMode::V9Compliant,
214            store: Some(store),
215        }
216    }
217
218    /// Create a driver entry for a session.
219    fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
220        let runtime_id = LogicalRuntimeId::new(session_id.to_string());
221        match &self.store {
222            Some(store) => {
223                DriverEntry::Persistent(PersistentRuntimeDriver::new(runtime_id, store.clone()))
224            }
225            None => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
226        }
227    }
228
229    /// Register a runtime driver for a session (no RuntimeLoop — inputs queue but
230    /// nothing processes them automatically). Useful for tests and legacy mode.
231    pub async fn register_session(&self, session_id: SessionId) {
232        if self.contains_session(&session_id).await {
233            return;
234        }
235        let mut entry = self.make_driver(&session_id);
236        if let Err(err) = entry.as_driver_mut().recover().await {
237            tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
238            return;
239        }
240        let session_entry = RuntimeSessionEntry {
241            driver: Arc::new(Mutex::new(entry)),
242            completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
243            wake_tx: None,
244            control_tx: None,
245            _loop_handle: None,
246        };
247        let mut sessions = self.sessions.write().await;
248        sessions.entry(session_id).or_insert(session_entry);
249    }
250
251    /// Register a runtime driver for a session WITH a RuntimeLoop backed by a
252    /// `CoreExecutor`. When `accept_input()` queues an input and requests wake,
253    /// the loop dequeues it and calls `executor.apply()` (which triggers
254    /// `SessionService::start_turn()`).
255    pub async fn register_session_with_executor(
256        &self,
257        session_id: SessionId,
258        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
259    ) {
260        self.ensure_session_with_executor(session_id, executor)
261            .await;
262    }
263
264    /// Ensure a runtime driver with executor exists for the session.
265    ///
266    /// If a session was already registered without a loop, upgrade the
267    /// existing driver in place so queued inputs remain attached to the same
268    /// runtime ledger and can start draining immediately.
269    pub async fn ensure_session_with_executor(
270        &self,
271        session_id: SessionId,
272        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
273    ) {
274        let mut executor = Some(executor);
275        let upgrade = {
276            let mut sessions = self.sessions.write().await;
277            if let Some(entry) = sessions.get_mut(&session_id) {
278                if entry.wake_tx.is_some() && entry.control_tx.is_some() {
279                    return;
280                }
281
282                let driver = Arc::clone(&entry.driver);
283                let (wake_tx, wake_rx) = mpsc::channel(16);
284                let (control_tx, control_rx) = mpsc::channel(16);
285                let Some(executor) = executor.take() else {
286                    tracing::error!(%session_id, "executor missing while upgrading existing runtime session");
287                    return;
288                };
289                let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
290                    driver.clone(),
291                    executor,
292                    wake_rx,
293                    control_rx,
294                    Some(entry.completions.clone()),
295                );
296
297                entry.wake_tx = Some(wake_tx.clone());
298                entry.control_tx = Some(control_tx);
299                entry._loop_handle = Some(handle);
300                Some((driver, wake_tx))
301            } else {
302                None
303            }
304        };
305
306        if let Some((driver, wake_tx)) = upgrade {
307            let should_wake = {
308                let driver = driver.lock().await;
309                !driver.as_driver().active_input_ids().is_empty()
310            };
311            if should_wake {
312                let _ = wake_tx.try_send(());
313            }
314            return;
315        }
316
317        let mut recovered_entry = self.make_driver(&session_id);
318        if let Err(err) = recovered_entry.as_driver_mut().recover().await {
319            tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
320            return;
321        }
322
323        let driver = {
324            let mut sessions = self.sessions.write().await;
325            if let Some(entry) = sessions.get(&session_id) {
326                if entry.wake_tx.is_some() && entry.control_tx.is_some() {
327                    return;
328                }
329                entry.driver.clone()
330            } else {
331                let driver = Arc::new(Mutex::new(recovered_entry));
332                sessions.insert(
333                    session_id.clone(),
334                    RuntimeSessionEntry {
335                        driver: driver.clone(),
336                        completions: Arc::new(Mutex::new(
337                            crate::completion::CompletionRegistry::new(),
338                        )),
339                        wake_tx: None,
340                        control_tx: None,
341                        _loop_handle: None,
342                    },
343                );
344                driver
345            }
346        };
347
348        let (wake_tx, wake_rx) = mpsc::channel(16);
349        let (control_tx, control_rx) = mpsc::channel(16);
350        let Some(executor) = executor.take() else {
351            tracing::error!(%session_id, "executor missing while registering runtime session");
352            return;
353        };
354
355        // Get or create completions for this session
356        let completions = {
357            let sessions = self.sessions.read().await;
358            sessions.get(&session_id).map(|e| e.completions.clone())
359        };
360
361        let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
362            driver.clone(),
363            executor,
364            wake_rx,
365            control_rx,
366            completions,
367        );
368
369        let mut sessions = self.sessions.write().await;
370        let Some(entry) = sessions.get_mut(&session_id) else {
371            return;
372        };
373        if entry.wake_tx.is_some() && entry.control_tx.is_some() {
374            return;
375        }
376        entry.wake_tx = Some(wake_tx.clone());
377        entry.control_tx = Some(control_tx);
378        entry._loop_handle = Some(handle);
379        drop(sessions);
380
381        let should_wake = {
382            let driver = driver.lock().await;
383            !driver.as_driver().active_input_ids().is_empty()
384        };
385        if should_wake {
386            let _ = wake_tx.try_send(());
387        }
388    }
389
390    /// Unregister a session's runtime driver.
391    ///
392    /// Drops the wake channel sender, which causes the RuntimeLoop to exit.
393    pub async fn unregister_session(&self, session_id: &SessionId) {
394        self.sessions.write().await.remove(session_id);
395    }
396
397    /// Check whether a runtime driver is already registered for a session.
398    pub async fn contains_session(&self, session_id: &SessionId) -> bool {
399        self.sessions.read().await.contains_key(session_id)
400    }
401
402    /// Cancel the currently-running turn for a registered session.
403    pub async fn interrupt_current_run(
404        &self,
405        session_id: &SessionId,
406    ) -> Result<(), RuntimeDriverError> {
407        let sessions = self.sessions.read().await;
408        let entry = sessions
409            .get(session_id)
410            .ok_or(RuntimeDriverError::NotReady {
411                state: RuntimeState::Destroyed,
412            })?;
413        let Some(control_tx) = &entry.control_tx else {
414            return Err(RuntimeDriverError::NotReady {
415                state: RuntimeState::Destroyed,
416            });
417        };
418        control_tx
419            .send(RunControlCommand::CancelCurrentRun {
420                reason: "mob interrupt".to_string(),
421            })
422            .await
423            .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
424    }
425
426    /// Accept an input and execute it synchronously through the runtime driver.
427    ///
428    /// This is useful for surfaces that need the legacy request/response shape
429    /// while still preserving v9 input lifecycle semantics.
430    pub async fn accept_input_and_run<T, F, Fut>(
431        &self,
432        session_id: &SessionId,
433        input: Input,
434        op: F,
435    ) -> Result<T, RuntimeDriverError>
436    where
437        F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
438        Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
439    {
440        let driver = {
441            let sessions = self.sessions.read().await;
442            sessions
443                .get(session_id)
444                .ok_or(RuntimeDriverError::NotReady {
445                    state: RuntimeState::Destroyed,
446                })?
447                .driver
448                .clone()
449        };
450
451        let (input_id, run_id, primitive) = {
452            let mut driver = driver.lock().await;
453            if !driver.is_idle() || !driver.as_driver().active_input_ids().is_empty() {
454                return Err(RuntimeDriverError::NotReady {
455                    state: driver.as_driver().runtime_state(),
456                });
457            }
458            let outcome = driver.as_driver_mut().accept_input(input).await?;
459            let input_id = match outcome {
460                AcceptOutcome::Accepted { input_id, .. } => input_id,
461                AcceptOutcome::Deduplicated { existing_id, .. } => existing_id,
462                AcceptOutcome::Rejected { reason } => {
463                    return Err(RuntimeDriverError::ValidationFailed { reason });
464                }
465            };
466
467            if !driver.is_idle() {
468                return Err(RuntimeDriverError::NotReady {
469                    state: driver.as_driver().runtime_state(),
470                });
471            }
472
473            let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
474                RuntimeDriverError::Internal("accepted input was not queued for execution".into())
475            })?;
476            if dequeued_id != input_id {
477                return Err(RuntimeDriverError::NotReady {
478                    state: driver.as_driver().runtime_state(),
479                });
480            }
481            let run_id = RunId::new();
482            driver.start_run(run_id.clone()).map_err(|err| {
483                RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
484            })?;
485            driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
486                RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
487            })?;
488            let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
489            (input_id, run_id, primitive)
490        };
491
492        match op(run_id.clone(), primitive.clone()).await {
493            Ok((result, output)) => {
494                let mut driver = driver.lock().await;
495                if let Err(err) = driver
496                    .as_driver_mut()
497                    .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
498                        run_id: run_id.clone(),
499                        receipt: output.receipt,
500                        session_snapshot: output.session_snapshot,
501                    })
502                    .await
503                {
504                    if let Err(unwind_err) = driver
505                        .as_driver_mut()
506                        .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
507                            run_id,
508                            error: format!("boundary commit failed: {err}"),
509                            recoverable: true,
510                        })
511                        .await
512                    {
513                        return Err(RuntimeDriverError::Internal(format!(
514                            "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
515                        )));
516                    }
517                    return Err(RuntimeDriverError::Internal(format!(
518                        "runtime boundary commit failed: {err}"
519                    )));
520                }
521                if let Err(err) = driver
522                    .as_driver_mut()
523                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
524                        run_id,
525                        consumed_input_ids: vec![input_id],
526                    })
527                    .await
528                {
529                    drop(driver);
530                    self.unregister_session(session_id).await;
531                    return Err(RuntimeDriverError::Internal(format!(
532                        "failed to persist runtime completion snapshot: {err}"
533                    )));
534                }
535                Ok(result)
536            }
537            Err(err) => {
538                let mut driver = driver.lock().await;
539                if let Err(run_err) = driver
540                    .as_driver_mut()
541                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
542                        run_id,
543                        error: err.to_string(),
544                        recoverable: true,
545                    })
546                    .await
547                {
548                    drop(driver);
549                    self.unregister_session(session_id).await;
550                    return Err(RuntimeDriverError::Internal(format!(
551                        "failed to persist runtime failure snapshot: {run_err}"
552                    )));
553                }
554                Err(err)
555            }
556        }
557    }
558
559    /// Accept an input and return a completion handle that resolves when the
560    /// input reaches a terminal state (Consumed or Abandoned).
561    ///
562    /// Returns `(AcceptOutcome, Option<CompletionHandle>)`:
563    /// - `(Accepted, Some(handle))` — await handle for result
564    /// - `(Deduplicated, Some(handle))` — joined in-flight waiter
565    /// - `(Deduplicated, None)` — input already terminal; no waiter needed
566    /// - `(Rejected, _)` — returned as `Err(ValidationFailed)`
567    pub async fn accept_input_with_completion(
568        &self,
569        session_id: &SessionId,
570        input: Input,
571    ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
572    {
573        let sessions = self.sessions.read().await;
574        let entry = sessions
575            .get(session_id)
576            .ok_or(RuntimeDriverError::NotReady {
577                state: RuntimeState::Destroyed,
578            })?;
579
580        let (outcome, should_wake, should_process, handle) = {
581            let mut driver = entry.driver.lock().await;
582            let result = driver.as_driver_mut().accept_input(input).await?;
583
584            match &result {
585                AcceptOutcome::Accepted { input_id, .. } => {
586                    let handle = {
587                        let mut completions = entry.completions.lock().await;
588                        completions.register(input_id.clone())
589                    };
590                    let wake = driver.take_wake_requested();
591                    let process_now = driver.take_process_requested();
592                    (result, wake, process_now, Some(handle))
593                }
594                AcceptOutcome::Deduplicated { existing_id, .. } => {
595                    // Check if the existing input is already terminal
596                    let existing_state = driver.as_driver().input_state(existing_id);
597                    let is_terminal = existing_state
598                        .map(|s| s.current_state.is_terminal())
599                        .unwrap_or(true); // missing state = already cleaned up = terminal
600
601                    if is_terminal {
602                        // Input already processed — no handle, no waiter
603                        (result, false, false, None)
604                    } else {
605                        // In-flight — join existing waiters via multi-waiter Vec
606                        let handle = {
607                            let mut completions = entry.completions.lock().await;
608                            completions.register(existing_id.clone())
609                        };
610                        (result, false, false, Some(handle))
611                    }
612                }
613                AcceptOutcome::Rejected { reason } => {
614                    return Err(RuntimeDriverError::ValidationFailed {
615                        reason: reason.clone(),
616                    });
617                }
618            }
619        };
620
621        if (should_wake || should_process)
622            && let Some(ref wake_tx) = entry.wake_tx
623        {
624            let _ = wake_tx.try_send(());
625        }
626
627        Ok((outcome, handle))
628    }
629
630    /// Get the shared completion registry for a session.
631    ///
632    /// Used by the runtime loop to resolve waiters on input consumption.
633    #[allow(dead_code)]
634    pub(crate) async fn completion_registry(
635        &self,
636        session_id: &SessionId,
637    ) -> Option<SharedCompletionRegistry> {
638        let sessions = self.sessions.read().await;
639        sessions.get(session_id).map(|e| e.completions.clone())
640    }
641}
642
643#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
644#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
645impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
646    fn runtime_mode(&self) -> RuntimeMode {
647        self.mode
648    }
649
650    async fn accept_input(
651        &self,
652        session_id: &SessionId,
653        input: Input,
654    ) -> Result<AcceptOutcome, RuntimeDriverError> {
655        let sessions = self.sessions.read().await;
656        let entry = sessions
657            .get(session_id)
658            .ok_or(RuntimeDriverError::NotReady {
659                state: RuntimeState::Destroyed,
660            })?;
661
662        // Accept input and check wake under the driver lock
663        let (outcome, should_wake, should_process) = {
664            let mut driver = entry.driver.lock().await;
665            let result = driver.as_driver_mut().accept_input(input).await?;
666            let wake = driver.take_wake_requested();
667            let process_now = driver.take_process_requested();
668            (result, wake, process_now)
669        };
670
671        // Signal the RuntimeLoop if wake or immediate processing was requested.
672        if (should_wake || should_process)
673            && let Some(ref wake_tx) = entry.wake_tx
674        {
675            // Non-blocking: if the channel is full, the loop is already processing
676            let _ = wake_tx.try_send(());
677        }
678
679        Ok(outcome)
680    }
681
682    async fn runtime_state(
683        &self,
684        session_id: &SessionId,
685    ) -> Result<RuntimeState, RuntimeDriverError> {
686        let sessions = self.sessions.read().await;
687        let entry = sessions
688            .get(session_id)
689            .ok_or(RuntimeDriverError::NotReady {
690                state: RuntimeState::Destroyed,
691            })?;
692        let driver = entry.driver.lock().await;
693        Ok(driver.as_driver().runtime_state())
694    }
695
696    async fn retire_runtime(
697        &self,
698        session_id: &SessionId,
699    ) -> Result<RetireReport, RuntimeDriverError> {
700        let sessions = self.sessions.read().await;
701        let entry = sessions
702            .get(session_id)
703            .ok_or(RuntimeDriverError::NotReady {
704                state: RuntimeState::Destroyed,
705            })?;
706        let mut driver = entry.driver.lock().await;
707        let report = driver.as_driver_mut().retire().await?;
708        drop(driver); // Release driver lock before waking
709
710        if report.inputs_pending_drain > 0 {
711            // Wake the runtime loop so it drains already-queued inputs.
712            // Retired state allows processing but rejects new accepts.
713            if let Some(ref wake_tx) = entry.wake_tx {
714                let _ = wake_tx.send(()).await;
715            }
716        }
717
718        // If no loop is attached, nothing will drain — resolve all pending waiters
719        if entry.wake_tx.is_none() {
720            let mut completions = entry.completions.lock().await;
721            completions.resolve_all_terminated("retired without runtime loop");
722        }
723
724        Ok(report)
725    }
726
727    async fn reset_runtime(
728        &self,
729        session_id: &SessionId,
730    ) -> Result<ResetReport, RuntimeDriverError> {
731        let sessions = self.sessions.read().await;
732        let entry = sessions
733            .get(session_id)
734            .ok_or(RuntimeDriverError::NotReady {
735                state: RuntimeState::Destroyed,
736            })?;
737        let mut driver = entry.driver.lock().await;
738        if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
739            return Err(RuntimeDriverError::NotReady {
740                state: RuntimeState::Running,
741            });
742        }
743        let report = driver.as_driver_mut().reset().await?;
744
745        // Resolve all pending completion waiters — reset discards all queued work
746        let mut completions = entry.completions.lock().await;
747        completions.resolve_all_terminated("runtime reset");
748
749        Ok(report)
750    }
751
752    async fn input_state(
753        &self,
754        session_id: &SessionId,
755        input_id: &InputId,
756    ) -> Result<Option<InputState>, RuntimeDriverError> {
757        let sessions = self.sessions.read().await;
758        let entry = sessions
759            .get(session_id)
760            .ok_or(RuntimeDriverError::NotReady {
761                state: RuntimeState::Destroyed,
762            })?;
763        let driver = entry.driver.lock().await;
764        Ok(driver.as_driver().input_state(input_id).cloned())
765    }
766
767    async fn list_active_inputs(
768        &self,
769        session_id: &SessionId,
770    ) -> Result<Vec<InputId>, RuntimeDriverError> {
771        let sessions = self.sessions.read().await;
772        let entry = sessions
773            .get(session_id)
774            .ok_or(RuntimeDriverError::NotReady {
775                state: RuntimeState::Destroyed,
776            })?;
777        let driver = entry.driver.lock().await;
778        Ok(driver.as_driver().active_input_ids())
779    }
780}
781
782#[cfg(test)]
783#[allow(clippy::unwrap_used)]
784mod tests {
785    use super::*;
786    use crate::input::*;
787    use crate::input_state::InputState;
788    use crate::runtime_state::RuntimeState;
789    use crate::store::{RuntimeStore, RuntimeStoreError, SessionDelta};
790    use chrono::Utc;
791    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
792    use std::time::Duration;
793
794    fn make_prompt(text: &str) -> Input {
795        Input::Prompt(PromptInput {
796            header: InputHeader {
797                id: InputId::new(),
798                timestamp: Utc::now(),
799                source: InputOrigin::Operator,
800                durability: InputDurability::Durable,
801                visibility: InputVisibility::default(),
802                idempotency_key: None,
803                supersession_key: None,
804                correlation_id: None,
805            },
806            text: text.into(),
807            turn_metadata: None,
808        })
809    }
810
811    struct HarnessRuntimeStore {
812        inner: crate::store::InMemoryRuntimeStore,
813        fail_atomic_apply: bool,
814        /// Fail atomic_lifecycle_commit after N successful calls (None = never fail).
815        fail_atomic_lifecycle_commit_after: Option<usize>,
816        atomic_lifecycle_commit_calls: AtomicUsize,
817        load_input_states_delay: Duration,
818        fail_persist_input_state_after: Option<usize>,
819        persist_input_state_calls: AtomicUsize,
820    }
821
822    impl HarnessRuntimeStore {
823        fn failing_atomic_apply() -> Self {
824            Self {
825                inner: crate::store::InMemoryRuntimeStore::new(),
826                fail_atomic_apply: true,
827                fail_atomic_lifecycle_commit_after: None,
828                atomic_lifecycle_commit_calls: AtomicUsize::new(0),
829                load_input_states_delay: Duration::ZERO,
830                fail_persist_input_state_after: None,
831                persist_input_state_calls: AtomicUsize::new(0),
832            }
833        }
834
835        fn delayed_recover(delay: Duration) -> Self {
836            Self {
837                inner: crate::store::InMemoryRuntimeStore::new(),
838                fail_atomic_apply: false,
839                fail_atomic_lifecycle_commit_after: None,
840                atomic_lifecycle_commit_calls: AtomicUsize::new(0),
841                load_input_states_delay: delay,
842                fail_persist_input_state_after: None,
843                persist_input_state_calls: AtomicUsize::new(0),
844            }
845        }
846
847        fn failing_terminal_snapshot() -> Self {
848            Self {
849                inner: crate::store::InMemoryRuntimeStore::new(),
850                fail_atomic_apply: false,
851                // Recovery calls atomic_lifecycle_commit once (call 0 succeeds),
852                // the terminal event call (call 1) fails.
853                fail_atomic_lifecycle_commit_after: Some(1),
854                atomic_lifecycle_commit_calls: AtomicUsize::new(0),
855                load_input_states_delay: Duration::ZERO,
856                fail_persist_input_state_after: None,
857                persist_input_state_calls: AtomicUsize::new(0),
858            }
859        }
860    }
861
862    #[async_trait::async_trait]
863    impl RuntimeStore for HarnessRuntimeStore {
864        async fn commit_session_boundary(
865            &self,
866            runtime_id: &crate::identifiers::LogicalRuntimeId,
867            session_delta: SessionDelta,
868            run_id: RunId,
869            boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
870            contributing_input_ids: Vec<InputId>,
871            input_updates: Vec<InputState>,
872        ) -> Result<meerkat_core::lifecycle::RunBoundaryReceipt, RuntimeStoreError> {
873            self.inner
874                .commit_session_boundary(
875                    runtime_id,
876                    session_delta,
877                    run_id,
878                    boundary,
879                    contributing_input_ids,
880                    input_updates,
881                )
882                .await
883        }
884
885        async fn atomic_apply(
886            &self,
887            runtime_id: &crate::identifiers::LogicalRuntimeId,
888            session_delta: Option<SessionDelta>,
889            receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
890            input_updates: Vec<InputState>,
891            session_store_key: Option<meerkat_core::types::SessionId>,
892        ) -> Result<(), RuntimeStoreError> {
893            if self.fail_atomic_apply {
894                return Err(RuntimeStoreError::WriteFailed(
895                    "synthetic atomic_apply failure".to_string(),
896                ));
897            }
898            self.inner
899                .atomic_apply(
900                    runtime_id,
901                    session_delta,
902                    receipt,
903                    input_updates,
904                    session_store_key,
905                )
906                .await
907        }
908
909        async fn load_input_states(
910            &self,
911            runtime_id: &crate::identifiers::LogicalRuntimeId,
912        ) -> Result<Vec<InputState>, RuntimeStoreError> {
913            if !self.load_input_states_delay.is_zero() {
914                tokio::time::sleep(self.load_input_states_delay).await;
915            }
916            self.inner.load_input_states(runtime_id).await
917        }
918
919        async fn load_boundary_receipt(
920            &self,
921            runtime_id: &crate::identifiers::LogicalRuntimeId,
922            run_id: &RunId,
923            sequence: u64,
924        ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeStoreError>
925        {
926            self.inner
927                .load_boundary_receipt(runtime_id, run_id, sequence)
928                .await
929        }
930
931        async fn load_session_snapshot(
932            &self,
933            runtime_id: &crate::identifiers::LogicalRuntimeId,
934        ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
935            self.inner.load_session_snapshot(runtime_id).await
936        }
937
938        async fn persist_input_state(
939            &self,
940            runtime_id: &crate::identifiers::LogicalRuntimeId,
941            state: &InputState,
942        ) -> Result<(), RuntimeStoreError> {
943            let call_index = self
944                .persist_input_state_calls
945                .fetch_add(1, Ordering::SeqCst);
946            if self
947                .fail_persist_input_state_after
948                .is_some_and(|fail_after| call_index >= fail_after)
949            {
950                return Err(RuntimeStoreError::WriteFailed(
951                    "synthetic persist_input_state failure".to_string(),
952                ));
953            }
954            self.inner.persist_input_state(runtime_id, state).await
955        }
956
957        async fn load_input_state(
958            &self,
959            runtime_id: &crate::identifiers::LogicalRuntimeId,
960            input_id: &InputId,
961        ) -> Result<Option<InputState>, RuntimeStoreError> {
962            self.inner.load_input_state(runtime_id, input_id).await
963        }
964
965        async fn persist_runtime_state(
966            &self,
967            runtime_id: &crate::identifiers::LogicalRuntimeId,
968            state: RuntimeState,
969        ) -> Result<(), RuntimeStoreError> {
970            self.inner.persist_runtime_state(runtime_id, state).await
971        }
972
973        async fn load_runtime_state(
974            &self,
975            runtime_id: &crate::identifiers::LogicalRuntimeId,
976        ) -> Result<Option<RuntimeState>, RuntimeStoreError> {
977            self.inner.load_runtime_state(runtime_id).await
978        }
979
980        async fn atomic_lifecycle_commit(
981            &self,
982            runtime_id: &crate::identifiers::LogicalRuntimeId,
983            runtime_state: RuntimeState,
984            input_states: &[InputState],
985        ) -> Result<(), RuntimeStoreError> {
986            let call_index = self
987                .atomic_lifecycle_commit_calls
988                .fetch_add(1, Ordering::SeqCst);
989            if self
990                .fail_atomic_lifecycle_commit_after
991                .is_some_and(|fail_after| call_index >= fail_after)
992            {
993                return Err(RuntimeStoreError::WriteFailed(
994                    "synthetic atomic_lifecycle_commit failure".to_string(),
995                ));
996            }
997            self.inner
998                .atomic_lifecycle_commit(runtime_id, runtime_state, input_states)
999                .await
1000        }
1001    }
1002
1003    #[tokio::test]
1004    async fn ephemeral_adapter_accept_and_query() {
1005        let adapter = RuntimeSessionAdapter::ephemeral();
1006        let sid = SessionId::new();
1007        adapter.register_session(sid.clone()).await;
1008
1009        let input = make_prompt("hello");
1010        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1011        assert!(outcome.is_accepted());
1012
1013        let state = adapter.runtime_state(&sid).await.unwrap();
1014        assert_eq!(state, RuntimeState::Idle);
1015
1016        let active = adapter.list_active_inputs(&sid).await.unwrap();
1017        assert_eq!(active.len(), 1);
1018    }
1019
1020    #[tokio::test]
1021    async fn persistent_adapter_accept() {
1022        let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
1023        let adapter = RuntimeSessionAdapter::persistent(store);
1024        let sid = SessionId::new();
1025        adapter.register_session(sid.clone()).await;
1026
1027        let input = make_prompt("hello");
1028        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1029        assert!(outcome.is_accepted());
1030    }
1031
1032    #[tokio::test]
1033    async fn unregistered_session_errors() {
1034        let adapter = RuntimeSessionAdapter::ephemeral();
1035        let sid = SessionId::new();
1036        let result = adapter.accept_input(&sid, make_prompt("hi")).await;
1037        assert!(result.is_err());
1038    }
1039
1040    #[tokio::test]
1041    async fn unregister_removes_driver() {
1042        let adapter = RuntimeSessionAdapter::ephemeral();
1043        let sid = SessionId::new();
1044        adapter.register_session(sid.clone()).await;
1045        adapter.unregister_session(&sid).await;
1046
1047        let result = adapter.runtime_state(&sid).await;
1048        assert!(result.is_err());
1049    }
1050
1051    /// Test that accept_input with a RuntimeLoop triggers input processing.
1052    #[tokio::test]
1053    async fn accept_with_executor_triggers_loop() {
1054        use meerkat_core::lifecycle::RunId;
1055        use meerkat_core::lifecycle::core_executor::{
1056            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1057        };
1058        use meerkat_core::lifecycle::run_control::RunControlCommand;
1059        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1060        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1061        use std::sync::atomic::{AtomicBool, Ordering};
1062
1063        // Track whether apply was called
1064        let apply_called = Arc::new(AtomicBool::new(false));
1065        let apply_called_clone = apply_called.clone();
1066
1067        struct TestExecutor {
1068            called: Arc<AtomicBool>,
1069        }
1070
1071        #[async_trait::async_trait]
1072        impl CoreExecutor for TestExecutor {
1073            async fn apply(
1074                &mut self,
1075                run_id: RunId,
1076                primitive: RunPrimitive,
1077            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1078                self.called.store(true, Ordering::SeqCst);
1079                Ok(CoreApplyOutput {
1080                    receipt: RunBoundaryReceipt {
1081                        run_id,
1082                        boundary: RunApplyBoundary::RunStart,
1083                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1084                        conversation_digest: None,
1085                        message_count: 0,
1086                        sequence: 0,
1087                    },
1088                    session_snapshot: None,
1089                    run_result: None,
1090                })
1091            }
1092
1093            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1094                Ok(())
1095            }
1096        }
1097
1098        let adapter = RuntimeSessionAdapter::ephemeral();
1099        let sid = SessionId::new();
1100        let executor = Box::new(TestExecutor {
1101            called: apply_called_clone,
1102        });
1103        adapter
1104            .register_session_with_executor(sid.clone(), executor)
1105            .await;
1106
1107        // Accept input — should trigger the loop
1108        let input = make_prompt("hello from executor test");
1109        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1110        assert!(outcome.is_accepted());
1111
1112        // Give the loop time to process
1113        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1114
1115        assert!(
1116            apply_called.load(Ordering::SeqCst),
1117            "CoreExecutor::apply() should have been called by the RuntimeLoop"
1118        );
1119
1120        // After processing, the input should be consumed and the runtime back to Idle
1121        let state = adapter.runtime_state(&sid).await.unwrap();
1122        assert_eq!(state, RuntimeState::Idle);
1123
1124        // The input should be consumed (terminal)
1125        let active = adapter.list_active_inputs(&sid).await.unwrap();
1126        assert!(active.is_empty(), "All inputs should be consumed");
1127    }
1128
1129    /// Test that a failed executor re-queues the input (not stranded in APC).
1130    #[tokio::test]
1131    async fn failed_executor_requeues_input() {
1132        use crate::input_state::InputLifecycleState;
1133        use meerkat_core::lifecycle::core_executor::{
1134            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1135        };
1136        use meerkat_core::lifecycle::run_control::RunControlCommand;
1137        use meerkat_core::lifecycle::run_primitive::RunPrimitive;
1138        struct FailingExecutor;
1139
1140        #[async_trait::async_trait]
1141        impl CoreExecutor for FailingExecutor {
1142            async fn apply(
1143                &mut self,
1144                _run_id: RunId,
1145                _primitive: RunPrimitive,
1146            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1147                Err(CoreExecutorError::ApplyFailed {
1148                    reason: "LLM error".into(),
1149                })
1150            }
1151
1152            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1153                Ok(())
1154            }
1155        }
1156
1157        let adapter = RuntimeSessionAdapter::ephemeral();
1158        let sid = SessionId::new();
1159        adapter
1160            .register_session_with_executor(sid.clone(), Box::new(FailingExecutor))
1161            .await;
1162
1163        let input = make_prompt("hello failing");
1164        let input_id = input.id().clone();
1165        adapter.accept_input(&sid, input).await.unwrap();
1166
1167        // Give the loop time to process and fail
1168        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1169
1170        // Runtime should be back to Idle (not stuck in Running)
1171        let state = adapter.runtime_state(&sid).await.unwrap();
1172        assert_eq!(state, RuntimeState::Idle);
1173
1174        // Input should be rolled back to Queued (not stranded in APC)
1175        let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1176        assert_eq!(
1177            is.current_state,
1178            InputLifecycleState::Queued,
1179            "Failed execution should roll input back to Queued, not strand in AppliedPendingConsumption"
1180        );
1181    }
1182
1183    #[tokio::test]
1184    async fn failed_executor_continues_processing_backlog() {
1185        use crate::input_state::InputLifecycleState;
1186        use meerkat_core::lifecycle::core_executor::{
1187            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1188        };
1189        use meerkat_core::lifecycle::run_control::RunControlCommand;
1190        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1191        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1192
1193        struct FailThenSucceedExecutor {
1194            calls: Arc<AtomicUsize>,
1195        }
1196
1197        #[async_trait::async_trait]
1198        impl CoreExecutor for FailThenSucceedExecutor {
1199            async fn apply(
1200                &mut self,
1201                run_id: RunId,
1202                primitive: RunPrimitive,
1203            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1204                let call = self.calls.fetch_add(1, Ordering::SeqCst);
1205                tokio::time::sleep(Duration::from_millis(50)).await;
1206                if call == 0 {
1207                    return Err(CoreExecutorError::ApplyFailed {
1208                        reason: "first run fails".into(),
1209                    });
1210                }
1211                Ok(CoreApplyOutput {
1212                    receipt: RunBoundaryReceipt {
1213                        run_id,
1214                        boundary: RunApplyBoundary::RunStart,
1215                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1216                        conversation_digest: None,
1217                        message_count: 0,
1218                        sequence: 0,
1219                    },
1220                    session_snapshot: None,
1221                    run_result: None,
1222                })
1223            }
1224
1225            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1226                Ok(())
1227            }
1228        }
1229
1230        let adapter = RuntimeSessionAdapter::ephemeral();
1231        let sid = SessionId::new();
1232        let calls = Arc::new(AtomicUsize::new(0));
1233        adapter
1234            .register_session_with_executor(
1235                sid.clone(),
1236                Box::new(FailThenSucceedExecutor {
1237                    calls: Arc::clone(&calls),
1238                }),
1239            )
1240            .await;
1241
1242        let first = make_prompt("first");
1243        let first_id = first.id().clone();
1244        let second = make_prompt("second");
1245        let second_id = second.id().clone();
1246        adapter.accept_input(&sid, first).await.unwrap();
1247        tokio::time::sleep(Duration::from_millis(10)).await;
1248        adapter.accept_input(&sid, second).await.unwrap();
1249
1250        tokio::time::sleep(Duration::from_millis(220)).await;
1251
1252        let second_state = adapter
1253            .input_state(&sid, &second_id)
1254            .await
1255            .unwrap()
1256            .unwrap();
1257        assert_eq!(second_state.current_state, InputLifecycleState::Consumed);
1258        assert_eq!(
1259            adapter.runtime_state(&sid).await.unwrap(),
1260            RuntimeState::Idle
1261        );
1262        assert!(
1263            calls.load(Ordering::SeqCst) >= 2,
1264            "the runtime loop should keep draining queued backlog after a failed run"
1265        );
1266        let first_state = adapter.input_state(&sid, &first_id).await.unwrap().unwrap();
1267        assert!(
1268            matches!(
1269                first_state.current_state,
1270                InputLifecycleState::Queued | InputLifecycleState::Consumed
1271            ),
1272            "the initially failed input should have been safely rolled back or retried after the backlog drained"
1273        );
1274    }
1275
1276    #[tokio::test]
1277    async fn ensure_session_with_executor_upgrades_registered_session() {
1278        use crate::input_state::InputLifecycleState;
1279        use meerkat_core::lifecycle::RunId;
1280        use meerkat_core::lifecycle::core_executor::{
1281            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1282        };
1283        use meerkat_core::lifecycle::run_control::RunControlCommand;
1284        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1285        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1286        use std::sync::atomic::{AtomicBool, Ordering};
1287
1288        struct SuccessExecutor {
1289            called: Arc<AtomicBool>,
1290        }
1291
1292        #[async_trait::async_trait]
1293        impl CoreExecutor for SuccessExecutor {
1294            async fn apply(
1295                &mut self,
1296                run_id: RunId,
1297                primitive: RunPrimitive,
1298            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1299                self.called.store(true, Ordering::SeqCst);
1300                Ok(CoreApplyOutput {
1301                    receipt: RunBoundaryReceipt {
1302                        run_id,
1303                        boundary: RunApplyBoundary::RunStart,
1304                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1305                        conversation_digest: None,
1306                        message_count: 0,
1307                        sequence: 0,
1308                    },
1309                    session_snapshot: None,
1310                    run_result: None,
1311                })
1312            }
1313
1314            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1315                Ok(())
1316            }
1317        }
1318
1319        let apply_called = Arc::new(AtomicBool::new(false));
1320        let adapter = RuntimeSessionAdapter::ephemeral();
1321        let sid = SessionId::new();
1322        adapter.register_session(sid.clone()).await;
1323
1324        let input = make_prompt("upgrade me");
1325        let input_id = input.id().clone();
1326        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1327        assert!(outcome.is_accepted());
1328
1329        adapter
1330            .ensure_session_with_executor(
1331                sid.clone(),
1332                Box::new(SuccessExecutor {
1333                    called: Arc::clone(&apply_called),
1334                }),
1335            )
1336            .await;
1337
1338        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1339
1340        assert!(
1341            apply_called.load(Ordering::SeqCst),
1342            "upgrading an already-registered session should attach a live loop"
1343        );
1344
1345        let state = adapter.runtime_state(&sid).await.unwrap();
1346        assert_eq!(state, RuntimeState::Idle);
1347
1348        let active = adapter.list_active_inputs(&sid).await.unwrap();
1349        assert!(active.is_empty(), "queued work should drain after upgrade");
1350
1351        let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1352        assert_eq!(
1353            is.current_state,
1354            InputLifecycleState::Consumed,
1355            "the pre-upgrade queued input should be processed once the loop is attached"
1356        );
1357    }
1358
1359    #[tokio::test]
1360    async fn ensure_session_with_executor_upgrades_racy_registration() {
1361        use crate::input_state::InputLifecycleState;
1362        use meerkat_core::lifecycle::RunId;
1363        use meerkat_core::lifecycle::core_executor::{
1364            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1365        };
1366        use meerkat_core::lifecycle::run_control::RunControlCommand;
1367        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1368        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1369
1370        struct SuccessExecutor {
1371            called: Arc<AtomicBool>,
1372        }
1373
1374        #[async_trait::async_trait]
1375        impl CoreExecutor for SuccessExecutor {
1376            async fn apply(
1377                &mut self,
1378                run_id: RunId,
1379                primitive: RunPrimitive,
1380            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1381                self.called.store(true, Ordering::SeqCst);
1382                Ok(CoreApplyOutput {
1383                    receipt: RunBoundaryReceipt {
1384                        run_id,
1385                        boundary: RunApplyBoundary::RunStart,
1386                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1387                        conversation_digest: None,
1388                        message_count: 0,
1389                        sequence: 0,
1390                    },
1391                    session_snapshot: None,
1392                    run_result: None,
1393                })
1394            }
1395
1396            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1397                Ok(())
1398            }
1399        }
1400
1401        let store = Arc::new(HarnessRuntimeStore::delayed_recover(Duration::from_millis(
1402            75,
1403        )));
1404        let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1405        let sid = SessionId::new();
1406        let apply_called = Arc::new(AtomicBool::new(false));
1407
1408        let ensure_task = {
1409            let adapter = Arc::clone(&adapter);
1410            let sid = sid.clone();
1411            let apply_called = Arc::clone(&apply_called);
1412            tokio::spawn(async move {
1413                adapter
1414                    .ensure_session_with_executor(
1415                        sid,
1416                        Box::new(SuccessExecutor {
1417                            called: apply_called,
1418                        }),
1419                    )
1420                    .await;
1421            })
1422        };
1423
1424        tokio::time::sleep(Duration::from_millis(10)).await;
1425        adapter.register_session(sid.clone()).await;
1426        ensure_task.await.unwrap();
1427
1428        let input = make_prompt("race upgrade");
1429        let input_id = input.id().clone();
1430        adapter.accept_input(&sid, input).await.unwrap();
1431        tokio::time::sleep(Duration::from_millis(120)).await;
1432
1433        assert!(
1434            apply_called.load(Ordering::SeqCst),
1435            "the racy registration path should still attach a live runtime loop"
1436        );
1437        let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1438        assert_eq!(state.current_state, InputLifecycleState::Consumed);
1439    }
1440
1441    #[tokio::test]
1442    async fn boundary_commit_failure_unwinds_sync_runtime_state() {
1443        use crate::input_state::InputLifecycleState;
1444        use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1445        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1446        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1447
1448        let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1449        let adapter = RuntimeSessionAdapter::persistent(store);
1450        let sid = SessionId::new();
1451        adapter.register_session(sid.clone()).await;
1452
1453        let input = make_prompt("sync boundary failure");
1454        let input_id = input.id().clone();
1455        let result = adapter
1456            .accept_input_and_run(&sid, input, move |run_id, primitive| async move {
1457                Ok((
1458                    (),
1459                    CoreApplyOutput {
1460                        receipt: RunBoundaryReceipt {
1461                            run_id,
1462                            boundary: RunApplyBoundary::RunStart,
1463                            contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1464                            conversation_digest: None,
1465                            message_count: 0,
1466                            sequence: 0,
1467                        },
1468                        session_snapshot: None,
1469                        run_result: None,
1470                    },
1471                ))
1472            })
1473            .await;
1474        assert!(result.is_err(), "boundary commit failure should surface");
1475        let Err(err) = result else {
1476            unreachable!("asserted runtime boundary commit failure above");
1477        };
1478        assert!(
1479            err.to_string().contains("runtime boundary commit failed"),
1480            "unexpected error: {err}"
1481        );
1482        assert_eq!(
1483            adapter.runtime_state(&sid).await.unwrap(),
1484            RuntimeState::Idle
1485        );
1486        let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1487        assert_eq!(state.current_state, InputLifecycleState::Queued);
1488    }
1489
1490    #[tokio::test]
1491    async fn boundary_commit_failure_unwinds_runtime_loop_state() {
1492        use crate::input_state::InputLifecycleState;
1493        use meerkat_core::lifecycle::core_executor::{
1494            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1495        };
1496        use meerkat_core::lifecycle::run_control::RunControlCommand;
1497        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1498        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1499
1500        struct SuccessExecutor {
1501            stop_called: Arc<AtomicBool>,
1502        }
1503
1504        #[async_trait::async_trait]
1505        impl CoreExecutor for SuccessExecutor {
1506            async fn apply(
1507                &mut self,
1508                run_id: RunId,
1509                primitive: RunPrimitive,
1510            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1511                Ok(CoreApplyOutput {
1512                    receipt: RunBoundaryReceipt {
1513                        run_id,
1514                        boundary: RunApplyBoundary::RunStart,
1515                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1516                        conversation_digest: None,
1517                        message_count: 0,
1518                        sequence: 0,
1519                    },
1520                    session_snapshot: None,
1521                    run_result: None,
1522                })
1523            }
1524
1525            async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1526                if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1527                    self.stop_called.store(true, Ordering::SeqCst);
1528                }
1529                Ok(())
1530            }
1531        }
1532
1533        let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1534        let adapter = RuntimeSessionAdapter::persistent(store);
1535        let sid = SessionId::new();
1536        let stop_called = Arc::new(AtomicBool::new(false));
1537        adapter
1538            .register_session_with_executor(
1539                sid.clone(),
1540                Box::new(SuccessExecutor {
1541                    stop_called: Arc::clone(&stop_called),
1542                }),
1543            )
1544            .await;
1545
1546        let input = make_prompt("loop boundary failure");
1547        let input_id = input.id().clone();
1548        adapter.accept_input(&sid, input).await.unwrap();
1549        tokio::time::sleep(Duration::from_millis(120)).await;
1550
1551        assert!(
1552            stop_called.load(Ordering::SeqCst),
1553            "boundary commit failures should stop the dead executor path"
1554        );
1555        assert_eq!(
1556            adapter.runtime_state(&sid).await.unwrap(),
1557            RuntimeState::Idle
1558        );
1559        let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1560        assert_eq!(state.current_state, InputLifecycleState::Queued);
1561    }
1562
1563    #[tokio::test]
1564    async fn terminal_snapshot_failure_unregisters_runtime_loop_session() {
1565        use meerkat_core::lifecycle::core_executor::{
1566            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1567        };
1568        use meerkat_core::lifecycle::run_control::RunControlCommand;
1569        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1570        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1571
1572        struct SuccessExecutor {
1573            adapter: Arc<RuntimeSessionAdapter>,
1574            session_id: SessionId,
1575            stop_called: Arc<AtomicBool>,
1576        }
1577
1578        #[async_trait::async_trait]
1579        impl CoreExecutor for SuccessExecutor {
1580            async fn apply(
1581                &mut self,
1582                run_id: RunId,
1583                primitive: RunPrimitive,
1584            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1585                Ok(CoreApplyOutput {
1586                    receipt: RunBoundaryReceipt {
1587                        run_id,
1588                        boundary: RunApplyBoundary::RunStart,
1589                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1590                        conversation_digest: None,
1591                        message_count: 0,
1592                        sequence: 0,
1593                    },
1594                    session_snapshot: None,
1595                    run_result: None,
1596                })
1597            }
1598
1599            async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1600                if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1601                    self.stop_called.store(true, Ordering::SeqCst);
1602                    self.adapter.unregister_session(&self.session_id).await;
1603                }
1604                Ok(())
1605            }
1606        }
1607
1608        let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1609        let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1610        let sid = SessionId::new();
1611        let stop_called = Arc::new(AtomicBool::new(false));
1612        adapter
1613            .register_session_with_executor(
1614                sid.clone(),
1615                Box::new(SuccessExecutor {
1616                    adapter: Arc::clone(&adapter),
1617                    session_id: sid.clone(),
1618                    stop_called: Arc::clone(&stop_called),
1619                }),
1620            )
1621            .await;
1622
1623        adapter
1624            .accept_input(&sid, make_prompt("terminal snapshot failure"))
1625            .await
1626            .unwrap();
1627        tokio::time::sleep(Duration::from_millis(120)).await;
1628
1629        assert!(
1630            stop_called.load(Ordering::SeqCst),
1631            "terminal snapshot persistence failures should stop the runtime loop"
1632        );
1633        let state_result = adapter.runtime_state(&sid).await;
1634        assert!(
1635            state_result.is_err(),
1636            "stopped runtime sessions should be unregistered"
1637        );
1638        let Err(err) = state_result else {
1639            unreachable!("asserted stopped runtime unregistration above");
1640        };
1641        assert!(matches!(
1642            err,
1643            RuntimeDriverError::NotReady {
1644                state: RuntimeState::Destroyed
1645            }
1646        ));
1647    }
1648
1649    #[tokio::test]
1650    async fn terminal_snapshot_failure_unregisters_sync_runtime_session() {
1651        use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1652        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1653        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1654
1655        let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1656        let adapter = RuntimeSessionAdapter::persistent(store);
1657        let sid = SessionId::new();
1658        adapter.register_session(sid.clone()).await;
1659
1660        let result = adapter
1661            .accept_input_and_run(
1662                &sid,
1663                make_prompt("sync terminal snapshot failure"),
1664                move |run_id, primitive| async move {
1665                    Ok((
1666                        (),
1667                        CoreApplyOutput {
1668                            receipt: RunBoundaryReceipt {
1669                                run_id,
1670                                boundary: RunApplyBoundary::RunStart,
1671                                contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1672                                conversation_digest: None,
1673                                message_count: 0,
1674                                sequence: 0,
1675                            },
1676                            session_snapshot: None,
1677                            run_result: None,
1678                        },
1679                    ))
1680                },
1681            )
1682            .await;
1683        assert!(
1684            result.is_err(),
1685            "terminal snapshot persistence failure should surface"
1686        );
1687        let Err(err) = result else {
1688            unreachable!("asserted terminal snapshot failure above");
1689        };
1690
1691        assert!(
1692            err.to_string().contains("terminal event persist failed")
1693                || err
1694                    .to_string()
1695                    .contains("failed to persist runtime completion snapshot"),
1696            "unexpected error: {err}"
1697        );
1698        let runtime_state = adapter.runtime_state(&sid).await;
1699        assert!(
1700            matches!(
1701                runtime_state,
1702                Err(RuntimeDriverError::NotReady {
1703                    state: RuntimeState::Destroyed
1704                })
1705            ),
1706            "sync path should unregister the broken runtime session"
1707        );
1708    }
1709
1710    // ─── Phase A gate tests ───
1711
1712    /// Gate A2: Dedup on terminal input returns (Deduplicated, None) — no hang.
1713    #[tokio::test]
1714    async fn dedup_terminal_input_returns_none_handle() {
1715        use crate::identifiers::IdempotencyKey;
1716        use meerkat_core::lifecycle::core_executor::{
1717            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1718        };
1719        use meerkat_core::lifecycle::run_control::RunControlCommand;
1720        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1721        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1722        use meerkat_core::types::{RunResult, Usage};
1723
1724        struct ResultExecutor;
1725        #[async_trait::async_trait]
1726        impl CoreExecutor for ResultExecutor {
1727            async fn apply(
1728                &mut self,
1729                run_id: RunId,
1730                primitive: RunPrimitive,
1731            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1732                Ok(CoreApplyOutput {
1733                    receipt: RunBoundaryReceipt {
1734                        run_id,
1735                        boundary: RunApplyBoundary::RunStart,
1736                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1737                        conversation_digest: None,
1738                        message_count: 0,
1739                        sequence: 0,
1740                    },
1741                    session_snapshot: None,
1742                    run_result: Some(RunResult {
1743                        text: "done".into(),
1744                        session_id: SessionId::new(),
1745                        usage: Usage::default(),
1746                        turns: 1,
1747                        tool_calls: 0,
1748                        structured_output: None,
1749                        schema_warnings: None,
1750                        skill_diagnostics: None,
1751                    }),
1752                })
1753            }
1754            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1755                Ok(())
1756            }
1757        }
1758
1759        let adapter = RuntimeSessionAdapter::ephemeral();
1760        let sid = SessionId::new();
1761        adapter
1762            .register_session_with_executor(sid.clone(), Box::new(ResultExecutor))
1763            .await;
1764
1765        // Accept first input with idempotency key
1766        let key = IdempotencyKey::new("gate-a2");
1767        let mut input1 = make_prompt("first");
1768        if let Input::Prompt(ref mut p) = input1 {
1769            p.header.idempotency_key = Some(key.clone());
1770        }
1771        let (outcome1, handle1) = adapter
1772            .accept_input_with_completion(&sid, input1)
1773            .await
1774            .unwrap();
1775        assert!(outcome1.is_accepted());
1776        assert!(handle1.is_some(), "accepted input should have a handle");
1777
1778        // Wait for it to complete
1779        let result = handle1.unwrap().wait().await;
1780        assert!(
1781            matches!(result, crate::completion::CompletionOutcome::Completed(_)),
1782            "first input should complete successfully"
1783        );
1784
1785        // Now send duplicate — input is already terminal (Consumed)
1786        let mut input2 = make_prompt("duplicate");
1787        if let Input::Prompt(ref mut p) = input2 {
1788            p.header.idempotency_key = Some(key);
1789        }
1790        let (outcome2, handle2) = adapter
1791            .accept_input_with_completion(&sid, input2)
1792            .await
1793            .unwrap();
1794        assert!(
1795            outcome2.is_deduplicated(),
1796            "second input with same key should be deduplicated"
1797        );
1798        assert!(
1799            handle2.is_none(),
1800            "dedup on terminal input should return None handle"
1801        );
1802    }
1803
1804    /// Gate A3: Dedup on in-flight input returns (Deduplicated, Some(handle))
1805    /// that resolves when the original completes.
1806    #[tokio::test]
1807    async fn dedup_inflight_input_returns_handle_that_resolves() {
1808        use crate::identifiers::IdempotencyKey;
1809        use meerkat_core::lifecycle::core_executor::{
1810            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1811        };
1812        use meerkat_core::lifecycle::run_control::RunControlCommand;
1813        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1814        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1815        use meerkat_core::types::{RunResult, Usage};
1816
1817        struct SlowExecutor;
1818        #[async_trait::async_trait]
1819        impl CoreExecutor for SlowExecutor {
1820            async fn apply(
1821                &mut self,
1822                run_id: RunId,
1823                primitive: RunPrimitive,
1824            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1825                // Simulate slow execution so duplicate arrives while in-flight
1826                tokio::time::sleep(Duration::from_millis(200)).await;
1827                Ok(CoreApplyOutput {
1828                    receipt: RunBoundaryReceipt {
1829                        run_id,
1830                        boundary: RunApplyBoundary::RunStart,
1831                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1832                        conversation_digest: None,
1833                        message_count: 0,
1834                        sequence: 0,
1835                    },
1836                    session_snapshot: None,
1837                    run_result: Some(RunResult {
1838                        text: "slow done".into(),
1839                        session_id: SessionId::new(),
1840                        usage: Usage::default(),
1841                        turns: 1,
1842                        tool_calls: 0,
1843                        structured_output: None,
1844                        schema_warnings: None,
1845                        skill_diagnostics: None,
1846                    }),
1847                })
1848            }
1849            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1850                Ok(())
1851            }
1852        }
1853
1854        let adapter = RuntimeSessionAdapter::ephemeral();
1855        let sid = SessionId::new();
1856        adapter
1857            .register_session_with_executor(sid.clone(), Box::new(SlowExecutor))
1858            .await;
1859
1860        // Accept first input with idempotency key
1861        let key = IdempotencyKey::new("gate-a3");
1862        let mut input1 = make_prompt("original");
1863        if let Input::Prompt(ref mut p) = input1 {
1864            p.header.idempotency_key = Some(key.clone());
1865        }
1866        let (outcome1, handle1) = adapter
1867            .accept_input_with_completion(&sid, input1)
1868            .await
1869            .unwrap();
1870        assert!(outcome1.is_accepted());
1871
1872        // Wait briefly so the input is in-flight (Staged/Running), not yet terminal
1873        tokio::time::sleep(Duration::from_millis(50)).await;
1874
1875        // Send duplicate while original is still running
1876        let mut input2 = make_prompt("duplicate");
1877        if let Input::Prompt(ref mut p) = input2 {
1878            p.header.idempotency_key = Some(key);
1879        }
1880        let (outcome2, handle2) = adapter
1881            .accept_input_with_completion(&sid, input2)
1882            .await
1883            .unwrap();
1884        assert!(
1885            outcome2.is_deduplicated(),
1886            "second input should be deduplicated"
1887        );
1888        assert!(
1889            handle2.is_some(),
1890            "dedup on in-flight input should return Some(handle)"
1891        );
1892
1893        // Both handles should resolve when the original completes
1894        let result1 = handle1.unwrap().wait().await;
1895        let result2 = handle2.unwrap().wait().await;
1896        assert!(
1897            matches!(result1, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1898            "original handle should complete with result"
1899        );
1900        assert!(
1901            matches!(result2, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1902            "duplicate handle should also complete with same result"
1903        );
1904    }
1905
1906    /// Gate A4 (part 1): resolve_without_result sends CompletedWithoutResult
1907    /// when executor returns run_result: None.
1908    #[tokio::test]
1909    async fn completion_handle_resolves_without_result() {
1910        use meerkat_core::lifecycle::core_executor::{
1911            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1912        };
1913        use meerkat_core::lifecycle::run_control::RunControlCommand;
1914        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1915        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1916
1917        struct NoResultExecutor;
1918        #[async_trait::async_trait]
1919        impl CoreExecutor for NoResultExecutor {
1920            async fn apply(
1921                &mut self,
1922                run_id: RunId,
1923                primitive: RunPrimitive,
1924            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1925                Ok(CoreApplyOutput {
1926                    receipt: RunBoundaryReceipt {
1927                        run_id,
1928                        boundary: RunApplyBoundary::RunStart,
1929                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1930                        conversation_digest: None,
1931                        message_count: 0,
1932                        sequence: 0,
1933                    },
1934                    session_snapshot: None,
1935                    run_result: None, // No RunResult
1936                })
1937            }
1938            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1939                Ok(())
1940            }
1941        }
1942
1943        let adapter = RuntimeSessionAdapter::ephemeral();
1944        let sid = SessionId::new();
1945        adapter
1946            .register_session_with_executor(sid.clone(), Box::new(NoResultExecutor))
1947            .await;
1948
1949        let input = make_prompt("context append");
1950        let (outcome, handle) = adapter
1951            .accept_input_with_completion(&sid, input)
1952            .await
1953            .unwrap();
1954        assert!(outcome.is_accepted());
1955
1956        let result = handle.unwrap().wait().await;
1957        assert!(
1958            matches!(
1959                result,
1960                crate::completion::CompletionOutcome::CompletedWithoutResult
1961            ),
1962            "executor returning run_result: None should resolve as CompletedWithoutResult, got {result:?}"
1963        );
1964    }
1965
1966    /// Gate A5: reset_runtime resolves all pending waiters.
1967    #[tokio::test]
1968    async fn reset_runtime_resolves_pending_waiters() {
1969        // Register without executor so inputs queue but don't process
1970        let adapter = RuntimeSessionAdapter::ephemeral();
1971        let sid = SessionId::new();
1972        adapter.register_session(sid.clone()).await;
1973
1974        let input = make_prompt("pending");
1975        let (outcome, handle) = adapter
1976            .accept_input_with_completion(&sid, input)
1977            .await
1978            .unwrap();
1979        assert!(outcome.is_accepted());
1980        assert!(handle.is_some());
1981
1982        // Reset the runtime
1983        adapter.reset_runtime(&sid).await.unwrap();
1984
1985        // Handle should resolve as terminated
1986        let result = handle.unwrap().wait().await;
1987        assert!(
1988            matches!(
1989                result,
1990                crate::completion::CompletionOutcome::RuntimeTerminated(_)
1991            ),
1992            "reset should resolve pending waiters as terminated, got {result:?}"
1993        );
1994    }
1995
1996    /// Gate A6: retire_runtime without loop resolves waiters.
1997    #[tokio::test]
1998    async fn retire_without_loop_resolves_waiters() {
1999        // Register without executor (no RuntimeLoop)
2000        let adapter = RuntimeSessionAdapter::ephemeral();
2001        let sid = SessionId::new();
2002        adapter.register_session(sid.clone()).await;
2003
2004        let input = make_prompt("will be retired");
2005        let (outcome, handle) = adapter
2006            .accept_input_with_completion(&sid, input)
2007            .await
2008            .unwrap();
2009        assert!(outcome.is_accepted());
2010        assert!(handle.is_some());
2011
2012        // Retire without loop attached
2013        adapter.retire_runtime(&sid).await.unwrap();
2014
2015        // Handle should resolve as terminated since no loop will drain
2016        let result = handle.unwrap().wait().await;
2017        assert!(
2018            matches!(
2019                result,
2020                crate::completion::CompletionOutcome::RuntimeTerminated(_)
2021            ),
2022            "retire without loop should resolve pending waiters as terminated, got {result:?}"
2023        );
2024    }
2025
2026    /// Test that BoundaryApplied fires with correct receipt on success.
2027    #[tokio::test]
2028    async fn successful_execution_fires_boundary_applied() {
2029        use crate::input_state::InputLifecycleState;
2030        use meerkat_core::lifecycle::RunId;
2031        use meerkat_core::lifecycle::core_executor::{
2032            CoreApplyOutput, CoreExecutor, CoreExecutorError,
2033        };
2034        use meerkat_core::lifecycle::run_control::RunControlCommand;
2035        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
2036        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
2037
2038        struct SuccessExecutor;
2039
2040        #[async_trait::async_trait]
2041        impl CoreExecutor for SuccessExecutor {
2042            async fn apply(
2043                &mut self,
2044                run_id: RunId,
2045                primitive: RunPrimitive,
2046            ) -> Result<CoreApplyOutput, CoreExecutorError> {
2047                Ok(CoreApplyOutput {
2048                    receipt: RunBoundaryReceipt {
2049                        run_id,
2050                        boundary: RunApplyBoundary::RunStart,
2051                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
2052                        conversation_digest: None,
2053                        message_count: 0,
2054                        sequence: 0,
2055                    },
2056                    session_snapshot: None,
2057                    run_result: None,
2058                })
2059            }
2060
2061            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
2062                Ok(())
2063            }
2064        }
2065
2066        let adapter = RuntimeSessionAdapter::ephemeral();
2067        let sid = SessionId::new();
2068        adapter
2069            .register_session_with_executor(sid.clone(), Box::new(SuccessExecutor))
2070            .await;
2071
2072        let input = make_prompt("hello success");
2073        let input_id = input.id().clone();
2074        adapter.accept_input(&sid, input).await.unwrap();
2075
2076        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2077
2078        // Input should have gone through full lifecycle: Queued → Staged → Applied → APC → Consumed
2079        let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
2080        assert_eq!(
2081            is.current_state,
2082            InputLifecycleState::Consumed,
2083            "Successful execution should consume the input"
2084        );
2085
2086        // Runtime should be back to Idle
2087        let state = adapter.runtime_state(&sid).await.unwrap();
2088        assert_eq!(state, RuntimeState::Idle);
2089    }
2090}