Skip to main content

meerkat_runtime/
session_adapter.rs

1//! RuntimeSessionAdapter — wraps a SessionService with per-session RuntimeDrivers.
2//!
3//! This adapter lives in meerkat-runtime so that meerkat-session doesn't need
4//! to depend on meerkat-runtime. Surfaces use this adapter to get v9 runtime
5//! capabilities on top of any SessionService implementation.
6//!
7//! When a session is registered with a `CoreExecutor`, a background RuntimeLoop
8//! task is spawned per session. `accept_input()` queues the input in the driver
9//! and, if wake is requested, signals the loop. The loop dequeues, stages,
10//! applies via CoreExecutor (which calls SessionService::start_turn()), and
11//! marks inputs as consumed.
12
13use std::collections::HashMap;
14use std::future::Future;
15use std::sync::Arc;
16
17use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
18use meerkat_core::lifecycle::run_control::RunControlCommand;
19use meerkat_core::lifecycle::{InputId, RunId};
20use meerkat_core::types::SessionId;
21
22use crate::accept::AcceptOutcome;
23use crate::driver::ephemeral::EphemeralRuntimeDriver;
24use crate::driver::persistent::PersistentRuntimeDriver;
25use crate::identifiers::LogicalRuntimeId;
26use crate::input::Input;
27use crate::input_machine::InputStateMachineError;
28use crate::input_state::InputState;
29use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
30use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
31use crate::store::RuntimeStore;
32use crate::tokio;
33use crate::tokio::sync::{Mutex, RwLock, mpsc};
34use crate::traits::{ResetReport, RetireReport, RuntimeDriver, RuntimeDriverError};
35
36/// Shared driver handle used by both the adapter and the RuntimeLoop.
37pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
38
39/// Per-session runtime driver entry.
40pub(crate) enum DriverEntry {
41    Ephemeral(EphemeralRuntimeDriver),
42    Persistent(PersistentRuntimeDriver),
43}
44
45impl DriverEntry {
46    pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
47        match self {
48            DriverEntry::Ephemeral(d) => d,
49            DriverEntry::Persistent(d) => d,
50        }
51    }
52
53    pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
54        match self {
55            DriverEntry::Ephemeral(d) => d,
56            DriverEntry::Persistent(d) => d,
57        }
58    }
59
60    /// Check if the runtime is idle.
61    pub(crate) fn is_idle(&self) -> bool {
62        match self {
63            DriverEntry::Ephemeral(d) => d.is_idle(),
64            DriverEntry::Persistent(d) => d.is_idle(),
65        }
66    }
67
68    /// Check if the runtime can process queued inputs (Idle or Retired).
69    pub(crate) fn can_process_queue(&self) -> bool {
70        match self {
71            DriverEntry::Ephemeral(d) => d.state_machine_ref().can_process_queue(),
72            DriverEntry::Persistent(d) => d.inner_ref().state_machine_ref().can_process_queue(),
73        }
74    }
75
76    /// Check and clear the wake flag.
77    pub(crate) fn take_wake_requested(&mut self) -> bool {
78        match self {
79            DriverEntry::Ephemeral(d) => d.take_wake_requested(),
80            DriverEntry::Persistent(d) => d.take_wake_requested(),
81        }
82    }
83
84    /// Check and clear the immediate processing flag.
85    pub(crate) fn take_process_requested(&mut self) -> bool {
86        match self {
87            DriverEntry::Ephemeral(d) => d.take_process_requested(),
88            DriverEntry::Persistent(d) => d.take_process_requested(),
89        }
90    }
91
92    /// Dequeue the next input for processing.
93    pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
94        match self {
95            DriverEntry::Ephemeral(d) => d.dequeue_next(),
96            DriverEntry::Persistent(d) => d.dequeue_next(),
97        }
98    }
99
100    /// Start a new run (Idle → Running).
101    pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
102        match self {
103            DriverEntry::Ephemeral(d) => d.start_run(run_id),
104            DriverEntry::Persistent(d) => d.start_run(run_id),
105        }
106    }
107
108    /// Complete a run (Running → Idle).
109    pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
110        match self {
111            DriverEntry::Ephemeral(d) => d.complete_run(),
112            DriverEntry::Persistent(d) => d.complete_run(),
113        }
114    }
115
116    /// Stage an input (Queued → Staged).
117    pub(crate) fn stage_input(
118        &mut self,
119        input_id: &InputId,
120        run_id: &RunId,
121    ) -> Result<(), InputStateMachineError> {
122        match self {
123            DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
124            DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
125        }
126    }
127
128    /// Apply an input after successful immediate execution.
129    #[allow(dead_code)]
130    pub(crate) fn apply_input(
131        &mut self,
132        input_id: &InputId,
133        run_id: &RunId,
134    ) -> Result<(), InputStateMachineError> {
135        match self {
136            DriverEntry::Ephemeral(d) => d.apply_input(input_id, run_id),
137            DriverEntry::Persistent(d) => d.apply_input(input_id, run_id),
138        }
139    }
140
141    /// Consume an input after successful immediate execution.
142    #[allow(dead_code)]
143    pub(crate) fn consume_inputs(
144        &mut self,
145        input_ids: &[InputId],
146        run_id: &RunId,
147    ) -> Result<(), InputStateMachineError> {
148        match self {
149            DriverEntry::Ephemeral(d) => d.consume_inputs(input_ids, run_id),
150            DriverEntry::Persistent(d) => d.consume_inputs(input_ids, run_id),
151        }
152    }
153
154    /// Roll back staged inputs after failed immediate execution.
155    #[allow(dead_code)]
156    pub(crate) fn rollback_staged(
157        &mut self,
158        input_ids: &[InputId],
159    ) -> Result<(), InputStateMachineError> {
160        match self {
161            DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
162            DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
163        }
164    }
165}
166
167/// Shared completion registry (accessed by adapter for registration and loop for resolution).
168pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
169
170/// Per-session state: driver + optional RuntimeLoop.
171struct RuntimeSessionEntry {
172    /// Shared driver handle (accessed by both adapter methods and RuntimeLoop).
173    driver: SharedDriver,
174    /// Completion waiters (accessed by accept_input_with_completion and RuntimeLoop).
175    completions: SharedCompletionRegistry,
176    /// Wake signal sender (if a RuntimeLoop is attached).
177    wake_tx: Option<mpsc::Sender<()>>,
178    /// Run-control sender for cancelling the current run.
179    control_tx: Option<mpsc::Sender<RunControlCommand>>,
180    /// Loop task handle (dropped on unregister, which closes the channel).
181    _loop_handle: Option<tokio::task::JoinHandle<()>>,
182}
183
184/// Wraps a SessionService to provide v9 runtime capabilities.
185///
186/// Maintains a per-session RuntimeDriver registry. When sessions are registered
187/// with a `CoreExecutor`, a RuntimeLoop task is spawned that processes queued
188/// inputs by calling `CoreExecutor::apply()` (which triggers
189/// `SessionService::start_turn()` under the hood).
190pub struct RuntimeSessionAdapter {
191    /// Per-session entries.
192    sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
193    /// Runtime mode.
194    mode: RuntimeMode,
195    /// Optional RuntimeStore for persistent drivers.
196    store: Option<Arc<dyn RuntimeStore>>,
197}
198
199impl RuntimeSessionAdapter {
200    /// Create an ephemeral adapter (all sessions use EphemeralRuntimeDriver).
201    pub fn ephemeral() -> Self {
202        Self {
203            sessions: RwLock::new(HashMap::new()),
204            mode: RuntimeMode::V9Compliant,
205            store: None,
206        }
207    }
208
209    /// Create a persistent adapter with a RuntimeStore.
210    pub fn persistent(store: Arc<dyn RuntimeStore>) -> Self {
211        Self {
212            sessions: RwLock::new(HashMap::new()),
213            mode: RuntimeMode::V9Compliant,
214            store: Some(store),
215        }
216    }
217
218    /// Create a driver entry for a session.
219    fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
220        let runtime_id = LogicalRuntimeId::new(session_id.to_string());
221        match &self.store {
222            Some(store) => {
223                DriverEntry::Persistent(PersistentRuntimeDriver::new(runtime_id, store.clone()))
224            }
225            None => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
226        }
227    }
228
229    /// Register a runtime driver for a session (no RuntimeLoop — inputs queue but
230    /// nothing processes them automatically). Useful for tests and legacy mode.
231    pub async fn register_session(&self, session_id: SessionId) {
232        if self.contains_session(&session_id).await {
233            return;
234        }
235        let mut entry = self.make_driver(&session_id);
236        if let Err(err) = entry.as_driver_mut().recover().await {
237            tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
238            return;
239        }
240        let session_entry = RuntimeSessionEntry {
241            driver: Arc::new(Mutex::new(entry)),
242            completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
243            wake_tx: None,
244            control_tx: None,
245            _loop_handle: None,
246        };
247        let mut sessions = self.sessions.write().await;
248        sessions.entry(session_id).or_insert(session_entry);
249    }
250
251    /// Register a runtime driver for a session WITH a RuntimeLoop backed by a
252    /// `CoreExecutor`. When `accept_input()` queues an input and requests wake,
253    /// the loop dequeues it and calls `executor.apply()` (which triggers
254    /// `SessionService::start_turn()`).
255    pub async fn register_session_with_executor(
256        &self,
257        session_id: SessionId,
258        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
259    ) {
260        self.ensure_session_with_executor(session_id, executor)
261            .await;
262    }
263
264    /// Ensure a runtime driver with executor exists for the session.
265    ///
266    /// If a session was already registered without a loop, upgrade the
267    /// existing driver in place so queued inputs remain attached to the same
268    /// runtime ledger and can start draining immediately.
269    pub async fn ensure_session_with_executor(
270        &self,
271        session_id: SessionId,
272        executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
273    ) {
274        let mut executor = Some(executor);
275        let upgrade = {
276            let mut sessions = self.sessions.write().await;
277            if let Some(entry) = sessions.get_mut(&session_id) {
278                if entry.wake_tx.is_some() && entry.control_tx.is_some() {
279                    return;
280                }
281
282                let driver = Arc::clone(&entry.driver);
283                let (wake_tx, wake_rx) = mpsc::channel(16);
284                let (control_tx, control_rx) = mpsc::channel(16);
285                let Some(executor) = executor.take() else {
286                    tracing::error!(%session_id, "executor missing while upgrading existing runtime session");
287                    return;
288                };
289                let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
290                    driver.clone(),
291                    executor,
292                    wake_rx,
293                    control_rx,
294                    Some(entry.completions.clone()),
295                );
296
297                entry.wake_tx = Some(wake_tx.clone());
298                entry.control_tx = Some(control_tx);
299                entry._loop_handle = Some(handle);
300                Some((driver, wake_tx))
301            } else {
302                None
303            }
304        };
305
306        if let Some((driver, wake_tx)) = upgrade {
307            let should_wake = {
308                let driver = driver.lock().await;
309                !driver.as_driver().active_input_ids().is_empty()
310            };
311            if should_wake {
312                let _ = wake_tx.try_send(());
313            }
314            return;
315        }
316
317        let mut recovered_entry = self.make_driver(&session_id);
318        if let Err(err) = recovered_entry.as_driver_mut().recover().await {
319            tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
320            return;
321        }
322
323        let driver = {
324            let mut sessions = self.sessions.write().await;
325            if let Some(entry) = sessions.get(&session_id) {
326                if entry.wake_tx.is_some() && entry.control_tx.is_some() {
327                    return;
328                }
329                entry.driver.clone()
330            } else {
331                let driver = Arc::new(Mutex::new(recovered_entry));
332                sessions.insert(
333                    session_id.clone(),
334                    RuntimeSessionEntry {
335                        driver: driver.clone(),
336                        completions: Arc::new(Mutex::new(
337                            crate::completion::CompletionRegistry::new(),
338                        )),
339                        wake_tx: None,
340                        control_tx: None,
341                        _loop_handle: None,
342                    },
343                );
344                driver
345            }
346        };
347
348        let (wake_tx, wake_rx) = mpsc::channel(16);
349        let (control_tx, control_rx) = mpsc::channel(16);
350        let Some(executor) = executor.take() else {
351            tracing::error!(%session_id, "executor missing while registering runtime session");
352            return;
353        };
354
355        // Get or create completions for this session
356        let completions = {
357            let sessions = self.sessions.read().await;
358            sessions.get(&session_id).map(|e| e.completions.clone())
359        };
360
361        let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
362            driver.clone(),
363            executor,
364            wake_rx,
365            control_rx,
366            completions,
367        );
368
369        let mut sessions = self.sessions.write().await;
370        let Some(entry) = sessions.get_mut(&session_id) else {
371            return;
372        };
373        if entry.wake_tx.is_some() && entry.control_tx.is_some() {
374            return;
375        }
376        entry.wake_tx = Some(wake_tx.clone());
377        entry.control_tx = Some(control_tx);
378        entry._loop_handle = Some(handle);
379        drop(sessions);
380
381        let should_wake = {
382            let driver = driver.lock().await;
383            !driver.as_driver().active_input_ids().is_empty()
384        };
385        if should_wake {
386            let _ = wake_tx.try_send(());
387        }
388    }
389
390    /// Unregister a session's runtime driver.
391    ///
392    /// Drops the wake channel sender, which causes the RuntimeLoop to exit.
393    pub async fn unregister_session(&self, session_id: &SessionId) {
394        self.sessions.write().await.remove(session_id);
395    }
396
397    /// Check whether a runtime driver is already registered for a session.
398    pub async fn contains_session(&self, session_id: &SessionId) -> bool {
399        self.sessions.read().await.contains_key(session_id)
400    }
401
402    /// Cancel the currently-running turn for a registered session.
403    pub async fn interrupt_current_run(
404        &self,
405        session_id: &SessionId,
406    ) -> Result<(), RuntimeDriverError> {
407        let sessions = self.sessions.read().await;
408        let entry = sessions
409            .get(session_id)
410            .ok_or(RuntimeDriverError::NotReady {
411                state: RuntimeState::Destroyed,
412            })?;
413        let Some(control_tx) = &entry.control_tx else {
414            return Err(RuntimeDriverError::NotReady {
415                state: RuntimeState::Destroyed,
416            });
417        };
418        control_tx
419            .send(RunControlCommand::CancelCurrentRun {
420                reason: "mob interrupt".to_string(),
421            })
422            .await
423            .map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
424    }
425
426    /// Accept an input and execute it synchronously through the runtime driver.
427    ///
428    /// This is useful for surfaces that need the legacy request/response shape
429    /// while still preserving v9 input lifecycle semantics.
430    pub async fn accept_input_and_run<T, F, Fut>(
431        &self,
432        session_id: &SessionId,
433        input: Input,
434        op: F,
435    ) -> Result<T, RuntimeDriverError>
436    where
437        F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
438        Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
439    {
440        let driver = {
441            let sessions = self.sessions.read().await;
442            sessions
443                .get(session_id)
444                .ok_or(RuntimeDriverError::NotReady {
445                    state: RuntimeState::Destroyed,
446                })?
447                .driver
448                .clone()
449        };
450
451        let (input_id, run_id, primitive) = {
452            let mut driver = driver.lock().await;
453            if !driver.is_idle() || !driver.as_driver().active_input_ids().is_empty() {
454                return Err(RuntimeDriverError::NotReady {
455                    state: driver.as_driver().runtime_state(),
456                });
457            }
458            let outcome = driver.as_driver_mut().accept_input(input).await?;
459            let input_id = match outcome {
460                AcceptOutcome::Accepted { input_id, .. } => input_id,
461                AcceptOutcome::Deduplicated { existing_id, .. } => existing_id,
462                AcceptOutcome::Rejected { reason } => {
463                    return Err(RuntimeDriverError::ValidationFailed { reason });
464                }
465            };
466
467            if !driver.is_idle() {
468                return Err(RuntimeDriverError::NotReady {
469                    state: driver.as_driver().runtime_state(),
470                });
471            }
472
473            let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
474                RuntimeDriverError::Internal("accepted input was not queued for execution".into())
475            })?;
476            if dequeued_id != input_id {
477                return Err(RuntimeDriverError::NotReady {
478                    state: driver.as_driver().runtime_state(),
479                });
480            }
481            let run_id = RunId::new();
482            driver.start_run(run_id.clone()).map_err(|err| {
483                RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
484            })?;
485            driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
486                RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
487            })?;
488            let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
489            (input_id, run_id, primitive)
490        };
491
492        match op(run_id.clone(), primitive.clone()).await {
493            Ok((result, output)) => {
494                let mut driver = driver.lock().await;
495                if let Err(err) = driver
496                    .as_driver_mut()
497                    .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
498                        run_id: run_id.clone(),
499                        receipt: output.receipt,
500                        session_snapshot: output.session_snapshot,
501                    })
502                    .await
503                {
504                    if let Err(unwind_err) = driver
505                        .as_driver_mut()
506                        .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
507                            run_id,
508                            error: format!("boundary commit failed: {err}"),
509                            recoverable: true,
510                        })
511                        .await
512                    {
513                        return Err(RuntimeDriverError::Internal(format!(
514                            "runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
515                        )));
516                    }
517                    return Err(RuntimeDriverError::Internal(format!(
518                        "runtime boundary commit failed: {err}"
519                    )));
520                }
521                if let Err(err) = driver
522                    .as_driver_mut()
523                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
524                        run_id,
525                        consumed_input_ids: vec![input_id],
526                    })
527                    .await
528                {
529                    drop(driver);
530                    self.unregister_session(session_id).await;
531                    return Err(RuntimeDriverError::Internal(format!(
532                        "failed to persist runtime completion snapshot: {err}"
533                    )));
534                }
535                Ok(result)
536            }
537            Err(err) => {
538                let mut driver = driver.lock().await;
539                if let Err(run_err) = driver
540                    .as_driver_mut()
541                    .on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
542                        run_id,
543                        error: err.to_string(),
544                        recoverable: true,
545                    })
546                    .await
547                {
548                    drop(driver);
549                    self.unregister_session(session_id).await;
550                    return Err(RuntimeDriverError::Internal(format!(
551                        "failed to persist runtime failure snapshot: {run_err}"
552                    )));
553                }
554                Err(err)
555            }
556        }
557    }
558
559    /// Accept an input and return a completion handle that resolves when the
560    /// input reaches a terminal state (Consumed or Abandoned).
561    ///
562    /// Returns `(AcceptOutcome, Option<CompletionHandle>)`:
563    /// - `(Accepted, Some(handle))` — await handle for result
564    /// - `(Deduplicated, Some(handle))` — joined in-flight waiter
565    /// - `(Deduplicated, None)` — input already terminal; no waiter needed
566    /// - `(Rejected, _)` — returned as `Err(ValidationFailed)`
567    pub async fn accept_input_with_completion(
568        &self,
569        session_id: &SessionId,
570        input: Input,
571    ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
572    {
573        let sessions = self.sessions.read().await;
574        let entry = sessions
575            .get(session_id)
576            .ok_or(RuntimeDriverError::NotReady {
577                state: RuntimeState::Destroyed,
578            })?;
579
580        let (outcome, should_wake, should_process, handle) = {
581            let mut driver = entry.driver.lock().await;
582            let result = driver.as_driver_mut().accept_input(input).await?;
583
584            match &result {
585                AcceptOutcome::Accepted { input_id, .. } => {
586                    let handle = {
587                        let mut completions = entry.completions.lock().await;
588                        completions.register(input_id.clone())
589                    };
590                    let wake = driver.take_wake_requested();
591                    let process_now = driver.take_process_requested();
592                    (result, wake, process_now, Some(handle))
593                }
594                AcceptOutcome::Deduplicated { existing_id, .. } => {
595                    // Check if the existing input is already terminal
596                    let existing_state = driver.as_driver().input_state(existing_id);
597                    let is_terminal = existing_state
598                        .map(|s| s.current_state.is_terminal())
599                        .unwrap_or(true); // missing state = already cleaned up = terminal
600
601                    if is_terminal {
602                        // Input already processed — no handle, no waiter
603                        (result, false, false, None)
604                    } else {
605                        // In-flight — join existing waiters via multi-waiter Vec
606                        let handle = {
607                            let mut completions = entry.completions.lock().await;
608                            completions.register(existing_id.clone())
609                        };
610                        (result, false, false, Some(handle))
611                    }
612                }
613                AcceptOutcome::Rejected { reason } => {
614                    return Err(RuntimeDriverError::ValidationFailed {
615                        reason: reason.clone(),
616                    });
617                }
618            }
619        };
620
621        if (should_wake || should_process)
622            && let Some(ref wake_tx) = entry.wake_tx
623        {
624            let _ = wake_tx.try_send(());
625        }
626
627        Ok((outcome, handle))
628    }
629
630    /// Get the shared completion registry for a session.
631    ///
632    /// Used by the runtime loop to resolve waiters on input consumption.
633    #[allow(dead_code)]
634    pub(crate) async fn completion_registry(
635        &self,
636        session_id: &SessionId,
637    ) -> Option<SharedCompletionRegistry> {
638        let sessions = self.sessions.read().await;
639        sessions.get(session_id).map(|e| e.completions.clone())
640    }
641}
642
643#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
644#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
645impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
646    fn runtime_mode(&self) -> RuntimeMode {
647        self.mode
648    }
649
650    async fn accept_input(
651        &self,
652        session_id: &SessionId,
653        input: Input,
654    ) -> Result<AcceptOutcome, RuntimeDriverError> {
655        let sessions = self.sessions.read().await;
656        let entry = sessions
657            .get(session_id)
658            .ok_or(RuntimeDriverError::NotReady {
659                state: RuntimeState::Destroyed,
660            })?;
661
662        // Accept input and check wake under the driver lock
663        let (outcome, should_wake, should_process) = {
664            let mut driver = entry.driver.lock().await;
665            let result = driver.as_driver_mut().accept_input(input).await?;
666            let wake = driver.take_wake_requested();
667            let process_now = driver.take_process_requested();
668            (result, wake, process_now)
669        };
670
671        // Signal the RuntimeLoop if wake or immediate processing was requested.
672        if (should_wake || should_process)
673            && let Some(ref wake_tx) = entry.wake_tx
674        {
675            // Non-blocking: if the channel is full, the loop is already processing
676            let _ = wake_tx.try_send(());
677        }
678
679        Ok(outcome)
680    }
681
682    async fn runtime_state(
683        &self,
684        session_id: &SessionId,
685    ) -> Result<RuntimeState, RuntimeDriverError> {
686        let sessions = self.sessions.read().await;
687        let entry = sessions
688            .get(session_id)
689            .ok_or(RuntimeDriverError::NotReady {
690                state: RuntimeState::Destroyed,
691            })?;
692        let driver = entry.driver.lock().await;
693        Ok(driver.as_driver().runtime_state())
694    }
695
696    async fn retire_runtime(
697        &self,
698        session_id: &SessionId,
699    ) -> Result<RetireReport, RuntimeDriverError> {
700        let sessions = self.sessions.read().await;
701        let entry = sessions
702            .get(session_id)
703            .ok_or(RuntimeDriverError::NotReady {
704                state: RuntimeState::Destroyed,
705            })?;
706        let mut driver = entry.driver.lock().await;
707        let report = driver.as_driver_mut().retire().await?;
708        drop(driver); // Release driver lock before waking
709
710        if report.inputs_pending_drain > 0 {
711            // Wake the runtime loop so it drains already-queued inputs.
712            // Retired state allows processing but rejects new accepts.
713            if let Some(ref wake_tx) = entry.wake_tx {
714                let _ = wake_tx.send(()).await;
715            }
716        }
717
718        // If no loop is attached, nothing will drain — resolve all pending waiters
719        if entry.wake_tx.is_none() {
720            let mut completions = entry.completions.lock().await;
721            completions.resolve_all_terminated("retired without runtime loop");
722        }
723
724        Ok(report)
725    }
726
727    async fn reset_runtime(
728        &self,
729        session_id: &SessionId,
730    ) -> Result<ResetReport, RuntimeDriverError> {
731        let sessions = self.sessions.read().await;
732        let entry = sessions
733            .get(session_id)
734            .ok_or(RuntimeDriverError::NotReady {
735                state: RuntimeState::Destroyed,
736            })?;
737        let mut driver = entry.driver.lock().await;
738        if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
739            return Err(RuntimeDriverError::NotReady {
740                state: RuntimeState::Running,
741            });
742        }
743        let report = driver.as_driver_mut().reset().await?;
744
745        // Resolve all pending completion waiters — reset discards all queued work
746        let mut completions = entry.completions.lock().await;
747        completions.resolve_all_terminated("runtime reset");
748
749        Ok(report)
750    }
751
752    async fn input_state(
753        &self,
754        session_id: &SessionId,
755        input_id: &InputId,
756    ) -> Result<Option<InputState>, RuntimeDriverError> {
757        let sessions = self.sessions.read().await;
758        let entry = sessions
759            .get(session_id)
760            .ok_or(RuntimeDriverError::NotReady {
761                state: RuntimeState::Destroyed,
762            })?;
763        let driver = entry.driver.lock().await;
764        Ok(driver.as_driver().input_state(input_id).cloned())
765    }
766
767    async fn list_active_inputs(
768        &self,
769        session_id: &SessionId,
770    ) -> Result<Vec<InputId>, RuntimeDriverError> {
771        let sessions = self.sessions.read().await;
772        let entry = sessions
773            .get(session_id)
774            .ok_or(RuntimeDriverError::NotReady {
775                state: RuntimeState::Destroyed,
776            })?;
777        let driver = entry.driver.lock().await;
778        Ok(driver.as_driver().active_input_ids())
779    }
780}
781
782#[cfg(test)]
783#[allow(clippy::unwrap_used)]
784mod tests {
785    use super::*;
786    use crate::input::*;
787    use crate::input_state::InputState;
788    use crate::runtime_state::RuntimeState;
789    use crate::store::{RuntimeStore, RuntimeStoreError, SessionDelta};
790    use chrono::Utc;
791    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
792    use std::time::Duration;
793
794    fn make_prompt(text: &str) -> Input {
795        Input::Prompt(PromptInput {
796            header: InputHeader {
797                id: InputId::new(),
798                timestamp: Utc::now(),
799                source: InputOrigin::Operator,
800                durability: InputDurability::Durable,
801                visibility: InputVisibility::default(),
802                idempotency_key: None,
803                supersession_key: None,
804                correlation_id: None,
805            },
806            text: text.into(),
807            blocks: None,
808            turn_metadata: None,
809        })
810    }
811
812    struct HarnessRuntimeStore {
813        inner: crate::store::InMemoryRuntimeStore,
814        fail_atomic_apply: bool,
815        /// Fail atomic_lifecycle_commit after N successful calls (None = never fail).
816        fail_atomic_lifecycle_commit_after: Option<usize>,
817        atomic_lifecycle_commit_calls: AtomicUsize,
818        load_input_states_delay: Duration,
819        fail_persist_input_state_after: Option<usize>,
820        persist_input_state_calls: AtomicUsize,
821    }
822
823    impl HarnessRuntimeStore {
824        fn failing_atomic_apply() -> Self {
825            Self {
826                inner: crate::store::InMemoryRuntimeStore::new(),
827                fail_atomic_apply: true,
828                fail_atomic_lifecycle_commit_after: None,
829                atomic_lifecycle_commit_calls: AtomicUsize::new(0),
830                load_input_states_delay: Duration::ZERO,
831                fail_persist_input_state_after: None,
832                persist_input_state_calls: AtomicUsize::new(0),
833            }
834        }
835
836        fn delayed_recover(delay: Duration) -> Self {
837            Self {
838                inner: crate::store::InMemoryRuntimeStore::new(),
839                fail_atomic_apply: false,
840                fail_atomic_lifecycle_commit_after: None,
841                atomic_lifecycle_commit_calls: AtomicUsize::new(0),
842                load_input_states_delay: delay,
843                fail_persist_input_state_after: None,
844                persist_input_state_calls: AtomicUsize::new(0),
845            }
846        }
847
848        fn failing_terminal_snapshot() -> Self {
849            Self {
850                inner: crate::store::InMemoryRuntimeStore::new(),
851                fail_atomic_apply: false,
852                // Recovery calls atomic_lifecycle_commit once (call 0 succeeds),
853                // the terminal event call (call 1) fails.
854                fail_atomic_lifecycle_commit_after: Some(1),
855                atomic_lifecycle_commit_calls: AtomicUsize::new(0),
856                load_input_states_delay: Duration::ZERO,
857                fail_persist_input_state_after: None,
858                persist_input_state_calls: AtomicUsize::new(0),
859            }
860        }
861    }
862
863    #[async_trait::async_trait]
864    impl RuntimeStore for HarnessRuntimeStore {
865        async fn commit_session_boundary(
866            &self,
867            runtime_id: &crate::identifiers::LogicalRuntimeId,
868            session_delta: SessionDelta,
869            run_id: RunId,
870            boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
871            contributing_input_ids: Vec<InputId>,
872            input_updates: Vec<InputState>,
873        ) -> Result<meerkat_core::lifecycle::RunBoundaryReceipt, RuntimeStoreError> {
874            self.inner
875                .commit_session_boundary(
876                    runtime_id,
877                    session_delta,
878                    run_id,
879                    boundary,
880                    contributing_input_ids,
881                    input_updates,
882                )
883                .await
884        }
885
886        async fn atomic_apply(
887            &self,
888            runtime_id: &crate::identifiers::LogicalRuntimeId,
889            session_delta: Option<SessionDelta>,
890            receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
891            input_updates: Vec<InputState>,
892            session_store_key: Option<meerkat_core::types::SessionId>,
893        ) -> Result<(), RuntimeStoreError> {
894            if self.fail_atomic_apply {
895                return Err(RuntimeStoreError::WriteFailed(
896                    "synthetic atomic_apply failure".to_string(),
897                ));
898            }
899            self.inner
900                .atomic_apply(
901                    runtime_id,
902                    session_delta,
903                    receipt,
904                    input_updates,
905                    session_store_key,
906                )
907                .await
908        }
909
910        async fn load_input_states(
911            &self,
912            runtime_id: &crate::identifiers::LogicalRuntimeId,
913        ) -> Result<Vec<InputState>, RuntimeStoreError> {
914            if !self.load_input_states_delay.is_zero() {
915                tokio::time::sleep(self.load_input_states_delay).await;
916            }
917            self.inner.load_input_states(runtime_id).await
918        }
919
920        async fn load_boundary_receipt(
921            &self,
922            runtime_id: &crate::identifiers::LogicalRuntimeId,
923            run_id: &RunId,
924            sequence: u64,
925        ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeStoreError>
926        {
927            self.inner
928                .load_boundary_receipt(runtime_id, run_id, sequence)
929                .await
930        }
931
932        async fn load_session_snapshot(
933            &self,
934            runtime_id: &crate::identifiers::LogicalRuntimeId,
935        ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
936            self.inner.load_session_snapshot(runtime_id).await
937        }
938
939        async fn persist_input_state(
940            &self,
941            runtime_id: &crate::identifiers::LogicalRuntimeId,
942            state: &InputState,
943        ) -> Result<(), RuntimeStoreError> {
944            let call_index = self
945                .persist_input_state_calls
946                .fetch_add(1, Ordering::SeqCst);
947            if self
948                .fail_persist_input_state_after
949                .is_some_and(|fail_after| call_index >= fail_after)
950            {
951                return Err(RuntimeStoreError::WriteFailed(
952                    "synthetic persist_input_state failure".to_string(),
953                ));
954            }
955            self.inner.persist_input_state(runtime_id, state).await
956        }
957
958        async fn load_input_state(
959            &self,
960            runtime_id: &crate::identifiers::LogicalRuntimeId,
961            input_id: &InputId,
962        ) -> Result<Option<InputState>, RuntimeStoreError> {
963            self.inner.load_input_state(runtime_id, input_id).await
964        }
965
966        async fn persist_runtime_state(
967            &self,
968            runtime_id: &crate::identifiers::LogicalRuntimeId,
969            state: RuntimeState,
970        ) -> Result<(), RuntimeStoreError> {
971            self.inner.persist_runtime_state(runtime_id, state).await
972        }
973
974        async fn load_runtime_state(
975            &self,
976            runtime_id: &crate::identifiers::LogicalRuntimeId,
977        ) -> Result<Option<RuntimeState>, RuntimeStoreError> {
978            self.inner.load_runtime_state(runtime_id).await
979        }
980
981        async fn atomic_lifecycle_commit(
982            &self,
983            runtime_id: &crate::identifiers::LogicalRuntimeId,
984            runtime_state: RuntimeState,
985            input_states: &[InputState],
986        ) -> Result<(), RuntimeStoreError> {
987            let call_index = self
988                .atomic_lifecycle_commit_calls
989                .fetch_add(1, Ordering::SeqCst);
990            if self
991                .fail_atomic_lifecycle_commit_after
992                .is_some_and(|fail_after| call_index >= fail_after)
993            {
994                return Err(RuntimeStoreError::WriteFailed(
995                    "synthetic atomic_lifecycle_commit failure".to_string(),
996                ));
997            }
998            self.inner
999                .atomic_lifecycle_commit(runtime_id, runtime_state, input_states)
1000                .await
1001        }
1002    }
1003
1004    #[tokio::test]
1005    async fn ephemeral_adapter_accept_and_query() {
1006        let adapter = RuntimeSessionAdapter::ephemeral();
1007        let sid = SessionId::new();
1008        adapter.register_session(sid.clone()).await;
1009
1010        let input = make_prompt("hello");
1011        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1012        assert!(outcome.is_accepted());
1013
1014        let state = adapter.runtime_state(&sid).await.unwrap();
1015        assert_eq!(state, RuntimeState::Idle);
1016
1017        let active = adapter.list_active_inputs(&sid).await.unwrap();
1018        assert_eq!(active.len(), 1);
1019    }
1020
1021    #[tokio::test]
1022    async fn persistent_adapter_accept() {
1023        let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
1024        let adapter = RuntimeSessionAdapter::persistent(store);
1025        let sid = SessionId::new();
1026        adapter.register_session(sid.clone()).await;
1027
1028        let input = make_prompt("hello");
1029        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1030        assert!(outcome.is_accepted());
1031    }
1032
1033    #[tokio::test]
1034    async fn unregistered_session_errors() {
1035        let adapter = RuntimeSessionAdapter::ephemeral();
1036        let sid = SessionId::new();
1037        let result = adapter.accept_input(&sid, make_prompt("hi")).await;
1038        assert!(result.is_err());
1039    }
1040
1041    #[tokio::test]
1042    async fn unregister_removes_driver() {
1043        let adapter = RuntimeSessionAdapter::ephemeral();
1044        let sid = SessionId::new();
1045        adapter.register_session(sid.clone()).await;
1046        adapter.unregister_session(&sid).await;
1047
1048        let result = adapter.runtime_state(&sid).await;
1049        assert!(result.is_err());
1050    }
1051
1052    /// Test that accept_input with a RuntimeLoop triggers input processing.
1053    #[tokio::test]
1054    async fn accept_with_executor_triggers_loop() {
1055        use meerkat_core::lifecycle::RunId;
1056        use meerkat_core::lifecycle::core_executor::{
1057            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1058        };
1059        use meerkat_core::lifecycle::run_control::RunControlCommand;
1060        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1061        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1062        use std::sync::atomic::{AtomicBool, Ordering};
1063
1064        // Track whether apply was called
1065        let apply_called = Arc::new(AtomicBool::new(false));
1066        let apply_called_clone = apply_called.clone();
1067
1068        struct TestExecutor {
1069            called: Arc<AtomicBool>,
1070        }
1071
1072        #[async_trait::async_trait]
1073        impl CoreExecutor for TestExecutor {
1074            async fn apply(
1075                &mut self,
1076                run_id: RunId,
1077                primitive: RunPrimitive,
1078            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1079                self.called.store(true, Ordering::SeqCst);
1080                Ok(CoreApplyOutput {
1081                    receipt: RunBoundaryReceipt {
1082                        run_id,
1083                        boundary: RunApplyBoundary::RunStart,
1084                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1085                        conversation_digest: None,
1086                        message_count: 0,
1087                        sequence: 0,
1088                    },
1089                    session_snapshot: None,
1090                    run_result: None,
1091                })
1092            }
1093
1094            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1095                Ok(())
1096            }
1097        }
1098
1099        let adapter = RuntimeSessionAdapter::ephemeral();
1100        let sid = SessionId::new();
1101        let executor = Box::new(TestExecutor {
1102            called: apply_called_clone,
1103        });
1104        adapter
1105            .register_session_with_executor(sid.clone(), executor)
1106            .await;
1107
1108        // Accept input — should trigger the loop
1109        let input = make_prompt("hello from executor test");
1110        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1111        assert!(outcome.is_accepted());
1112
1113        // Give the loop time to process
1114        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1115
1116        assert!(
1117            apply_called.load(Ordering::SeqCst),
1118            "CoreExecutor::apply() should have been called by the RuntimeLoop"
1119        );
1120
1121        // After processing, the input should be consumed and the runtime back to Idle
1122        let state = adapter.runtime_state(&sid).await.unwrap();
1123        assert_eq!(state, RuntimeState::Idle);
1124
1125        // The input should be consumed (terminal)
1126        let active = adapter.list_active_inputs(&sid).await.unwrap();
1127        assert!(active.is_empty(), "All inputs should be consumed");
1128    }
1129
1130    /// Test that a failed executor re-queues the input (not stranded in APC).
1131    #[tokio::test]
1132    async fn failed_executor_requeues_input() {
1133        use crate::input_state::InputLifecycleState;
1134        use meerkat_core::lifecycle::core_executor::{
1135            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1136        };
1137        use meerkat_core::lifecycle::run_control::RunControlCommand;
1138        use meerkat_core::lifecycle::run_primitive::RunPrimitive;
1139        struct FailingExecutor;
1140
1141        #[async_trait::async_trait]
1142        impl CoreExecutor for FailingExecutor {
1143            async fn apply(
1144                &mut self,
1145                _run_id: RunId,
1146                _primitive: RunPrimitive,
1147            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1148                Err(CoreExecutorError::ApplyFailed {
1149                    reason: "LLM error".into(),
1150                })
1151            }
1152
1153            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1154                Ok(())
1155            }
1156        }
1157
1158        let adapter = RuntimeSessionAdapter::ephemeral();
1159        let sid = SessionId::new();
1160        adapter
1161            .register_session_with_executor(sid.clone(), Box::new(FailingExecutor))
1162            .await;
1163
1164        let input = make_prompt("hello failing");
1165        let input_id = input.id().clone();
1166        adapter.accept_input(&sid, input).await.unwrap();
1167
1168        // Give the loop time to process and fail
1169        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1170
1171        // Runtime should be back to Idle (not stuck in Running)
1172        let state = adapter.runtime_state(&sid).await.unwrap();
1173        assert_eq!(state, RuntimeState::Idle);
1174
1175        // Input should be rolled back to Queued (not stranded in APC)
1176        let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1177        assert_eq!(
1178            is.current_state,
1179            InputLifecycleState::Queued,
1180            "Failed execution should roll input back to Queued, not strand in AppliedPendingConsumption"
1181        );
1182    }
1183
1184    #[tokio::test]
1185    async fn failed_executor_continues_processing_backlog() {
1186        use crate::input_state::InputLifecycleState;
1187        use meerkat_core::lifecycle::core_executor::{
1188            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1189        };
1190        use meerkat_core::lifecycle::run_control::RunControlCommand;
1191        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1192        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1193
1194        struct FailThenSucceedExecutor {
1195            calls: Arc<AtomicUsize>,
1196        }
1197
1198        #[async_trait::async_trait]
1199        impl CoreExecutor for FailThenSucceedExecutor {
1200            async fn apply(
1201                &mut self,
1202                run_id: RunId,
1203                primitive: RunPrimitive,
1204            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1205                let call = self.calls.fetch_add(1, Ordering::SeqCst);
1206                tokio::time::sleep(Duration::from_millis(50)).await;
1207                if call == 0 {
1208                    return Err(CoreExecutorError::ApplyFailed {
1209                        reason: "first run fails".into(),
1210                    });
1211                }
1212                Ok(CoreApplyOutput {
1213                    receipt: RunBoundaryReceipt {
1214                        run_id,
1215                        boundary: RunApplyBoundary::RunStart,
1216                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1217                        conversation_digest: None,
1218                        message_count: 0,
1219                        sequence: 0,
1220                    },
1221                    session_snapshot: None,
1222                    run_result: None,
1223                })
1224            }
1225
1226            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1227                Ok(())
1228            }
1229        }
1230
1231        let adapter = RuntimeSessionAdapter::ephemeral();
1232        let sid = SessionId::new();
1233        let calls = Arc::new(AtomicUsize::new(0));
1234        adapter
1235            .register_session_with_executor(
1236                sid.clone(),
1237                Box::new(FailThenSucceedExecutor {
1238                    calls: Arc::clone(&calls),
1239                }),
1240            )
1241            .await;
1242
1243        let first = make_prompt("first");
1244        let first_id = first.id().clone();
1245        let second = make_prompt("second");
1246        let second_id = second.id().clone();
1247        adapter.accept_input(&sid, first).await.unwrap();
1248        tokio::time::sleep(Duration::from_millis(10)).await;
1249        adapter.accept_input(&sid, second).await.unwrap();
1250
1251        tokio::time::sleep(Duration::from_millis(220)).await;
1252
1253        let second_state = adapter
1254            .input_state(&sid, &second_id)
1255            .await
1256            .unwrap()
1257            .unwrap();
1258        assert_eq!(second_state.current_state, InputLifecycleState::Consumed);
1259        assert_eq!(
1260            adapter.runtime_state(&sid).await.unwrap(),
1261            RuntimeState::Idle
1262        );
1263        assert!(
1264            calls.load(Ordering::SeqCst) >= 2,
1265            "the runtime loop should keep draining queued backlog after a failed run"
1266        );
1267        let first_state = adapter.input_state(&sid, &first_id).await.unwrap().unwrap();
1268        assert!(
1269            matches!(
1270                first_state.current_state,
1271                InputLifecycleState::Queued | InputLifecycleState::Consumed
1272            ),
1273            "the initially failed input should have been safely rolled back or retried after the backlog drained"
1274        );
1275    }
1276
1277    #[tokio::test]
1278    async fn ensure_session_with_executor_upgrades_registered_session() {
1279        use crate::input_state::InputLifecycleState;
1280        use meerkat_core::lifecycle::RunId;
1281        use meerkat_core::lifecycle::core_executor::{
1282            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1283        };
1284        use meerkat_core::lifecycle::run_control::RunControlCommand;
1285        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1286        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1287        use std::sync::atomic::{AtomicBool, Ordering};
1288
1289        struct SuccessExecutor {
1290            called: Arc<AtomicBool>,
1291        }
1292
1293        #[async_trait::async_trait]
1294        impl CoreExecutor for SuccessExecutor {
1295            async fn apply(
1296                &mut self,
1297                run_id: RunId,
1298                primitive: RunPrimitive,
1299            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1300                self.called.store(true, Ordering::SeqCst);
1301                Ok(CoreApplyOutput {
1302                    receipt: RunBoundaryReceipt {
1303                        run_id,
1304                        boundary: RunApplyBoundary::RunStart,
1305                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1306                        conversation_digest: None,
1307                        message_count: 0,
1308                        sequence: 0,
1309                    },
1310                    session_snapshot: None,
1311                    run_result: None,
1312                })
1313            }
1314
1315            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1316                Ok(())
1317            }
1318        }
1319
1320        let apply_called = Arc::new(AtomicBool::new(false));
1321        let adapter = RuntimeSessionAdapter::ephemeral();
1322        let sid = SessionId::new();
1323        adapter.register_session(sid.clone()).await;
1324
1325        let input = make_prompt("upgrade me");
1326        let input_id = input.id().clone();
1327        let outcome = adapter.accept_input(&sid, input).await.unwrap();
1328        assert!(outcome.is_accepted());
1329
1330        adapter
1331            .ensure_session_with_executor(
1332                sid.clone(),
1333                Box::new(SuccessExecutor {
1334                    called: Arc::clone(&apply_called),
1335                }),
1336            )
1337            .await;
1338
1339        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1340
1341        assert!(
1342            apply_called.load(Ordering::SeqCst),
1343            "upgrading an already-registered session should attach a live loop"
1344        );
1345
1346        let state = adapter.runtime_state(&sid).await.unwrap();
1347        assert_eq!(state, RuntimeState::Idle);
1348
1349        let active = adapter.list_active_inputs(&sid).await.unwrap();
1350        assert!(active.is_empty(), "queued work should drain after upgrade");
1351
1352        let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1353        assert_eq!(
1354            is.current_state,
1355            InputLifecycleState::Consumed,
1356            "the pre-upgrade queued input should be processed once the loop is attached"
1357        );
1358    }
1359
1360    #[tokio::test]
1361    async fn ensure_session_with_executor_upgrades_racy_registration() {
1362        use crate::input_state::InputLifecycleState;
1363        use meerkat_core::lifecycle::RunId;
1364        use meerkat_core::lifecycle::core_executor::{
1365            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1366        };
1367        use meerkat_core::lifecycle::run_control::RunControlCommand;
1368        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1369        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1370
1371        struct SuccessExecutor {
1372            called: Arc<AtomicBool>,
1373        }
1374
1375        #[async_trait::async_trait]
1376        impl CoreExecutor for SuccessExecutor {
1377            async fn apply(
1378                &mut self,
1379                run_id: RunId,
1380                primitive: RunPrimitive,
1381            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1382                self.called.store(true, Ordering::SeqCst);
1383                Ok(CoreApplyOutput {
1384                    receipt: RunBoundaryReceipt {
1385                        run_id,
1386                        boundary: RunApplyBoundary::RunStart,
1387                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1388                        conversation_digest: None,
1389                        message_count: 0,
1390                        sequence: 0,
1391                    },
1392                    session_snapshot: None,
1393                    run_result: None,
1394                })
1395            }
1396
1397            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1398                Ok(())
1399            }
1400        }
1401
1402        let store = Arc::new(HarnessRuntimeStore::delayed_recover(Duration::from_millis(
1403            75,
1404        )));
1405        let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1406        let sid = SessionId::new();
1407        let apply_called = Arc::new(AtomicBool::new(false));
1408
1409        let ensure_task = {
1410            let adapter = Arc::clone(&adapter);
1411            let sid = sid.clone();
1412            let apply_called = Arc::clone(&apply_called);
1413            tokio::spawn(async move {
1414                adapter
1415                    .ensure_session_with_executor(
1416                        sid,
1417                        Box::new(SuccessExecutor {
1418                            called: apply_called,
1419                        }),
1420                    )
1421                    .await;
1422            })
1423        };
1424
1425        tokio::time::sleep(Duration::from_millis(10)).await;
1426        adapter.register_session(sid.clone()).await;
1427        ensure_task.await.unwrap();
1428
1429        let input = make_prompt("race upgrade");
1430        let input_id = input.id().clone();
1431        adapter.accept_input(&sid, input).await.unwrap();
1432        tokio::time::sleep(Duration::from_millis(120)).await;
1433
1434        assert!(
1435            apply_called.load(Ordering::SeqCst),
1436            "the racy registration path should still attach a live runtime loop"
1437        );
1438        let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1439        assert_eq!(state.current_state, InputLifecycleState::Consumed);
1440    }
1441
1442    #[tokio::test]
1443    async fn boundary_commit_failure_unwinds_sync_runtime_state() {
1444        use crate::input_state::InputLifecycleState;
1445        use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1446        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1447        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1448
1449        let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1450        let adapter = RuntimeSessionAdapter::persistent(store);
1451        let sid = SessionId::new();
1452        adapter.register_session(sid.clone()).await;
1453
1454        let input = make_prompt("sync boundary failure");
1455        let input_id = input.id().clone();
1456        let result = adapter
1457            .accept_input_and_run(&sid, input, move |run_id, primitive| async move {
1458                Ok((
1459                    (),
1460                    CoreApplyOutput {
1461                        receipt: RunBoundaryReceipt {
1462                            run_id,
1463                            boundary: RunApplyBoundary::RunStart,
1464                            contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1465                            conversation_digest: None,
1466                            message_count: 0,
1467                            sequence: 0,
1468                        },
1469                        session_snapshot: None,
1470                        run_result: None,
1471                    },
1472                ))
1473            })
1474            .await;
1475        assert!(result.is_err(), "boundary commit failure should surface");
1476        let Err(err) = result else {
1477            unreachable!("asserted runtime boundary commit failure above");
1478        };
1479        assert!(
1480            err.to_string().contains("runtime boundary commit failed"),
1481            "unexpected error: {err}"
1482        );
1483        assert_eq!(
1484            adapter.runtime_state(&sid).await.unwrap(),
1485            RuntimeState::Idle
1486        );
1487        let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1488        assert_eq!(state.current_state, InputLifecycleState::Queued);
1489    }
1490
1491    #[tokio::test]
1492    async fn boundary_commit_failure_unwinds_runtime_loop_state() {
1493        use crate::input_state::InputLifecycleState;
1494        use meerkat_core::lifecycle::core_executor::{
1495            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1496        };
1497        use meerkat_core::lifecycle::run_control::RunControlCommand;
1498        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1499        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1500
1501        struct SuccessExecutor {
1502            stop_called: Arc<AtomicBool>,
1503        }
1504
1505        #[async_trait::async_trait]
1506        impl CoreExecutor for SuccessExecutor {
1507            async fn apply(
1508                &mut self,
1509                run_id: RunId,
1510                primitive: RunPrimitive,
1511            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1512                Ok(CoreApplyOutput {
1513                    receipt: RunBoundaryReceipt {
1514                        run_id,
1515                        boundary: RunApplyBoundary::RunStart,
1516                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1517                        conversation_digest: None,
1518                        message_count: 0,
1519                        sequence: 0,
1520                    },
1521                    session_snapshot: None,
1522                    run_result: None,
1523                })
1524            }
1525
1526            async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1527                if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1528                    self.stop_called.store(true, Ordering::SeqCst);
1529                }
1530                Ok(())
1531            }
1532        }
1533
1534        let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
1535        let adapter = RuntimeSessionAdapter::persistent(store);
1536        let sid = SessionId::new();
1537        let stop_called = Arc::new(AtomicBool::new(false));
1538        adapter
1539            .register_session_with_executor(
1540                sid.clone(),
1541                Box::new(SuccessExecutor {
1542                    stop_called: Arc::clone(&stop_called),
1543                }),
1544            )
1545            .await;
1546
1547        let input = make_prompt("loop boundary failure");
1548        let input_id = input.id().clone();
1549        adapter.accept_input(&sid, input).await.unwrap();
1550        tokio::time::sleep(Duration::from_millis(120)).await;
1551
1552        assert!(
1553            stop_called.load(Ordering::SeqCst),
1554            "boundary commit failures should stop the dead executor path"
1555        );
1556        assert_eq!(
1557            adapter.runtime_state(&sid).await.unwrap(),
1558            RuntimeState::Idle
1559        );
1560        let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
1561        assert_eq!(state.current_state, InputLifecycleState::Queued);
1562    }
1563
1564    #[tokio::test]
1565    async fn terminal_snapshot_failure_unregisters_runtime_loop_session() {
1566        use meerkat_core::lifecycle::core_executor::{
1567            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1568        };
1569        use meerkat_core::lifecycle::run_control::RunControlCommand;
1570        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1571        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1572
1573        struct SuccessExecutor {
1574            adapter: Arc<RuntimeSessionAdapter>,
1575            session_id: SessionId,
1576            stop_called: Arc<AtomicBool>,
1577        }
1578
1579        #[async_trait::async_trait]
1580        impl CoreExecutor for SuccessExecutor {
1581            async fn apply(
1582                &mut self,
1583                run_id: RunId,
1584                primitive: RunPrimitive,
1585            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1586                Ok(CoreApplyOutput {
1587                    receipt: RunBoundaryReceipt {
1588                        run_id,
1589                        boundary: RunApplyBoundary::RunStart,
1590                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1591                        conversation_digest: None,
1592                        message_count: 0,
1593                        sequence: 0,
1594                    },
1595                    session_snapshot: None,
1596                    run_result: None,
1597                })
1598            }
1599
1600            async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1601                if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
1602                    self.stop_called.store(true, Ordering::SeqCst);
1603                    self.adapter.unregister_session(&self.session_id).await;
1604                }
1605                Ok(())
1606            }
1607        }
1608
1609        let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1610        let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
1611        let sid = SessionId::new();
1612        let stop_called = Arc::new(AtomicBool::new(false));
1613        adapter
1614            .register_session_with_executor(
1615                sid.clone(),
1616                Box::new(SuccessExecutor {
1617                    adapter: Arc::clone(&adapter),
1618                    session_id: sid.clone(),
1619                    stop_called: Arc::clone(&stop_called),
1620                }),
1621            )
1622            .await;
1623
1624        adapter
1625            .accept_input(&sid, make_prompt("terminal snapshot failure"))
1626            .await
1627            .unwrap();
1628        tokio::time::sleep(Duration::from_millis(120)).await;
1629
1630        assert!(
1631            stop_called.load(Ordering::SeqCst),
1632            "terminal snapshot persistence failures should stop the runtime loop"
1633        );
1634        let state_result = adapter.runtime_state(&sid).await;
1635        assert!(
1636            state_result.is_err(),
1637            "stopped runtime sessions should be unregistered"
1638        );
1639        let Err(err) = state_result else {
1640            unreachable!("asserted stopped runtime unregistration above");
1641        };
1642        assert!(matches!(
1643            err,
1644            RuntimeDriverError::NotReady {
1645                state: RuntimeState::Destroyed
1646            }
1647        ));
1648    }
1649
1650    #[tokio::test]
1651    async fn terminal_snapshot_failure_unregisters_sync_runtime_session() {
1652        use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
1653        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
1654        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1655
1656        let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
1657        let adapter = RuntimeSessionAdapter::persistent(store);
1658        let sid = SessionId::new();
1659        adapter.register_session(sid.clone()).await;
1660
1661        let result = adapter
1662            .accept_input_and_run(
1663                &sid,
1664                make_prompt("sync terminal snapshot failure"),
1665                move |run_id, primitive| async move {
1666                    Ok((
1667                        (),
1668                        CoreApplyOutput {
1669                            receipt: RunBoundaryReceipt {
1670                                run_id,
1671                                boundary: RunApplyBoundary::RunStart,
1672                                contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1673                                conversation_digest: None,
1674                                message_count: 0,
1675                                sequence: 0,
1676                            },
1677                            session_snapshot: None,
1678                            run_result: None,
1679                        },
1680                    ))
1681                },
1682            )
1683            .await;
1684        assert!(
1685            result.is_err(),
1686            "terminal snapshot persistence failure should surface"
1687        );
1688        let Err(err) = result else {
1689            unreachable!("asserted terminal snapshot failure above");
1690        };
1691
1692        assert!(
1693            err.to_string().contains("terminal event persist failed")
1694                || err
1695                    .to_string()
1696                    .contains("failed to persist runtime completion snapshot"),
1697            "unexpected error: {err}"
1698        );
1699        let runtime_state = adapter.runtime_state(&sid).await;
1700        assert!(
1701            matches!(
1702                runtime_state,
1703                Err(RuntimeDriverError::NotReady {
1704                    state: RuntimeState::Destroyed
1705                })
1706            ),
1707            "sync path should unregister the broken runtime session"
1708        );
1709    }
1710
1711    // ─── Phase A gate tests ───
1712
1713    /// Gate A2: Dedup on terminal input returns (Deduplicated, None) — no hang.
1714    #[tokio::test]
1715    async fn dedup_terminal_input_returns_none_handle() {
1716        use crate::identifiers::IdempotencyKey;
1717        use meerkat_core::lifecycle::core_executor::{
1718            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1719        };
1720        use meerkat_core::lifecycle::run_control::RunControlCommand;
1721        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1722        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1723        use meerkat_core::types::{RunResult, Usage};
1724
1725        struct ResultExecutor;
1726        #[async_trait::async_trait]
1727        impl CoreExecutor for ResultExecutor {
1728            async fn apply(
1729                &mut self,
1730                run_id: RunId,
1731                primitive: RunPrimitive,
1732            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1733                Ok(CoreApplyOutput {
1734                    receipt: RunBoundaryReceipt {
1735                        run_id,
1736                        boundary: RunApplyBoundary::RunStart,
1737                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1738                        conversation_digest: None,
1739                        message_count: 0,
1740                        sequence: 0,
1741                    },
1742                    session_snapshot: None,
1743                    run_result: Some(RunResult {
1744                        text: "done".into(),
1745                        session_id: SessionId::new(),
1746                        usage: Usage::default(),
1747                        turns: 1,
1748                        tool_calls: 0,
1749                        structured_output: None,
1750                        schema_warnings: None,
1751                        skill_diagnostics: None,
1752                    }),
1753                })
1754            }
1755            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1756                Ok(())
1757            }
1758        }
1759
1760        let adapter = RuntimeSessionAdapter::ephemeral();
1761        let sid = SessionId::new();
1762        adapter
1763            .register_session_with_executor(sid.clone(), Box::new(ResultExecutor))
1764            .await;
1765
1766        // Accept first input with idempotency key
1767        let key = IdempotencyKey::new("gate-a2");
1768        let mut input1 = make_prompt("first");
1769        if let Input::Prompt(ref mut p) = input1 {
1770            p.header.idempotency_key = Some(key.clone());
1771        }
1772        let (outcome1, handle1) = adapter
1773            .accept_input_with_completion(&sid, input1)
1774            .await
1775            .unwrap();
1776        assert!(outcome1.is_accepted());
1777        assert!(handle1.is_some(), "accepted input should have a handle");
1778
1779        // Wait for it to complete
1780        let result = handle1.unwrap().wait().await;
1781        assert!(
1782            matches!(result, crate::completion::CompletionOutcome::Completed(_)),
1783            "first input should complete successfully"
1784        );
1785
1786        // Now send duplicate — input is already terminal (Consumed)
1787        let mut input2 = make_prompt("duplicate");
1788        if let Input::Prompt(ref mut p) = input2 {
1789            p.header.idempotency_key = Some(key);
1790        }
1791        let (outcome2, handle2) = adapter
1792            .accept_input_with_completion(&sid, input2)
1793            .await
1794            .unwrap();
1795        assert!(
1796            outcome2.is_deduplicated(),
1797            "second input with same key should be deduplicated"
1798        );
1799        assert!(
1800            handle2.is_none(),
1801            "dedup on terminal input should return None handle"
1802        );
1803    }
1804
1805    /// Gate A3: Dedup on in-flight input returns (Deduplicated, Some(handle))
1806    /// that resolves when the original completes.
1807    #[tokio::test]
1808    async fn dedup_inflight_input_returns_handle_that_resolves() {
1809        use crate::identifiers::IdempotencyKey;
1810        use meerkat_core::lifecycle::core_executor::{
1811            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1812        };
1813        use meerkat_core::lifecycle::run_control::RunControlCommand;
1814        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1815        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1816        use meerkat_core::types::{RunResult, Usage};
1817
1818        struct SlowExecutor;
1819        #[async_trait::async_trait]
1820        impl CoreExecutor for SlowExecutor {
1821            async fn apply(
1822                &mut self,
1823                run_id: RunId,
1824                primitive: RunPrimitive,
1825            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1826                // Simulate slow execution so duplicate arrives while in-flight
1827                tokio::time::sleep(Duration::from_millis(200)).await;
1828                Ok(CoreApplyOutput {
1829                    receipt: RunBoundaryReceipt {
1830                        run_id,
1831                        boundary: RunApplyBoundary::RunStart,
1832                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1833                        conversation_digest: None,
1834                        message_count: 0,
1835                        sequence: 0,
1836                    },
1837                    session_snapshot: None,
1838                    run_result: Some(RunResult {
1839                        text: "slow done".into(),
1840                        session_id: SessionId::new(),
1841                        usage: Usage::default(),
1842                        turns: 1,
1843                        tool_calls: 0,
1844                        structured_output: None,
1845                        schema_warnings: None,
1846                        skill_diagnostics: None,
1847                    }),
1848                })
1849            }
1850            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1851                Ok(())
1852            }
1853        }
1854
1855        let adapter = RuntimeSessionAdapter::ephemeral();
1856        let sid = SessionId::new();
1857        adapter
1858            .register_session_with_executor(sid.clone(), Box::new(SlowExecutor))
1859            .await;
1860
1861        // Accept first input with idempotency key
1862        let key = IdempotencyKey::new("gate-a3");
1863        let mut input1 = make_prompt("original");
1864        if let Input::Prompt(ref mut p) = input1 {
1865            p.header.idempotency_key = Some(key.clone());
1866        }
1867        let (outcome1, handle1) = adapter
1868            .accept_input_with_completion(&sid, input1)
1869            .await
1870            .unwrap();
1871        assert!(outcome1.is_accepted());
1872
1873        // Wait briefly so the input is in-flight (Staged/Running), not yet terminal
1874        tokio::time::sleep(Duration::from_millis(50)).await;
1875
1876        // Send duplicate while original is still running
1877        let mut input2 = make_prompt("duplicate");
1878        if let Input::Prompt(ref mut p) = input2 {
1879            p.header.idempotency_key = Some(key);
1880        }
1881        let (outcome2, handle2) = adapter
1882            .accept_input_with_completion(&sid, input2)
1883            .await
1884            .unwrap();
1885        assert!(
1886            outcome2.is_deduplicated(),
1887            "second input should be deduplicated"
1888        );
1889        assert!(
1890            handle2.is_some(),
1891            "dedup on in-flight input should return Some(handle)"
1892        );
1893
1894        // Both handles should resolve when the original completes
1895        let result1 = handle1.unwrap().wait().await;
1896        let result2 = handle2.unwrap().wait().await;
1897        assert!(
1898            matches!(result1, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1899            "original handle should complete with result"
1900        );
1901        assert!(
1902            matches!(result2, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
1903            "duplicate handle should also complete with same result"
1904        );
1905    }
1906
1907    /// Gate A4 (part 1): resolve_without_result sends CompletedWithoutResult
1908    /// when executor returns run_result: None.
1909    #[tokio::test]
1910    async fn completion_handle_resolves_without_result() {
1911        use meerkat_core::lifecycle::core_executor::{
1912            CoreApplyOutput, CoreExecutor, CoreExecutorError,
1913        };
1914        use meerkat_core::lifecycle::run_control::RunControlCommand;
1915        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
1916        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
1917
1918        struct NoResultExecutor;
1919        #[async_trait::async_trait]
1920        impl CoreExecutor for NoResultExecutor {
1921            async fn apply(
1922                &mut self,
1923                run_id: RunId,
1924                primitive: RunPrimitive,
1925            ) -> Result<CoreApplyOutput, CoreExecutorError> {
1926                Ok(CoreApplyOutput {
1927                    receipt: RunBoundaryReceipt {
1928                        run_id,
1929                        boundary: RunApplyBoundary::RunStart,
1930                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
1931                        conversation_digest: None,
1932                        message_count: 0,
1933                        sequence: 0,
1934                    },
1935                    session_snapshot: None,
1936                    run_result: None, // No RunResult
1937                })
1938            }
1939            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
1940                Ok(())
1941            }
1942        }
1943
1944        let adapter = RuntimeSessionAdapter::ephemeral();
1945        let sid = SessionId::new();
1946        adapter
1947            .register_session_with_executor(sid.clone(), Box::new(NoResultExecutor))
1948            .await;
1949
1950        let input = make_prompt("context append");
1951        let (outcome, handle) = adapter
1952            .accept_input_with_completion(&sid, input)
1953            .await
1954            .unwrap();
1955        assert!(outcome.is_accepted());
1956
1957        let result = handle.unwrap().wait().await;
1958        assert!(
1959            matches!(
1960                result,
1961                crate::completion::CompletionOutcome::CompletedWithoutResult
1962            ),
1963            "executor returning run_result: None should resolve as CompletedWithoutResult, got {result:?}"
1964        );
1965    }
1966
1967    /// Gate A5: reset_runtime resolves all pending waiters.
1968    #[tokio::test]
1969    async fn reset_runtime_resolves_pending_waiters() {
1970        // Register without executor so inputs queue but don't process
1971        let adapter = RuntimeSessionAdapter::ephemeral();
1972        let sid = SessionId::new();
1973        adapter.register_session(sid.clone()).await;
1974
1975        let input = make_prompt("pending");
1976        let (outcome, handle) = adapter
1977            .accept_input_with_completion(&sid, input)
1978            .await
1979            .unwrap();
1980        assert!(outcome.is_accepted());
1981        assert!(handle.is_some());
1982
1983        // Reset the runtime
1984        adapter.reset_runtime(&sid).await.unwrap();
1985
1986        // Handle should resolve as terminated
1987        let result = handle.unwrap().wait().await;
1988        assert!(
1989            matches!(
1990                result,
1991                crate::completion::CompletionOutcome::RuntimeTerminated(_)
1992            ),
1993            "reset should resolve pending waiters as terminated, got {result:?}"
1994        );
1995    }
1996
1997    /// Gate A6: retire_runtime without loop resolves waiters.
1998    #[tokio::test]
1999    async fn retire_without_loop_resolves_waiters() {
2000        // Register without executor (no RuntimeLoop)
2001        let adapter = RuntimeSessionAdapter::ephemeral();
2002        let sid = SessionId::new();
2003        adapter.register_session(sid.clone()).await;
2004
2005        let input = make_prompt("will be retired");
2006        let (outcome, handle) = adapter
2007            .accept_input_with_completion(&sid, input)
2008            .await
2009            .unwrap();
2010        assert!(outcome.is_accepted());
2011        assert!(handle.is_some());
2012
2013        // Retire without loop attached
2014        adapter.retire_runtime(&sid).await.unwrap();
2015
2016        // Handle should resolve as terminated since no loop will drain
2017        let result = handle.unwrap().wait().await;
2018        assert!(
2019            matches!(
2020                result,
2021                crate::completion::CompletionOutcome::RuntimeTerminated(_)
2022            ),
2023            "retire without loop should resolve pending waiters as terminated, got {result:?}"
2024        );
2025    }
2026
2027    /// Test that BoundaryApplied fires with correct receipt on success.
2028    #[tokio::test]
2029    async fn successful_execution_fires_boundary_applied() {
2030        use crate::input_state::InputLifecycleState;
2031        use meerkat_core::lifecycle::RunId;
2032        use meerkat_core::lifecycle::core_executor::{
2033            CoreApplyOutput, CoreExecutor, CoreExecutorError,
2034        };
2035        use meerkat_core::lifecycle::run_control::RunControlCommand;
2036        use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
2037        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
2038
2039        struct SuccessExecutor;
2040
2041        #[async_trait::async_trait]
2042        impl CoreExecutor for SuccessExecutor {
2043            async fn apply(
2044                &mut self,
2045                run_id: RunId,
2046                primitive: RunPrimitive,
2047            ) -> Result<CoreApplyOutput, CoreExecutorError> {
2048                Ok(CoreApplyOutput {
2049                    receipt: RunBoundaryReceipt {
2050                        run_id,
2051                        boundary: RunApplyBoundary::RunStart,
2052                        contributing_input_ids: primitive.contributing_input_ids().to_vec(),
2053                        conversation_digest: None,
2054                        message_count: 0,
2055                        sequence: 0,
2056                    },
2057                    session_snapshot: None,
2058                    run_result: None,
2059                })
2060            }
2061
2062            async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
2063                Ok(())
2064            }
2065        }
2066
2067        let adapter = RuntimeSessionAdapter::ephemeral();
2068        let sid = SessionId::new();
2069        adapter
2070            .register_session_with_executor(sid.clone(), Box::new(SuccessExecutor))
2071            .await;
2072
2073        let input = make_prompt("hello success");
2074        let input_id = input.id().clone();
2075        adapter.accept_input(&sid, input).await.unwrap();
2076
2077        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2078
2079        // Input should have gone through full lifecycle: Queued → Staged → Applied → APC → Consumed
2080        let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
2081        assert_eq!(
2082            is.current_state,
2083            InputLifecycleState::Consumed,
2084            "Successful execution should consume the input"
2085        );
2086
2087        // Runtime should be back to Idle
2088        let state = adapter.runtime_state(&sid).await.unwrap();
2089        assert_eq!(state, RuntimeState::Idle);
2090    }
2091}