Skip to main content

meerkat_runtime/meerkat_machine/
mod.rs

1//! MeerkatMachine — session-scoped execution kernel.
2//!
3//! One of two kernels in the Meerkat two-kernel architecture:
4//!
5//! - **MeerkatMachine** (this module) owns session-scoped runtime state:
6//!   input ingress, run lifecycle, completion waiters, async-ops registry,
7//!   comms drain, and tool visibility publication. All mutations flow through
8//!   one unified internal reducer, gated by TLA+-derived precondition guards.
9//!
10//! - **MobMachine** (`meerkat-mob`) owns mob-scoped orchestration: roster,
11//!   flow frames, delegation, and inter-member wiring.
12//!
13//! MeerkatMachine lives in `meerkat-runtime` so `meerkat-session` does not
14//! depend on runtime execution internals. When a session registers a
15//! `CoreExecutor`, a background `RuntimeLoop` task is spawned. Input acceptance
16//! queues through the driver; wake signals the loop; the loop dequeues, stages,
17//! applies via `CoreExecutor`, and marks inputs consumed.
18
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::sync::RwLock as StdRwLock;
24#[cfg(not(target_arch = "wasm32"))]
25use std::sync::{Mutex as StdMutex, OnceLock, Weak};
26
27use meerkat_core::lifecycle::{InputId, RunId};
28use meerkat_core::tool_scope::ToolScopeTurnOverlay;
29use meerkat_core::types::SessionId;
30use meerkat_core::{BlobId, BlobPayload, BlobRef, BlobStore, BlobStoreError};
31use meerkat_core::{
32    DeferredToolLoadAuthority, SessionToolVisibilityState, ToolFilter, ToolScopeApplyError,
33    ToolScopeRevision, ToolScopeStageError, ToolVisibilityOwner, ToolVisibilityWitness,
34};
35
36use crate::accept::AcceptOutcome;
37use crate::driver::ephemeral::EphemeralRuntimeDriver;
38use crate::driver::persistent::PersistentRuntimeDriver;
39use crate::identifiers::LogicalRuntimeId;
40use crate::input::Input;
41use crate::input_state::{
42    InputAbandonReason, InputLifecycleState, InputStateSeed, InputTerminalOutcome,
43};
44use crate::meerkat_machine_types::{
45    HydratedSessionLlmState, MeerkatAdmittedInputSnapshot, MeerkatArchiveSnapshot,
46    MeerkatBindingSnapshot, MeerkatCompletionWaiterSnapshot, MeerkatCompletionWaitersSnapshot,
47    MeerkatControlSnapshot, MeerkatCursorSnapshot, MeerkatDrainSnapshot, MeerkatDriverKind,
48    MeerkatFormalStateProjection, MeerkatInputsSnapshot, MeerkatLedgerSnapshot,
49    MeerkatMachineCommand, MeerkatMachineCommandError, MeerkatMachineCommandResult,
50    MeerkatMachineRunFailure, MeerkatMachineSpineSnapshot, MeerkatOpsSnapshot,
51    SessionLlmCapabilityDelta, SessionLlmCapabilitySurface, SessionLlmReconfigureHost,
52    SessionLlmReconfigureReport, SessionLlmReconfigureRequest, SessionToolVisibilityDelta,
53};
54use crate::runtime_state::RuntimeState;
55use crate::service_ext::SessionServiceRuntimeExt;
56use crate::store::RuntimeStore;
57use crate::tokio;
58use crate::tokio::sync::{Mutex, RwLock, mpsc};
59#[cfg(test)]
60use crate::traits::RuntimeDriver;
61use crate::traits::{
62    DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
63    RuntimeControlPlaneError, RuntimeDriverError,
64};
65
66#[allow(clippy::expect_used)]
67pub(crate) fn recover_projected_authority(
68    state: dsl::MeerkatMachineState,
69    context: &'static str,
70) -> dsl::MeerkatMachineAuthority {
71    dsl::MeerkatMachineAuthority::recover_from_state(state).expect(context)
72}
73
74struct ToolVisibilityOwnerGeneratedAuthorityBridgeToken;
75
76static TOOL_VISIBILITY_OWNER_GENERATED_AUTHORITY_BRIDGE_TOKEN:
77    ToolVisibilityOwnerGeneratedAuthorityBridgeToken =
78    ToolVisibilityOwnerGeneratedAuthorityBridgeToken;
79
80fn tool_visibility_owner_generated_authority_bridge_token()
81-> &'static (dyn std::any::Any + Send + Sync) {
82    &TOOL_VISIBILITY_OWNER_GENERATED_AUTHORITY_BRIDGE_TOKEN
83}
84
85#[doc(hidden)]
86#[allow(improper_ctypes_definitions, unsafe_code)]
87#[unsafe(export_name = concat!(
88    "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_tool_visibility_owner_",
89    env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
90))]
91pub extern "Rust" fn tool_visibility_owner_generated_authority_bridge_token_is_valid(
92    token: &(dyn std::any::Any + Send + Sync),
93) -> bool {
94    token.is::<ToolVisibilityOwnerGeneratedAuthorityBridgeToken>()
95}
96
97fn generated_tool_visibility_owner(
98    owner: Arc<dyn ToolVisibilityOwner>,
99) -> Result<meerkat_core::GeneratedToolVisibilityOwner, String> {
100    #[allow(improper_ctypes_definitions, unsafe_code)]
101    unsafe extern "Rust" {
102        #[link_name = concat!(
103            "__meerkat_core_runtime_generated_tool_visibility_owner_build_v1_",
104            env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
105        )]
106        fn core_runtime_generated_tool_visibility_owner_build(
107            token: &'static (dyn std::any::Any + Send + Sync),
108            owner: Arc<dyn ToolVisibilityOwner>,
109        ) -> Result<meerkat_core::GeneratedToolVisibilityOwner, String>;
110    }
111    #[allow(unsafe_code)]
112    unsafe {
113        core_runtime_generated_tool_visibility_owner_build(
114            tool_visibility_owner_generated_authority_bridge_token(),
115            owner,
116        )
117    }
118}
119
120/// Build a generated visibility owner for standalone facade sessions.
121///
122/// Standalone sessions do not have a runtime loop, but durable tool visibility
123/// is still a machine fact. This owner gives those sessions the same
124/// MeerkatMachine authority path used by runtime-backed sessions instead of
125/// falling back to a handwritten local mutator.
126pub fn standalone_tool_visibility_owner(
127    session_id: &SessionId,
128    current_identity: &meerkat_core::SessionLlmIdentity,
129    model_profile: Option<&meerkat_core::model_profile::ModelProfile>,
130    capability_base_filter: &ToolFilter,
131) -> Result<meerkat_core::GeneratedToolVisibilityOwner, String> {
132    let mut authority = dsl_authority::recover_authority_from_runtime_observation(
133        session_id,
134        RuntimeState::Idle,
135        None,
136        None,
137        None,
138        BTreeSet::new(),
139        None,
140        None,
141        None,
142    )
143    .map_err(|err| dsl_authority::map_error(err, "standalone visibility authority"))?;
144    let (current_capability_surface, current_capability_surface_status) = match model_profile {
145        Some(profile) => (
146            Some(dsl::SessionLlmCapabilitySurface {
147                supports_temperature: profile.supports_temperature,
148                supports_thinking: profile.supports_thinking,
149                supports_reasoning: profile.supports_reasoning,
150                inline_video: profile.inline_video,
151                vision: profile.vision,
152                image_input: profile.image_input,
153                image_tool_results: profile.image_tool_results,
154                supports_web_search: profile.supports_web_search,
155                image_generation: profile.image_generation,
156                realtime: profile.realtime,
157                call_timeout_secs: profile.call_timeout_secs,
158            }),
159            dsl::SessionLlmCapabilitySurfaceStatus::Resolved,
160        ),
161        None => (None, dsl::SessionLlmCapabilitySurfaceStatus::Unresolved),
162    };
163    dsl::MeerkatMachineMutator::apply(
164        &mut authority,
165        dsl::MeerkatMachineInput::HydrateSessionLlmState {
166            current_identity: dsl::SessionLlmIdentity::from_domain(current_identity),
167            current_capability_surface,
168            current_capability_surface_status,
169            current_capability_base_filter: dsl::ToolFilter::from_domain(capability_base_filter),
170        },
171    )
172    .map_err(|err| dsl_authority::map_error(err, "standalone visibility hydration"))?;
173    let authority = Arc::new(std::sync::Mutex::new(authority));
174    let owner = Arc::new(MachineToolVisibilityOwner::new());
175    owner.bind_dsl_authority(authority);
176    generated_tool_visibility_owner(owner as Arc<dyn ToolVisibilityOwner>)
177}
178
179/// Error type for [`MeerkatMachine::prepare_bindings`].
180#[derive(Debug, thiserror::Error)]
181pub enum RuntimeBindingsError {
182    /// Session was not found after registration (should not happen in practice).
183    #[error("session {0} not found in runtime adapter after registration")]
184    SessionNotFound(SessionId),
185    /// Machine-owned binding preparation failed before bindings were published.
186    #[error("failed to prepare runtime bindings for session {0}: {1}")]
187    PrepareFailed(SessionId, String),
188}
189
190/// Generated public projection for an input-state seed.
191#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192pub struct InputPublicStateProjection {
193    pub lifecycle_state: dsl::InputPublicLifecycleState,
194    pub terminal_outcome: Option<dsl::InputPublicTerminalOutcome>,
195}
196
197/// Runtime lifecycle/admission facts emitted by generated MeerkatMachine
198/// authority for a public runtime-state projection.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct RuntimeLifecycleFacts {
201    pub terminality: dsl::RuntimeLifecycleTerminality,
202    pub input_admission: dsl::RuntimeInputAdmission,
203    pub queue_admission: dsl::RuntimeQueueAdmission,
204    pub prepare_admission: dsl::RuntimePrepareAdmission,
205    pub ingress_admission: dsl::RuntimeIngressAdmission,
206}
207
208impl RuntimeLifecycleFacts {
209    #[must_use]
210    pub fn can_accept_input(self) -> bool {
211        self.input_admission == dsl::RuntimeInputAdmission::AcceptsInput
212    }
213
214    #[must_use]
215    pub fn can_process_queue(self) -> bool {
216        self.queue_admission == dsl::RuntimeQueueAdmission::ProcessesQueue
217    }
218
219    #[must_use]
220    pub fn can_prepare_run(self) -> bool {
221        self.prepare_admission == dsl::RuntimePrepareAdmission::Ready
222    }
223
224    #[must_use]
225    pub fn is_terminal(self) -> bool {
226        self.terminality == dsl::RuntimeLifecycleTerminality::Terminal
227    }
228}
229
230/// Runtime-loop queue-drain admission feedback emitted by generated
231/// MeerkatMachine authority.
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct RuntimeLoopQueueAdmissionPlan {
234    pub queue_admission: dsl::RuntimeQueueAdmission,
235    pub run_binding: dsl::RuntimeLoopRunBinding,
236}
237
238impl RuntimeLoopQueueAdmissionPlan {
239    #[must_use]
240    pub fn can_process_queue(self) -> bool {
241        self.queue_admission == dsl::RuntimeQueueAdmission::ProcessesQueue
242    }
243
244    #[must_use]
245    pub fn uses_prebound_run(self) -> bool {
246        self.run_binding == dsl::RuntimeLoopRunBinding::UsePrebound
247    }
248}
249
250/// Classify runtime lifecycle/admission facts through generated
251/// MeerkatMachine authority. Callers provide only the observed state variant;
252/// all behavior-affecting facts come back as generated typed feedback.
253pub fn classify_runtime_lifecycle_state(
254    state: RuntimeState,
255) -> Result<RuntimeLifecycleFacts, String> {
256    let observed_state = dsl_authority::observed_runtime_lifecycle_state(state);
257    let mut authority = projection_authority();
258    let transition = dsl::MeerkatMachineMutator::apply(
259        &mut authority,
260        dsl::MeerkatMachineInput::ClassifyRuntimeLifecycleState {
261            state: observed_state,
262        },
263    )
264    .map_err(|err| {
265        format!("MeerkatMachine rejected runtime lifecycle classification for {state}: {err}")
266    })?;
267
268    transition
269        .into_effects()
270        .into_iter()
271        .find_map(|effect| match effect {
272            dsl::MeerkatMachineEffect::RuntimeLifecycleStateClassified {
273                state,
274                terminality,
275                input_admission,
276                queue_admission,
277                prepare_admission,
278                ingress_admission,
279            } if state == observed_state => Some(RuntimeLifecycleFacts {
280                terminality,
281                input_admission,
282                queue_admission,
283                prepare_admission,
284                ingress_admission,
285            }),
286            _ => None,
287        })
288        .ok_or_else(|| {
289            format!("MeerkatMachine emitted no runtime lifecycle classification for {state}")
290        })
291}
292
293/// Classify the store-visible durable runtime lifecycle state through
294/// generated MeerkatMachine authority. The caller supplies only the live
295/// observed state; generated feedback decides the recovery projection.
296pub fn classify_runtime_lifecycle_durable_state(
297    state: RuntimeState,
298) -> Result<RuntimeState, String> {
299    let observed_state = dsl_authority::observed_runtime_lifecycle_state(state);
300    let mut authority = projection_authority();
301    let transition = dsl::MeerkatMachineMutator::apply(
302        &mut authority,
303        dsl::MeerkatMachineInput::ClassifyRuntimeLifecycleDurability {
304            state: observed_state,
305        },
306    )
307    .map_err(|err| {
308        format!(
309            "MeerkatMachine rejected runtime lifecycle durability classification for {state}: {err}"
310        )
311    })?;
312
313    transition
314        .into_effects()
315        .into_iter()
316        .find_map(|effect| match effect {
317            dsl::MeerkatMachineEffect::RuntimeLifecycleDurabilityClassified {
318                state,
319                durable_state,
320            } if state == observed_state => Some(
321                dsl_authority::runtime_state_from_observed_lifecycle_state(durable_state),
322            ),
323            _ => None,
324        })
325        .ok_or_else(|| {
326            format!(
327                "MeerkatMachine emitted no runtime lifecycle durability classification for {state}"
328            )
329        })
330}
331
332/// Classify runtime-loop queue admission through generated MeerkatMachine
333/// authority. The caller provides the observed runtime state and the structural
334/// fact that a current run id is bound; generated feedback decides whether the
335/// queue may drain and whether that bound run id must be reused.
336pub fn classify_runtime_loop_queue_admission(
337    state: RuntimeState,
338    current_run_bound: bool,
339) -> Result<RuntimeLoopQueueAdmissionPlan, String> {
340    let observed_state = dsl_authority::observed_runtime_lifecycle_state(state);
341    let mut authority = projection_authority();
342    let transition = dsl::MeerkatMachineMutator::apply(
343        &mut authority,
344        dsl::MeerkatMachineInput::ClassifyRuntimeLoopQueueAdmission {
345            state: observed_state,
346            current_run_bound,
347        },
348    )
349    .map_err(|err| {
350        format!(
351            "MeerkatMachine rejected runtime-loop queue admission for {state} with current_run_bound={current_run_bound}: {err}"
352        )
353    })?;
354
355    transition
356        .into_effects()
357        .into_iter()
358        .find_map(|effect| match effect {
359            dsl::MeerkatMachineEffect::RuntimeLoopQueueAdmissionClassified {
360                state,
361                current_run_bound: observed_current_run_bound,
362                queue_admission,
363                run_binding,
364            } if state == observed_state && observed_current_run_bound == current_run_bound => {
365                Some(RuntimeLoopQueueAdmissionPlan {
366                    queue_admission,
367                    run_binding,
368                })
369            }
370            _ => None,
371        })
372        .ok_or_else(|| {
373            format!(
374                "MeerkatMachine emitted no runtime-loop queue admission for {state} with current_run_bound={current_run_bound}"
375            )
376        })
377}
378
379/// Machine-owned arbitration verdict between the live DSL lifecycle phase and
380/// the durable control projection, emitted by generated MeerkatMachine
381/// authority. `publish_control` is the terminal-precedence decision (the
382/// published control projection supersedes the live DSL phase);
383/// `selected_raw_phase` is the chosen phase without the visibility rewrite;
384/// `visible_phase` is the externally-visible phase after the
385/// Running+pre_run(Retired)->Retired rewrite. The shell mirrors all three.
386#[derive(Debug, Clone, Copy, PartialEq, Eq)]
387pub struct VisibleRuntimePhasePlan {
388    pub publish_control: bool,
389    pub selected_raw_phase: RuntimeState,
390    pub visible_phase: RuntimeState,
391}
392
393/// Resolve the authoritative/visible runtime phase through generated
394/// MeerkatMachine authority. The shell feeds only the five pure
395/// [`RuntimeState`] observations it already holds; the machine owns BOTH the
396/// terminal-precedence `publish_control` policy AND the
397/// Running+pre_run(Retired)->Retired visibility rewrite. The shell mirrors the
398/// emitted verdict and re-derives nothing, failing closed if no verdict is
399/// emitted.
400pub fn resolve_visible_runtime_phase(
401    dsl_phase: RuntimeState,
402    dsl_pre_run_phase: Option<RuntimeState>,
403    control_phase: RuntimeState,
404    control_pre_run_phase: Option<RuntimeState>,
405    has_runtime_persistence: bool,
406) -> Result<VisibleRuntimePhasePlan, String> {
407    let observed_dsl = dsl_authority::observed_runtime_lifecycle_state(dsl_phase);
408    let observed_control = dsl_authority::observed_runtime_lifecycle_state(control_phase);
409    let observed_dsl_pre_run =
410        dsl_pre_run_phase.map(dsl_authority::observed_runtime_lifecycle_state);
411    let observed_control_pre_run =
412        control_pre_run_phase.map(dsl_authority::observed_runtime_lifecycle_state);
413    let mut authority = projection_authority();
414    let transition = dsl::MeerkatMachineMutator::apply(
415        &mut authority,
416        dsl::MeerkatMachineInput::ResolveVisibleRuntimePhase {
417            dsl_phase: observed_dsl,
418            dsl_pre_run_phase: observed_dsl_pre_run,
419            control_phase: observed_control,
420            control_pre_run_phase: observed_control_pre_run,
421            has_runtime_persistence,
422        },
423    )
424    .map_err(|err| {
425        format!(
426            "MeerkatMachine rejected visible runtime phase resolution \
427             (dsl={dsl_phase}, control={control_phase}, persistence={has_runtime_persistence}): {err}"
428        )
429    })?;
430
431    transition
432        .into_effects()
433        .into_iter()
434        .find_map(|effect| match effect {
435            dsl::MeerkatMachineEffect::VisibleRuntimePhaseResolved {
436                publish_control,
437                selected_raw_phase,
438                visible_phase,
439            } => Some(VisibleRuntimePhasePlan {
440                publish_control,
441                selected_raw_phase: dsl_authority::runtime_state_from_observed_lifecycle_state(
442                    selected_raw_phase,
443                ),
444                visible_phase: dsl_authority::runtime_state_from_observed_lifecycle_state(
445                    visible_phase,
446                ),
447            }),
448            _ => None,
449        })
450        .ok_or_else(|| {
451            format!(
452                "MeerkatMachine emitted no visible runtime phase resolution \
453                 (dsl={dsl_phase}, control={control_phase}, persistence={has_runtime_persistence})"
454            )
455        })
456}
457
458/// Resolve the public lifecycle class for a machine-derived input phase
459/// through generated MeerkatMachine authority.
460pub fn resolve_input_public_lifecycle_projection(
461    input_id: &InputId,
462    phase: InputLifecycleState,
463) -> Result<dsl::InputPublicLifecycleState, String> {
464    let input_key = input_id.to_string();
465    let mut authority = projection_authority();
466    let transition = dsl::MeerkatMachineMutator::apply(
467        &mut authority,
468        dsl::MeerkatMachineInput::ResolveInputPublicLifecycle {
469            input_id: input_key.clone(),
470            phase: observed_input_phase(phase),
471        },
472    )
473    .map_err(|err| {
474        format!("MeerkatMachine rejected public lifecycle projection for '{input_id}': {err}")
475    })?;
476
477    transition
478        .into_effects()
479        .into_iter()
480        .find_map(|effect| match effect {
481            dsl::MeerkatMachineEffect::InputPublicLifecycleResolved { input_id, phase }
482                if input_id == input_key =>
483            {
484                Some(phase)
485            }
486            _ => None,
487        })
488        .ok_or_else(|| {
489            format!("MeerkatMachine emitted no public lifecycle projection for '{input_id}'")
490        })
491}
492
493/// Resolve public lifecycle and terminal result classes for a machine-derived
494/// input-state seed through generated MeerkatMachine authority.
495pub fn resolve_input_public_state_projection(
496    input_id: &InputId,
497    seed: &InputStateSeed,
498) -> Result<InputPublicStateProjection, String> {
499    let lifecycle_state = resolve_input_public_lifecycle_projection(input_id, seed.phase)?;
500    let terminal_outcome = resolve_input_public_terminal_projection(input_id, seed)?;
501    Ok(InputPublicStateProjection {
502        lifecycle_state,
503        terminal_outcome,
504    })
505}
506
507pub(crate) fn input_seed_behavioral_terminality_via_authority(
508    input_id: &InputId,
509    seed: &InputStateSeed,
510) -> Result<bool, String> {
511    classify_input_behavioral_terminality(input_id, seed.phase, seed.terminal_outcome.as_ref())
512}
513
514pub(crate) fn input_phase_behavioral_terminality_via_authority(
515    input_id: &InputId,
516    phase: InputLifecycleState,
517    terminal_outcome: Option<InputTerminalOutcome>,
518) -> Result<bool, String> {
519    classify_input_behavioral_terminality(input_id, phase, terminal_outcome.as_ref())
520}
521
522/// Authorize DSL-owned input-state seed facts before they are written to a
523/// runtime store.
524pub(crate) fn authorize_stored_input_state_seed(
525    input_id: &InputId,
526    seed: &InputStateSeed,
527) -> Result<(), String> {
528    let input_key = input_id.to_string();
529    let (terminal_kind, superseded_by, aggregate_id, abandon_reason, abandon_attempt_count) =
530        input_seed_terminal_parts(seed)?;
531    let mut authority = projection_authority();
532    let transition = dsl::MeerkatMachineMutator::apply(
533        &mut authority,
534        dsl::MeerkatMachineInput::AuthorizeStoredInputStateSeed {
535            input_id: input_key.clone(),
536            phase: observed_input_phase(seed.phase),
537            terminal_kind,
538            superseded_by,
539            aggregate_id,
540            abandon_reason,
541            abandon_attempt_count,
542            attempt_count: u64::from(seed.attempt_count),
543            run_id: seed
544                .last_run_id
545                .as_ref()
546                .map(std::string::ToString::to_string),
547            boundary_sequence: seed.last_boundary_sequence,
548            admission_sequence: seed.admission_sequence,
549            recovery_lane: seed.recovery_lane.map(dsl::InputLane::from),
550        },
551    )
552    .map_err(|err| {
553        format!("MeerkatMachine rejected stored input-state seed for '{input_id}': {err}")
554    })?;
555
556    transition
557        .into_effects()
558        .into_iter()
559        .find_map(|effect| match effect {
560            dsl::MeerkatMachineEffect::StoredInputStateSeedAuthorized { input_id }
561                if input_id == input_key =>
562            {
563                Some(())
564            }
565            _ => None,
566        })
567        .ok_or_else(|| {
568            format!("MeerkatMachine emitted no stored input-state seed authority for '{input_id}'")
569        })
570}
571
572fn classify_input_behavioral_terminality(
573    input_id: &InputId,
574    phase: InputLifecycleState,
575    terminal_outcome: Option<&InputTerminalOutcome>,
576) -> Result<bool, String> {
577    let input_key = input_id.to_string();
578    let (terminal_kind, abandon_reason) = input_terminality_parts(terminal_outcome);
579    let mut authority = projection_authority();
580    let transition = dsl::MeerkatMachineMutator::apply(
581        &mut authority,
582        dsl::MeerkatMachineInput::ClassifyInputTerminality {
583            input_id: input_key.clone(),
584            phase: observed_input_phase(phase),
585            terminal_kind,
586            abandon_reason,
587        },
588    )
589    .map_err(|err| {
590        format!("MeerkatMachine rejected behavioral input terminality for '{input_id}': {err}")
591    })?;
592
593    let mut terminality = None;
594    for effect in transition.into_effects() {
595        match effect {
596            dsl::MeerkatMachineEffect::InputBehavioralTerminalityResolved {
597                input_id,
598                terminal,
599            } if input_id == input_key => terminality = Some(terminal),
600            other => {
601                return Err(format!(
602                    "MeerkatMachine emitted unexpected behavioral input terminality effect for '{input_id}': {other:?}"
603                ));
604            }
605        }
606    }
607    terminality.ok_or_else(|| {
608        format!("MeerkatMachine emitted no behavioral input terminality for '{input_id}'")
609    })
610}
611
612fn resolve_input_public_terminal_projection(
613    input_id: &InputId,
614    seed: &InputStateSeed,
615) -> Result<Option<dsl::InputPublicTerminalOutcome>, String> {
616    let input_key = input_id.to_string();
617    let (terminal_kind, abandon_reason) = input_terminality_parts(seed.terminal_outcome.as_ref());
618    let mut authority = projection_authority();
619    let transition = dsl::MeerkatMachineMutator::apply(
620        &mut authority,
621        dsl::MeerkatMachineInput::ResolveInputPublicTerminalOutcome {
622            input_id: input_key.clone(),
623            phase: observed_input_phase(seed.phase),
624            terminal_kind,
625            abandon_reason,
626        },
627    )
628    .map_err(|err| {
629        format!("MeerkatMachine rejected public terminal projection for '{input_id}': {err}")
630    })?;
631
632    transition
633        .into_effects()
634        .into_iter()
635        .find_map(|effect| match effect {
636            dsl::MeerkatMachineEffect::InputPublicTerminalOutcomeResolved {
637                input_id,
638                terminal_outcome,
639            } if input_id == input_key => Some(terminal_outcome),
640            _ => None,
641        })
642        .ok_or_else(|| {
643            format!("MeerkatMachine emitted no public terminal projection for '{input_id}'")
644        })
645}
646
647fn projection_authority() -> dsl::MeerkatMachineAuthority {
648    dsl_authority::new_initialized_authority("projection authority must initialize")
649}
650
651#[cfg(feature = "live")]
652fn live_unbound_rejection_authority() -> crate::driver::ephemeral::SharedIngressDslAuthority {
653    Arc::new(std::sync::Mutex::new(
654        dsl_authority::new_initialized_authority(
655            "live unbound rejection authority must initialize",
656        ),
657    ))
658}
659
660fn observed_input_phase(phase: InputLifecycleState) -> dsl::RecoveredInputObservedPhase {
661    match phase {
662        InputLifecycleState::Accepted => dsl::RecoveredInputObservedPhase::Accepted,
663        InputLifecycleState::Queued => dsl::RecoveredInputObservedPhase::Queued,
664        InputLifecycleState::Staged => dsl::RecoveredInputObservedPhase::Staged,
665        InputLifecycleState::Applied => dsl::RecoveredInputObservedPhase::Applied,
666        InputLifecycleState::AppliedPendingConsumption => {
667            dsl::RecoveredInputObservedPhase::AppliedPendingConsumption
668        }
669        InputLifecycleState::Consumed => dsl::RecoveredInputObservedPhase::Consumed,
670        InputLifecycleState::Superseded => dsl::RecoveredInputObservedPhase::Superseded,
671        InputLifecycleState::Coalesced => dsl::RecoveredInputObservedPhase::Coalesced,
672        InputLifecycleState::Abandoned => dsl::RecoveredInputObservedPhase::Abandoned,
673    }
674}
675
676type InputSeedTerminalParts = (
677    Option<dsl::InputTerminalKind>,
678    Option<String>,
679    Option<String>,
680    Option<dsl::InputAbandonReason>,
681    u64,
682);
683
684fn input_seed_terminal_parts(seed: &InputStateSeed) -> Result<InputSeedTerminalParts, String> {
685    match seed.terminal_outcome.as_ref() {
686        None => Ok((None, None, None, None, 0)),
687        Some(InputTerminalOutcome::Consumed) => {
688            Ok((Some(dsl::InputTerminalKind::Consumed), None, None, None, 0))
689        }
690        Some(InputTerminalOutcome::Superseded { superseded_by }) => Ok((
691            Some(dsl::InputTerminalKind::Superseded),
692            Some(superseded_by.to_string()),
693            None,
694            None,
695            0,
696        )),
697        Some(InputTerminalOutcome::Coalesced { aggregate_id }) => Ok((
698            Some(dsl::InputTerminalKind::Coalesced),
699            None,
700            Some(aggregate_id.to_string()),
701            None,
702            0,
703        )),
704        Some(InputTerminalOutcome::Abandoned { reason }) => {
705            let abandon_attempt_count = match reason {
706                InputAbandonReason::MaxAttemptsExhausted { attempts } => u64::from(*attempts),
707                _ => u64::from(seed.attempt_count),
708            };
709            Ok((
710                Some(dsl::InputTerminalKind::Abandoned),
711                None,
712                None,
713                input_terminality_parts(seed.terminal_outcome.as_ref()).1,
714                abandon_attempt_count,
715            ))
716        }
717    }
718}
719
720fn input_terminality_parts(
721    outcome: Option<&InputTerminalOutcome>,
722) -> (
723    Option<dsl::InputTerminalKind>,
724    Option<dsl::InputAbandonReason>,
725) {
726    match outcome {
727        None => (None, None),
728        Some(InputTerminalOutcome::Consumed) => (Some(dsl::InputTerminalKind::Consumed), None),
729        Some(InputTerminalOutcome::Superseded { .. }) => {
730            (Some(dsl::InputTerminalKind::Superseded), None)
731        }
732        Some(InputTerminalOutcome::Coalesced { .. }) => {
733            (Some(dsl::InputTerminalKind::Coalesced), None)
734        }
735        Some(InputTerminalOutcome::Abandoned { reason }) => (
736            Some(dsl::InputTerminalKind::Abandoned),
737            Some(match reason {
738                InputAbandonReason::Retired => dsl::InputAbandonReason::Retired,
739                InputAbandonReason::Reset => dsl::InputAbandonReason::Reset,
740                InputAbandonReason::Stopped => dsl::InputAbandonReason::Stopped,
741                InputAbandonReason::Destroyed => dsl::InputAbandonReason::Destroyed,
742                InputAbandonReason::Cancelled => dsl::InputAbandonReason::Cancelled,
743                InputAbandonReason::MaxAttemptsExhausted { .. } => {
744                    dsl::InputAbandonReason::MaxAttemptsExhausted
745                }
746            }),
747        ),
748    }
749}
750
751#[derive(Debug, Default)]
752struct UnavailableBlobStore;
753
754impl UnavailableBlobStore {
755    fn error() -> BlobStoreError {
756        BlobStoreError::Unsupported(
757            "persistent runtime constructed without blob store; blob-backed inputs require a BlobStore"
758                .to_string(),
759        )
760    }
761}
762
763#[cfg(not(target_arch = "wasm32"))]
764struct PersistentAuthAuthorityBundle {
765    store: StdMutex<Weak<dyn RuntimeStore>>,
766    auth_lease: Arc<crate::handles::RuntimeAuthLeaseHandle>,
767    oauth_flows: Arc<crate::handles::RuntimeOAuthFlowHandle>,
768}
769
770#[cfg(not(target_arch = "wasm32"))]
771#[derive(Debug, Clone, PartialEq, Eq, Hash)]
772enum PersistentAuthAuthorityKey {
773    Durable(String),
774    Process(usize),
775}
776
777#[cfg(not(target_arch = "wasm32"))]
778static PERSISTENT_AUTH_AUTHORITIES: OnceLock<
779    StdMutex<HashMap<PersistentAuthAuthorityKey, Arc<PersistentAuthAuthorityBundle>>>,
780> = OnceLock::new();
781
782#[cfg(not(target_arch = "wasm32"))]
783fn runtime_store_identity(store: &Arc<dyn RuntimeStore>) -> PersistentAuthAuthorityKey {
784    store
785        .auth_authority_key()
786        .map(PersistentAuthAuthorityKey::Durable)
787        .unwrap_or_else(|| {
788            PersistentAuthAuthorityKey::Process(Arc::as_ptr(store).cast::<()>() as usize)
789        })
790}
791
792fn runtime_stores_share_authority(a: &Arc<dyn RuntimeStore>, b: &Arc<dyn RuntimeStore>) -> bool {
793    match (a.auth_authority_key(), b.auth_authority_key()) {
794        (Some(a), Some(b)) => a == b,
795        _ => Arc::ptr_eq(a, b),
796    }
797}
798
799fn generated_runtime_auth_lease_handle(
800    handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
801) -> meerkat_core::handles::GeneratedAuthLeaseHandle {
802    #[allow(clippy::expect_used)]
803    crate::protocol_auth_lease_lifecycle_publication::generated_auth_lease_handle(handle)
804        .expect("runtime AuthLeaseHandle must be certified by generated AuthMachine authority")
805}
806
807#[cfg(not(target_arch = "wasm32"))]
808fn persistent_auth_authorities(
809    store: &Arc<dyn RuntimeStore>,
810) -> Arc<PersistentAuthAuthorityBundle> {
811    let key = runtime_store_identity(store);
812    let authorities = PERSISTENT_AUTH_AUTHORITIES.get_or_init(|| StdMutex::new(HashMap::new()));
813    let mut authorities = authorities
814        .lock()
815        .unwrap_or_else(std::sync::PoisonError::into_inner);
816    if let Some(existing) = authorities.get(&key) {
817        let stored_store_alive = existing
818            .store
819            .lock()
820            .unwrap_or_else(std::sync::PoisonError::into_inner)
821            .upgrade()
822            .is_some();
823        if matches!(key, PersistentAuthAuthorityKey::Durable(_)) || stored_store_alive {
824            existing.oauth_flows.bind_persistent_store(store);
825            *existing
826                .store
827                .lock()
828                .unwrap_or_else(std::sync::PoisonError::into_inner) = Arc::downgrade(store);
829            return Arc::clone(existing);
830        }
831    }
832    let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
833    let oauth_flows = Arc::new(
834        crate::handles::RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
835            std::time::Duration::from_secs(10 * 60),
836            Arc::clone(&auth_lease),
837            store,
838        ),
839    );
840    let bundle = Arc::new(PersistentAuthAuthorityBundle {
841        store: StdMutex::new(Arc::downgrade(store)),
842        auth_lease,
843        oauth_flows,
844    });
845    authorities.insert(key, Arc::clone(&bundle));
846    bundle
847}
848
849#[cfg(all(test, not(target_arch = "wasm32")))]
850pub(crate) fn clear_persistent_auth_authorities_for_test() {
851    if let Some(authorities) = PERSISTENT_AUTH_AUTHORITIES.get() {
852        authorities
853            .lock()
854            .unwrap_or_else(std::sync::PoisonError::into_inner)
855            .clear();
856    }
857}
858
859#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
860#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
861impl BlobStore for UnavailableBlobStore {
862    async fn put_image(&self, _media_type: &str, _data: &str) -> Result<BlobRef, BlobStoreError> {
863        Err(Self::error())
864    }
865
866    async fn get(&self, _blob_id: &BlobId) -> Result<BlobPayload, BlobStoreError> {
867        Err(Self::error())
868    }
869
870    async fn delete(&self, _blob_id: &BlobId) -> Result<(), BlobStoreError> {
871        Err(Self::error())
872    }
873
874    async fn exists(&self, _blob_id: &BlobId) -> Result<bool, BlobStoreError> {
875        Err(Self::error())
876    }
877
878    fn is_persistent(&self) -> bool {
879        false
880    }
881}
882
883#[cfg(not(target_arch = "wasm32"))]
884type MeerkatMachineCommandFuture<'a> = Pin<
885    Box<
886        dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>>
887            + Send
888            + 'a,
889    >,
890>;
891
892#[cfg(target_arch = "wasm32")]
893type MeerkatMachineCommandFuture<'a> = Pin<
894    Box<dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>> + 'a>,
895>;
896
897pub(crate) use driver::{
898    DriverEntry, SharedCompletionRegistry, SharedDriver, cancel_runtime_loop_run,
899    commit_runtime_loop_run, fail_machine_run, fail_runtime_loop_run,
900    machine_batch_primitive_projections, machine_batch_runtime_semantics,
901    machine_commit_prepared_destroy, machine_commit_service_turn_terminal_receipt,
902    machine_prepare_bindings_projection, machine_prepare_destroy, machine_recover_ephemeral_driver,
903    machine_recover_persistent_driver, machine_recycle_preserving_work, machine_reset,
904    machine_retire, machine_select_runtime_loop_batch, machine_stop_runtime,
905    prepare_runtime_loop_batch_start,
906};
907
908pub(crate) mod driver;
909
910mod comms_drain;
911pub mod composition;
912mod dispatch_control;
913mod dispatch_drain;
914mod dispatch_ingress;
915mod dispatch_session;
916#[allow(unused_variables, dead_code, clippy::cmp_owned)]
917#[allow(clippy::assign_op_pattern)]
918pub mod dsl;
919pub(crate) mod dsl_authority;
920mod dsl_effects;
921mod llm_reconfigure;
922mod runtime_control;
923mod session_management;
924mod traits;
925mod visibility;
926
927pub use composition::{MeerkatCompositionSignalDispatcher, MeerkatConsumerSurface};
928
929pub use comms_drain::{
930    CommsDrainMode, CommsDrainPhase, DrainExitReason, PeerEndpointStageError, PeerIngressOwner,
931    SupervisorBinding, SupervisorBindingStageError,
932};
933pub(crate) use comms_drain::{
934    CommsDrainSlot, SupervisorAuthorizeAdmission, SupervisorBindAdmission,
935    SupervisorBridgeCommandAdmission, abort_slot,
936};
937pub(crate) use dsl_effects::{DslTransitionEffects, apply_dsl_transition_on_authority};
938pub(crate) use visibility::MachineToolVisibilityOwner;
939
940struct StagedSessionDslInput {
941    previous_snapshot: dsl::MeerkatMachineAuthoritySnapshot,
942    committed_snapshot: dsl::MeerkatMachineAuthoritySnapshot,
943    effects: DslTransitionEffects,
944}
945
946#[derive(Clone, Copy)]
947enum CommittedEffectDispatchFailure {
948    PreserveCommittedDslState,
949}
950
951/// Per-session state: driver + generated authority binding + shell handles.
952struct RuntimeSessionEntry {
953    /// Canonical runtime control-plane identity for this registered session.
954    runtime_id: LogicalRuntimeId,
955    /// Per-session mutation gate.
956    ///
957    /// Serializes same-session mutating commands across the full
958    /// DSL-stage → driver-mutate → DSL-sync span. Without this gate,
959    /// two concurrent commands on the same session can interleave between
960    /// the DSL projection sync (which releases `sessions` lock) and the
961    /// driver mutation (which acquires `driver` lock independently).
962    ///
963    /// This is NOT a replacement for `sessions` RwLock or `driver` Mutex —
964    /// it is an additional serialization point that spans the entire
965    /// multi-step mutation window.
966    mutation_gate: Arc<Mutex<()>>,
967    /// Shared driver handle (accessed by both adapter methods and RuntimeLoop).
968    driver: SharedDriver,
969    /// Canonical coarse control projection for this session.
970    ///
971    /// The driver reads this to realize shell mechanics, but machine-facing
972    /// queries should publish from this shared cell rather than treating the
973    /// driver shell as the source of lifecycle truth.
974    control_projection: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
975    /// Shared async-operation lifecycle registry for this runtime/session.
976    ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
977    /// Runtime epoch identity — stable across rebuilds, rotated on reset/restart-without-recovery.
978    epoch_id: meerkat_core::RuntimeEpochId,
979    /// Mechanical close gate for handles minted from this session entry.
980    ///
981    /// The DSL still owns runtime terminality; this gate only invalidates cloned
982    /// cross-crate handles after the entry is torn down.
983    handle_teardown_gate: Arc<crate::handles::HandleTeardownGate>,
984    /// Shared consumer cursor state for the epoch.
985    cursor_state: Arc<meerkat_core::EpochCursorState>,
986    /// Completion waiters (accessed by accept_input_with_completion and RuntimeLoop).
987    completions: SharedCompletionRegistry,
988    /// Canonical durable visibility owner for this session.
989    tool_visibility_owner: Arc<MachineToolVisibilityOwner>,
990    /// Runtime-loop channel publication slot.
991    ///
992    /// This is mechanical shell state only. The generated `MeerkatMachine`
993    /// `registration_phase` is the semantic executor registration authority.
994    attachment_slot: RuntimeLoopAttachmentSlot,
995    /// Temporary live interrupt capability for prepared, session-owned turns
996    /// that run before the runtime loop attachment is published.
997    provisional_interrupt_handle:
998        Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
999    /// DSL authority for coarse lifecycle phase transitions.
1000    /// Sync field — validates transitions, writes back phase.
1001    ///
1002    /// `Arc<std::sync::Mutex<_>>` so cross-crate handle impls
1003    /// (`meerkat-runtime::handles::*`) can share the same underlying authority
1004    /// from a sync context without awaiting the outer `sessions` tokio lock.
1005    /// The Arc heap-allocates the authority's large expanded state (31 fields
1006    /// including several Maps/Sets) so holding a reference to a
1007    /// `RuntimeSessionEntry` does not bloat async future sizes.
1008    dsl_authority: Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1009    /// Per-session comms drain lifecycle slot.
1010    ///
1011    /// Collapsed from the sibling `MeerkatMachine.comms_drain_slots:
1012    /// RwLock<HashMap<SessionId, CommsDrainSlot>>` in wave-c C-H2 (F5 in
1013    /// docs/wave-c-prep/state-scope-audit.md) — keeping the slot here
1014    /// makes "session exists" a single HashMap insertion and eliminates
1015    /// the class of bugs where the sibling map and the session map
1016    /// could fall out of sync across a registration/unregistration
1017    /// boundary.
1018    drain_slot: CommsDrainSlot,
1019}
1020
1021/// Capability bundle for an attached runtime loop.
1022///
1023/// Keep all loop-related handles together so "attached vs detached" cannot
1024/// drift into partially-populated shell state.
1025struct RuntimeLoopAttachment {
1026    wake_tx: mpsc::Sender<()>,
1027    effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
1028    boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
1029    interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
1030    loop_handle: tokio::task::JoinHandle<()>,
1031}
1032
1033/// Mechanical runtime-loop channel slot.
1034enum RuntimeLoopAttachmentSlot {
1035    Empty,
1036    Attached(RuntimeLoopAttachment),
1037}
1038
1039impl RuntimeSessionEntry {
1040    fn control_snapshot(&self) -> crate::driver::ephemeral::RuntimeControlProjection {
1041        self.control_projection
1042            .read()
1043            .map(|guard| guard.clone())
1044            .unwrap_or_else(|poisoned| {
1045                tracing::error!("runtime control projection lock poisoned");
1046                poisoned.into_inner().clone()
1047            })
1048    }
1049
1050    fn attachment_is_live(&self) -> bool {
1051        match &self.attachment_slot {
1052            RuntimeLoopAttachmentSlot::Attached(attachment) => {
1053                !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed()
1054            }
1055            RuntimeLoopAttachmentSlot::Empty => false,
1056        }
1057    }
1058
1059    fn generated_executor_registration_active(&self) -> bool {
1060        let authority = self
1061            .dsl_authority
1062            .lock()
1063            .unwrap_or_else(std::sync::PoisonError::into_inner);
1064        matches!(
1065            authority.state().registration_phase,
1066            dsl::RegistrationPhase::Active
1067        )
1068    }
1069
1070    fn close_handle_teardown_gate(&self) {
1071        let _guard = self
1072            .dsl_authority
1073            .lock()
1074            .unwrap_or_else(std::sync::PoisonError::into_inner);
1075        self.handle_teardown_gate.close();
1076    }
1077
1078    /// True while the runtime-loop executor registration is `Active` *or*
1079    /// `Draining`. The drain window (`BeginUnregisterSession` → final
1080    /// `UnregisterSession`) keeps the session registered so the in-flight run
1081    /// can still commit and resolve its completion waiters; the runtime-loop
1082    /// driver-authority gate must therefore admit `Draining`, while the
1083    /// registration *claim* check stays `Active`-only (no new attachment may be
1084    /// granted inside the drain window).
1085    fn generated_executor_registration_active_or_draining(&self) -> bool {
1086        let authority = self
1087            .dsl_authority
1088            .lock()
1089            .unwrap_or_else(std::sync::PoisonError::into_inner);
1090        matches!(
1091            authority.state().registration_phase,
1092            dsl::RegistrationPhase::Active | dsl::RegistrationPhase::Draining
1093        )
1094    }
1095
1096    fn generated_stop_deferred(&self) -> bool {
1097        self.dsl_authority
1098            .lock()
1099            .unwrap_or_else(std::sync::PoisonError::into_inner)
1100            .state()
1101            .runtime_stop_deferred
1102    }
1103
1104    fn stage_generated_executor_registration_claim(
1105        &self,
1106        session_id: &SessionId,
1107    ) -> Result<StagedSessionDslInput, String> {
1108        let staged = MeerkatMachine::stage_dsl_transition_on_authority(
1109            &self.dsl_authority,
1110            dsl::MeerkatMachineInput::EnsureSessionWithExecutor {
1111                session_id: dsl::SessionId::from_domain(session_id),
1112            },
1113            "EnsureSessionWithExecutor",
1114        )?;
1115        if self.generated_executor_registration_active() {
1116            Ok(staged)
1117        } else {
1118            let mut authority = self
1119                .dsl_authority
1120                .lock()
1121                .unwrap_or_else(std::sync::PoisonError::into_inner);
1122            authority.restore_snapshot(staged.previous_snapshot);
1123            Err("generated MeerkatMachine did not grant active executor registration".into())
1124        }
1125    }
1126
1127    fn stage_generated_executor_exit_observation(&self) -> Result<StagedSessionDslInput, String> {
1128        MeerkatMachine::stage_runtime_internal_dsl_transition_on_authority(
1129            &self.dsl_authority,
1130            crate::meerkat_machine_types::MeerkatMachineFieldlessRuntimeInternalInput::RuntimeExecutorExited,
1131        )
1132    }
1133
1134    /// Returns `true` only if the executor is fully attached with live channels.
1135    /// Used by internal publish logic within `ensure_session_with_executor`.
1136    fn has_live_attachment(&self) -> bool {
1137        self.attachment_is_live()
1138    }
1139
1140    fn attach_runtime_loop(
1141        &mut self,
1142        wake_tx: mpsc::Sender<()>,
1143        effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
1144        boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
1145        interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
1146        loop_handle: tokio::task::JoinHandle<()>,
1147    ) {
1148        self.provisional_interrupt_handle = None;
1149        self.attachment_slot = RuntimeLoopAttachmentSlot::Attached(RuntimeLoopAttachment {
1150            wake_tx,
1151            effect_tx,
1152            boundary_handle,
1153            interrupt_handle,
1154            loop_handle,
1155        });
1156    }
1157
1158    /// Detach the runtime-loop channels, returning the loop's `JoinHandle` so a
1159    /// caller can await its quiescence.
1160    ///
1161    /// Dropping the returned `wake_tx`/`effect_tx` (held inside the attachment)
1162    /// closes the loop's receivers, which drives the loop through its canonical
1163    /// `StopRuntimeExecutor` + `RuntimeExecutorExited` exit. The slot is left
1164    /// `Empty`. Returns `None` when no loop is attached.
1165    fn take_loop_join_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
1166        match std::mem::replace(&mut self.attachment_slot, RuntimeLoopAttachmentSlot::Empty) {
1167            RuntimeLoopAttachmentSlot::Attached(attachment) => Some(attachment.loop_handle),
1168            RuntimeLoopAttachmentSlot::Empty => None,
1169        }
1170    }
1171
1172    fn clear_dead_attachment(&mut self) -> bool {
1173        if matches!(self.attachment_slot, RuntimeLoopAttachmentSlot::Attached(_))
1174            && !self.attachment_is_live()
1175        {
1176            self.attachment_slot = RuntimeLoopAttachmentSlot::Empty;
1177            return true;
1178        }
1179        false
1180    }
1181
1182    fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
1183        match &self.attachment_slot {
1184            RuntimeLoopAttachmentSlot::Attached(attachment)
1185                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1186            {
1187                Some(attachment.wake_tx.clone())
1188            }
1189            _ => None,
1190        }
1191    }
1192
1193    fn effect_sender(&self) -> Option<mpsc::Sender<crate::effect::RuntimeEffect>> {
1194        match &self.attachment_slot {
1195            RuntimeLoopAttachmentSlot::Attached(attachment)
1196                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1197            {
1198                Some(attachment.effect_tx.clone())
1199            }
1200            _ => None,
1201        }
1202    }
1203
1204    fn boundary_handle(
1205        &self,
1206    ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>> {
1207        match &self.attachment_slot {
1208            RuntimeLoopAttachmentSlot::Attached(attachment)
1209                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1210            {
1211                attachment.boundary_handle.clone()
1212            }
1213            _ => None,
1214        }
1215    }
1216
1217    fn interrupt_handle(
1218        &self,
1219    ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>> {
1220        match &self.attachment_slot {
1221            RuntimeLoopAttachmentSlot::Attached(attachment)
1222                if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1223            {
1224                attachment.interrupt_handle.clone()
1225            }
1226            _ => self.provisional_interrupt_handle.clone(),
1227        }
1228    }
1229
1230    fn install_provisional_interrupt_handle(
1231        &mut self,
1232        handle: Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>,
1233    ) {
1234        if !self.attachment_is_live() {
1235            self.provisional_interrupt_handle = Some(handle);
1236        }
1237    }
1238}
1239
1240impl MeerkatMachine {
1241    /// Acquire the per-session mutation gate.
1242    ///
1243    /// Returns an `Arc<Mutex<()>>` that the caller must `.lock().await` and
1244    /// hold across the full DSL-stage → driver-mutate → DSL-sync span.
1245    /// Returns `None` if the session is not registered.
1246    async fn session_mutation_gate(&self, session_id: &SessionId) -> Option<Arc<Mutex<()>>> {
1247        let sessions = self.sessions.read().await;
1248        sessions
1249            .get(session_id)
1250            .map(|entry| Arc::clone(&entry.mutation_gate))
1251    }
1252
1253    async fn lock_current_session_mutation_gate(
1254        &self,
1255        session_id: &SessionId,
1256    ) -> Option<crate::tokio::sync::OwnedMutexGuard<()>> {
1257        loop {
1258            let gate = self.session_mutation_gate(session_id).await?;
1259            let gate_guard = Arc::clone(&gate).lock_owned().await;
1260            let sessions = self.sessions.read().await;
1261            let entry = sessions.get(session_id)?;
1262            if Arc::ptr_eq(&entry.mutation_gate, &gate) {
1263                return Some(gate_guard);
1264            }
1265        }
1266    }
1267
1268    pub(crate) async fn lock_current_session_driver_gate(
1269        &self,
1270        session_id: &SessionId,
1271        driver: &SharedDriver,
1272    ) -> Result<crate::tokio::sync::OwnedMutexGuard<()>, RuntimeDriverError> {
1273        let gate_guard = self
1274            .lock_current_session_mutation_gate(session_id)
1275            .await
1276            .ok_or(RuntimeDriverError::NotReady {
1277                state: RuntimeState::Destroyed,
1278            })?;
1279        {
1280            let sessions = self.sessions.read().await;
1281            let entry = sessions
1282                .get(session_id)
1283                .ok_or(RuntimeDriverError::NotReady {
1284                    state: RuntimeState::Destroyed,
1285                })?;
1286            if !Arc::ptr_eq(&entry.driver, driver) {
1287                return Err(RuntimeDriverError::NotReady {
1288                    state: RuntimeState::Destroyed,
1289                });
1290            }
1291        }
1292        Ok(gate_guard)
1293    }
1294
1295    pub(crate) async fn lock_current_runtime_loop_driver_authority(
1296        &self,
1297        session_id: &SessionId,
1298        driver: &SharedDriver,
1299    ) -> Result<crate::tokio::sync::OwnedMutexGuard<()>, RuntimeDriverError> {
1300        let gate_guard = self
1301            .lock_current_session_driver_gate(session_id, driver)
1302            .await?;
1303        {
1304            let sessions = self.sessions.read().await;
1305            let entry = sessions
1306                .get(session_id)
1307                .ok_or(RuntimeDriverError::NotReady {
1308                    state: RuntimeState::Destroyed,
1309                })?;
1310            if !entry.generated_executor_registration_active_or_draining() {
1311                return Err(RuntimeDriverError::ValidationFailed {
1312                    reason:
1313                        "generated MeerkatMachine has no active runtime-loop executor registration"
1314                            .to_string(),
1315                });
1316            }
1317        }
1318        Ok(gate_guard)
1319    }
1320
1321    async fn current_session_driver_with_authority(
1322        &self,
1323        session_id: &SessionId,
1324    ) -> Result<(SharedDriver, crate::tokio::sync::OwnedMutexGuard<()>), RuntimeDriverError> {
1325        let gate_guard = self
1326            .lock_current_session_mutation_gate(session_id)
1327            .await
1328            .ok_or(RuntimeDriverError::NotReady {
1329                state: RuntimeState::Destroyed,
1330            })?;
1331        let driver = {
1332            let sessions = self.sessions.read().await;
1333            sessions
1334                .get(session_id)
1335                .ok_or(RuntimeDriverError::NotReady {
1336                    state: RuntimeState::Destroyed,
1337                })?
1338                .driver
1339                .clone()
1340        };
1341        Ok((driver, gate_guard))
1342    }
1343
1344    async fn session_dsl_authority(
1345        &self,
1346        session_id: &SessionId,
1347    ) -> Result<Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>, String> {
1348        let sessions = self.sessions.read().await;
1349        sessions
1350            .get(session_id)
1351            .map(|entry| Arc::clone(&entry.dsl_authority))
1352            .ok_or_else(|| {
1353                RuntimeDriverError::NotReady {
1354                    state: RuntimeState::Destroyed,
1355                }
1356                .to_string()
1357            })
1358    }
1359
1360    #[cfg(any(test, feature = "test-support"))]
1361    async fn session_handle_teardown_gate(
1362        &self,
1363        session_id: &SessionId,
1364    ) -> Result<Arc<crate::handles::HandleTeardownGate>, String> {
1365        let sessions = self.sessions.read().await;
1366        sessions
1367            .get(session_id)
1368            .map(|entry| Arc::clone(&entry.handle_teardown_gate))
1369            .ok_or_else(|| {
1370                RuntimeDriverError::NotReady {
1371                    state: RuntimeState::Destroyed,
1372                }
1373                .to_string()
1374            })
1375    }
1376
1377    /// Test-support: install the session's generated peer-comms handle (and its
1378    /// owner token) onto a comms runtime, so the runtime accepts generated trust
1379    /// mutations minted from THIS adapter's session dsl authority. Mirrors what
1380    /// `prepare_session_runtime_bindings` does in production via
1381    /// `SessionRuntimeBindings`, for tests/harnesses that construct external
1382    /// member runtimes directly (e.g. the external-TCP production-drain smoke
1383    /// lane). Gated behind `test-support` so it never reaches a production build.
1384    #[cfg(any(test, feature = "test-support"))]
1385    pub async fn test_install_session_peer_comms_handle_on_runtime(
1386        &self,
1387        session_id: &SessionId,
1388        runtime: &(dyn meerkat_core::handles::PeerCommsInstallTarget + '_),
1389    ) -> Result<(), String> {
1390        let dsl = self
1391            .session_dsl_authority(session_id)
1392            .await
1393            .map_err(|error| format!("session dsl authority unavailable: {error}"))?;
1394        let teardown_gate = self
1395            .session_handle_teardown_gate(session_id)
1396            .await
1397            .map_err(|error| format!("session handle teardown gate unavailable: {error}"))?;
1398        let handle = std::sync::Arc::new(
1399            crate::handles::HandleDslAuthority::from_shared_with_teardown_gate(dsl, teardown_gate),
1400        );
1401        crate::handles::RuntimePeerCommsHandle::install_generated_on(handle, runtime)
1402    }
1403
1404    fn preview_dsl_input_on_state(
1405        state: &dsl::MeerkatMachineState,
1406        input: dsl::MeerkatMachineInput,
1407        context: &str,
1408    ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
1409        let mut preview = dsl::MeerkatMachineAuthority::recover_from_state(state.clone())
1410            .map_err(|err| dsl_authority::map_error(err, context))?;
1411        dsl::MeerkatMachineMutator::apply(&mut preview, input)
1412            .map(|transition| transition.into_effects())
1413            .map_err(|err| dsl_authority::map_error(err, context))
1414    }
1415
1416    async fn preview_session_dsl_input(
1417        &self,
1418        session_id: &SessionId,
1419        input: dsl::MeerkatMachineInput,
1420        context: &str,
1421    ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
1422        let authority = self.session_dsl_authority(session_id).await?;
1423        let state = {
1424            let authority = authority
1425                .lock()
1426                .unwrap_or_else(std::sync::PoisonError::into_inner);
1427            authority.state().clone()
1428        };
1429        Self::preview_dsl_input_on_state(&state, input, context)
1430    }
1431
1432    async fn session_dsl_state(
1433        &self,
1434        session_id: &SessionId,
1435    ) -> Result<dsl::MeerkatMachineState, RuntimeControlPlaneError> {
1436        let authority = self
1437            .session_dsl_authority(session_id)
1438            .await
1439            .map_err(RuntimeControlPlaneError::Internal)?;
1440        let authority = authority
1441            .lock()
1442            .unwrap_or_else(std::sync::PoisonError::into_inner);
1443        Ok(authority.state().clone())
1444    }
1445
1446    async fn commit_session_dsl_transition(
1447        &self,
1448        session_id: &SessionId,
1449        staged: StagedSessionDslInput,
1450        context: &str,
1451    ) -> Result<(), String> {
1452        self.commit_session_dsl_transition_with_dispatch_failure(
1453            session_id,
1454            staged,
1455            context,
1456            CommittedEffectDispatchFailure::PreserveCommittedDslState,
1457        )
1458        .await
1459    }
1460
1461    async fn commit_session_dsl_transition_preserving_committed_state(
1462        &self,
1463        session_id: &SessionId,
1464        staged: StagedSessionDslInput,
1465        context: &str,
1466    ) -> Result<(), String> {
1467        self.commit_session_dsl_transition_with_dispatch_failure(
1468            session_id,
1469            staged,
1470            context,
1471            CommittedEffectDispatchFailure::PreserveCommittedDslState,
1472        )
1473        .await
1474    }
1475
1476    async fn commit_session_dsl_transition_with_dispatch_failure(
1477        &self,
1478        _session_id: &SessionId,
1479        staged: StagedSessionDslInput,
1480        context: &str,
1481        dispatch_failure: CommittedEffectDispatchFailure,
1482    ) -> Result<(), String> {
1483        if let Err(error) = self
1484            .dispatch_routed_signals_from_effects(&staged.effects)
1485            .await
1486        {
1487            let CommittedEffectDispatchFailure::PreserveCommittedDslState = dispatch_failure;
1488            return Err(format!(
1489                "DSL authority ({context}): committed effect dispatch failed: {error}"
1490            ));
1491        }
1492        Ok(())
1493    }
1494
1495    async fn dispatch_routed_signals_from_effects(
1496        &self,
1497        effects: &[dsl::MeerkatMachineEffect],
1498    ) -> Result<(), String> {
1499        let dispatcher = {
1500            self.composition_signal_dispatcher
1501                .read()
1502                .unwrap_or_else(std::sync::PoisonError::into_inner)
1503                .clone()
1504        };
1505        let Some(dispatcher) = dispatcher else {
1506            return Ok(());
1507        };
1508
1509        for effect in effects {
1510            if let Some(signal) = composition::lift_routed_signal(effect) {
1511                composition::dispatch_routed_signal(&dispatcher, signal).await?;
1512            }
1513        }
1514        Ok(())
1515    }
1516
1517    async fn clear_dead_runtime_attachment(&self, session_id: &SessionId) {
1518        let mut sessions = self.sessions.write().await;
1519        if let Some(entry) = sessions.get_mut(session_id) {
1520            let cleared = entry.clear_dead_attachment();
1521            if cleared && let Err(error) = entry.stage_generated_executor_exit_observation() {
1522                tracing::warn!(
1523                    %session_id,
1524                    error = %error,
1525                    "generated MeerkatMachine rejected executor-exit observation while clearing dead attachment"
1526                );
1527            }
1528        }
1529    }
1530
1531    async fn dispatch_cancel_after_boundary_runtime_effect(
1532        &self,
1533        session_id: &SessionId,
1534        effect_tx: Option<mpsc::Sender<crate::effect::RuntimeEffect>>,
1535        boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
1536        projected_effect: crate::effect::ProjectedRuntimeEffect,
1537        context: &str,
1538    ) -> Result<(), RuntimeDriverError> {
1539        let Some(effect_tx) = effect_tx else {
1540            let state = self
1541                .existing_session_runtime_state(session_id)
1542                .await
1543                .unwrap_or(RuntimeState::Destroyed);
1544            return Err(RuntimeDriverError::NotReady { state });
1545        };
1546
1547        let reason = projected_effect.reason().to_string();
1548        if let Some(boundary_handle) = boundary_handle {
1549            boundary_handle
1550                .cancel_after_boundary(reason)
1551                .await
1552                .map_err(|err| {
1553                    RuntimeDriverError::Internal(format!(
1554                        "{context}: failed to apply live boundary cancel: {err}"
1555                    ))
1556                })?;
1557        }
1558
1559        match effect_tx.send(projected_effect.into_effect()).await {
1560            Ok(()) => Ok(()),
1561            Err(_) => {
1562                self.clear_dead_runtime_attachment(session_id).await;
1563                Err(RuntimeDriverError::NotReady {
1564                    state: RuntimeState::Idle,
1565                })
1566            }
1567        }
1568    }
1569
1570    async fn restore_session_dsl_state(
1571        &self,
1572        session_id: &SessionId,
1573        snapshot: dsl::MeerkatMachineAuthoritySnapshot,
1574    ) {
1575        if let Ok(authority) = self.session_dsl_authority(session_id).await {
1576            Self::restore_dsl_authority_snapshot(&authority, snapshot);
1577        }
1578    }
1579
1580    async fn restore_session_dsl_state_if_current(
1581        &self,
1582        session_id: &SessionId,
1583        expected_current: dsl::MeerkatMachineAuthoritySnapshot,
1584        restore: dsl::MeerkatMachineAuthoritySnapshot,
1585    ) -> bool {
1586        let Ok(authority) = self.session_dsl_authority(session_id).await else {
1587            return false;
1588        };
1589        Self::restore_dsl_authority_snapshot_if_current(&authority, expected_current, restore)
1590    }
1591
1592    fn restore_dsl_authority_snapshot(
1593        authority: &Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1594        snapshot: dsl::MeerkatMachineAuthoritySnapshot,
1595    ) {
1596        let mut authority = authority
1597            .lock()
1598            .unwrap_or_else(std::sync::PoisonError::into_inner);
1599        authority.restore_snapshot(snapshot);
1600    }
1601
1602    fn restore_dsl_authority_snapshot_if_current(
1603        authority: &Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1604        expected_current: dsl::MeerkatMachineAuthoritySnapshot,
1605        restore: dsl::MeerkatMachineAuthoritySnapshot,
1606    ) -> bool {
1607        let mut authority = authority
1608            .lock()
1609            .unwrap_or_else(std::sync::PoisonError::into_inner);
1610        let current = authority.snapshot();
1611        if current.state() == expected_current.state() {
1612            authority.restore_snapshot(restore);
1613            true
1614        } else {
1615            false
1616        }
1617    }
1618}
1619
1620/// Capability token proving a session-control mutation is routed through
1621/// `MeerkatMachine` authority instead of a public store-only service path.
1622#[derive(Debug, Clone, Copy)]
1623pub struct MachineSessionControlAuthority {
1624    _private: (),
1625}
1626
1627#[cfg(feature = "live")]
1628struct LiveOpenAdmissionGeneratedAuthorityBridgeToken;
1629
1630#[cfg(feature = "live")]
1631struct LiveCloseResultGeneratedAuthorityBridgeToken;
1632
1633#[cfg(feature = "live")]
1634struct LiveChannelStatusResultGeneratedAuthorityBridgeToken;
1635
1636#[cfg(feature = "live")]
1637static LIVE_OPEN_ADMISSION_GENERATED_AUTHORITY_BRIDGE_TOKEN:
1638    LiveOpenAdmissionGeneratedAuthorityBridgeToken = LiveOpenAdmissionGeneratedAuthorityBridgeToken;
1639
1640#[cfg(feature = "live")]
1641static LIVE_CLOSE_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN:
1642    LiveCloseResultGeneratedAuthorityBridgeToken = LiveCloseResultGeneratedAuthorityBridgeToken;
1643
1644#[cfg(feature = "live")]
1645static LIVE_CHANNEL_STATUS_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN:
1646    LiveChannelStatusResultGeneratedAuthorityBridgeToken =
1647    LiveChannelStatusResultGeneratedAuthorityBridgeToken;
1648
1649#[cfg(feature = "live")]
1650fn live_open_admission_generated_authority_bridge_token()
1651-> &'static (dyn std::any::Any + Send + Sync) {
1652    &LIVE_OPEN_ADMISSION_GENERATED_AUTHORITY_BRIDGE_TOKEN
1653}
1654
1655#[cfg(feature = "live")]
1656fn live_close_result_generated_authority_bridge_token() -> &'static (dyn std::any::Any + Send + Sync)
1657{
1658    &LIVE_CLOSE_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN
1659}
1660
1661#[cfg(feature = "live")]
1662fn live_channel_status_result_generated_authority_bridge_token()
1663-> &'static (dyn std::any::Any + Send + Sync) {
1664    &LIVE_CHANNEL_STATUS_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN
1665}
1666
1667#[cfg(feature = "live")]
1668#[doc(hidden)]
1669#[allow(improper_ctypes_definitions, unsafe_code)]
1670#[unsafe(export_name = concat!(
1671    "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_live_open_admission_",
1672    env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1673))]
1674pub extern "Rust" fn live_open_admission_generated_authority_bridge_token_is_valid(
1675    token: &(dyn std::any::Any + Send + Sync),
1676) -> bool {
1677    token.is::<LiveOpenAdmissionGeneratedAuthorityBridgeToken>()
1678}
1679
1680#[cfg(feature = "live")]
1681#[doc(hidden)]
1682#[allow(improper_ctypes_definitions, unsafe_code)]
1683#[unsafe(export_name = concat!(
1684    "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_live_close_result_",
1685    env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1686))]
1687pub extern "Rust" fn live_close_result_generated_authority_bridge_token_is_valid(
1688    token: &(dyn std::any::Any + Send + Sync),
1689) -> bool {
1690    token.is::<LiveCloseResultGeneratedAuthorityBridgeToken>()
1691}
1692
1693#[cfg(feature = "live")]
1694#[doc(hidden)]
1695#[allow(improper_ctypes_definitions, unsafe_code)]
1696#[unsafe(export_name = concat!(
1697    "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_live_channel_status_result_",
1698    env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1699))]
1700pub extern "Rust" fn live_channel_status_result_generated_authority_bridge_token_is_valid(
1701    token: &(dyn std::any::Any + Send + Sync),
1702) -> bool {
1703    token.is::<LiveChannelStatusResultGeneratedAuthorityBridgeToken>()
1704}
1705
1706#[cfg(feature = "live")]
1707fn build_live_channel_open_authority(
1708    session_id: SessionId,
1709    channel_id: meerkat_live::LiveChannelId,
1710    sequence: u64,
1711) -> Result<meerkat_live::LiveChannelOpenAuthority, String> {
1712    #[allow(improper_ctypes_definitions, unsafe_code)]
1713    unsafe extern "Rust" {
1714        #[link_name = concat!(
1715            "__meerkat_live_runtime_generated_live_channel_open_authority_build_v1_",
1716            env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1717        )]
1718        fn live_generated_channel_open_authority_build(
1719            token: &'static (dyn std::any::Any + Send + Sync),
1720            session_id: SessionId,
1721            channel_id: meerkat_live::LiveChannelId,
1722            sequence: u64,
1723        ) -> Result<meerkat_live::LiveChannelOpenAuthority, String>;
1724    }
1725    #[allow(unsafe_code)]
1726    unsafe {
1727        live_generated_channel_open_authority_build(
1728            live_open_admission_generated_authority_bridge_token(),
1729            session_id,
1730            channel_id,
1731            sequence,
1732        )
1733    }
1734}
1735
1736#[cfg(feature = "live")]
1737fn build_live_channel_close_commit_authority(
1738    channel_id: String,
1739    close_sequence: u64,
1740) -> Result<meerkat_live::LiveChannelCloseCommitAuthority, String> {
1741    #[allow(improper_ctypes_definitions, unsafe_code)]
1742    unsafe extern "Rust" {
1743        #[link_name = concat!(
1744            "__meerkat_live_runtime_generated_live_channel_close_commit_authority_build_v1_",
1745            env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1746        )]
1747        fn live_generated_channel_close_commit_authority_build(
1748            token: &'static (dyn std::any::Any + Send + Sync),
1749            channel_id: String,
1750            close_sequence: u64,
1751        ) -> Result<meerkat_live::LiveChannelCloseCommitAuthority, String>;
1752    }
1753    #[allow(unsafe_code)]
1754    unsafe {
1755        live_generated_channel_close_commit_authority_build(
1756            live_close_result_generated_authority_bridge_token(),
1757            channel_id,
1758            close_sequence,
1759        )
1760    }
1761}
1762
1763#[cfg(feature = "live")]
1764fn build_live_channel_status_commit_authority(
1765    channel_id: String,
1766    status_observation_sequence: u64,
1767) -> Result<meerkat_live::LiveChannelStatusCommitAuthority, String> {
1768    #[allow(improper_ctypes_definitions, unsafe_code)]
1769    unsafe extern "Rust" {
1770        #[link_name = concat!(
1771            "__meerkat_live_runtime_generated_live_channel_status_commit_authority_build_v1_",
1772            env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1773        )]
1774        fn live_generated_channel_status_commit_authority_build(
1775            token: &'static (dyn std::any::Any + Send + Sync),
1776            channel_id: String,
1777            status_observation_sequence: u64,
1778        ) -> Result<meerkat_live::LiveChannelStatusCommitAuthority, String>;
1779    }
1780    #[allow(unsafe_code)]
1781    unsafe {
1782        live_generated_channel_status_commit_authority_build(
1783            live_channel_status_result_generated_authority_bridge_token(),
1784            channel_id,
1785            status_observation_sequence,
1786        )
1787    }
1788}
1789
1790/// Generated authority output for `live/open` admission.
1791///
1792/// Constructed only from `MeerkatMachineEffect::LiveOpenAdmissionResolved`.
1793/// The live host accepts this as a typed handoff before materializing
1794/// transport resources; it does not decide duplicate-session admission from
1795/// its local maps.
1796#[derive(Debug, Clone)]
1797#[cfg(feature = "live")]
1798pub struct LiveOpenAdmissionAuthority {
1799    session_id: SessionId,
1800    channel_id: meerkat_live::LiveChannelId,
1801    admitted: bool,
1802    rejection: Option<dsl::LiveOpenAdmissionRejection>,
1803    bound_llm_identity: Option<meerkat_core::SessionLlmIdentity>,
1804    sequence: u64,
1805    channel_open_authority: Option<meerkat_live::LiveChannelOpenAuthority>,
1806}
1807
1808#[cfg(feature = "live")]
1809impl LiveOpenAdmissionAuthority {
1810    pub(crate) fn from_generated_effect(
1811        session_id: SessionId,
1812        channel_id: meerkat_live::LiveChannelId,
1813        admitted: bool,
1814        rejection: Option<dsl::LiveOpenAdmissionRejection>,
1815        bound_llm_identity: Option<dsl::SessionLlmIdentity>,
1816        sequence: u64,
1817    ) -> Result<Self, String> {
1818        let bound_llm_identity = match (admitted, bound_llm_identity) {
1819            (true, Some(identity)) => Some(identity.try_into()?),
1820            (true, None) => {
1821                return Err(
1822                    "generated live-open admission was admitted without bound LLM identity"
1823                        .to_string(),
1824                );
1825            }
1826            (false, _) => None,
1827        };
1828        let channel_open_authority = if admitted {
1829            Some(build_live_channel_open_authority(
1830                session_id.clone(),
1831                channel_id.clone(),
1832                sequence,
1833            )?)
1834        } else {
1835            None
1836        };
1837        Ok(Self {
1838            session_id,
1839            channel_id,
1840            admitted,
1841            rejection,
1842            bound_llm_identity,
1843            sequence,
1844            channel_open_authority,
1845        })
1846    }
1847
1848    #[must_use]
1849    pub fn session_id(&self) -> &SessionId {
1850        &self.session_id
1851    }
1852
1853    #[must_use]
1854    pub fn channel_id(&self) -> &meerkat_live::LiveChannelId {
1855        &self.channel_id
1856    }
1857
1858    #[must_use]
1859    pub fn admitted(&self) -> bool {
1860        self.admitted
1861    }
1862
1863    #[must_use]
1864    pub fn rejection(&self) -> Option<dsl::LiveOpenAdmissionRejection> {
1865        self.rejection
1866    }
1867
1868    #[must_use]
1869    pub fn bound_llm_identity(&self) -> Option<&meerkat_core::SessionLlmIdentity> {
1870        self.bound_llm_identity.as_ref()
1871    }
1872
1873    #[must_use]
1874    pub fn sequence(&self) -> u64 {
1875        self.sequence
1876    }
1877
1878    #[must_use]
1879    pub fn channel_open_authority(&self) -> Option<&meerkat_live::LiveChannelOpenAuthority> {
1880        self.channel_open_authority.as_ref()
1881    }
1882}
1883
1884/// Generated authority output for the public `live/refresh` success result.
1885///
1886/// Constructed only from a `MeerkatMachineEffect::LiveRefreshResultResolved`
1887/// emitted after the live adapter command queue has accepted the refresh
1888/// handoff. RPC/SDK surfaces project this value to their wire result instead
1889/// of classifying the public status from host queue mechanics.
1890#[derive(Debug, Clone, PartialEq, Eq)]
1891#[cfg(feature = "live")]
1892pub struct LiveRefreshResultAuthority {
1893    pub status: dsl::LiveRefreshPublicStatus,
1894    pub sequence: u64,
1895    pub queue_acceptance_sequence: u64,
1896}
1897
1898/// Generated authority output for the public `live/close` success result.
1899///
1900/// Constructed only from a `MeerkatMachineEffect::LiveCloseResultResolved`
1901/// emitted after the live host supplies typed close-observation evidence.
1902#[derive(Debug, Clone)]
1903#[cfg(feature = "live")]
1904pub struct LiveCloseResultAuthority {
1905    pub status: dsl::LiveClosePublicStatus,
1906    pub sequence: u64,
1907    pub close_observation_sequence: u64,
1908    channel_close_commit_authority: Option<meerkat_live::LiveChannelCloseCommitAuthority>,
1909}
1910
1911#[cfg(feature = "live")]
1912impl LiveCloseResultAuthority {
1913    pub(crate) fn from_generated_effect(
1914        channel_id: String,
1915        status: dsl::LiveClosePublicStatus,
1916        sequence: u64,
1917        close_observation_sequence: u64,
1918    ) -> Result<Self, String> {
1919        let channel_close_commit_authority = match status {
1920            dsl::LiveClosePublicStatus::Closed => Some(build_live_channel_close_commit_authority(
1921                channel_id,
1922                close_observation_sequence,
1923            )?),
1924        };
1925        Ok(Self {
1926            status,
1927            sequence,
1928            close_observation_sequence,
1929            channel_close_commit_authority,
1930        })
1931    }
1932
1933    #[must_use]
1934    pub fn channel_close_commit_authority(
1935        &self,
1936    ) -> Option<&meerkat_live::LiveChannelCloseCommitAuthority> {
1937        self.channel_close_commit_authority.as_ref()
1938    }
1939
1940    #[must_use]
1941    pub fn into_channel_close_commit_authority(
1942        self,
1943    ) -> Option<meerkat_live::LiveChannelCloseCommitAuthority> {
1944        self.channel_close_commit_authority
1945    }
1946}
1947
1948/// Generated authority output for public live command success results.
1949///
1950/// Constructed only from a `MeerkatMachineEffect::LiveCommandResultResolved`
1951/// emitted after the live host supplies typed command queue-acceptance
1952/// evidence.
1953#[derive(Debug, Clone, PartialEq, Eq)]
1954#[cfg(feature = "live")]
1955pub struct LiveCommandResultAuthority {
1956    pub command: dsl::LiveCommandPublicKind,
1957    pub sequence: u64,
1958    pub command_acceptance_sequence: u64,
1959}
1960
1961/// Generated authority output for public live command rejection results.
1962///
1963/// Constructed only from a `MeerkatMachineEffect::LiveCommandRejectionResolved`
1964/// emitted after the live host supplies typed rejection evidence. RPC/SDK
1965/// surfaces project error classes from this value instead of matching host
1966/// errors directly.
1967#[derive(Debug, Clone, PartialEq, Eq)]
1968#[cfg(feature = "live")]
1969pub struct LiveCommandRejectionAuthority {
1970    pub command: dsl::LiveCommandPublicKind,
1971    pub rejection: dsl::LiveCommandRejectionReason,
1972    pub public_error_class: dsl::LiveCommandRejectionPublicErrorClass,
1973    pub sequence: u64,
1974}
1975
1976/// Generated authority output for public live channel control request
1977/// rejections.
1978///
1979/// Constructed only from a
1980/// `MeerkatMachineEffect::LiveChannelRequestRejectionResolved` emitted after
1981/// the live host supplies typed rejection evidence.
1982#[derive(Debug, Clone, PartialEq, Eq)]
1983#[cfg(feature = "live")]
1984pub struct LiveChannelRequestRejectionAuthority {
1985    pub request: dsl::LiveChannelRequestPublicKind,
1986    pub rejection: dsl::LiveChannelRequestRejectionReason,
1987    pub public_error_class: dsl::LiveChannelRequestRejectionPublicErrorClass,
1988    pub sequence: u64,
1989}
1990
1991/// Generated authority output for a WebRTC answer token issued by
1992/// MeerkatMachine.
1993///
1994/// Constructed only from `MeerkatMachineEffect::LiveWebrtcTokenIssued`.
1995/// The transport supplies random bearer material, but it is not returned to a
1996/// caller until the generated machine records the channel binding and expiry.
1997#[derive(Debug, Clone, PartialEq, Eq)]
1998#[cfg(feature = "live")]
1999pub struct LiveWebrtcTokenAuthority {
2000    pub token: String,
2001    pub expires_at_ms: u64,
2002    pub sequence: u64,
2003}
2004
2005/// Generated authority output for WebRTC answer token admission.
2006///
2007/// Constructed only from
2008/// `MeerkatMachineEffect::LiveWebrtcAnswerAdmissionResolved`. RPC signaling
2009/// proceeds to peer setup only when this effect admits the token.
2010#[derive(Debug, Clone, PartialEq, Eq)]
2011#[cfg(feature = "live")]
2012pub struct LiveWebrtcAnswerAdmissionAuthority {
2013    pub admitted: bool,
2014    pub rejection: Option<dsl::LiveWebrtcAnswerAdmissionRejection>,
2015    pub public_error_class: Option<dsl::LiveChannelRequestRejectionPublicErrorClass>,
2016    pub sequence: u64,
2017}
2018
2019/// Generated authority output for the public `live/webrtc/answer` success
2020/// class.
2021///
2022/// Constructed only from
2023/// `MeerkatMachineEffect::LiveWebrtcAnswerResultResolved` emitted after the
2024/// WebRTC transport supplies answer-observation evidence.
2025#[derive(Debug, Clone, PartialEq, Eq)]
2026#[cfg(feature = "live")]
2027pub struct LiveWebrtcAnswerResultAuthority {
2028    pub status: dsl::LiveWebrtcAnswerPublicStatus,
2029    pub answered: bool,
2030    pub sequence: u64,
2031    pub answer_observation_sequence: u64,
2032}
2033
2034/// Generated authority output for a WebSocket transport token issued by
2035/// MeerkatMachine.
2036///
2037/// Constructed only from `MeerkatMachineEffect::LiveWebsocketTokenIssued`.
2038/// The WebSocket transport supplies random bearer material, but it is not
2039/// returned until generated authority records channel binding and expiry.
2040#[derive(Debug, Clone, PartialEq, Eq)]
2041#[cfg(feature = "live")]
2042pub struct LiveWebsocketTokenAuthority {
2043    pub token: String,
2044    pub expires_at_ms: u64,
2045    pub sequence: u64,
2046}
2047
2048/// Generated authority output for WebSocket token admission.
2049///
2050/// Constructed only from
2051/// `MeerkatMachineEffect::LiveWebsocketTokenAdmissionResolved`. The WebSocket
2052/// transport upgrade proceeds only when this effect admits the token.
2053#[derive(Debug, Clone, PartialEq, Eq)]
2054#[cfg(feature = "live")]
2055pub struct LiveWebsocketTokenAdmissionAuthority {
2056    pub admitted: bool,
2057    pub rejection: Option<dsl::LiveWebsocketTokenAdmissionRejection>,
2058    pub public_error_class: Option<dsl::LiveWebsocketTokenAdmissionPublicErrorClass>,
2059    pub sequence: u64,
2060}
2061
2062/// Generated authority output for the public `live/status` result.
2063///
2064/// Constructed only from a `MeerkatMachineEffect::LiveChannelStatusResolved`
2065/// emitted after the live host supplies typed adapter-status observation
2066/// evidence.
2067#[derive(Debug, Clone)]
2068#[cfg(feature = "live")]
2069pub struct LiveChannelStatusAuthority {
2070    pub status: dsl::LiveChannelPublicStatus,
2071    pub sequence: u64,
2072    pub status_observation_sequence: u64,
2073    pub degradation_reason: Option<dsl::LiveChannelDegradationReason>,
2074    pub degradation_detail: Option<String>,
2075    pub channel_status_commit_authority: Option<meerkat_live::LiveChannelStatusCommitAuthority>,
2076}
2077
2078#[cfg(feature = "live")]
2079impl LiveChannelStatusAuthority {
2080    pub(crate) fn from_generated_effect(
2081        channel_id: String,
2082        status: dsl::LiveChannelPublicStatus,
2083        sequence: u64,
2084        status_observation_sequence: u64,
2085        degradation_reason: Option<dsl::LiveChannelDegradationReason>,
2086        degradation_detail: Option<String>,
2087    ) -> Result<Self, String> {
2088        Ok(Self {
2089            status,
2090            sequence,
2091            status_observation_sequence,
2092            degradation_reason,
2093            degradation_detail,
2094            channel_status_commit_authority: Some(build_live_channel_status_commit_authority(
2095                channel_id,
2096                status_observation_sequence,
2097            )?),
2098        })
2099    }
2100
2101    #[must_use]
2102    pub fn channel_status_commit_authority(
2103        &self,
2104    ) -> Option<&meerkat_live::LiveChannelStatusCommitAuthority> {
2105        self.channel_status_commit_authority.as_ref()
2106    }
2107
2108    #[must_use]
2109    pub fn into_channel_status_commit_authority(
2110        self,
2111    ) -> Option<meerkat_live::LiveChannelStatusCommitAuthority> {
2112        self.channel_status_commit_authority
2113    }
2114}
2115
2116/// Session-scoped execution kernel for the Meerkat runtime.
2117///
2118/// Owns per-session runtime state (driver, ops registry, completion waiters,
2119/// comms drain, epoch bindings) and routes all internal mutations through one
2120/// canonical command reducer, with smaller group handlers retained only as
2121/// implementation detail helpers.
2122pub struct MeerkatMachine {
2123    /// Per-session entries.
2124    sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
2125    /// Optional RuntimeStore for persistent drivers.
2126    store: Option<Arc<dyn RuntimeStore>>,
2127    /// Blob store used by persistent drivers for durable input externalization.
2128    blob_store: Option<Arc<dyn BlobStore>>,
2129    /// Runtime-owned shell seam for live session LLM reconfiguration I/O.
2130    llm_reconfigure_host: StdRwLock<Option<Arc<dyn SessionLlmReconfigureHost>>>,
2131    /// AuthMachine lifecycle authority shared by runtime-backed auth
2132    /// resolution/refresh paths and public auth-status surfaces.
2133    auth_lease: StdRwLock<meerkat_core::handles::GeneratedAuthLeaseHandle>,
2134    /// OAuth login-flow lifecycle authority shared by public auth surfaces
2135    /// that operate through this runtime adapter.
2136    #[cfg(not(target_arch = "wasm32"))]
2137    oauth_flows: StdRwLock<Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>>,
2138    /// Runtime-scoped generated authority for live control/command rejections
2139    /// that cannot be attributed to a session because generated active-channel
2140    /// ownership has no binding for the requested channel.
2141    #[cfg(feature = "live")]
2142    live_unbound_rejection_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
2143    /// Canonical owner of "this session id is currently active" — replaces
2144    /// the deleted process-global `SESSION_IDENTITY_CLAIMS` static in the
2145    /// comms shell (dogma #2). Comms runtimes acquire a typed
2146    /// [`meerkat_core::handles::SessionClaim`] through this handle and hold
2147    /// it for their lifetime; the registry is scoped to this `MeerkatMachine`
2148    /// instance, so tests / multi-runtime processes get clean isolation.
2149    session_claims: Arc<crate::handles::RuntimeSessionClaimRegistry>,
2150    /// Optional typed signal dispatcher for MeerkatMachine lifecycle
2151    /// effects routed by `meerkat_mob_seam` into MobMachine observation
2152    /// signals.
2153    composition_signal_dispatcher:
2154        StdRwLock<Option<composition::MeerkatCompositionSignalDispatcher>>,
2155}
2156
2157impl MeerkatMachine {
2158    /// Capability token for store-only session-control mutations routed
2159    /// through this machine authority.
2160    #[must_use]
2161    pub fn session_control_authority(&self) -> MachineSessionControlAuthority {
2162        MachineSessionControlAuthority { _private: () }
2163    }
2164
2165    /// Whether this adapter shares the same runtime persistence authority as
2166    /// another adapter. Runtime-backed composition surfaces use this to reject
2167    /// mismatched adapters before visible terminal events can outrun the store
2168    /// that owns their durable commit.
2169    #[must_use]
2170    pub fn shares_runtime_persistence_with(&self, other: &Self) -> bool {
2171        match (&self.store, &other.store) {
2172            (None, None) => true,
2173            (Some(a), Some(b)) => runtime_stores_share_authority(a, b),
2174            _ => false,
2175        }
2176    }
2177
2178    /// Whether this adapter owns the same runtime persistence authority as a
2179    /// concrete runtime store handle.
2180    #[must_use]
2181    pub fn shares_runtime_store_authority(&self, store: &Arc<dyn RuntimeStore>) -> bool {
2182        self.store
2183            .as_ref()
2184            .is_some_and(|machine_store| runtime_stores_share_authority(machine_store, store))
2185    }
2186
2187    /// Whether this adapter has a runtime persistence store.
2188    #[must_use]
2189    pub fn has_runtime_persistence(&self) -> bool {
2190        self.store.is_some()
2191    }
2192
2193    fn normalize_destroyed_error(err: RuntimeDriverError) -> RuntimeDriverError {
2194        match err {
2195            RuntimeDriverError::NotReady {
2196                state: RuntimeState::Destroyed,
2197            } => RuntimeDriverError::Destroyed,
2198            other => other,
2199        }
2200    }
2201
2202    /// Create an ephemeral adapter (all sessions use EphemeralRuntimeDriver).
2203    pub fn ephemeral() -> Self {
2204        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
2205        #[cfg(not(target_arch = "wasm32"))]
2206        let oauth_flows = Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
2207            std::time::Duration::from_secs(10 * 60),
2208            Arc::clone(&auth_lease),
2209        ));
2210        let auth_lease = generated_runtime_auth_lease_handle(auth_lease);
2211        Self {
2212            sessions: RwLock::new(HashMap::new()),
2213            store: None,
2214            blob_store: None,
2215            llm_reconfigure_host: StdRwLock::new(None),
2216            auth_lease: StdRwLock::new(auth_lease),
2217            #[cfg(not(target_arch = "wasm32"))]
2218            oauth_flows: StdRwLock::new(oauth_flows),
2219            #[cfg(feature = "live")]
2220            live_unbound_rejection_authority: live_unbound_rejection_authority(),
2221            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
2222            composition_signal_dispatcher: StdRwLock::new(None),
2223        }
2224    }
2225
2226    /// Create a persistent adapter with a RuntimeStore.
2227    pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
2228        #[cfg(not(target_arch = "wasm32"))]
2229        let (auth_lease, oauth_flows) = {
2230            let authorities = persistent_auth_authorities(&store);
2231            (
2232                Arc::clone(&authorities.auth_lease),
2233                Arc::clone(&authorities.oauth_flows),
2234            )
2235        };
2236        #[cfg(target_arch = "wasm32")]
2237        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
2238        let auth_lease = generated_runtime_auth_lease_handle(auth_lease);
2239        Self {
2240            sessions: RwLock::new(HashMap::new()),
2241            store: Some(store),
2242            blob_store: Some(blob_store),
2243            llm_reconfigure_host: StdRwLock::new(None),
2244            auth_lease: StdRwLock::new(auth_lease),
2245            #[cfg(not(target_arch = "wasm32"))]
2246            oauth_flows: StdRwLock::new(oauth_flows),
2247            #[cfg(feature = "live")]
2248            live_unbound_rejection_authority: live_unbound_rejection_authority(),
2249            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
2250            composition_signal_dispatcher: StdRwLock::new(None),
2251        }
2252    }
2253
2254    /// Create a persistent adapter with a RuntimeStore but no blob store.
2255    ///
2256    /// The driver remains persistent for session state. Blob-backed inputs fail
2257    /// explicitly at the blob-store boundary until a real [`BlobStore`] is
2258    /// supplied.
2259    pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
2260        #[cfg(not(target_arch = "wasm32"))]
2261        let (auth_lease, oauth_flows) = {
2262            let authorities = persistent_auth_authorities(&store);
2263            (
2264                Arc::clone(&authorities.auth_lease),
2265                Arc::clone(&authorities.oauth_flows),
2266            )
2267        };
2268        #[cfg(target_arch = "wasm32")]
2269        let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
2270        let auth_lease = generated_runtime_auth_lease_handle(auth_lease);
2271        Self {
2272            sessions: RwLock::new(HashMap::new()),
2273            store: Some(store),
2274            blob_store: Some(Arc::new(UnavailableBlobStore)),
2275            llm_reconfigure_host: StdRwLock::new(None),
2276            auth_lease: StdRwLock::new(auth_lease),
2277            #[cfg(not(target_arch = "wasm32"))]
2278            oauth_flows: StdRwLock::new(oauth_flows),
2279            #[cfg(feature = "live")]
2280            live_unbound_rejection_authority: live_unbound_rejection_authority(),
2281            session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
2282            composition_signal_dispatcher: StdRwLock::new(None),
2283        }
2284    }
2285
2286    /// Shared auth lifecycle handle used by all runtime-backed session
2287    /// bindings created by this adapter.
2288    pub fn auth_lease_handle(&self) -> Arc<dyn meerkat_core::handles::AuthLeaseHandle> {
2289        self.generated_auth_lease_handle().clone_handle()
2290    }
2291
2292    /// Generated-authority-certified auth lifecycle handle used at factory and
2293    /// resolver seams that must reject arbitrary handwritten handles.
2294    pub fn generated_auth_lease_handle(&self) -> meerkat_core::handles::GeneratedAuthLeaseHandle {
2295        self.auth_lease
2296            .read()
2297            .unwrap_or_else(std::sync::PoisonError::into_inner)
2298            .clone()
2299    }
2300
2301    /// Install the auth lifecycle authority that public surfaces also read.
2302    ///
2303    /// Surfaces construct the adapter before all state fields are available, so
2304    /// this setter lets them align the adapter's runtime-backed traffic with
2305    /// the surface-visible status handle without creating a competing registry.
2306    pub fn set_auth_lease_handle(&self, handle: Arc<crate::handles::RuntimeAuthLeaseHandle>) {
2307        self.set_runtime_auth_lease_handle(handle);
2308    }
2309
2310    /// Install the runtime credential lifecycle handle together with an
2311    /// explicit OAuth login-flow authority.
2312    ///
2313    /// The credential side still has to be a generated AuthMachine authority;
2314    /// the explicit OAuth authority only controls login-flow test seams.
2315    #[cfg(not(target_arch = "wasm32"))]
2316    pub fn set_auth_lease_handle_with_oauth_flow_authority(
2317        &self,
2318        handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
2319        oauth_flows: Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>,
2320    ) {
2321        *self
2322            .oauth_flows
2323            .write()
2324            .unwrap_or_else(std::sync::PoisonError::into_inner) = oauth_flows;
2325        let handle = generated_runtime_auth_lease_handle(handle);
2326        *self
2327            .auth_lease
2328            .write()
2329            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
2330    }
2331
2332    /// Install a runtime AuthMachine authority shared by auth leases and OAuth
2333    /// login-flow lifecycle transitions.
2334    pub fn set_runtime_auth_lease_handle(
2335        &self,
2336        handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
2337    ) {
2338        #[cfg(not(target_arch = "wasm32"))]
2339        {
2340            *self
2341                .oauth_flows
2342                .write()
2343                .unwrap_or_else(std::sync::PoisonError::into_inner) =
2344                Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
2345                    std::time::Duration::from_secs(10 * 60),
2346                    Arc::clone(&handle),
2347                ));
2348        }
2349        let handle = generated_runtime_auth_lease_handle(handle);
2350        *self
2351            .auth_lease
2352            .write()
2353            .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
2354    }
2355
2356    /// Shared OAuth login-flow authority used by all auth surfaces that are
2357    /// backed by this runtime adapter.
2358    #[cfg(not(target_arch = "wasm32"))]
2359    pub fn oauth_flow_authority(
2360        &self,
2361    ) -> Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority> {
2362        Arc::clone(
2363            &self
2364                .oauth_flows
2365                .read()
2366                .unwrap_or_else(std::sync::PoisonError::into_inner),
2367        )
2368    }
2369
2370    /// The canonical session-identity claim handle owned by this
2371    /// `MeerkatMachine`. Comms runtimes wired through this machine acquire
2372    /// their session-id claim through it; the registry is scoped to this
2373    /// machine instance so tests / parallel runtimes do not collide.
2374    pub fn session_claim_handle(&self) -> Arc<dyn meerkat_core::handles::SessionClaimHandle> {
2375        Arc::clone(&self.session_claims) as Arc<dyn meerkat_core::handles::SessionClaimHandle>
2376    }
2377
2378    /// Attach the typed composition signal dispatcher used for
2379    /// MeerkatMachine -> MobMachine lifecycle observation routes.
2380    pub fn set_composition_signal_dispatcher(
2381        &self,
2382        dispatcher: composition::MeerkatCompositionSignalDispatcher,
2383    ) {
2384        let mut slot = self
2385            .composition_signal_dispatcher
2386            .write()
2387            .unwrap_or_else(std::sync::PoisonError::into_inner);
2388        *slot = Some(dispatcher);
2389    }
2390
2391    /// Apply a routed-input variant delivered by the `meerkat_mob_seam`
2392    /// composition dispatcher against the session's shared DSL authority.
2393    ///
2394    /// The caller is
2395    /// [`crate::meerkat_machine::composition::MeerkatConsumerSurface::apply_routed_input`];
2396    /// it has already projected producer fields into the typed
2397    /// [`dsl::MeerkatMachineInput`] shape. This method performs the
2398    /// session lookup + DSL-lock-scoped apply. A typed transition error
2399    /// from the kernel is surfaced as a `String` so the dispatcher can
2400    /// map it onto `DispatchRefusal::ConsumerRefused`.
2401    pub(crate) async fn apply_routed_meerkat_input(
2402        &self,
2403        session_id: &SessionId,
2404        input: dsl::MeerkatMachineInput,
2405    ) -> Result<(), dsl_authority::DslTransitionRefusal> {
2406        let _gate_guard = self
2407            .lock_current_session_mutation_gate(session_id)
2408            .await
2409            .ok_or_else(|| {
2410                dsl_authority::DslTransitionRefusal::other(
2411                    "routed_session_not_registered",
2412                    format!(
2413                        "session `{session_id}` is not registered with this MeerkatMachine; \
2414                         cannot deliver routed input"
2415                    ),
2416                )
2417            })?;
2418        self.apply_routed_session_dsl_input(session_id, input, "RoutedMeerkatInput")
2419            .await
2420            .map(|_| ())
2421    }
2422
2423    #[cfg(test)]
2424    pub(crate) async fn debug_shared_ingress_authorities(
2425        &self,
2426        session_id: &SessionId,
2427    ) -> Option<(
2428        Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
2429        crate::driver::ephemeral::SharedIngressDslAuthority,
2430    )> {
2431        let sessions = self.sessions.read().await;
2432        let entry = sessions.get(session_id)?;
2433        let session_authority = Arc::clone(&entry.dsl_authority);
2434        let driver = entry.driver.lock().await;
2435        Some((session_authority, driver.shared_dsl_authority()))
2436    }
2437
2438    /// Create a driver entry for a session.
2439    fn make_driver(
2440        &self,
2441        runtime_id: LogicalRuntimeId,
2442        dsl_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
2443        initial_runtime_state: RuntimeState,
2444    ) -> DriverEntry {
2445        let control_projection = Arc::new(StdRwLock::new(
2446            crate::driver::ephemeral::RuntimeControlProjection {
2447                phase: initial_runtime_state,
2448                current_run_id: None,
2449                pre_run_phase: None,
2450            },
2451        ));
2452        match (&self.store, &self.blob_store) {
2453            (Some(store), Some(blob_store)) => {
2454                DriverEntry::Persistent(PersistentRuntimeDriver::new_with_control(
2455                    runtime_id,
2456                    store.clone(),
2457                    blob_store.clone(),
2458                    control_projection,
2459                    dsl_authority,
2460                ))
2461            }
2462            _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new_with_control_and_dsl(
2463                runtime_id,
2464                control_projection,
2465                dsl_authority,
2466            )),
2467        }
2468    }
2469
2470    /// Recover or create fresh ops lifecycle state for a session.
2471    ///
2472    /// This is the single canonical recovery seam. Both `register_session()`
2473    /// and `ensure_session_with_executor()`'s cold path call this to create
2474    /// epoch-local state. If a durable store is available, attempts to load
2475    /// the persisted snapshot; otherwise creates fresh state with a new epoch.
2476    async fn recover_or_create_ops_state(
2477        &self,
2478        session_id: &SessionId,
2479        runtime_id: &LogicalRuntimeId,
2480    ) -> Result<
2481        (
2482            Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
2483            meerkat_core::RuntimeEpochId,
2484            Arc<meerkat_core::EpochCursorState>,
2485        ),
2486        RuntimeDriverError,
2487    > {
2488        if let Some(ref store) = self.store {
2489            match store.load_ops_lifecycle(runtime_id).await {
2490                Ok(Some(snapshot)) => {
2491                    let recovered_epoch = snapshot.epoch_id.clone();
2492                    let recovered_ops_count = snapshot.completion_entries.len();
2493                    let registry =
2494                        match crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(
2495                            snapshot,
2496                        ) {
2497                            Ok(registry) => registry,
2498                            Err(err) => {
2499                                tracing::error!(
2500                                    %session_id,
2501                                    %runtime_id,
2502                                    error = %err,
2503                                    "failed to recover ops lifecycle through generated authority"
2504                                );
2505                                return Err(RuntimeDriverError::Internal(format!(
2506                                    "failed to recover ops lifecycle through generated authority: {err}"
2507                                )));
2508                            }
2509                        };
2510                    let recovered_cursor_snapshot = registry.completion_cursor_snapshot();
2511                    let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
2512                        recovered_cursor_snapshot.agent_applied_cursor,
2513                        recovered_cursor_snapshot.runtime_observed_seq,
2514                        recovered_cursor_snapshot.runtime_last_injected_seq,
2515                    );
2516                    tracing::info!(
2517                        %session_id,
2518                        %runtime_id,
2519                        epoch_id = %recovered_epoch,
2520                        recovered_ops = recovered_ops_count,
2521                        "ops lifecycle recovered from durable store (same epoch)"
2522                    );
2523                    return Ok((
2524                        Arc::new(registry),
2525                        recovered_epoch,
2526                        Arc::new(recovered_cursors),
2527                    ));
2528                }
2529                Ok(None) => {}
2530                Err(err) => {
2531                    tracing::error!(
2532                        %session_id,
2533                        %runtime_id,
2534                        error = %err,
2535                        "failed to load ops lifecycle from durable store"
2536                    );
2537                    return Err(RuntimeDriverError::Internal(format!(
2538                        "failed to load ops lifecycle from durable store: {err}"
2539                    )));
2540                }
2541            }
2542            tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
2543            Ok((
2544                Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
2545                meerkat_core::RuntimeEpochId::new(),
2546                Arc::new(meerkat_core::EpochCursorState::new()),
2547            ))
2548        } else {
2549            Ok((
2550                Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
2551                meerkat_core::RuntimeEpochId::new(),
2552                Arc::new(meerkat_core::EpochCursorState::new()),
2553            ))
2554        }
2555    }
2556
2557    fn fresh_ops_state() -> (
2558        Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
2559        meerkat_core::RuntimeEpochId,
2560        Arc<meerkat_core::EpochCursorState>,
2561    ) {
2562        let registry = Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new());
2563        let epoch = meerkat_core::RuntimeEpochId::new();
2564        let cursors = Arc::new(meerkat_core::EpochCursorState::new());
2565        (registry, epoch, cursors)
2566    }
2567
2568    #[allow(clippy::large_futures)]
2569    fn execute_meerkat_machine_command(
2570        &self,
2571        self_handle: Option<Arc<Self>>,
2572        command: MeerkatMachineCommand,
2573    ) -> MeerkatMachineCommandFuture<'_> {
2574        Box::pin(async move {
2575            match command {
2576                MeerkatMachineCommand::EnsureSessionWithExecutor { .. } => {
2577                    let self_handle = self_handle.ok_or_else(|| {
2578                        MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
2579                            "EnsureSessionWithExecutor requires Arc<Self> machine handle".into(),
2580                        ))
2581                    })?;
2582                    self_handle
2583                        .execute_meerkat_machine_ensure_session_command(command)
2584                        .await
2585                        .map_err(Into::into)
2586                }
2587                MeerkatMachineCommand::RegisterSession { .. }
2588                | MeerkatMachineCommand::UnregisterSession { .. }
2589                | MeerkatMachineCommand::SetSilentIntents { .. }
2590                | MeerkatMachineCommand::CancelAfterBoundary { .. }
2591                | MeerkatMachineCommand::StopRuntimeExecutor { .. }
2592                | MeerkatMachineCommand::CommitServiceTurnTerminalReceipt { .. }
2593                | MeerkatMachineCommand::ContainsSession { .. }
2594                | MeerkatMachineCommand::SessionHasExecutor { .. }
2595                | MeerkatMachineCommand::SessionHasComms { .. }
2596                | MeerkatMachineCommand::OpsLifecycleRegistry { .. }
2597                | MeerkatMachineCommand::PrepareBindings { .. }
2598                | MeerkatMachineCommand::PrepareLocalSessionBindings { .. }
2599                | MeerkatMachineCommand::InputState { .. }
2600                | MeerkatMachineCommand::ListActiveInputs { .. }
2601                | MeerkatMachineCommand::ReconfigureSessionLlmIdentity { .. }
2602                | MeerkatMachineCommand::StagePersistentFilter { .. }
2603                | MeerkatMachineCommand::RequestDeferredTools { .. }
2604                | MeerkatMachineCommand::PublishCommittedVisibleSet { .. } => self
2605                    .execute_meerkat_machine_session_command(command)
2606                    .await
2607                    .map_err(Into::into),
2608                MeerkatMachineCommand::SetPeerIngressContext { .. }
2609                | MeerkatMachineCommand::NotifyDrainExited { .. } => {
2610                    let self_handle = self_handle.ok_or_else(|| {
2611                        MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
2612                            "drain command requires Arc<Self> machine handle".into(),
2613                        ))
2614                    })?;
2615                    self_handle
2616                        .execute_meerkat_machine_drain_command(command)
2617                        .await
2618                        .map_err(Into::into)
2619                }
2620                MeerkatMachineCommand::AbortAll
2621                | MeerkatMachineCommand::Abort { .. }
2622                | MeerkatMachineCommand::Wait { .. } => self
2623                    .execute_meerkat_machine_drain_local_command(command)
2624                    .await
2625                    .map_err(Into::into),
2626                MeerkatMachineCommand::Ingest { .. }
2627                | MeerkatMachineCommand::PublishEvent { .. }
2628                | MeerkatMachineCommand::Retire { .. }
2629                | MeerkatMachineCommand::Recycle { .. }
2630                | MeerkatMachineCommand::Reset { .. }
2631                | MeerkatMachineCommand::Recover { .. }
2632                | MeerkatMachineCommand::Destroy { .. }
2633                | MeerkatMachineCommand::RuntimeState { .. }
2634                | MeerkatMachineCommand::ResolvedSessionLlmCapabilities { .. }
2635                | MeerkatMachineCommand::ConfigureModelRoutingBaseline { .. }
2636                | MeerkatMachineCommand::SessionModelRoutingStatus { .. }
2637                | MeerkatMachineCommand::RequestSwitchTurn { .. }
2638                | MeerkatMachineCommand::AdmitModelRoutingAssistantTurn { .. }
2639                | MeerkatMachineCommand::BeginImageOperation { .. }
2640                | MeerkatMachineCommand::DenyImageOperationPlan { .. }
2641                | MeerkatMachineCommand::ActivateImageOperationOverride { .. }
2642                | MeerkatMachineCommand::ClassifyImageOperationTerminal { .. }
2643                | MeerkatMachineCommand::CompleteImageOperation { .. }
2644                | MeerkatMachineCommand::RestoreImageOperationOverride { .. }
2645                | MeerkatMachineCommand::LoadBoundaryReceipt { .. } => self
2646                    .execute_meerkat_machine_control_command(command)
2647                    .await
2648                    .map_err(Into::into),
2649                MeerkatMachineCommand::AcceptWithCompletion { .. }
2650                | MeerkatMachineCommand::AcceptWithoutWake { .. } => self
2651                    .execute_meerkat_machine_ingress_command(command)
2652                    .await
2653                    .map_err(Into::into),
2654            }
2655        })
2656    }
2657
2658    /// Register a runtime driver for a session (no RuntimeLoop — inputs queue but
2659    /// nothing processes them automatically). Useful for tests and legacy mode.
2660    ///
2661    /// Registration is a control-plane prerequisite: a failed register must not be
2662    /// laundered to success. The inner command can fail recovery, so the typed
2663    /// error is propagated to the caller rather than discarded.
2664    pub async fn register_session(
2665        &self,
2666        session_id: SessionId,
2667    ) -> Result<(), RuntimeControlPlaneError> {
2668        match self
2669            .execute_meerkat_machine_command(
2670                None,
2671                MeerkatMachineCommand::RegisterSession { session_id },
2672            )
2673            .await
2674            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
2675        {
2676            MeerkatMachineCommandResult::Unit => Ok(()),
2677            other => Err(RuntimeControlPlaneError::Internal(format!(
2678                "register_session: unexpected command result variant: {other:?}"
2679            ))),
2680        }
2681    }
2682}
2683
2684#[cfg(test)]
2685#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
2686#[path = "../meerkat_machine_tests.rs"]
2687mod tests;