Skip to main content

meerkat_runtime/meerkat_machine/
mod.rs

1//! MeerkatMachine — session-scoped execution kernel.
2//!
3//! One of two kernels in the Meerkat two-kernel architecture:
4//!
5//! - **MeerkatMachine** (this module) owns session-scoped runtime state:
6//!   input ingress, run lifecycle, completion waiters, async-ops registry,
7//!   comms drain, and tool visibility publication. All mutations flow through
8//!   one unified internal reducer, gated by TLA+-derived precondition guards.
9//!
10//! - **MobMachine** (`meerkat-mob`) owns mob-scoped orchestration: roster,
11//!   flow frames, delegation, and inter-member wiring.
12//!
13//! MeerkatMachine lives in `meerkat-runtime` so `meerkat-session` does not
14//! depend on runtime execution internals. When a session registers a
15//! `CoreExecutor`, a background `RuntimeLoop` task is spawned. Input acceptance
16//! queues through the driver; wake signals the loop; the loop dequeues, stages,
17//! applies via `CoreExecutor`, and marks inputs consumed.
18
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::sync::RwLock as StdRwLock;
24#[cfg(not(target_arch = "wasm32"))]
25use std::sync::{Mutex as StdMutex, OnceLock, Weak};
26
27use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
28use meerkat_core::lifecycle::{InputId, RunId};
29use meerkat_core::types::SessionId;
30use meerkat_core::{BlobId, BlobPayload, BlobRef, BlobStore, BlobStoreError};
31use meerkat_core::{
32    DeferredToolLoadAuthority, SessionToolVisibilityState, ToolFilter, ToolScopeApplyError,
33    ToolScopeRevision, ToolScopeStageError, ToolVisibilityOwner, ToolVisibilityWitness,
34};
35
36use crate::accept::AcceptOutcome;
37use crate::driver::ephemeral::EphemeralRuntimeDriver;
38use crate::driver::persistent::PersistentRuntimeDriver;
39use crate::identifiers::LogicalRuntimeId;
40use crate::input::Input;
41use crate::input_state::InputLifecycleState;
42use crate::meerkat_machine_types::{
43    HydratedSessionLlmState, MeerkatAdmittedInputSnapshot, MeerkatBindingSnapshot,
44    MeerkatCompletionWaiterSnapshot, MeerkatCompletionWaitersSnapshot, MeerkatControlSnapshot,
45    MeerkatCursorSnapshot, MeerkatDrainSnapshot, MeerkatDriverKind, MeerkatFormalStateProjection,
46    MeerkatInputsSnapshot, MeerkatLedgerSnapshot, MeerkatMachineCommand,
47    MeerkatMachineCommandError, MeerkatMachineCommandResult, MeerkatMachineRunFailure,
48    MeerkatMachineRunPrepared, MeerkatMachineSpineSnapshot, MeerkatOpsSnapshot,
49    SessionLlmCapabilityDelta, SessionLlmCapabilitySurface, SessionLlmCapabilitySurfaceStatus,
50    SessionLlmReconfigureHost, SessionLlmReconfigureReport, SessionLlmReconfigureRequest,
51    SessionToolVisibilityDelta,
52};
53use crate::runtime_state::RuntimeState;
54use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
55use crate::store::RuntimeStore;
56use crate::tokio;
57use crate::tokio::sync::{Mutex, RwLock, mpsc};
58#[cfg(test)]
59use crate::traits::RuntimeDriver;
60use crate::traits::{
61    DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
62    RuntimeControlPlaneError, RuntimeDriverError,
63};
64
65/// Error type for [`MeerkatMachine::prepare_bindings`].
66#[derive(Debug, thiserror::Error)]
67pub enum RuntimeBindingsError {
68    /// Session was not found after registration (should not happen in practice).
69    #[error("session {0} not found in runtime adapter after registration")]
70    SessionNotFound(SessionId),
71    /// Machine-owned binding preparation failed before bindings were published.
72    #[error("failed to prepare runtime bindings for session {0}: {1}")]
73    PrepareFailed(SessionId, String),
74}
75
76#[derive(Debug, Default)]
77struct UnavailableBlobStore;
78
79impl UnavailableBlobStore {
80    fn error() -> BlobStoreError {
81        BlobStoreError::Unsupported(
82            "persistent runtime constructed without blob store; blob-backed inputs require a BlobStore"
83                .to_string(),
84        )
85    }
86}
87
88#[cfg(not(target_arch = "wasm32"))]
89struct PersistentAuthAuthorityBundle {
90    store: StdMutex<Weak<dyn RuntimeStore>>,
91    auth_lease: Arc<crate::handles::RuntimeAuthLeaseHandle>,
92    oauth_flows: Arc<crate::handles::RuntimeOAuthFlowHandle>,
93}
94
95#[cfg(not(target_arch = "wasm32"))]
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97enum PersistentAuthAuthorityKey {
98    Durable(String),
99    Process(usize),
100}
101
102#[cfg(not(target_arch = "wasm32"))]
103static PERSISTENT_AUTH_AUTHORITIES: OnceLock<
104    StdMutex<HashMap<PersistentAuthAuthorityKey, Arc<PersistentAuthAuthorityBundle>>>,
105> = OnceLock::new();
106
107#[cfg(not(target_arch = "wasm32"))]
108fn runtime_store_identity(store: &Arc<dyn RuntimeStore>) -> PersistentAuthAuthorityKey {
109    store
110        .auth_authority_key()
111        .map(PersistentAuthAuthorityKey::Durable)
112        .unwrap_or_else(|| {
113            PersistentAuthAuthorityKey::Process(Arc::as_ptr(store).cast::<()>() as usize)
114        })
115}
116
117fn runtime_stores_share_authority(a: &Arc<dyn RuntimeStore>, b: &Arc<dyn RuntimeStore>) -> bool {
118    match (a.auth_authority_key(), b.auth_authority_key()) {
119        (Some(a), Some(b)) => a == b,
120        _ => Arc::ptr_eq(a, b),
121    }
122}
123
124#[cfg(not(target_arch = "wasm32"))]
125fn persistent_auth_authorities(
126    store: &Arc<dyn RuntimeStore>,
127) -> Arc<PersistentAuthAuthorityBundle> {
128    let key = runtime_store_identity(store);
129    let authorities = PERSISTENT_AUTH_AUTHORITIES.get_or_init(|| StdMutex::new(HashMap::new()));
130    let mut authorities = authorities
131        .lock()
132        .unwrap_or_else(std::sync::PoisonError::into_inner);
133    if let Some(existing) = authorities.get(&key) {
134        let stored_store_alive = existing
135            .store
136            .lock()
137            .unwrap_or_else(std::sync::PoisonError::into_inner)
138            .upgrade()
139            .is_some();
140        if matches!(key, PersistentAuthAuthorityKey::Durable(_)) || stored_store_alive {
141            existing.oauth_flows.bind_persistent_store(store);
142            *existing
143                .store
144                .lock()
145                .unwrap_or_else(std::sync::PoisonError::into_inner) = Arc::downgrade(store);
146            return Arc::clone(existing);
147        }
148    }
149    let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
150    let oauth_flows = Arc::new(
151        crate::handles::RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
152            std::time::Duration::from_secs(10 * 60),
153            Arc::clone(&auth_lease),
154            store,
155        ),
156    );
157    let bundle = Arc::new(PersistentAuthAuthorityBundle {
158        store: StdMutex::new(Arc::downgrade(store)),
159        auth_lease,
160        oauth_flows,
161    });
162    authorities.insert(key, Arc::clone(&bundle));
163    bundle
164}
165
166#[cfg(all(test, not(target_arch = "wasm32")))]
167pub(crate) fn clear_persistent_auth_authorities_for_test() {
168    if let Some(authorities) = PERSISTENT_AUTH_AUTHORITIES.get() {
169        authorities
170            .lock()
171            .unwrap_or_else(std::sync::PoisonError::into_inner)
172            .clear();
173    }
174}
175
176#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
177#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
178impl BlobStore for UnavailableBlobStore {
179    async fn put_image(&self, _media_type: &str, _data: &str) -> Result<BlobRef, BlobStoreError> {
180        Err(Self::error())
181    }
182
183    async fn get(&self, _blob_id: &BlobId) -> Result<BlobPayload, BlobStoreError> {
184        Err(Self::error())
185    }
186
187    async fn delete(&self, _blob_id: &BlobId) -> Result<(), BlobStoreError> {
188        Err(Self::error())
189    }
190
191    async fn exists(&self, _blob_id: &BlobId) -> Result<bool, BlobStoreError> {
192        Err(Self::error())
193    }
194
195    fn is_persistent(&self) -> bool {
196        false
197    }
198}
199
200#[cfg(not(target_arch = "wasm32"))]
201struct UnavailableOAuthFlowAuthority;
202
203#[cfg(not(target_arch = "wasm32"))]
204impl UnavailableOAuthFlowAuthority {
205    fn rejected(operation: &'static str) -> meerkat_auth_core::oauth_flow::OAuthFlowError {
206        meerkat_auth_core::oauth_flow::OAuthFlowError::LifecycleRejected {
207            operation,
208            detail: "custom auth lease handle does not provide a runtime OAuth flow authority"
209                .to_string(),
210        }
211    }
212}
213
214#[cfg(not(target_arch = "wasm32"))]
215impl meerkat_auth_core::oauth_flow::OAuthFlowAuthority for UnavailableOAuthFlowAuthority {
216    fn start(
217        &self,
218        _target: meerkat_core::AuthBindingRef,
219        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
220        _redirect_uri: String,
221        _pkce_verifier: String,
222    ) -> Result<String, meerkat_auth_core::oauth_flow::OAuthFlowError> {
223        Err(Self::rejected("admit_oauth_browser_flow"))
224    }
225
226    fn verify(
227        &self,
228        _state: &str,
229        _target: &meerkat_core::AuthBindingRef,
230        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
231        _redirect_uri: &str,
232    ) -> Result<
233        meerkat_auth_core::oauth_flow::OAuthFlowRecord,
234        meerkat_auth_core::oauth_flow::OAuthFlowError,
235    > {
236        Err(Self::rejected("verify_oauth_browser_flow"))
237    }
238
239    fn consume(
240        &self,
241        _state: &str,
242        _target: &meerkat_core::AuthBindingRef,
243        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
244        _redirect_uri: &str,
245    ) -> Result<
246        meerkat_auth_core::oauth_flow::OAuthFlowRecord,
247        meerkat_auth_core::oauth_flow::OAuthFlowError,
248    > {
249        Err(Self::rejected("consume_oauth_browser_flow"))
250    }
251
252    fn admit_device_code(
253        &self,
254        _target: meerkat_core::AuthBindingRef,
255        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
256        _device_code: String,
257        _expires_in: std::time::Duration,
258    ) -> Result<(), meerkat_auth_core::oauth_flow::OAuthFlowError> {
259        Err(Self::rejected("admit_oauth_device_flow"))
260    }
261
262    fn verify_device_code(
263        &self,
264        _device_code: &str,
265        _target: &meerkat_core::AuthBindingRef,
266        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
267    ) -> Result<
268        meerkat_auth_core::oauth_flow::OAuthDeviceFlowRecord,
269        meerkat_auth_core::oauth_flow::OAuthFlowError,
270    > {
271        Err(Self::rejected("verify_oauth_device_flow"))
272    }
273
274    fn begin_device_code_poll(
275        &self,
276        _device_code: &str,
277        _target: &meerkat_core::AuthBindingRef,
278        _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
279    ) -> Result<
280        meerkat_auth_core::oauth_flow::OAuthDevicePollLease,
281        meerkat_auth_core::oauth_flow::OAuthFlowError,
282    > {
283        Err(Self::rejected("begin_oauth_device_poll"))
284    }
285}
286
287#[cfg(not(target_arch = "wasm32"))]
288type MeerkatMachineCommandFuture<'a> = Pin<
289    Box<
290        dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>>
291            + Send
292            + 'a,
293    >,
294>;
295
296#[cfg(target_arch = "wasm32")]
297type MeerkatMachineCommandFuture<'a> = Pin<
298    Box<dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>> + 'a>,
299>;
300
301pub(crate) use driver::{
302    DriverEntry, SharedCompletionRegistry, SharedDriver, cancel_runtime_loop_run,
303    commit_runtime_loop_run, fail_machine_run, fail_runtime_loop_run,
304    machine_apply_run_return_projection, machine_batch_primitive_projections,
305    machine_batch_runtime_semantics, machine_begin_run, machine_commit_prepared_destroy,
306    machine_commit_service_turn_terminal_receipt, machine_executor_attach_projection,
307    machine_input_boundary, machine_prepare_bindings_projection, machine_prepare_destroy,
308    machine_recover_ephemeral_driver, machine_recover_persistent_driver,
309    machine_recycle_preserving_work, machine_reset, machine_retire,
310    machine_select_runtime_loop_batch, machine_stop_runtime, machine_unregister_session_projection,
311    prepare_runtime_loop_batch_start, rollback_runtime_loop_run_after_boundary_commit_failure,
312};
313
314pub(crate) mod driver;
315
316mod comms_drain;
317pub mod composition;
318mod dispatch_control;
319mod dispatch_drain;
320mod dispatch_ingress;
321mod dispatch_session;
322#[allow(unused_variables, dead_code, clippy::cmp_owned)]
323#[allow(clippy::assign_op_pattern)]
324pub mod dsl;
325pub(crate) mod dsl_authority;
326mod dsl_effects;
327mod llm_reconfigure;
328mod runtime_control;
329mod session_management;
330mod traits;
331mod visibility;
332
333pub use composition::{MeerkatCompositionSignalDispatcher, MeerkatConsumerSurface};
334
335pub use comms_drain::{
336    CommsDrainMode, CommsDrainPhase, DrainExitReason, PeerEndpointStageError, PeerIngressOwner,
337    SupervisorBinding, SupervisorBindingStageError,
338};
339pub(crate) use comms_drain::{CommsDrainSlot, abort_slot};
340pub(crate) use dsl_effects::{DslTransitionEffects, apply_dsl_transition_on_authority};
341pub(crate) use visibility::MachineToolVisibilityOwner;
342
343struct StagedSessionDslInput {
344    previous_state: Box<dsl::MeerkatMachineState>,
345    effects: DslTransitionEffects,
346}
347
348#[derive(Clone, Copy)]
349enum CommittedEffectDispatchFailure {
350    PreserveCommittedDslState,
351    RestorePreviousDslState,
352}
353
354/// Per-session state: driver + registration phase.
355struct RuntimeSessionEntry {
356    /// Canonical runtime control-plane identity for this registered session.
357    runtime_id: LogicalRuntimeId,
358    /// Per-session mutation gate.
359    ///
360    /// Serializes same-session mutating commands across the full
361    /// DSL-stage → driver-mutate → DSL-sync span. Without this gate,
362    /// two concurrent commands on the same session can interleave between
363    /// the DSL projection sync (which releases `sessions` lock) and the
364    /// driver mutation (which acquires `driver` lock independently).
365    ///
366    /// This is NOT a replacement for `sessions` RwLock or `driver` Mutex —
367    /// it is an additional serialization point that spans the entire
368    /// multi-step mutation window.
369    mutation_gate: Arc<Mutex<()>>,
370    /// Shared driver handle (accessed by both adapter methods and RuntimeLoop).
371    driver: SharedDriver,
372    /// Canonical coarse control projection for this session.
373    ///
374    /// The driver reads this to realize shell mechanics, but machine-facing
375    /// queries should publish from this shared cell rather than treating the
376    /// driver shell as the source of lifecycle truth.
377    control_projection: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
378    /// Shared async-operation lifecycle registry for this runtime/session.
379    ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
380    /// Runtime epoch identity — stable across rebuilds, rotated on reset/restart-without-recovery.
381    epoch_id: meerkat_core::RuntimeEpochId,
382    /// Shared consumer cursor state for the epoch.
383    cursor_state: Arc<meerkat_core::EpochCursorState>,
384    /// Completion waiters (accessed by accept_input_with_completion and RuntimeLoop).
385    completions: SharedCompletionRegistry,
386    /// Canonical durable visibility owner for this session.
387    tool_visibility_owner: Arc<MachineToolVisibilityOwner>,
388    /// Machine-owned current durable/live LLM identity for the registered session.
389    current_llm_identity: Option<meerkat_core::SessionLlmIdentity>,
390    /// Machine-owned current capability surface for the registered session.
391    current_capability_surface: Option<SessionLlmCapabilitySurface>,
392    /// Whether the machine has a resolved capability surface for the current identity.
393    capability_surface_status: SessionLlmCapabilitySurfaceStatus,
394    /// Registration phase — explicit type-level distinction between
395    /// "registered but inert" and "executor attached."
396    phase: RegistrationPhase,
397    /// Temporary live interrupt capability for prepared, session-owned turns
398    /// that run before the runtime loop attachment is published.
399    provisional_interrupt_handle:
400        Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
401    /// DSL authority for coarse lifecycle phase transitions.
402    /// Sync field — validates transitions, writes back phase.
403    ///
404    /// `Arc<std::sync::Mutex<_>>` so cross-crate handle impls
405    /// (`meerkat-runtime::handles::*`) can share the same underlying authority
406    /// from a sync context without awaiting the outer `sessions` tokio lock.
407    /// The Arc heap-allocates the authority's large expanded state (31 fields
408    /// including several Maps/Sets) so holding a reference to a
409    /// `RuntimeSessionEntry` does not bloat async future sizes.
410    dsl_authority: Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
411    /// Per-session comms drain lifecycle slot.
412    ///
413    /// Collapsed from the sibling `MeerkatMachine.comms_drain_slots:
414    /// RwLock<HashMap<SessionId, CommsDrainSlot>>` in wave-c C-H2 (F5 in
415    /// docs/wave-c-prep/state-scope-audit.md) — keeping the slot here
416    /// makes "session exists" a single HashMap insertion and eliminates
417    /// the class of bugs where the sibling map and the session map
418    /// could fall out of sync across a registration/unregistration
419    /// boundary.
420    drain_slot: CommsDrainSlot,
421}
422
423/// Capability bundle for an attached runtime loop.
424///
425/// Keep all loop-related handles together so "attached vs detached" cannot
426/// drift into partially-populated shell state.
427struct RuntimeLoopAttachment {
428    wake_tx: mpsc::Sender<()>,
429    effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
430    boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
431    interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
432    _loop_handle: tokio::task::JoinHandle<()>,
433}
434
435/// Explicit registration phase — the type-level distinction between
436/// "registered but inert," "attachment in progress," and "executor attached."
437///
438/// Replaces the implicit `Option<RuntimeLoopAttachment>` discriminant
439/// (Dogma §8: Option must not hide ownership uncertainty).
440enum RegistrationPhase {
441    /// Registered via `prepare_bindings()`. No executor — inputs queue
442    /// but are not processed until an executor attaches.
443    Queuing,
444    /// `ensure_session_with_executor()` is in progress — another task is
445    /// wiring the runtime loop. Concurrent callers must treat this as
446    /// "attachment pending" and not race a second loop spawn.
447    Attaching,
448    /// Executor attached with live channels. Inputs are processed
449    /// by the RuntimeLoop.
450    Active(RuntimeLoopAttachment),
451}
452
453impl RuntimeSessionEntry {
454    fn control_snapshot(&self) -> crate::driver::ephemeral::RuntimeControlProjection {
455        self.control_projection
456            .read()
457            .map(|guard| guard.clone())
458            .unwrap_or_else(|poisoned| {
459                tracing::error!("runtime control projection lock poisoned");
460                poisoned.into_inner().clone()
461            })
462    }
463
464    fn attachment_is_live(&self) -> bool {
465        match &self.phase {
466            RegistrationPhase::Active(attachment) => {
467                !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed()
468            }
469            RegistrationPhase::Queuing | RegistrationPhase::Attaching => false,
470        }
471    }
472
473    /// Returns `true` if an executor is attached with live channels, OR if
474    /// attachment is in progress (another task is wiring the loop).
475    /// Used by external-facing queries (`session_has_executor`) to prevent
476    /// concurrent callers from racing a second loop spawn.
477    fn has_attachment_or_attaching(&self) -> bool {
478        matches!(self.phase, RegistrationPhase::Attaching) || self.attachment_is_live()
479    }
480
481    /// Returns `true` only if the executor is fully attached with live channels.
482    /// Used by internal publish logic within `ensure_session_with_executor`
483    /// where the caller itself may have set `Attaching`.
484    fn has_live_attachment(&self) -> bool {
485        self.attachment_is_live()
486    }
487
488    fn attach_runtime_loop(
489        &mut self,
490        wake_tx: mpsc::Sender<()>,
491        effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
492        boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
493        interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
494        loop_handle: tokio::task::JoinHandle<()>,
495    ) {
496        self.provisional_interrupt_handle = None;
497        self.phase = RegistrationPhase::Active(RuntimeLoopAttachment {
498            wake_tx,
499            effect_tx,
500            boundary_handle,
501            interrupt_handle,
502            _loop_handle: loop_handle,
503        });
504    }
505
506    fn clear_dead_attachment(&mut self) -> bool {
507        if matches!(self.phase, RegistrationPhase::Active(_)) && !self.attachment_is_live() {
508            // Don't regress to Queuing if another task is mid-attach;
509            // Active with dead channels goes back to Queuing for retry.
510            self.phase = RegistrationPhase::Queuing;
511            return true;
512        }
513        false
514    }
515
516    fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
517        match &self.phase {
518            RegistrationPhase::Active(attachment)
519                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
520            {
521                Some(attachment.wake_tx.clone())
522            }
523            _ => None,
524        }
525    }
526
527    fn effect_sender(&self) -> Option<mpsc::Sender<crate::effect::RuntimeEffect>> {
528        match &self.phase {
529            RegistrationPhase::Active(attachment)
530                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
531            {
532                Some(attachment.effect_tx.clone())
533            }
534            _ => None,
535        }
536    }
537
538    fn boundary_handle(
539        &self,
540    ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>> {
541        match &self.phase {
542            RegistrationPhase::Active(attachment)
543                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
544            {
545                attachment.boundary_handle.clone()
546            }
547            _ => None,
548        }
549    }
550
551    fn interrupt_handle(
552        &self,
553    ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>> {
554        match &self.phase {
555            RegistrationPhase::Active(attachment)
556                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
557            {
558                attachment.interrupt_handle.clone()
559            }
560            _ => self.provisional_interrupt_handle.clone(),
561        }
562    }
563
564    fn install_provisional_interrupt_handle(
565        &mut self,
566        handle: Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>,
567    ) {
568        if !self.attachment_is_live() {
569            self.provisional_interrupt_handle = Some(handle);
570        }
571    }
572}
573
574impl MeerkatMachine {
575    /// Acquire the per-session mutation gate.
576    ///
577    /// Returns an `Arc<Mutex<()>>` that the caller must `.lock().await` and
578    /// hold across the full DSL-stage → driver-mutate → DSL-sync span.
579    /// Returns `None` if the session is not registered.
580    async fn session_mutation_gate(&self, session_id: &SessionId) -> Option<Arc<Mutex<()>>> {
581        let sessions = self.sessions.read().await;
582        sessions
583            .get(session_id)
584            .map(|entry| Arc::clone(&entry.mutation_gate))
585    }
586
587    async fn session_dsl_authority(
588        &self,
589        session_id: &SessionId,
590    ) -> Result<Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>, String> {
591        let sessions = self.sessions.read().await;
592        sessions
593            .get(session_id)
594            .map(|entry| Arc::clone(&entry.dsl_authority))
595            .ok_or_else(|| {
596                RuntimeDriverError::NotReady {
597                    state: RuntimeState::Destroyed,
598                }
599                .to_string()
600            })
601    }
602
603    fn preview_dsl_input_on_state(
604        state: &dsl::MeerkatMachineState,
605        input: dsl::MeerkatMachineInput,
606        context: &str,
607    ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
608        let mut preview = dsl::MeerkatMachineAuthority::from_state(state.clone());
609        dsl::MeerkatMachineMutator::apply(&mut preview, input)
610            .map(|transition| transition.effects)
611            .map_err(|err| dsl_authority::map_error(err, context))
612    }
613
614    async fn preview_session_dsl_input(
615        &self,
616        session_id: &SessionId,
617        input: dsl::MeerkatMachineInput,
618        context: &str,
619    ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
620        let authority = self.session_dsl_authority(session_id).await?;
621        let state = {
622            let authority = authority
623                .lock()
624                .unwrap_or_else(std::sync::PoisonError::into_inner);
625            authority.state.clone()
626        };
627        Self::preview_dsl_input_on_state(&state, input, context)
628    }
629
630    async fn session_dsl_state(
631        &self,
632        session_id: &SessionId,
633    ) -> Result<dsl::MeerkatMachineState, RuntimeControlPlaneError> {
634        let authority = self
635            .session_dsl_authority(session_id)
636            .await
637            .map_err(RuntimeControlPlaneError::Internal)?;
638        let authority = authority
639            .lock()
640            .unwrap_or_else(std::sync::PoisonError::into_inner);
641        Ok(authority.state.clone())
642    }
643
644    async fn commit_session_dsl_transition(
645        &self,
646        session_id: &SessionId,
647        staged: StagedSessionDslInput,
648        context: &str,
649    ) -> Result<(), String> {
650        self.commit_session_dsl_transition_with_dispatch_failure(
651            session_id,
652            staged,
653            context,
654            CommittedEffectDispatchFailure::RestorePreviousDslState,
655        )
656        .await
657    }
658
659    async fn commit_session_dsl_transition_preserving_committed_state(
660        &self,
661        session_id: &SessionId,
662        staged: StagedSessionDslInput,
663        context: &str,
664    ) -> Result<(), String> {
665        self.commit_session_dsl_transition_with_dispatch_failure(
666            session_id,
667            staged,
668            context,
669            CommittedEffectDispatchFailure::PreserveCommittedDslState,
670        )
671        .await
672    }
673
674    async fn commit_session_dsl_transition_with_dispatch_failure(
675        &self,
676        session_id: &SessionId,
677        staged: StagedSessionDslInput,
678        context: &str,
679        dispatch_failure: CommittedEffectDispatchFailure,
680    ) -> Result<(), String> {
681        if let Err(error) = self
682            .dispatch_routed_signals_from_effects(&staged.effects)
683            .await
684        {
685            match dispatch_failure {
686                CommittedEffectDispatchFailure::PreserveCommittedDslState => {}
687                CommittedEffectDispatchFailure::RestorePreviousDslState => {
688                    self.restore_session_dsl_state(session_id, staged.previous_state)
689                        .await;
690                }
691            }
692            return Err(format!(
693                "DSL authority ({context}): committed effect dispatch failed: {error}"
694            ));
695        }
696        Ok(())
697    }
698
699    async fn dispatch_routed_signals_from_effects(
700        &self,
701        effects: &[dsl::MeerkatMachineEffect],
702    ) -> Result<(), String> {
703        let dispatcher = {
704            self.composition_signal_dispatcher
705                .read()
706                .unwrap_or_else(std::sync::PoisonError::into_inner)
707                .clone()
708        };
709        let Some(dispatcher) = dispatcher else {
710            return Ok(());
711        };
712
713        for effect in effects {
714            if let Some(signal) = composition::lift_routed_signal(effect) {
715                composition::dispatch_routed_signal(&dispatcher, signal).await?;
716            }
717        }
718        Ok(())
719    }
720
721    async fn clear_dead_runtime_attachment(&self, session_id: &SessionId) {
722        let mut sessions = self.sessions.write().await;
723        if let Some(entry) = sessions.get_mut(session_id) {
724            entry.clear_dead_attachment();
725        }
726    }
727
728    async fn dispatch_cancel_after_boundary_runtime_effect(
729        &self,
730        session_id: &SessionId,
731        effect_tx: Option<mpsc::Sender<crate::effect::RuntimeEffect>>,
732        boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
733        projected_effect: crate::effect::ProjectedRuntimeEffect,
734        context: &str,
735    ) -> Result<(), RuntimeDriverError> {
736        let Some(effect_tx) = effect_tx else {
737            let state = self
738                .existing_session_runtime_state(session_id)
739                .await
740                .unwrap_or(RuntimeState::Destroyed);
741            return Err(RuntimeDriverError::NotReady { state });
742        };
743
744        let reason = projected_effect.reason().to_string();
745        if let Some(boundary_handle) = boundary_handle {
746            boundary_handle
747                .cancel_after_boundary(reason)
748                .await
749                .map_err(|err| {
750                    RuntimeDriverError::Internal(format!(
751                        "{context}: failed to apply live boundary cancel: {err}"
752                    ))
753                })?;
754        }
755
756        match effect_tx.send(projected_effect.into_effect()).await {
757            Ok(()) => Ok(()),
758            Err(_) => {
759                self.clear_dead_runtime_attachment(session_id).await;
760                Err(RuntimeDriverError::NotReady {
761                    state: RuntimeState::Idle,
762                })
763            }
764        }
765    }
766
767    async fn restore_session_dsl_state(
768        &self,
769        session_id: &SessionId,
770        state: Box<dsl::MeerkatMachineState>,
771    ) {
772        if let Ok(authority) = self.session_dsl_authority(session_id).await {
773            Self::restore_dsl_authority_state(&authority, state);
774        }
775    }
776
777    fn restore_dsl_authority_state(
778        authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
779        state: Box<dsl::MeerkatMachineState>,
780    ) {
781        let mut authority = authority
782            .lock()
783            .unwrap_or_else(std::sync::PoisonError::into_inner);
784        *authority = dsl::MeerkatMachineAuthority::from_state(*state);
785    }
786}
787
788/// Capability token proving a session-control mutation is routed through
789/// `MeerkatMachine` authority instead of a public store-only service path.
790#[derive(Debug, Clone, Copy)]
791pub struct MachineSessionControlAuthority {
792    _private: (),
793}
794
795/// Session-scoped execution kernel for the Meerkat runtime.
796///
797/// Owns per-session runtime state (driver, ops registry, completion waiters,
798/// comms drain, epoch bindings) and routes all internal mutations through one
799/// canonical command reducer, with smaller group handlers retained only as
800/// implementation detail helpers.
801pub struct MeerkatMachine {
802    /// Per-session entries.
803    sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
804    /// Runtime mode.
805    mode: RuntimeMode,
806    /// Optional RuntimeStore for persistent drivers.
807    store: Option<Arc<dyn RuntimeStore>>,
808    /// Blob store used by persistent drivers for durable input externalization.
809    blob_store: Option<Arc<dyn BlobStore>>,
810    /// Runtime-owned shell seam for live session LLM reconfiguration I/O.
811    llm_reconfigure_host: StdRwLock<Option<Arc<dyn SessionLlmReconfigureHost>>>,
812    /// AuthMachine lifecycle authority shared by runtime-backed auth
813    /// resolution/refresh paths and public auth-status surfaces.
814    auth_lease: StdRwLock<Arc<dyn meerkat_core::handles::AuthLeaseHandle>>,
815    /// OAuth login-flow lifecycle authority shared by public auth surfaces
816    /// that operate through this runtime adapter.
817    #[cfg(not(target_arch = "wasm32"))]
818    oauth_flows: StdRwLock<Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>>,
819    /// Canonical owner of "this session id is currently active" — replaces
820    /// the deleted process-global `SESSION_IDENTITY_CLAIMS` static in the
821    /// comms shell (dogma #2). Comms runtimes acquire a typed
822    /// [`meerkat_core::handles::SessionClaim`] through this handle and hold
823    /// it for their lifetime; the registry is scoped to this `MeerkatMachine`
824    /// instance, so tests / multi-runtime processes get clean isolation.
825    session_claims: Arc<crate::handles::RuntimeSessionClaimRegistry>,
826    /// Optional typed signal dispatcher for MeerkatMachine lifecycle
827    /// effects routed by `meerkat_mob_seam` into MobMachine observation
828    /// signals.
829    composition_signal_dispatcher:
830        StdRwLock<Option<composition::MeerkatCompositionSignalDispatcher>>,
831}
832
833impl MeerkatMachine {
834    /// Capability token for store-only session-control mutations routed
835    /// through this machine authority.
836    #[must_use]
837    pub fn session_control_authority(&self) -> MachineSessionControlAuthority {
838        MachineSessionControlAuthority { _private: () }
839    }
840
841    /// Whether this adapter shares the same runtime persistence authority as
842    /// another adapter. Runtime-backed composition surfaces use this to reject
843    /// mismatched adapters before visible terminal events can outrun the store
844    /// that owns their durable commit.
845    #[must_use]
846    pub fn shares_runtime_persistence_with(&self, other: &Self) -> bool {
847        match (&self.store, &other.store) {
848            (None, None) => true,
849            (Some(a), Some(b)) => runtime_stores_share_authority(a, b),
850            _ => false,
851        }
852    }
853
854    /// Whether this adapter owns the same runtime persistence authority as a
855    /// concrete runtime store handle.
856    #[must_use]
857    pub fn shares_runtime_store_authority(&self, store: &Arc<dyn RuntimeStore>) -> bool {
858        self.store
859            .as_ref()
860            .is_some_and(|machine_store| runtime_stores_share_authority(machine_store, store))
861    }
862
863    /// Whether this adapter has a runtime persistence store.
864    #[must_use]
865    pub fn has_runtime_persistence(&self) -> bool {
866        self.store.is_some()
867    }
868
869    fn normalize_destroyed_error(err: RuntimeDriverError) -> RuntimeDriverError {
870        match err {
871            RuntimeDriverError::NotReady {
872                state: RuntimeState::Destroyed,
873            } => RuntimeDriverError::Destroyed,
874            other => other,
875        }
876    }
877
878    /// Create an ephemeral adapter (all sessions use EphemeralRuntimeDriver).
879    pub fn ephemeral() -> Self {
880        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
881        #[cfg(not(target_arch = "wasm32"))]
882        let oauth_flows = Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
883            std::time::Duration::from_secs(10 * 60),
884            Arc::clone(&auth_lease),
885        ));
886        let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
887        Self {
888            sessions: RwLock::new(HashMap::new()),
889            mode: RuntimeMode::V9Compliant,
890            store: None,
891            blob_store: None,
892            llm_reconfigure_host: StdRwLock::new(None),
893            auth_lease: StdRwLock::new(auth_lease),
894            #[cfg(not(target_arch = "wasm32"))]
895            oauth_flows: StdRwLock::new(oauth_flows),
896            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
897            composition_signal_dispatcher: StdRwLock::new(None),
898        }
899    }
900
901    /// Create a persistent adapter with a RuntimeStore.
902    pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
903        #[cfg(not(target_arch = "wasm32"))]
904        let (auth_lease, oauth_flows) = {
905            let authorities = persistent_auth_authorities(&store);
906            (
907                Arc::clone(&authorities.auth_lease),
908                Arc::clone(&authorities.oauth_flows),
909            )
910        };
911        #[cfg(target_arch = "wasm32")]
912        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
913        let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
914        Self {
915            sessions: RwLock::new(HashMap::new()),
916            mode: RuntimeMode::V9Compliant,
917            store: Some(store),
918            blob_store: Some(blob_store),
919            llm_reconfigure_host: StdRwLock::new(None),
920            auth_lease: StdRwLock::new(auth_lease),
921            #[cfg(not(target_arch = "wasm32"))]
922            oauth_flows: StdRwLock::new(oauth_flows),
923            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
924            composition_signal_dispatcher: StdRwLock::new(None),
925        }
926    }
927
928    /// Create a persistent adapter with a RuntimeStore but no blob store.
929    ///
930    /// The driver remains persistent for session state. Blob-backed inputs fail
931    /// explicitly at the blob-store boundary until a real [`BlobStore`] is
932    /// supplied.
933    pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
934        #[cfg(not(target_arch = "wasm32"))]
935        let (auth_lease, oauth_flows) = {
936            let authorities = persistent_auth_authorities(&store);
937            (
938                Arc::clone(&authorities.auth_lease),
939                Arc::clone(&authorities.oauth_flows),
940            )
941        };
942        #[cfg(target_arch = "wasm32")]
943        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
944        let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
945        Self {
946            sessions: RwLock::new(HashMap::new()),
947            mode: RuntimeMode::V9Compliant,
948            store: Some(store),
949            blob_store: Some(Arc::new(UnavailableBlobStore)),
950            llm_reconfigure_host: StdRwLock::new(None),
951            auth_lease: StdRwLock::new(auth_lease),
952            #[cfg(not(target_arch = "wasm32"))]
953            oauth_flows: StdRwLock::new(oauth_flows),
954            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
955            composition_signal_dispatcher: StdRwLock::new(None),
956        }
957    }
958
959    /// Shared auth lifecycle handle used by all runtime-backed session
960    /// bindings created by this adapter.
961    pub fn auth_lease_handle(&self) -> Arc<dyn meerkat_core::handles::AuthLeaseHandle> {
962        Arc::clone(
963            &self
964                .auth_lease
965                .read()
966                .unwrap_or_else(std::sync::PoisonError::into_inner),
967        )
968    }
969
970    /// Install the auth lifecycle authority that public surfaces also read.
971    ///
972    /// Surfaces construct the adapter before all state fields are available, so
973    /// this setter lets them align the adapter's runtime-backed traffic with
974    /// the surface-visible status handle without creating a competing registry.
975    pub fn set_auth_lease_handle(&self, handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>) {
976        #[cfg(not(target_arch = "wasm32"))]
977        let handle = {
978            let erased: Arc<dyn std::any::Any + Send + Sync> = handle.clone();
979            if let Ok(runtime_handle) =
980                Arc::downcast::<crate::handles::RuntimeAuthLeaseHandle>(erased)
981            {
982                self.set_runtime_auth_lease_handle(runtime_handle);
983                return;
984            }
985            *self
986                .oauth_flows
987                .write()
988                .unwrap_or_else(std::sync::PoisonError::into_inner) =
989                Arc::new(UnavailableOAuthFlowAuthority);
990            handle
991        };
992        *self
993            .auth_lease
994            .write()
995            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
996    }
997
998    /// Install a custom credential lifecycle handle together with an explicit
999    /// OAuth login-flow authority.
1000    ///
1001    /// This is the opt-in seam for tests/embedders that intentionally want a
1002    /// non-runtime credential handle. The plain setter does not silently create
1003    /// a second hidden AuthMachine authority for OAuth flow membership.
1004    #[cfg(not(target_arch = "wasm32"))]
1005    pub fn set_auth_lease_handle_with_oauth_flow_authority(
1006        &self,
1007        handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>,
1008        oauth_flows: Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>,
1009    ) {
1010        *self
1011            .oauth_flows
1012            .write()
1013            .unwrap_or_else(std::sync::PoisonError::into_inner) = oauth_flows;
1014        *self
1015            .auth_lease
1016            .write()
1017            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1018    }
1019
1020    /// Install a runtime AuthMachine authority shared by auth leases and OAuth
1021    /// login-flow lifecycle transitions.
1022    pub fn set_runtime_auth_lease_handle(
1023        &self,
1024        handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
1025    ) {
1026        #[cfg(not(target_arch = "wasm32"))]
1027        {
1028            *self
1029                .oauth_flows
1030                .write()
1031                .unwrap_or_else(std::sync::PoisonError::into_inner) =
1032                Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
1033                    std::time::Duration::from_secs(10 * 60),
1034                    Arc::clone(&handle),
1035                ));
1036        }
1037        let handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = handle;
1038        *self
1039            .auth_lease
1040            .write()
1041            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1042    }
1043
1044    /// Shared OAuth login-flow authority used by all auth surfaces that are
1045    /// backed by this runtime adapter.
1046    #[cfg(not(target_arch = "wasm32"))]
1047    pub fn oauth_flow_authority(
1048        &self,
1049    ) -> Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority> {
1050        Arc::clone(
1051            &self
1052                .oauth_flows
1053                .read()
1054                .unwrap_or_else(std::sync::PoisonError::into_inner),
1055        )
1056    }
1057
1058    /// The canonical session-identity claim handle owned by this
1059    /// `MeerkatMachine`. Comms runtimes wired through this machine acquire
1060    /// their session-id claim through it; the registry is scoped to this
1061    /// machine instance so tests / parallel runtimes do not collide.
1062    pub fn session_claim_handle(&self) -> Arc<dyn meerkat_core::handles::SessionClaimHandle> {
1063        Arc::clone(&self.session_claims) as Arc<dyn meerkat_core::handles::SessionClaimHandle>
1064    }
1065
1066    /// Attach the typed composition signal dispatcher used for
1067    /// MeerkatMachine -> MobMachine lifecycle observation routes.
1068    pub fn set_composition_signal_dispatcher(
1069        &self,
1070        dispatcher: composition::MeerkatCompositionSignalDispatcher,
1071    ) {
1072        let mut slot = self
1073            .composition_signal_dispatcher
1074            .write()
1075            .unwrap_or_else(std::sync::PoisonError::into_inner);
1076        *slot = Some(dispatcher);
1077    }
1078
1079    /// Apply a routed-input variant delivered by the `meerkat_mob_seam`
1080    /// composition dispatcher against the session's shared DSL authority.
1081    ///
1082    /// The caller is
1083    /// [`crate::meerkat_machine::composition::MeerkatConsumerSurface::apply_routed_input`];
1084    /// it has already projected producer fields into the typed
1085    /// [`dsl::MeerkatMachineInput`] shape. This method performs the
1086    /// session lookup + DSL-lock-scoped apply. A typed transition error
1087    /// from the kernel is surfaced as a `String` so the dispatcher can
1088    /// map it onto `DispatchRefusal::ConsumerRefused`.
1089    pub async fn apply_routed_meerkat_input(
1090        &self,
1091        session_id: &SessionId,
1092        input: dsl::MeerkatMachineInput,
1093    ) -> Result<(), String> {
1094        Self::reject_raw_fieldless_runtime_internal_dsl_input(&input)?;
1095        let sessions = self.sessions.read().await;
1096        let entry = sessions.get(session_id).ok_or_else(|| {
1097            format!(
1098                "session `{session_id}` is not registered with this MeerkatMachine; \
1099                 cannot deliver routed input"
1100            )
1101        })?;
1102        let authority = Arc::clone(&entry.dsl_authority);
1103        drop(sessions);
1104
1105        let (previous_state, effects) = {
1106            let mut guard = authority
1107                .lock()
1108                .unwrap_or_else(std::sync::PoisonError::into_inner);
1109            let previous_state = Box::new(guard.state.clone());
1110            let effects = dsl::MeerkatMachineMutator::apply(&mut *guard, input)
1111                .map(|transition| transition.effects)
1112                .map_err(|err| format!("{err}"))?;
1113            (previous_state, effects)
1114        };
1115        if let Err(error) = self.dispatch_routed_signals_from_effects(&effects).await {
1116            self.restore_session_dsl_state(session_id, previous_state)
1117                .await;
1118            return Err(format!(
1119                "routed MeerkatMachine input committed effect dispatch failed: {error}"
1120            ));
1121        }
1122        Ok(())
1123    }
1124
1125    #[cfg(test)]
1126    pub(crate) async fn debug_shared_ingress_authorities(
1127        &self,
1128        session_id: &SessionId,
1129    ) -> Option<(
1130        Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1131        crate::driver::ephemeral::SharedIngressDslAuthority,
1132    )> {
1133        let sessions = self.sessions.read().await;
1134        let entry = sessions.get(session_id)?;
1135        let session_authority = Arc::clone(&entry.dsl_authority);
1136        let driver = entry.driver.lock().await;
1137        Some((session_authority, driver.shared_dsl_authority()))
1138    }
1139
1140    /// Create a driver entry for a session.
1141    fn make_driver(
1142        &self,
1143        runtime_id: LogicalRuntimeId,
1144        dsl_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
1145        initial_runtime_state: RuntimeState,
1146    ) -> DriverEntry {
1147        let control_projection = Arc::new(StdRwLock::new(
1148            crate::driver::ephemeral::RuntimeControlProjection {
1149                phase: initial_runtime_state,
1150                current_run_id: None,
1151                pre_run_phase: None,
1152            },
1153        ));
1154        match (&self.store, &self.blob_store) {
1155            (Some(store), Some(blob_store)) => {
1156                DriverEntry::Persistent(PersistentRuntimeDriver::new_with_control(
1157                    runtime_id,
1158                    store.clone(),
1159                    blob_store.clone(),
1160                    control_projection,
1161                    dsl_authority,
1162                ))
1163            }
1164            _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new_with_control_and_dsl(
1165                runtime_id,
1166                control_projection,
1167                dsl_authority,
1168            )),
1169        }
1170    }
1171
1172    /// Recover or create fresh ops lifecycle state for a session.
1173    ///
1174    /// This is the single canonical recovery seam. Both `register_session()`
1175    /// and `ensure_session_with_executor()`'s cold path call this to create
1176    /// epoch-local state. If a durable store is available, attempts to load
1177    /// the persisted snapshot; otherwise creates fresh state with a new epoch.
1178    async fn recover_or_create_ops_state(
1179        &self,
1180        session_id: &SessionId,
1181        runtime_id: &LogicalRuntimeId,
1182    ) -> (
1183        Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
1184        meerkat_core::RuntimeEpochId,
1185        Arc<meerkat_core::EpochCursorState>,
1186    ) {
1187        if let Some(ref store) = self.store {
1188            match store.load_ops_lifecycle(runtime_id).await {
1189                Ok(Some(snapshot)) => {
1190                    let recovered_epoch = snapshot.epoch_id.clone();
1191                    let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
1192                        snapshot.cursors.agent_applied_cursor,
1193                        snapshot.cursors.runtime_observed_seq,
1194                        snapshot.cursors.runtime_last_injected_seq,
1195                    );
1196                    let recovered_ops_count = snapshot.completion_entries.len();
1197                    let registry =
1198                        crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
1199                    tracing::info!(
1200                        %session_id,
1201                        %runtime_id,
1202                        epoch_id = %recovered_epoch,
1203                        recovered_ops = recovered_ops_count,
1204                        "ops lifecycle recovered from durable store (same epoch)"
1205                    );
1206                    return (
1207                        Arc::new(registry),
1208                        recovered_epoch,
1209                        Arc::new(recovered_cursors),
1210                    );
1211                }
1212                Ok(None) => {}
1213                Err(err) => {
1214                    tracing::warn!(
1215                        %session_id,
1216                        %runtime_id,
1217                        error = %err,
1218                        "failed to load ops lifecycle; epoch rotated"
1219                    );
1220                    return (
1221                        Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1222                        meerkat_core::RuntimeEpochId::new(),
1223                        Arc::new(meerkat_core::EpochCursorState::new()),
1224                    );
1225                }
1226            }
1227            tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
1228            (
1229                Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1230                meerkat_core::RuntimeEpochId::new(),
1231                Arc::new(meerkat_core::EpochCursorState::new()),
1232            )
1233        } else {
1234            (
1235                Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1236                meerkat_core::RuntimeEpochId::new(),
1237                Arc::new(meerkat_core::EpochCursorState::new()),
1238            )
1239        }
1240    }
1241
1242    #[allow(clippy::large_futures)]
1243    fn execute_meerkat_machine_command(
1244        &self,
1245        self_handle: Option<Arc<Self>>,
1246        command: MeerkatMachineCommand,
1247    ) -> MeerkatMachineCommandFuture<'_> {
1248        Box::pin(async move {
1249            match command {
1250                MeerkatMachineCommand::EnsureSessionWithExecutor { .. } => {
1251                    let self_handle = self_handle.ok_or_else(|| {
1252                        MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1253                            "EnsureSessionWithExecutor requires Arc<Self> machine handle".into(),
1254                        ))
1255                    })?;
1256                    self_handle
1257                        .execute_meerkat_machine_ensure_session_command(command)
1258                        .await
1259                        .map_err(Into::into)
1260                }
1261                MeerkatMachineCommand::RegisterSession { .. }
1262                | MeerkatMachineCommand::UnregisterSession { .. }
1263                | MeerkatMachineCommand::SetSilentIntents { .. }
1264                | MeerkatMachineCommand::CancelAfterBoundary { .. }
1265                | MeerkatMachineCommand::StopRuntimeExecutor { .. }
1266                | MeerkatMachineCommand::CommitServiceTurnTerminalReceipt { .. }
1267                | MeerkatMachineCommand::ContainsSession { .. }
1268                | MeerkatMachineCommand::SessionHasExecutor { .. }
1269                | MeerkatMachineCommand::SessionHasComms { .. }
1270                | MeerkatMachineCommand::OpsLifecycleRegistry { .. }
1271                | MeerkatMachineCommand::PrepareBindings { .. }
1272                | MeerkatMachineCommand::PrepareLocalSessionBindings { .. }
1273                | MeerkatMachineCommand::InputState { .. }
1274                | MeerkatMachineCommand::ListActiveInputs { .. }
1275                | MeerkatMachineCommand::ReconfigureSessionLlmIdentity { .. }
1276                | MeerkatMachineCommand::StagePersistentFilter { .. }
1277                | MeerkatMachineCommand::RequestDeferredTools { .. }
1278                | MeerkatMachineCommand::PublishCommittedVisibleSet { .. } => self
1279                    .execute_meerkat_machine_session_command(command)
1280                    .await
1281                    .map_err(Into::into),
1282                MeerkatMachineCommand::SetPeerIngressContext { .. }
1283                | MeerkatMachineCommand::NotifyDrainExited { .. } => {
1284                    let self_handle = self_handle.ok_or_else(|| {
1285                        MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1286                            "drain command requires Arc<Self> machine handle".into(),
1287                        ))
1288                    })?;
1289                    self_handle
1290                        .execute_meerkat_machine_drain_command(command)
1291                        .await
1292                        .map_err(Into::into)
1293                }
1294                MeerkatMachineCommand::AbortAll
1295                | MeerkatMachineCommand::Abort { .. }
1296                | MeerkatMachineCommand::Wait { .. } => self
1297                    .execute_meerkat_machine_drain_local_command(command)
1298                    .await
1299                    .map_err(Into::into),
1300                MeerkatMachineCommand::Ingest { .. }
1301                | MeerkatMachineCommand::PublishEvent { .. }
1302                | MeerkatMachineCommand::Retire { .. }
1303                | MeerkatMachineCommand::Recycle { .. }
1304                | MeerkatMachineCommand::Reset { .. }
1305                | MeerkatMachineCommand::Recover { .. }
1306                | MeerkatMachineCommand::Destroy { .. }
1307                | MeerkatMachineCommand::RuntimeState { .. }
1308                | MeerkatMachineCommand::ResolvedSessionLlmCapabilities { .. }
1309                | MeerkatMachineCommand::ConfigureModelRoutingBaseline { .. }
1310                | MeerkatMachineCommand::SessionModelRoutingStatus { .. }
1311                | MeerkatMachineCommand::RequestSwitchTurn { .. }
1312                | MeerkatMachineCommand::AdmitModelRoutingAssistantTurn { .. }
1313                | MeerkatMachineCommand::BeginImageOperation { .. }
1314                | MeerkatMachineCommand::ActivateImageOperationOverride { .. }
1315                | MeerkatMachineCommand::CompleteImageOperation { .. }
1316                | MeerkatMachineCommand::RestoreImageOperationOverride { .. }
1317                | MeerkatMachineCommand::LoadBoundaryReceipt { .. } => self
1318                    .execute_meerkat_machine_control_command(command)
1319                    .await
1320                    .map_err(Into::into),
1321                MeerkatMachineCommand::AcceptWithCompletion { .. }
1322                | MeerkatMachineCommand::AcceptWithoutWake { .. } => self
1323                    .execute_meerkat_machine_ingress_command(command)
1324                    .await
1325                    .map_err(Into::into),
1326                MeerkatMachineCommand::Prepare { .. }
1327                | MeerkatMachineCommand::Commit { .. }
1328                | MeerkatMachineCommand::Fail { .. } => self
1329                    .execute_meerkat_machine_legacy_run_command(command)
1330                    .await
1331                    .map_err(Into::into),
1332            }
1333        })
1334    }
1335
1336    /// Register a runtime driver for a session (no RuntimeLoop — inputs queue but
1337    /// nothing processes them automatically). Useful for tests and legacy mode.
1338    pub async fn register_session(&self, session_id: SessionId) {
1339        let _ = self
1340            .execute_meerkat_machine_command(
1341                None,
1342                MeerkatMachineCommand::RegisterSession { session_id },
1343            )
1344            .await;
1345    }
1346}
1347
1348#[cfg(test)]
1349#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1350#[path = "../meerkat_machine_tests.rs"]
1351mod tests;