Skip to main content

meerkat_runtime/meerkat_machine/
mod.rs

1//! MeerkatMachine — session-scoped execution kernel.
2//!
3//! One of two kernels in the Meerkat two-kernel architecture:
4//!
5//! - **MeerkatMachine** (this module) owns session-scoped runtime state:
6//!   input ingress, run lifecycle, completion waiters, async-ops registry,
7//!   comms drain, and tool visibility publication. All mutations flow through
8//!   one unified internal reducer, gated by TLA+-derived precondition guards.
9//!
10//! - **MobMachine** (`meerkat-mob`) owns mob-scoped orchestration: roster,
11//!   flow frames, delegation, and inter-member wiring.
12//!
13//! MeerkatMachine lives in `meerkat-runtime` so `meerkat-session` does not
14//! depend on runtime execution internals. When a session registers a
15//! `CoreExecutor`, a background `RuntimeLoop` task is spawned. Input acceptance
16//! queues through the driver; wake signals the loop; the loop dequeues, stages,
17//! applies via `CoreExecutor`, and marks inputs consumed.
18
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::sync::RwLock as StdRwLock;
24#[cfg(not(target_arch = "wasm32"))]
25use std::sync::{Mutex as StdMutex, OnceLock, Weak};
26
27use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
28use meerkat_core::lifecycle::{InputId, RunId};
29use meerkat_core::types::SessionId;
30use meerkat_core::{BlobId, BlobPayload, BlobRef, BlobStore, BlobStoreError};
31use meerkat_core::{
32    DeferredToolLoadAuthority, SessionToolVisibilityState, ToolFilter, ToolScopeApplyError,
33    ToolScopeRevision, ToolScopeStageError, ToolVisibilityOwner, ToolVisibilityWitness,
34};
35
36use crate::accept::AcceptOutcome;
37use crate::driver::ephemeral::EphemeralRuntimeDriver;
38use crate::driver::persistent::PersistentRuntimeDriver;
39use crate::identifiers::LogicalRuntimeId;
40use crate::input::Input;
41use crate::input_state::InputLifecycleState;
42use crate::meerkat_machine_types::{
43    HydratedSessionLlmState, MeerkatAdmittedInputSnapshot, MeerkatBindingSnapshot,
44    MeerkatCompletionWaiterSnapshot, MeerkatCompletionWaitersSnapshot, MeerkatControlSnapshot,
45    MeerkatCursorSnapshot, MeerkatDrainSnapshot, MeerkatDriverKind, MeerkatFormalStateProjection,
46    MeerkatInputsSnapshot, MeerkatLedgerSnapshot, MeerkatMachineCommand,
47    MeerkatMachineCommandError, MeerkatMachineCommandResult, MeerkatMachineRunFailure,
48    MeerkatMachineRunPrepared, MeerkatMachineSpineSnapshot, MeerkatOpsSnapshot,
49    SessionLlmCapabilityDelta, SessionLlmCapabilitySurface, SessionLlmCapabilitySurfaceStatus,
50    SessionLlmReconfigureHost, SessionLlmReconfigureReport, SessionLlmReconfigureRequest,
51    SessionToolVisibilityDelta,
52};
53use crate::runtime_state::RuntimeState;
54use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
55use crate::store::RuntimeStore;
56use crate::tokio;
57use crate::tokio::sync::{Mutex, RwLock, mpsc};
58#[cfg(test)]
59use crate::traits::RuntimeDriver;
60use crate::traits::{
61    DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
62    RuntimeControlPlaneError, RuntimeDriverError,
63};
64
65/// Error type for [`MeerkatMachine::prepare_bindings`].
66#[derive(Debug, thiserror::Error)]
67pub enum RuntimeBindingsError {
68    /// Session was not found after registration (should not happen in practice).
69    #[error("session {0} not found in runtime adapter after registration")]
70    SessionNotFound(SessionId),
71    /// Machine-owned binding preparation failed before bindings were published.
72    #[error("failed to prepare runtime bindings for session {0}: {1}")]
73    PrepareFailed(SessionId, String),
74}
75
76#[derive(Debug, Default)]
77struct UnavailableBlobStore;
78
79impl UnavailableBlobStore {
80    fn error() -> BlobStoreError {
81        BlobStoreError::Unsupported(
82            "persistent runtime constructed without blob store; blob-backed inputs require a BlobStore"
83                .to_string(),
84        )
85    }
86}
87
88#[cfg(not(target_arch = "wasm32"))]
89struct PersistentAuthAuthorityBundle {
90    store: StdMutex<Weak<dyn RuntimeStore>>,
91    auth_lease: Arc<crate::handles::RuntimeAuthLeaseHandle>,
92    oauth_flows: Arc<crate::handles::RuntimeOAuthFlowHandle>,
93}
94
95#[cfg(not(target_arch = "wasm32"))]
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97enum PersistentAuthAuthorityKey {
98    Durable(String),
99    Process(usize),
100}
101
102#[cfg(not(target_arch = "wasm32"))]
103static PERSISTENT_AUTH_AUTHORITIES: OnceLock<
104    StdMutex<HashMap<PersistentAuthAuthorityKey, Arc<PersistentAuthAuthorityBundle>>>,
105> = OnceLock::new();
106
107#[cfg(not(target_arch = "wasm32"))]
108fn runtime_store_identity(store: &Arc<dyn RuntimeStore>) -> PersistentAuthAuthorityKey {
109    store
110        .auth_authority_key()
111        .map(PersistentAuthAuthorityKey::Durable)
112        .unwrap_or_else(|| {
113            PersistentAuthAuthorityKey::Process(Arc::as_ptr(store).cast::<()>() as usize)
114        })
115}
116
117fn runtime_stores_share_authority(a: &Arc<dyn RuntimeStore>, b: &Arc<dyn RuntimeStore>) -> bool {
118    match (a.auth_authority_key(), b.auth_authority_key()) {
119        (Some(a), Some(b)) => a == b,
120        _ => Arc::ptr_eq(a, b),
121    }
122}
123
124#[cfg(not(target_arch = "wasm32"))]
125fn persistent_auth_authorities(
126    store: &Arc<dyn RuntimeStore>,
127) -> Arc<PersistentAuthAuthorityBundle> {
128    let key = runtime_store_identity(store);
129    let authorities = PERSISTENT_AUTH_AUTHORITIES.get_or_init(|| StdMutex::new(HashMap::new()));
130    let mut authorities = authorities
131        .lock()
132        .unwrap_or_else(std::sync::PoisonError::into_inner);
133    if let Some(existing) = authorities.get(&key) {
134        let stored_store_alive = existing
135            .store
136            .lock()
137            .unwrap_or_else(std::sync::PoisonError::into_inner)
138            .upgrade()
139            .is_some();
140        if matches!(key, PersistentAuthAuthorityKey::Durable(_)) || stored_store_alive {
141            existing.oauth_flows.bind_persistent_store(store);
142            *existing
143                .store
144                .lock()
145                .unwrap_or_else(std::sync::PoisonError::into_inner) = Arc::downgrade(store);
146            return Arc::clone(existing);
147        }
148    }
149    let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
150    let oauth_flows = Arc::new(
151        crate::handles::RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
152            std::time::Duration::from_secs(10 * 60),
153            Arc::clone(&auth_lease),
154            store,
155        ),
156    );
157    let bundle = Arc::new(PersistentAuthAuthorityBundle {
158        store: StdMutex::new(Arc::downgrade(store)),
159        auth_lease,
160        oauth_flows,
161    });
162    authorities.insert(key, Arc::clone(&bundle));
163    bundle
164}
165
166#[cfg(all(test, not(target_arch = "wasm32")))]
167pub(crate) fn clear_persistent_auth_authorities_for_test() {
168    if let Some(authorities) = PERSISTENT_AUTH_AUTHORITIES.get() {
169        authorities
170            .lock()
171            .unwrap_or_else(std::sync::PoisonError::into_inner)
172            .clear();
173    }
174}
175
176#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
177#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
178impl BlobStore for UnavailableBlobStore {
179    async fn put_image(&self, _media_type: &str, _data: &str) -> Result<BlobRef, BlobStoreError> {
180        Err(Self::error())
181    }
182
183    async fn get(&self, _blob_id: &BlobId) -> Result<BlobPayload, BlobStoreError> {
184        Err(Self::error())
185    }
186
187    async fn delete(&self, _blob_id: &BlobId) -> Result<(), BlobStoreError> {
188        Err(Self::error())
189    }
190
191    async fn exists(&self, _blob_id: &BlobId) -> Result<bool, BlobStoreError> {
192        Err(Self::error())
193    }
194
195    fn is_persistent(&self) -> bool {
196        false
197    }
198}
199
200#[cfg(not(target_arch = "wasm32"))]
201struct UnavailableOAuthFlowAuthority;
202
203#[cfg(not(target_arch = "wasm32"))]
204impl UnavailableOAuthFlowAuthority {
205    fn rejected(operation: &'static str) -> meerkat_auth_core::oauth_flow::OAuthFlowError {
206        meerkat_auth_core::oauth_flow::OAuthFlowError::LifecycleRejected {
207            operation,
208            detail: "custom auth lease handle does not provide a runtime OAuth flow authority"
209                .to_string(),
210        }
211    }
212}
213
214#[cfg(not(target_arch = "wasm32"))]
215impl meerkat_auth_core::oauth_flow::OAuthFlowAuthority for UnavailableOAuthFlowAuthority {
216    fn start(
217        &self,
218        _target: meerkat_core::AuthBindingRef,
219        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
220        _redirect_uri: String,
221        _pkce_verifier: String,
222    ) -> Result<String, meerkat_auth_core::oauth_flow::OAuthFlowError> {
223        Err(Self::rejected("admit_oauth_browser_flow"))
224    }
225
226    fn verify(
227        &self,
228        _state: &str,
229        _target: &meerkat_core::AuthBindingRef,
230        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
231        _redirect_uri: &str,
232    ) -> Result<
233        meerkat_auth_core::oauth_flow::OAuthFlowRecord,
234        meerkat_auth_core::oauth_flow::OAuthFlowError,
235    > {
236        Err(Self::rejected("verify_oauth_browser_flow"))
237    }
238
239    fn consume(
240        &self,
241        _state: &str,
242        _target: &meerkat_core::AuthBindingRef,
243        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
244        _redirect_uri: &str,
245    ) -> Result<
246        meerkat_auth_core::oauth_flow::OAuthFlowRecord,
247        meerkat_auth_core::oauth_flow::OAuthFlowError,
248    > {
249        Err(Self::rejected("consume_oauth_browser_flow"))
250    }
251
252    fn admit_device_code(
253        &self,
254        _target: meerkat_core::AuthBindingRef,
255        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
256        _device_code: String,
257        _expires_in: std::time::Duration,
258    ) -> Result<(), meerkat_auth_core::oauth_flow::OAuthFlowError> {
259        Err(Self::rejected("admit_oauth_device_flow"))
260    }
261
262    fn verify_device_code(
263        &self,
264        _device_code: &str,
265        _target: &meerkat_core::AuthBindingRef,
266        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
267    ) -> Result<
268        meerkat_auth_core::oauth_flow::OAuthDeviceFlowRecord,
269        meerkat_auth_core::oauth_flow::OAuthFlowError,
270    > {
271        Err(Self::rejected("verify_oauth_device_flow"))
272    }
273
274    fn begin_device_code_poll(
275        &self,
276        _device_code: &str,
277        _target: &meerkat_core::AuthBindingRef,
278        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
279    ) -> Result<
280        meerkat_auth_core::oauth_flow::OAuthDevicePollLease,
281        meerkat_auth_core::oauth_flow::OAuthFlowError,
282    > {
283        Err(Self::rejected("begin_oauth_device_poll"))
284    }
285}
286
287#[cfg(not(target_arch = "wasm32"))]
288type MeerkatMachineCommandFuture<'a> = Pin<
289    Box<
290        dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>>
291            + Send
292            + 'a,
293    >,
294>;
295
296#[cfg(target_arch = "wasm32")]
297type MeerkatMachineCommandFuture<'a> = Pin<
298    Box<dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>> + 'a>,
299>;
300
301pub(crate) use driver::{
302    DriverEntry, SharedCompletionRegistry, SharedDriver, cancel_runtime_loop_run,
303    commit_runtime_loop_run, fail_machine_run, fail_runtime_loop_run,
304    machine_apply_run_return_projection, machine_batch_primitive_projections,
305    machine_batch_runtime_semantics, machine_begin_run, machine_commit_prepared_destroy,
306    machine_commit_service_turn_terminal_receipt, machine_executor_attach_projection,
307    machine_input_boundary, machine_prepare_bindings_projection, machine_prepare_destroy,
308    machine_recover_ephemeral_driver, machine_recover_persistent_driver,
309    machine_recycle_preserving_work, machine_reset, machine_retire,
310    machine_select_runtime_loop_batch, machine_stop_runtime, machine_unregister_session_projection,
311    prepare_runtime_loop_batch_start, rollback_runtime_loop_run_after_boundary_commit_failure,
312};
313
314pub(crate) mod driver;
315
316mod comms_drain;
317pub mod composition;
318mod dispatch_control;
319mod dispatch_drain;
320mod dispatch_ingress;
321mod dispatch_session;
322#[allow(unused_variables, dead_code, clippy::cmp_owned)]
323#[allow(clippy::assign_op_pattern)]
324pub mod dsl;
325pub(crate) mod dsl_authority;
326mod dsl_effects;
327mod llm_reconfigure;
328mod runtime_control;
329mod session_management;
330mod traits;
331mod visibility;
332
333pub use composition::{MeerkatCompositionSignalDispatcher, MeerkatConsumerSurface};
334
335pub use comms_drain::{
336    CommsDrainMode, CommsDrainPhase, DrainExitReason, PeerEndpointStageError, PeerIngressOwner,
337    SupervisorBinding, SupervisorBindingStageError,
338};
339pub(crate) use comms_drain::{CommsDrainSlot, abort_slot};
340pub(crate) use dsl_effects::{DslTransitionEffects, apply_dsl_transition_on_authority};
341pub(crate) use visibility::MachineToolVisibilityOwner;
342
343struct StagedSessionDslInput {
344    previous_state: Box<dsl::MeerkatMachineState>,
345    effects: DslTransitionEffects,
346}
347
348#[derive(Clone, Copy)]
349enum CommittedEffectDispatchFailure {
350    PreserveCommittedDslState,
351    RestorePreviousDslState,
352}
353
354/// Per-session state: driver + registration phase.
355struct RuntimeSessionEntry {
356    /// Canonical runtime control-plane identity for this registered session.
357    runtime_id: LogicalRuntimeId,
358    /// Per-session mutation gate.
359    ///
360    /// Serializes same-session mutating commands across the full
361    /// DSL-stage → driver-mutate → DSL-sync span. Without this gate,
362    /// two concurrent commands on the same session can interleave between
363    /// the DSL projection sync (which releases `sessions` lock) and the
364    /// driver mutation (which acquires `driver` lock independently).
365    ///
366    /// This is NOT a replacement for `sessions` RwLock or `driver` Mutex —
367    /// it is an additional serialization point that spans the entire
368    /// multi-step mutation window.
369    mutation_gate: Arc<Mutex<()>>,
370    /// Shared driver handle (accessed by both adapter methods and RuntimeLoop).
371    driver: SharedDriver,
372    /// Canonical coarse control projection for this session.
373    ///
374    /// The driver reads this to realize shell mechanics, but machine-facing
375    /// queries should publish from this shared cell rather than treating the
376    /// driver shell as the source of lifecycle truth.
377    control_projection: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
378    /// Shared async-operation lifecycle registry for this runtime/session.
379    ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
380    /// Runtime epoch identity — stable across rebuilds, rotated on reset/restart-without-recovery.
381    epoch_id: meerkat_core::RuntimeEpochId,
382    /// Shared consumer cursor state for the epoch.
383    cursor_state: Arc<meerkat_core::EpochCursorState>,
384    /// Completion waiters (accessed by accept_input_with_completion and RuntimeLoop).
385    completions: SharedCompletionRegistry,
386    /// Canonical durable visibility owner for this session.
387    tool_visibility_owner: Arc<MachineToolVisibilityOwner>,
388    /// Machine-owned current durable/live LLM identity for the registered session.
389    current_llm_identity: Option<meerkat_core::SessionLlmIdentity>,
390    /// Machine-owned current capability surface for the registered session.
391    current_capability_surface: Option<SessionLlmCapabilitySurface>,
392    /// Whether the machine has a resolved capability surface for the current identity.
393    capability_surface_status: SessionLlmCapabilitySurfaceStatus,
394    /// Registration phase — explicit type-level distinction between
395    /// "registered but inert" and "executor attached."
396    phase: RegistrationPhase,
397    /// Temporary live interrupt capability for prepared, session-owned turns
398    /// that run before the runtime loop attachment is published.
399    provisional_interrupt_handle:
400        Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
401    /// DSL authority for coarse lifecycle phase transitions.
402    /// Sync field — validates transitions, writes back phase.
403    ///
404    /// `Arc<std::sync::Mutex<_>>` so cross-crate handle impls
405    /// (`meerkat-runtime::handles::*`) can share the same underlying authority
406    /// from a sync context without awaiting the outer `sessions` tokio lock.
407    /// The Arc heap-allocates the authority's large expanded state (31 fields
408    /// including several Maps/Sets) so holding a reference to a
409    /// `RuntimeSessionEntry` does not bloat async future sizes.
410    dsl_authority: Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
411    /// Per-session comms drain lifecycle slot.
412    ///
413    /// Collapsed from the sibling `MeerkatMachine.comms_drain_slots:
414    /// RwLock<HashMap<SessionId, CommsDrainSlot>>` in wave-c C-H2 (F5 in
415    /// docs/wave-c-prep/state-scope-audit.md) — keeping the slot here
416    /// makes "session exists" a single HashMap insertion and eliminates
417    /// the class of bugs where the sibling map and the session map
418    /// could fall out of sync across a registration/unregistration
419    /// boundary.
420    drain_slot: CommsDrainSlot,
421}
422
423/// Capability bundle for an attached runtime loop.
424///
425/// Keep all loop-related handles together so "attached vs detached" cannot
426/// drift into partially-populated shell state.
427struct RuntimeLoopAttachment {
428    wake_tx: mpsc::Sender<()>,
429    effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
430    boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
431    interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
432    _loop_handle: tokio::task::JoinHandle<()>,
433}
434
435/// Explicit registration phase — the type-level distinction between
436/// "registered but inert," "attachment in progress," and "executor attached."
437///
438/// Replaces the implicit `Option<RuntimeLoopAttachment>` discriminant
439/// (Dogma §8: Option must not hide ownership uncertainty).
440enum RegistrationPhase {
441    /// Registered via `prepare_bindings()`. No executor — inputs queue
442    /// but are not processed until an executor attaches.
443    Queuing,
444    /// `ensure_session_with_executor()` is in progress — another task is
445    /// wiring the runtime loop. Concurrent callers must treat this as
446    /// "attachment pending" and not race a second loop spawn.
447    Attaching,
448    /// Executor attached with live channels. Inputs are processed
449    /// by the RuntimeLoop.
450    Active(RuntimeLoopAttachment),
451}
452
453impl RuntimeSessionEntry {
454    fn control_snapshot(&self) -> crate::driver::ephemeral::RuntimeControlProjection {
455        self.control_projection
456            .read()
457            .map(|guard| guard.clone())
458            .unwrap_or_else(|poisoned| {
459                tracing::error!("runtime control projection lock poisoned");
460                poisoned.into_inner().clone()
461            })
462    }
463
464    fn attachment_is_live(&self) -> bool {
465        match &self.phase {
466            RegistrationPhase::Active(attachment) => {
467                !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed()
468            }
469            RegistrationPhase::Queuing | RegistrationPhase::Attaching => false,
470        }
471    }
472
473    /// Returns `true` if an executor is attached with live channels, OR if
474    /// attachment is in progress (another task is wiring the loop).
475    /// Used by external-facing queries (`session_has_executor`) to prevent
476    /// concurrent callers from racing a second loop spawn.
477    fn has_attachment_or_attaching(&self) -> bool {
478        matches!(self.phase, RegistrationPhase::Attaching) || self.attachment_is_live()
479    }
480
481    /// Returns `true` only if the executor is fully attached with live channels.
482    /// Used by internal publish logic within `ensure_session_with_executor`
483    /// where the caller itself may have set `Attaching`.
484    fn has_live_attachment(&self) -> bool {
485        self.attachment_is_live()
486    }
487
488    fn attach_runtime_loop(
489        &mut self,
490        wake_tx: mpsc::Sender<()>,
491        effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
492        boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
493        interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
494        loop_handle: tokio::task::JoinHandle<()>,
495    ) {
496        self.provisional_interrupt_handle = None;
497        self.phase = RegistrationPhase::Active(RuntimeLoopAttachment {
498            wake_tx,
499            effect_tx,
500            boundary_handle,
501            interrupt_handle,
502            _loop_handle: loop_handle,
503        });
504    }
505
506    fn clear_dead_attachment(&mut self) -> bool {
507        if matches!(self.phase, RegistrationPhase::Active(_)) && !self.attachment_is_live() {
508            // Don't regress to Queuing if another task is mid-attach;
509            // Active with dead channels goes back to Queuing for retry.
510            self.phase = RegistrationPhase::Queuing;
511            return true;
512        }
513        false
514    }
515
516    fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
517        match &self.phase {
518            RegistrationPhase::Active(attachment)
519                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
520            {
521                Some(attachment.wake_tx.clone())
522            }
523            _ => None,
524        }
525    }
526
527    fn effect_sender(&self) -> Option<mpsc::Sender<crate::effect::RuntimeEffect>> {
528        match &self.phase {
529            RegistrationPhase::Active(attachment)
530                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
531            {
532                Some(attachment.effect_tx.clone())
533            }
534            _ => None,
535        }
536    }
537
538    fn boundary_handle(
539        &self,
540    ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>> {
541        match &self.phase {
542            RegistrationPhase::Active(attachment)
543                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
544            {
545                attachment.boundary_handle.clone()
546            }
547            _ => None,
548        }
549    }
550
551    fn interrupt_handle(
552        &self,
553    ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>> {
554        match &self.phase {
555            RegistrationPhase::Active(attachment)
556                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
557            {
558                attachment.interrupt_handle.clone()
559            }
560            _ => self.provisional_interrupt_handle.clone(),
561        }
562    }
563
564    fn install_provisional_interrupt_handle(
565        &mut self,
566        handle: Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>,
567    ) {
568        if !self.attachment_is_live() {
569            self.provisional_interrupt_handle = Some(handle);
570        }
571    }
572}
573
574impl MeerkatMachine {
575    /// Acquire the per-session mutation gate.
576    ///
577    /// Returns an `Arc<Mutex<()>>` that the caller must `.lock().await` and
578    /// hold across the full DSL-stage → driver-mutate → DSL-sync span.
579    /// Returns `None` if the session is not registered.
580    async fn session_mutation_gate(&self, session_id: &SessionId) -> Option<Arc<Mutex<()>>> {
581        let sessions = self.sessions.read().await;
582        sessions
583            .get(session_id)
584            .map(|entry| Arc::clone(&entry.mutation_gate))
585    }
586
587    async fn session_dsl_authority(
588        &self,
589        session_id: &SessionId,
590    ) -> Result<Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>, String> {
591        let sessions = self.sessions.read().await;
592        sessions
593            .get(session_id)
594            .map(|entry| Arc::clone(&entry.dsl_authority))
595            .ok_or_else(|| {
596                RuntimeDriverError::NotReady {
597                    state: RuntimeState::Destroyed,
598                }
599                .to_string()
600            })
601    }
602
603    fn preview_dsl_input_on_state(
604        state: &dsl::MeerkatMachineState,
605        input: dsl::MeerkatMachineInput,
606        context: &str,
607    ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
608        let mut preview = dsl::MeerkatMachineAuthority::from_state(state.clone());
609        dsl::MeerkatMachineMutator::apply(&mut preview, input)
610            .map(|transition| transition.effects)
611            .map_err(|err| dsl_authority::map_error(err, context))
612    }
613
614    async fn preview_session_dsl_input(
615        &self,
616        session_id: &SessionId,
617        input: dsl::MeerkatMachineInput,
618        context: &str,
619    ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
620        let authority = self.session_dsl_authority(session_id).await?;
621        let state = {
622            let authority = authority
623                .lock()
624                .unwrap_or_else(std::sync::PoisonError::into_inner);
625            authority.state.clone()
626        };
627        Self::preview_dsl_input_on_state(&state, input, context)
628    }
629
630    async fn session_dsl_state(
631        &self,
632        session_id: &SessionId,
633    ) -> Result<dsl::MeerkatMachineState, RuntimeControlPlaneError> {
634        let authority = self
635            .session_dsl_authority(session_id)
636            .await
637            .map_err(RuntimeControlPlaneError::Internal)?;
638        let authority = authority
639            .lock()
640            .unwrap_or_else(std::sync::PoisonError::into_inner);
641        Ok(authority.state.clone())
642    }
643
644    async fn commit_session_dsl_transition(
645        &self,
646        session_id: &SessionId,
647        staged: StagedSessionDslInput,
648        context: &str,
649    ) -> Result<(), String> {
650        self.commit_session_dsl_transition_with_dispatch_failure(
651            session_id,
652            staged,
653            context,
654            CommittedEffectDispatchFailure::RestorePreviousDslState,
655        )
656        .await
657    }
658
659    async fn commit_session_dsl_transition_preserving_committed_state(
660        &self,
661        session_id: &SessionId,
662        staged: StagedSessionDslInput,
663        context: &str,
664    ) -> Result<(), String> {
665        self.commit_session_dsl_transition_with_dispatch_failure(
666            session_id,
667            staged,
668            context,
669            CommittedEffectDispatchFailure::PreserveCommittedDslState,
670        )
671        .await
672    }
673
674    async fn commit_session_dsl_transition_with_dispatch_failure(
675        &self,
676        session_id: &SessionId,
677        staged: StagedSessionDslInput,
678        context: &str,
679        dispatch_failure: CommittedEffectDispatchFailure,
680    ) -> Result<(), String> {
681        if let Err(error) = self
682            .dispatch_routed_signals_from_effects(&staged.effects)
683            .await
684        {
685            match dispatch_failure {
686                CommittedEffectDispatchFailure::PreserveCommittedDslState => {}
687                CommittedEffectDispatchFailure::RestorePreviousDslState => {
688                    self.restore_session_dsl_state(session_id, staged.previous_state)
689                        .await;
690                }
691            }
692            return Err(format!(
693                "DSL authority ({context}): committed effect dispatch failed: {error}"
694            ));
695        }
696        Ok(())
697    }
698
699    async fn dispatch_routed_signals_from_effects(
700        &self,
701        effects: &[dsl::MeerkatMachineEffect],
702    ) -> Result<(), String> {
703        let dispatcher = {
704            self.composition_signal_dispatcher
705                .read()
706                .unwrap_or_else(std::sync::PoisonError::into_inner)
707                .clone()
708        };
709        let Some(dispatcher) = dispatcher else {
710            return Ok(());
711        };
712
713        for effect in effects {
714            if let Some(signal) = composition::lift_routed_signal(effect) {
715                composition::dispatch_routed_signal(&dispatcher, signal).await?;
716            }
717        }
718        Ok(())
719    }
720
721    async fn clear_dead_runtime_attachment(&self, session_id: &SessionId) {
722        let mut sessions = self.sessions.write().await;
723        if let Some(entry) = sessions.get_mut(session_id) {
724            entry.clear_dead_attachment();
725        }
726    }
727
728    async fn dispatch_interrupt_yielding_runtime_effect(
729        &self,
730        session_id: &SessionId,
731        effect_tx: Option<mpsc::Sender<crate::effect::RuntimeEffect>>,
732        boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
733        projected_effect: crate::effect::ProjectedRuntimeEffect,
734        context: &str,
735    ) -> Result<(), RuntimeDriverError> {
736        let Some(effect_tx) = effect_tx else {
737            let state = self
738                .existing_session_runtime_state(session_id)
739                .await
740                .unwrap_or(RuntimeState::Destroyed);
741            return Err(RuntimeDriverError::NotReady { state });
742        };
743
744        let reason = projected_effect.reason().to_string();
745        if let Some(boundary_handle) = boundary_handle {
746            boundary_handle
747                .cancel_after_boundary(reason)
748                .await
749                .map_err(|err| {
750                    RuntimeDriverError::Internal(format!(
751                        "{context}: failed to apply live boundary cancel: {err}"
752                    ))
753                })?;
754        }
755
756        match effect_tx.try_send(projected_effect.into_effect()) {
757            Ok(()) => Ok(()),
758            Err(mpsc::error::TrySendError::Full(_)) => Err(RuntimeDriverError::Internal(format!(
759                "{context}: runtime effect channel full after accepted boundary cancel"
760            ))),
761            Err(mpsc::error::TrySendError::Closed(_)) => {
762                self.clear_dead_runtime_attachment(session_id).await;
763                Err(RuntimeDriverError::NotReady {
764                    state: RuntimeState::Idle,
765                })
766            }
767        }
768    }
769
770    async fn restore_session_dsl_state(
771        &self,
772        session_id: &SessionId,
773        state: Box<dsl::MeerkatMachineState>,
774    ) {
775        if let Ok(authority) = self.session_dsl_authority(session_id).await {
776            Self::restore_dsl_authority_state(&authority, state);
777        }
778    }
779
780    fn restore_dsl_authority_state(
781        authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
782        state: Box<dsl::MeerkatMachineState>,
783    ) {
784        let mut authority = authority
785            .lock()
786            .unwrap_or_else(std::sync::PoisonError::into_inner);
787        *authority = dsl::MeerkatMachineAuthority::from_state(*state);
788    }
789}
790
791/// Capability token proving a session-control mutation is routed through
792/// `MeerkatMachine` authority instead of a public store-only service path.
793#[derive(Debug, Clone, Copy)]
794pub struct MachineSessionControlAuthority {
795    _private: (),
796}
797
798/// Session-scoped execution kernel for the Meerkat runtime.
799///
800/// Owns per-session runtime state (driver, ops registry, completion waiters,
801/// comms drain, epoch bindings) and routes all internal mutations through one
802/// canonical command reducer, with smaller group handlers retained only as
803/// implementation detail helpers.
804pub struct MeerkatMachine {
805    /// Per-session entries.
806    sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
807    /// Runtime mode.
808    mode: RuntimeMode,
809    /// Optional RuntimeStore for persistent drivers.
810    store: Option<Arc<dyn RuntimeStore>>,
811    /// Blob store used by persistent drivers for durable input externalization.
812    blob_store: Option<Arc<dyn BlobStore>>,
813    /// Runtime-owned shell seam for live session LLM reconfiguration I/O.
814    llm_reconfigure_host: StdRwLock<Option<Arc<dyn SessionLlmReconfigureHost>>>,
815    /// AuthMachine lifecycle authority shared by runtime-backed auth
816    /// resolution/refresh paths and public auth-status surfaces.
817    auth_lease: StdRwLock<Arc<dyn meerkat_core::handles::AuthLeaseHandle>>,
818    /// OAuth login-flow lifecycle authority shared by public auth surfaces
819    /// that operate through this runtime adapter.
820    #[cfg(not(target_arch = "wasm32"))]
821    oauth_flows: StdRwLock<Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>>,
822    /// Canonical owner of "this session id is currently active" — replaces
823    /// the deleted process-global `SESSION_IDENTITY_CLAIMS` static in the
824    /// comms shell (dogma #2). Comms runtimes acquire a typed
825    /// [`meerkat_core::handles::SessionClaim`] through this handle and hold
826    /// it for their lifetime; the registry is scoped to this `MeerkatMachine`
827    /// instance, so tests / multi-runtime processes get clean isolation.
828    session_claims: Arc<crate::handles::RuntimeSessionClaimRegistry>,
829    /// Optional typed signal dispatcher for MeerkatMachine lifecycle
830    /// effects routed by `meerkat_mob_seam` into MobMachine observation
831    /// signals.
832    composition_signal_dispatcher:
833        StdRwLock<Option<composition::MeerkatCompositionSignalDispatcher>>,
834}
835
836impl MeerkatMachine {
837    /// Capability token for store-only session-control mutations routed
838    /// through this machine authority.
839    #[must_use]
840    pub fn session_control_authority(&self) -> MachineSessionControlAuthority {
841        MachineSessionControlAuthority { _private: () }
842    }
843
844    /// Whether this adapter shares the same runtime persistence authority as
845    /// another adapter. Runtime-backed composition surfaces use this to reject
846    /// mismatched adapters before visible terminal events can outrun the store
847    /// that owns their durable commit.
848    #[must_use]
849    pub fn shares_runtime_persistence_with(&self, other: &Self) -> bool {
850        match (&self.store, &other.store) {
851            (None, None) => true,
852            (Some(a), Some(b)) => runtime_stores_share_authority(a, b),
853            _ => false,
854        }
855    }
856
857    /// Whether this adapter owns the same runtime persistence authority as a
858    /// concrete runtime store handle.
859    #[must_use]
860    pub fn shares_runtime_store_authority(&self, store: &Arc<dyn RuntimeStore>) -> bool {
861        self.store
862            .as_ref()
863            .is_some_and(|machine_store| runtime_stores_share_authority(machine_store, store))
864    }
865
866    /// Whether this adapter has a runtime persistence store.
867    #[must_use]
868    pub fn has_runtime_persistence(&self) -> bool {
869        self.store.is_some()
870    }
871
872    fn normalize_destroyed_error(err: RuntimeDriverError) -> RuntimeDriverError {
873        match err {
874            RuntimeDriverError::NotReady {
875                state: RuntimeState::Destroyed,
876            } => RuntimeDriverError::Destroyed,
877            other => other,
878        }
879    }
880
881    /// Create an ephemeral adapter (all sessions use EphemeralRuntimeDriver).
882    pub fn ephemeral() -> Self {
883        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
884        #[cfg(not(target_arch = "wasm32"))]
885        let oauth_flows = Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
886            std::time::Duration::from_secs(10 * 60),
887            Arc::clone(&auth_lease),
888        ));
889        let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
890        Self {
891            sessions: RwLock::new(HashMap::new()),
892            mode: RuntimeMode::V9Compliant,
893            store: None,
894            blob_store: None,
895            llm_reconfigure_host: StdRwLock::new(None),
896            auth_lease: StdRwLock::new(auth_lease),
897            #[cfg(not(target_arch = "wasm32"))]
898            oauth_flows: StdRwLock::new(oauth_flows),
899            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
900            composition_signal_dispatcher: StdRwLock::new(None),
901        }
902    }
903
904    /// Create a persistent adapter with a RuntimeStore.
905    pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
906        #[cfg(not(target_arch = "wasm32"))]
907        let (auth_lease, oauth_flows) = {
908            let authorities = persistent_auth_authorities(&store);
909            (
910                Arc::clone(&authorities.auth_lease),
911                Arc::clone(&authorities.oauth_flows),
912            )
913        };
914        #[cfg(target_arch = "wasm32")]
915        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
916        let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
917        Self {
918            sessions: RwLock::new(HashMap::new()),
919            mode: RuntimeMode::V9Compliant,
920            store: Some(store),
921            blob_store: Some(blob_store),
922            llm_reconfigure_host: StdRwLock::new(None),
923            auth_lease: StdRwLock::new(auth_lease),
924            #[cfg(not(target_arch = "wasm32"))]
925            oauth_flows: StdRwLock::new(oauth_flows),
926            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
927            composition_signal_dispatcher: StdRwLock::new(None),
928        }
929    }
930
931    /// Create a persistent adapter with a RuntimeStore but no blob store.
932    ///
933    /// The driver remains persistent for session state. Blob-backed inputs fail
934    /// explicitly at the blob-store boundary until a real [`BlobStore`] is
935    /// supplied.
936    pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
937        #[cfg(not(target_arch = "wasm32"))]
938        let (auth_lease, oauth_flows) = {
939            let authorities = persistent_auth_authorities(&store);
940            (
941                Arc::clone(&authorities.auth_lease),
942                Arc::clone(&authorities.oauth_flows),
943            )
944        };
945        #[cfg(target_arch = "wasm32")]
946        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
947        let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
948        Self {
949            sessions: RwLock::new(HashMap::new()),
950            mode: RuntimeMode::V9Compliant,
951            store: Some(store),
952            blob_store: Some(Arc::new(UnavailableBlobStore)),
953            llm_reconfigure_host: StdRwLock::new(None),
954            auth_lease: StdRwLock::new(auth_lease),
955            #[cfg(not(target_arch = "wasm32"))]
956            oauth_flows: StdRwLock::new(oauth_flows),
957            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
958            composition_signal_dispatcher: StdRwLock::new(None),
959        }
960    }
961
962    /// Shared auth lifecycle handle used by all runtime-backed session
963    /// bindings created by this adapter.
964    pub fn auth_lease_handle(&self) -> Arc<dyn meerkat_core::handles::AuthLeaseHandle> {
965        Arc::clone(
966            &self
967                .auth_lease
968                .read()
969                .unwrap_or_else(std::sync::PoisonError::into_inner),
970        )
971    }
972
973    /// Install the auth lifecycle authority that public surfaces also read.
974    ///
975    /// Surfaces construct the adapter before all state fields are available, so
976    /// this setter lets them align the adapter's runtime-backed traffic with
977    /// the surface-visible status handle without creating a competing registry.
978    pub fn set_auth_lease_handle(&self, handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>) {
979        #[cfg(not(target_arch = "wasm32"))]
980        let handle = {
981            let erased: Arc<dyn std::any::Any + Send + Sync> = handle.clone();
982            if let Ok(runtime_handle) =
983                Arc::downcast::<crate::handles::RuntimeAuthLeaseHandle>(erased)
984            {
985                self.set_runtime_auth_lease_handle(runtime_handle);
986                return;
987            }
988            *self
989                .oauth_flows
990                .write()
991                .unwrap_or_else(std::sync::PoisonError::into_inner) =
992                Arc::new(UnavailableOAuthFlowAuthority);
993            handle
994        };
995        *self
996            .auth_lease
997            .write()
998            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
999    }
1000
1001    /// Install a custom credential lifecycle handle together with an explicit
1002    /// OAuth login-flow authority.
1003    ///
1004    /// This is the opt-in seam for tests/embedders that intentionally want a
1005    /// non-runtime credential handle. The plain setter does not silently create
1006    /// a second hidden AuthMachine authority for OAuth flow membership.
1007    #[cfg(not(target_arch = "wasm32"))]
1008    pub fn set_auth_lease_handle_with_oauth_flow_authority(
1009        &self,
1010        handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>,
1011        oauth_flows: Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>,
1012    ) {
1013        *self
1014            .oauth_flows
1015            .write()
1016            .unwrap_or_else(std::sync::PoisonError::into_inner) = oauth_flows;
1017        *self
1018            .auth_lease
1019            .write()
1020            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1021    }
1022
1023    /// Install a runtime AuthMachine authority shared by auth leases and OAuth
1024    /// login-flow lifecycle transitions.
1025    pub fn set_runtime_auth_lease_handle(
1026        &self,
1027        handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
1028    ) {
1029        #[cfg(not(target_arch = "wasm32"))]
1030        {
1031            *self
1032                .oauth_flows
1033                .write()
1034                .unwrap_or_else(std::sync::PoisonError::into_inner) =
1035                Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
1036                    std::time::Duration::from_secs(10 * 60),
1037                    Arc::clone(&handle),
1038                ));
1039        }
1040        let handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = handle;
1041        *self
1042            .auth_lease
1043            .write()
1044            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1045    }
1046
1047    /// Shared OAuth login-flow authority used by all auth surfaces that are
1048    /// backed by this runtime adapter.
1049    #[cfg(not(target_arch = "wasm32"))]
1050    pub fn oauth_flow_authority(
1051        &self,
1052    ) -> Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority> {
1053        Arc::clone(
1054            &self
1055                .oauth_flows
1056                .read()
1057                .unwrap_or_else(std::sync::PoisonError::into_inner),
1058        )
1059    }
1060
1061    /// The canonical session-identity claim handle owned by this
1062    /// `MeerkatMachine`. Comms runtimes wired through this machine acquire
1063    /// their session-id claim through it; the registry is scoped to this
1064    /// machine instance so tests / parallel runtimes do not collide.
1065    pub fn session_claim_handle(&self) -> Arc<dyn meerkat_core::handles::SessionClaimHandle> {
1066        Arc::clone(&self.session_claims) as Arc<dyn meerkat_core::handles::SessionClaimHandle>
1067    }
1068
1069    /// Attach the typed composition signal dispatcher used for
1070    /// MeerkatMachine -> MobMachine lifecycle observation routes.
1071    pub fn set_composition_signal_dispatcher(
1072        &self,
1073        dispatcher: composition::MeerkatCompositionSignalDispatcher,
1074    ) {
1075        let mut slot = self
1076            .composition_signal_dispatcher
1077            .write()
1078            .unwrap_or_else(std::sync::PoisonError::into_inner);
1079        *slot = Some(dispatcher);
1080    }
1081
1082    /// Apply a routed-input variant delivered by the `meerkat_mob_seam`
1083    /// composition dispatcher against the session's shared DSL authority.
1084    ///
1085    /// The caller is
1086    /// [`crate::meerkat_machine::composition::MeerkatConsumerSurface::apply_routed_input`];
1087    /// it has already projected producer fields into the typed
1088    /// [`dsl::MeerkatMachineInput`] shape. This method performs the
1089    /// session lookup + DSL-lock-scoped apply. A typed transition error
1090    /// from the kernel is surfaced as a `String` so the dispatcher can
1091    /// map it onto `DispatchRefusal::ConsumerRefused`.
1092    pub async fn apply_routed_meerkat_input(
1093        &self,
1094        session_id: &SessionId,
1095        input: dsl::MeerkatMachineInput,
1096    ) -> Result<(), String> {
1097        Self::reject_raw_fieldless_runtime_internal_dsl_input(&input)?;
1098        let sessions = self.sessions.read().await;
1099        let entry = sessions.get(session_id).ok_or_else(|| {
1100            format!(
1101                "session `{session_id}` is not registered with this MeerkatMachine; \
1102                 cannot deliver routed input"
1103            )
1104        })?;
1105        let authority = Arc::clone(&entry.dsl_authority);
1106        drop(sessions);
1107
1108        let (previous_state, effects) = {
1109            let mut guard = authority
1110                .lock()
1111                .unwrap_or_else(std::sync::PoisonError::into_inner);
1112            let previous_state = Box::new(guard.state.clone());
1113            let effects = dsl::MeerkatMachineMutator::apply(&mut *guard, input)
1114                .map(|transition| transition.effects)
1115                .map_err(|err| format!("{err}"))?;
1116            (previous_state, effects)
1117        };
1118        if let Err(error) = self.dispatch_routed_signals_from_effects(&effects).await {
1119            self.restore_session_dsl_state(session_id, previous_state)
1120                .await;
1121            return Err(format!(
1122                "routed MeerkatMachine input committed effect dispatch failed: {error}"
1123            ));
1124        }
1125        Ok(())
1126    }
1127
1128    #[cfg(test)]
1129    pub(crate) async fn debug_shared_ingress_authorities(
1130        &self,
1131        session_id: &SessionId,
1132    ) -> Option<(
1133        Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1134        crate::driver::ephemeral::SharedIngressDslAuthority,
1135    )> {
1136        let sessions = self.sessions.read().await;
1137        let entry = sessions.get(session_id)?;
1138        let session_authority = Arc::clone(&entry.dsl_authority);
1139        let driver = entry.driver.lock().await;
1140        Some((session_authority, driver.shared_dsl_authority()))
1141    }
1142
1143    /// Create a driver entry for a session.
1144    fn make_driver(
1145        &self,
1146        runtime_id: LogicalRuntimeId,
1147        dsl_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
1148        initial_runtime_state: RuntimeState,
1149    ) -> DriverEntry {
1150        let control_projection = Arc::new(StdRwLock::new(
1151            crate::driver::ephemeral::RuntimeControlProjection {
1152                phase: initial_runtime_state,
1153                current_run_id: None,
1154                pre_run_phase: None,
1155            },
1156        ));
1157        match (&self.store, &self.blob_store) {
1158            (Some(store), Some(blob_store)) => {
1159                DriverEntry::Persistent(PersistentRuntimeDriver::new_with_control(
1160                    runtime_id,
1161                    store.clone(),
1162                    blob_store.clone(),
1163                    control_projection,
1164                    dsl_authority,
1165                ))
1166            }
1167            _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new_with_control_and_dsl(
1168                runtime_id,
1169                control_projection,
1170                dsl_authority,
1171            )),
1172        }
1173    }
1174
1175    /// Recover or create fresh ops lifecycle state for a session.
1176    ///
1177    /// This is the single canonical recovery seam. Both `register_session()`
1178    /// and `ensure_session_with_executor()`'s cold path call this to create
1179    /// epoch-local state. If a durable store is available, attempts to load
1180    /// the persisted snapshot; otherwise creates fresh state with a new epoch.
1181    async fn recover_or_create_ops_state(
1182        &self,
1183        session_id: &SessionId,
1184        runtime_id: &LogicalRuntimeId,
1185    ) -> (
1186        Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
1187        meerkat_core::RuntimeEpochId,
1188        Arc<meerkat_core::EpochCursorState>,
1189    ) {
1190        if let Some(ref store) = self.store {
1191            match store.load_ops_lifecycle(runtime_id).await {
1192                Ok(Some(snapshot)) => {
1193                    let recovered_epoch = snapshot.epoch_id.clone();
1194                    let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
1195                        snapshot.cursors.agent_applied_cursor,
1196                        snapshot.cursors.runtime_observed_seq,
1197                        snapshot.cursors.runtime_last_injected_seq,
1198                    );
1199                    let recovered_ops_count = snapshot.completion_entries.len();
1200                    let registry =
1201                        crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
1202                    tracing::info!(
1203                        %session_id,
1204                        %runtime_id,
1205                        epoch_id = %recovered_epoch,
1206                        recovered_ops = recovered_ops_count,
1207                        "ops lifecycle recovered from durable store (same epoch)"
1208                    );
1209                    return (
1210                        Arc::new(registry),
1211                        recovered_epoch,
1212                        Arc::new(recovered_cursors),
1213                    );
1214                }
1215                Ok(None) => {}
1216                Err(err) => {
1217                    tracing::warn!(
1218                        %session_id,
1219                        %runtime_id,
1220                        error = %err,
1221                        "failed to load ops lifecycle; epoch rotated"
1222                    );
1223                    return (
1224                        Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1225                        meerkat_core::RuntimeEpochId::new(),
1226                        Arc::new(meerkat_core::EpochCursorState::new()),
1227                    );
1228                }
1229            }
1230            tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
1231            (
1232                Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1233                meerkat_core::RuntimeEpochId::new(),
1234                Arc::new(meerkat_core::EpochCursorState::new()),
1235            )
1236        } else {
1237            (
1238                Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1239                meerkat_core::RuntimeEpochId::new(),
1240                Arc::new(meerkat_core::EpochCursorState::new()),
1241            )
1242        }
1243    }
1244
1245    #[allow(clippy::large_futures)]
1246    fn execute_meerkat_machine_command(
1247        &self,
1248        self_handle: Option<Arc<Self>>,
1249        command: MeerkatMachineCommand,
1250    ) -> MeerkatMachineCommandFuture<'_> {
1251        Box::pin(async move {
1252            match command {
1253                MeerkatMachineCommand::EnsureSessionWithExecutor { .. } => {
1254                    let self_handle = self_handle.ok_or_else(|| {
1255                        MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1256                            "EnsureSessionWithExecutor requires Arc<Self> machine handle".into(),
1257                        ))
1258                    })?;
1259                    self_handle
1260                        .execute_meerkat_machine_ensure_session_command(command)
1261                        .await
1262                        .map_err(Into::into)
1263                }
1264                MeerkatMachineCommand::RegisterSession { .. }
1265                | MeerkatMachineCommand::UnregisterSession { .. }
1266                | MeerkatMachineCommand::SetSilentIntents { .. }
1267                | MeerkatMachineCommand::CancelAfterBoundary { .. }
1268                | MeerkatMachineCommand::StopRuntimeExecutor { .. }
1269                | MeerkatMachineCommand::CommitServiceTurnTerminalReceipt { .. }
1270                | MeerkatMachineCommand::ContainsSession { .. }
1271                | MeerkatMachineCommand::SessionHasExecutor { .. }
1272                | MeerkatMachineCommand::SessionHasComms { .. }
1273                | MeerkatMachineCommand::OpsLifecycleRegistry { .. }
1274                | MeerkatMachineCommand::PrepareBindings { .. }
1275                | MeerkatMachineCommand::PrepareLocalSessionBindings { .. }
1276                | MeerkatMachineCommand::InputState { .. }
1277                | MeerkatMachineCommand::ListActiveInputs { .. }
1278                | MeerkatMachineCommand::ReconfigureSessionLlmIdentity { .. }
1279                | MeerkatMachineCommand::StagePersistentFilter { .. }
1280                | MeerkatMachineCommand::RequestDeferredTools { .. }
1281                | MeerkatMachineCommand::PublishCommittedVisibleSet { .. } => self
1282                    .execute_meerkat_machine_session_command(command)
1283                    .await
1284                    .map_err(Into::into),
1285                MeerkatMachineCommand::SetPeerIngressContext { .. }
1286                | MeerkatMachineCommand::NotifyDrainExited { .. } => {
1287                    let self_handle = self_handle.ok_or_else(|| {
1288                        MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1289                            "drain command requires Arc<Self> machine handle".into(),
1290                        ))
1291                    })?;
1292                    self_handle
1293                        .execute_meerkat_machine_drain_command(command)
1294                        .await
1295                        .map_err(Into::into)
1296                }
1297                MeerkatMachineCommand::AbortAll
1298                | MeerkatMachineCommand::Abort { .. }
1299                | MeerkatMachineCommand::Wait { .. } => self
1300                    .execute_meerkat_machine_drain_local_command(command)
1301                    .await
1302                    .map_err(Into::into),
1303                MeerkatMachineCommand::Ingest { .. }
1304                | MeerkatMachineCommand::PublishEvent { .. }
1305                | MeerkatMachineCommand::Retire { .. }
1306                | MeerkatMachineCommand::Recycle { .. }
1307                | MeerkatMachineCommand::Reset { .. }
1308                | MeerkatMachineCommand::Recover { .. }
1309                | MeerkatMachineCommand::Destroy { .. }
1310                | MeerkatMachineCommand::RuntimeState { .. }
1311                | MeerkatMachineCommand::RuntimeRealtimeAttachmentStatus { .. }
1312                | MeerkatMachineCommand::ResolvedSessionLlmCapabilities { .. }
1313                | MeerkatMachineCommand::RuntimeRealtimeChannelStatus { .. }
1314                | MeerkatMachineCommand::ConfigureModelRoutingBaseline { .. }
1315                | MeerkatMachineCommand::SessionModelRoutingStatus { .. }
1316                | MeerkatMachineCommand::RequestSwitchTurn { .. }
1317                | MeerkatMachineCommand::AdmitModelRoutingAssistantTurn { .. }
1318                | MeerkatMachineCommand::BeginImageOperation { .. }
1319                | MeerkatMachineCommand::ActivateImageOperationOverride { .. }
1320                | MeerkatMachineCommand::CompleteImageOperation { .. }
1321                | MeerkatMachineCommand::RestoreImageOperationOverride { .. }
1322                | MeerkatMachineCommand::LoadBoundaryReceipt { .. } => self
1323                    .execute_meerkat_machine_control_command(command)
1324                    .await
1325                    .map_err(Into::into),
1326                MeerkatMachineCommand::AcceptWithCompletion { .. }
1327                | MeerkatMachineCommand::AcceptWithoutWake { .. } => self
1328                    .execute_meerkat_machine_ingress_command(command)
1329                    .await
1330                    .map_err(Into::into),
1331                MeerkatMachineCommand::Prepare { .. }
1332                | MeerkatMachineCommand::Commit { .. }
1333                | MeerkatMachineCommand::Fail { .. } => self
1334                    .execute_meerkat_machine_legacy_run_command(command)
1335                    .await
1336                    .map_err(Into::into),
1337            }
1338        })
1339    }
1340
1341    /// Register a runtime driver for a session (no RuntimeLoop — inputs queue but
1342    /// nothing processes them automatically). Useful for tests and legacy mode.
1343    pub async fn register_session(&self, session_id: SessionId) {
1344        let _ = self
1345            .execute_meerkat_machine_command(
1346                None,
1347                MeerkatMachineCommand::RegisterSession { session_id },
1348            )
1349            .await;
1350    }
1351}
1352
1353#[cfg(test)]
1354#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1355#[path = "../meerkat_machine_tests.rs"]
1356mod tests;