Skip to main content

meerkat_runtime/driver/
persistent.rs

1//! PersistentRuntimeDriver — wraps EphemeralRuntimeDriver + RuntimeStore.
2//!
3//! Provides durable-before-ack guarantee: InputState is persisted via
4//! RuntimeStore BEFORE returning AcceptOutcome. Delegates state machine
5//! logic to the ephemeral driver.
6
7use std::sync::Arc;
8use std::sync::RwLock as StdRwLock;
9
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12
13use crate::accept::AcceptOutcome;
14use crate::identifiers::LogicalRuntimeId;
15use crate::input::{Input, externalize_input_images};
16use crate::input_state::{
17    InputAbandonReason, InputLifecycleState, InputState, InputStatePersistenceRecord,
18    StoredInputState,
19};
20use crate::runtime_event::RuntimeEventEnvelope;
21use crate::runtime_state::RuntimeState;
22use crate::store::{MachineLifecycleCommit, RuntimeStore};
23use crate::traits::{DestroyReport, RecoveryReport, RuntimeDriver, RuntimeDriverError};
24
25use super::ephemeral::{
26    EphemeralDriverRollbackSnapshot, EphemeralRuntimeDriver, SharedIngressDslAuthority,
27};
28
29/// Persistent runtime driver — durable InputState via RuntimeStore.
30pub struct PersistentRuntimeDriver {
31    /// Underlying ephemeral driver for state machine logic.
32    inner: EphemeralRuntimeDriver,
33    /// Durable store for InputState + receipts.
34    store: Arc<dyn RuntimeStore>,
35    /// Blob store used to externalize durable input payloads.
36    blob_store: Arc<dyn BlobStore>,
37    /// Runtime ID for store operations.
38    runtime_id: LogicalRuntimeId,
39    /// Test-only fault injection: forces the input-state snapshot step of
40    /// [`Self::commit_lifecycle_with_rollback`] to fail so tests can pin the
41    /// checkpoint-restore contract for that arm.
42    #[cfg(test)]
43    pub(crate) force_input_snapshot_failure_for_test: bool,
44}
45
46impl PersistentRuntimeDriver {
47    /// Create a new persistent runtime driver.
48    pub fn new(
49        runtime_id: LogicalRuntimeId,
50        store: Arc<dyn RuntimeStore>,
51        blob_store: Arc<dyn BlobStore>,
52    ) -> Self {
53        Self::new_with_control(
54            runtime_id,
55            store,
56            blob_store,
57            Arc::new(StdRwLock::new(
58                crate::driver::ephemeral::RuntimeControlProjection::default(),
59            )),
60            crate::driver::ephemeral::new_ingress_dsl_authority(),
61        )
62    }
63
64    pub(crate) fn new_with_control(
65        runtime_id: LogicalRuntimeId,
66        store: Arc<dyn RuntimeStore>,
67        blob_store: Arc<dyn BlobStore>,
68        control: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
69        dsl: SharedIngressDslAuthority,
70    ) -> Self {
71        Self {
72            inner: EphemeralRuntimeDriver::new_with_control_and_dsl(
73                runtime_id.clone(),
74                control,
75                dsl,
76            ),
77            store,
78            blob_store,
79            runtime_id,
80            #[cfg(test)]
81            force_input_snapshot_failure_for_test: false,
82        }
83    }
84
85    /// Get immutable reference to the inner ephemeral driver.
86    pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
87        &self.inner
88    }
89
90    pub(crate) fn rollback_snapshot(&self) -> EphemeralDriverRollbackSnapshot {
91        self.inner.rollback_snapshot()
92    }
93
94    pub(crate) fn restore_rollback_snapshot(&mut self, snapshot: EphemeralDriverRollbackSnapshot) {
95        self.inner.restore_rollback_snapshot(snapshot);
96    }
97
98    /// Get the logical runtime ID for this driver.
99    pub fn runtime_id(&self) -> &LogicalRuntimeId {
100        &self.runtime_id
101    }
102
103    pub fn silent_comms_intents(&self) -> Vec<String> {
104        self.inner.silent_comms_intents()
105    }
106
107    /// Check if the runtime is idle (delegates to inner).
108    pub fn is_idle(&self) -> bool {
109        self.inner.is_idle()
110    }
111
112    /// Ask generated MeerkatMachine authority for the store-visible lifecycle.
113    fn runtime_state_for_persistence(&self) -> Result<RuntimeState, RuntimeDriverError> {
114        Self::runtime_state_for_persistence_from_inner(&self.inner)
115    }
116
117    fn runtime_state_for_persistence_from_inner(
118        inner: &EphemeralRuntimeDriver,
119    ) -> Result<RuntimeState, RuntimeDriverError> {
120        crate::meerkat_machine::classify_runtime_lifecycle_durable_state(inner.runtime_state())
121            .map_err(|err| {
122                RuntimeDriverError::Internal(format!(
123                    "generated runtime lifecycle durability classification failed: {err}"
124                ))
125            })
126    }
127
128    fn lifecycle_commit_for_persistence(
129        &self,
130    ) -> Result<MachineLifecycleCommit, RuntimeDriverError> {
131        Self::lifecycle_commit_for_persistence_from_inner(&self.inner)
132    }
133
134    fn lifecycle_commit_for_persistence_from_inner(
135        inner: &EphemeralRuntimeDriver,
136    ) -> Result<MachineLifecycleCommit, RuntimeDriverError> {
137        Ok(MachineLifecycleCommit::new_with_binding(
138            Self::runtime_state_for_persistence_from_inner(inner)?,
139            inner.machine_lifecycle_binding_facts(),
140        ))
141    }
142
143    /// Snapshot + classify the lifecycle persistence payload, restoring the
144    /// caller's checkpoint on failure.
145    ///
146    /// Contract (Dogma K11): every fallible step between a staged `&mut` DSL
147    /// transition and the rollback-guarded durable commit restores the
148    /// caller's checkpoint. A bare `?` here would leave the staged lifecycle
149    /// live in driver state while reporting failure to the caller. The
150    /// checkpoint is returned on success so the durable commit arm can keep
151    /// using it.
152    fn lifecycle_persistence_payload_with_rollback(
153        &mut self,
154        checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
155        context: &str,
156    ) -> Result<
157        (
158            super::ephemeral::EphemeralDriverRollbackSnapshot,
159            Vec<InputStatePersistenceRecord>,
160            MachineLifecycleCommit,
161        ),
162        RuntimeDriverError,
163    > {
164        let input_states_result = self.inner.authorized_stored_input_states_snapshot();
165        #[cfg(test)]
166        let input_states_result = if self.force_input_snapshot_failure_for_test {
167            Err(RuntimeDriverError::Internal(
168                "forced input-state snapshot failure for checkpoint-restore contract test"
169                    .to_string(),
170            ))
171        } else {
172            input_states_result
173        };
174        let input_states = match input_states_result {
175            Ok(input_states) => input_states,
176            Err(err) => {
177                self.inner.restore_rollback_snapshot(checkpoint);
178                return Err(RuntimeDriverError::Internal(format!(
179                    "{context} input-state snapshot failed: {err}"
180                )));
181            }
182        };
183        let commit = match self.lifecycle_commit_for_persistence() {
184            Ok(commit) => commit,
185            Err(err) => {
186                self.inner.restore_rollback_snapshot(checkpoint);
187                return Err(RuntimeDriverError::Internal(format!(
188                    "{context} lifecycle commit classification failed: {err}"
189                )));
190            }
191        };
192        Ok((checkpoint, input_states, commit))
193    }
194
195    async fn commit_lifecycle_with_rollback(
196        &mut self,
197        checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
198        target_state: RuntimeState,
199        context: &str,
200    ) -> Result<(), RuntimeDriverError> {
201        // Contract: every fallible step between the staged DSL transition and
202        // the durable commit restores the caller's checkpoint on failure. A
203        // bare `?` here would leave the staged lifecycle (e.g. Destroy) live
204        // in driver state while reporting failure to the caller.
205        let (checkpoint, input_states, commit) =
206            self.lifecycle_persistence_payload_with_rollback(checkpoint, context)?;
207        let target_durable_state =
208            match crate::meerkat_machine::classify_runtime_lifecycle_durable_state(target_state) {
209                Ok(target_durable_state) => target_durable_state,
210                Err(err) => {
211                    self.inner.restore_rollback_snapshot(checkpoint);
212                    return Err(RuntimeDriverError::Internal(format!(
213                        "{context} generated target lifecycle durability classification failed: {err}"
214                    )));
215                }
216            };
217        if commit.runtime_state() != target_durable_state {
218            self.inner.restore_rollback_snapshot(checkpoint);
219            return Err(RuntimeDriverError::Internal(format!(
220                "{context} durable persist target {target_durable_state:?} from live {target_state:?} disagreed with generated lifecycle commit {:?}",
221                commit.runtime_state()
222            )));
223        }
224        if let Err(err) = self
225            .store
226            .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
227            .await
228        {
229            self.inner.restore_rollback_snapshot(checkpoint);
230            return Err(RuntimeDriverError::Internal(format!(
231                "{context} persist failed: {err}"
232            )));
233        }
234        Ok(())
235    }
236
237    pub(crate) async fn publish_service_turn_terminal_lifecycle(
238        &mut self,
239        checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
240        target_state: RuntimeState,
241    ) -> Result<(), RuntimeDriverError> {
242        self.commit_lifecycle_with_rollback(
243            checkpoint,
244            target_state,
245            "service turn terminal receipt",
246        )
247        .await?;
248        self.inner.sync_control_projection_from_dsl_authority();
249        Ok(())
250    }
251
252    pub(crate) fn set_control_projection(
253        &mut self,
254        next_phase: RuntimeState,
255        current_run_id: Option<RunId>,
256        pre_run_phase: Option<RuntimeState>,
257    ) {
258        self.inner
259            .set_control_projection(next_phase, current_run_id, pre_run_phase);
260    }
261
262    /// Low-level control projection shim for external contract tests.
263    ///
264    /// This does not decide lifecycle legality; it only applies an already
265    /// chosen MeerkatMachine control projection to the concrete driver shell.
266    pub(crate) fn sync_control_projection_from_dsl_authority(&mut self) {
267        self.inner.sync_control_projection_from_dsl_authority();
268    }
269
270    pub(crate) async fn persist_current_machine_lifecycle(
271        &mut self,
272        context: &str,
273    ) -> Result<(), RuntimeDriverError> {
274        let input_states = self.inner.authorized_stored_input_states_snapshot()?;
275        let commit = self.lifecycle_commit_for_persistence()?;
276        self.store
277            .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
278            .await
279            .map_err(|err| {
280                RuntimeDriverError::Internal(format!("{context} lifecycle persist failed: {err}"))
281            })
282    }
283
284    /// Contract helper for external tests that need to start a run through the
285    /// same DSL authority used by the runtime loop.
286    #[doc(hidden)]
287    pub fn contract_begin_run_authority(
288        &mut self,
289        run_id: RunId,
290    ) -> Result<(), RuntimeDriverError> {
291        self.inner.contract_begin_run_authority(run_id)
292    }
293
294    /// Test-only authority override for crate-unit tests that need to seed
295    /// impossible or already-realized runtime phases.
296    #[cfg(test)]
297    #[doc(hidden)]
298    pub(crate) fn contract_force_runtime_authority(
299        &mut self,
300        next_phase: RuntimeState,
301        current_run_id: Option<RunId>,
302        pre_run_phase: Option<RuntimeState>,
303    ) {
304        self.inner
305            .contract_force_runtime_authority(next_phase, current_run_id, pre_run_phase);
306    }
307
308    /// Get pending events (delegates to inner).
309    pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
310        self.inner.drain_events()
311    }
312
313    /// Drain the typed post-admission signal (delegates to inner).
314    pub fn take_post_admission_signal(&mut self) -> crate::driver::ephemeral::PostAdmissionSignal {
315        self.inner.take_post_admission_signal()
316    }
317
318    /// Inspect the current typed post-admission signal without draining it.
319    pub fn post_admission_signal(&self) -> crate::driver::ephemeral::PostAdmissionSignal {
320        self.inner.post_admission_signal()
321    }
322
323    /// Check and clear wake flag (backward-compat, delegates to inner).
324    pub fn take_wake_requested(&mut self) -> bool {
325        self.inner.take_wake_requested()
326    }
327
328    /// Check and clear immediate processing flag (backward-compat, delegates to inner).
329    pub fn take_process_requested(&mut self) -> bool {
330        self.inner.take_process_requested()
331    }
332
333    /// Dequeue next input (delegates to inner).
334    pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
335        self.inner.dequeue_next()
336    }
337
338    /// Dequeue a specific input by ID (delegates to inner).
339    pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
340        self.inner.dequeue_by_id(input_id)
341    }
342
343    pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
344        self.inner.has_queued_input_outside(excluded)
345    }
346
347    pub(crate) fn defer_queued_inputs_behind_backlog(
348        &mut self,
349        input_ids: &[InputId],
350    ) -> Result<(), RuntimeDriverError> {
351        self.inner.defer_queued_inputs_behind_backlog(input_ids)
352    }
353
354    pub(crate) fn absorb_post_admission_effects(
355        &mut self,
356        effects: &[crate::meerkat_machine::dsl::MeerkatMachineEffect],
357    ) {
358        self.inner.absorb_post_admission_effects(effects);
359    }
360
361    pub(crate) fn resolve_admission(
362        &self,
363        input: &Input,
364    ) -> Result<crate::accept::ResolvedAdmission, RuntimeDriverError> {
365        self.inner.resolve_admission(input)
366    }
367
368    pub(crate) fn resolve_admission_with_active_turn_boundary(
369        &self,
370        input: &Input,
371        active_turn_boundary_available: bool,
372    ) -> Result<crate::accept::ResolvedAdmission, RuntimeDriverError> {
373        self.inner
374            .resolve_admission_with_active_turn_boundary(input, active_turn_boundary_available)
375    }
376
377    pub(crate) fn resolve_admission_without_wake_with_active_turn_boundary(
378        &self,
379        input: &Input,
380        active_turn_boundary_available: bool,
381    ) -> Result<crate::accept::ResolvedAdmission, RuntimeDriverError> {
382        self.inner
383            .resolve_admission_without_wake_with_active_turn_boundary(
384                input,
385                active_turn_boundary_available,
386            )
387    }
388
389    pub(crate) async fn accept_resolved_input(
390        &mut self,
391        input: Input,
392        resolved: crate::accept::ResolvedAdmission,
393    ) -> Result<AcceptOutcome, RuntimeDriverError> {
394        let flags = resolved.coarse_flags();
395        let mut staged = self.inner.clone_with_isolated_dsl_authority();
396        staged.ensure_contract_session_authority()?;
397        let staged_outcome = staged
398            .accept_resolved_input(input.clone(), resolved.clone())
399            .await?;
400
401        let AcceptOutcome::Accepted {
402            input_id: staged_input_id,
403            ..
404        } = staged_outcome
405        else {
406            return self.inner.accept_resolved_input(input, resolved).await;
407        };
408
409        staged.machine_apply_accept_with_completion_signal(&staged_input_id, flags)?;
410        let Some(mut staged_bundle) = staged.stored_input_state(&staged_input_id) else {
411            return Err(RuntimeDriverError::Internal(format!(
412                "generated input lifecycle phase missing for accepted input {staged_input_id}"
413            )));
414        };
415        let mut input_for_recovery = input.clone();
416        externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery)
417            .await
418            .map_err(|err| {
419                RuntimeDriverError::Internal(format!(
420                    "failed to externalize runtime input images: {err}"
421                ))
422            })?;
423        staged_bundle.state.persisted_input = Some(input_for_recovery.clone());
424        self.persist_state(&staged_bundle).await?;
425
426        self.inner.ensure_contract_session_authority()?;
427        let mut outcome = self.inner.accept_resolved_input(input, resolved).await?;
428        if let AcceptOutcome::Accepted {
429            ref input_id,
430            ref mut state,
431            ref mut seed,
432            ..
433        } = outcome
434        {
435            if input_id != &staged_input_id {
436                return Err(RuntimeDriverError::Internal(format!(
437                    "staged accepted input {staged_input_id} differed from committed input {input_id}"
438                )));
439            }
440            self.inner
441                .machine_apply_accept_with_completion_signal(input_id, flags)?;
442            let Some(mut bundle) = self.inner.stored_input_state(input_id) else {
443                return Err(RuntimeDriverError::Internal(format!(
444                    "generated input lifecycle phase missing for accepted input {input_id}"
445                )));
446            };
447            bundle.state.persisted_input = Some(input_for_recovery);
448            self.inner.ledger_mut().accept(bundle.state.clone());
449            *state = bundle.state;
450            *seed = bundle.seed;
451        }
452
453        Ok(outcome)
454    }
455
456    pub(crate) async fn preview_accept_resolved_input(
457        &self,
458        input: Input,
459        resolved: crate::accept::ResolvedAdmission,
460    ) -> Result<AcceptOutcome, RuntimeDriverError> {
461        let mut staged = self.inner.clone_with_isolated_dsl_authority();
462        staged.ensure_contract_session_authority()?;
463        staged.accept_resolved_input(input, resolved).await
464    }
465
466    /// Stage input (delegates to inner).
467    pub fn stage_input(
468        &mut self,
469        input_id: &InputId,
470        run_id: &meerkat_core::lifecycle::RunId,
471    ) -> Result<(), crate::traits::RuntimeDriverError> {
472        self.inner.stage_input(input_id, run_id)
473    }
474
475    /// Stage a batch of inputs atomically (delegates to inner).
476    pub fn stage_batch(
477        &mut self,
478        input_ids: &[InputId],
479        run_id: &meerkat_core::lifecycle::RunId,
480    ) -> Result<(), crate::traits::RuntimeDriverError> {
481        self.inner.stage_batch(input_ids, run_id)
482    }
483
484    pub(crate) fn machine_realize_stage_batch(
485        &mut self,
486        input_ids: &[InputId],
487        run_id: &meerkat_core::lifecycle::RunId,
488    ) -> Result<(), crate::traits::RuntimeDriverError> {
489        self.inner.machine_realize_stage_batch(input_ids, run_id)
490    }
491
492    /// Apply input (delegates to inner).
493    pub fn apply_input(
494        &mut self,
495        input_id: &InputId,
496        run_id: &meerkat_core::lifecycle::RunId,
497    ) -> Result<(), crate::traits::RuntimeDriverError> {
498        self.inner.apply_input(input_id, run_id)
499    }
500
501    /// Roll back staged inputs (delegates to inner).
502    pub fn rollback_staged(
503        &mut self,
504        input_ids: &[InputId],
505    ) -> Result<(), crate::traits::RuntimeDriverError> {
506        self.inner.rollback_staged(input_ids)
507    }
508
509    async fn persist_state(&self, state: &StoredInputState) -> Result<(), RuntimeDriverError> {
510        let state = InputStatePersistenceRecord::from_machine_snapshot(state.clone())
511            .map_err(RuntimeDriverError::Internal)?;
512        self.store
513            .persist_input_state(&self.runtime_id, &state)
514            .await
515            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
516    }
517
518    pub(crate) async fn abandon_pending_inputs(
519        &mut self,
520        reason: InputAbandonReason,
521    ) -> Result<usize, RuntimeDriverError> {
522        let checkpoint = self.inner.rollback_snapshot();
523        let abandoned = match self.inner.abandon_pending_inputs(reason) {
524            Ok(abandoned) => abandoned,
525            Err(err) => {
526                self.inner.restore_rollback_snapshot(checkpoint);
527                return Err(err);
528            }
529        };
530        let (checkpoint, input_states, commit) =
531            self.lifecycle_persistence_payload_with_rollback(checkpoint, "pending input abandon")?;
532        if let Err(err) = self
533            .store
534            .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
535            .await
536        {
537            self.inner.restore_rollback_snapshot(checkpoint);
538            return Err(RuntimeDriverError::Internal(format!(
539                "pending input abandon persist failed: {err}"
540            )));
541        }
542        Ok(abandoned)
543    }
544
545    /// Recycle the in-memory driver shell while preserving canonical pending
546    /// work from durable runtime truth.
547    ///
548    /// Unlike `reset()`, this must not abandon queued/staged work.
549    pub(crate) async fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
550        let checkpoint = self.inner.rollback_snapshot();
551        let transferred = match self.inner.recycle_preserving_work() {
552            Ok(transferred) => transferred,
553            Err(err) => {
554                self.inner.restore_rollback_snapshot(checkpoint);
555                return Err(err);
556            }
557        };
558        let (checkpoint, input_states, commit) =
559            self.lifecycle_persistence_payload_with_rollback(checkpoint, "recycle")?;
560        if let Err(err) = self
561            .store
562            .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
563            .await
564        {
565            self.inner.restore_rollback_snapshot(checkpoint);
566            return Err(RuntimeDriverError::Internal(format!(
567                "recycle persist failed: {err}"
568            )));
569        }
570
571        self.inner.sync_control_projection_from_dsl_authority();
572        Ok(transferred)
573    }
574
575    pub(crate) async fn realize_retire_lifecycle(
576        &mut self,
577    ) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
578        let checkpoint = self.inner.rollback_snapshot();
579        let report = self.inner.finalize_retire();
580        // Restore the checkpoint on classification failure: an early `?` here
581        // would leave the finalized retire state live without rollback.
582        let target_state = match self.runtime_state_for_persistence() {
583            Ok(target_state) => target_state,
584            Err(err) => {
585                self.inner.restore_rollback_snapshot(checkpoint);
586                return Err(err);
587            }
588        };
589        self.commit_lifecycle_with_rollback(checkpoint, target_state, "retire")
590            .await?;
591        self.inner.sync_control_projection_from_dsl_authority();
592        Ok(report)
593    }
594
595    pub(crate) async fn realize_reset_lifecycle(
596        &mut self,
597    ) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
598        let checkpoint = self.inner.rollback_snapshot();
599        let report = match self.inner.reset_cleanup() {
600            Ok(report) => report,
601            Err(err) => {
602                self.inner.restore_rollback_snapshot(checkpoint);
603                return Err(err);
604            }
605        };
606        // Restore the checkpoint on classification failure: an early `?` here
607        // would leave the reset-cleaned state live without rollback.
608        let target_state = match self.runtime_state_for_persistence() {
609            Ok(target_state) => target_state,
610            Err(err) => {
611                self.inner.restore_rollback_snapshot(checkpoint);
612                return Err(err);
613            }
614        };
615        self.commit_lifecycle_with_rollback(checkpoint, target_state, "reset")
616            .await?;
617        self.inner.sync_control_projection_from_dsl_authority();
618        Ok(report)
619    }
620
621    pub(crate) fn prepare_destroy_lifecycle(
622        &mut self,
623    ) -> Result<(EphemeralDriverRollbackSnapshot, DestroyReport), RuntimeDriverError> {
624        let checkpoint = self.inner.rollback_snapshot();
625        let abandoned = match self.inner.destroy_cleanup() {
626            Ok(abandoned) => abandoned,
627            Err(err) => {
628                self.inner.restore_rollback_snapshot(checkpoint);
629                return Err(err);
630            }
631        };
632        Ok((
633            checkpoint,
634            DestroyReport {
635                inputs_abandoned: abandoned,
636            },
637        ))
638    }
639
640    pub(crate) async fn commit_prepared_destroy_lifecycle(
641        &mut self,
642        checkpoint: EphemeralDriverRollbackSnapshot,
643    ) -> Result<(), RuntimeDriverError> {
644        // Resolve the durable target BEFORE handing the checkpoint to the
645        // commit helper: an early `?` here would otherwise leave the staged
646        // destroy state live without restoring the checkpoint (driver-side
647        // shadow truth with no rollback).
648        let target_state = match self.runtime_state_for_persistence() {
649            Ok(target_state) => target_state,
650            Err(err) => {
651                self.inner.restore_rollback_snapshot(checkpoint);
652                return Err(err);
653            }
654        };
655        self.commit_lifecycle_with_rollback(checkpoint, target_state, "destroy")
656            .await
657    }
658
659    pub(crate) fn rollback_prepared_destroy_lifecycle(
660        &mut self,
661        checkpoint: EphemeralDriverRollbackSnapshot,
662    ) {
663        self.inner.restore_rollback_snapshot(checkpoint);
664    }
665
666    pub(crate) async fn finalize_runtime_executor_exit(
667        &mut self,
668    ) -> Result<(), RuntimeDriverError> {
669        let checkpoint = self.inner.rollback_snapshot();
670        if let Err(err) = self.inner.apply_runtime_executor_exited_authority() {
671            self.inner.restore_rollback_snapshot(checkpoint);
672            return Err(err);
673        }
674        if let Err(err) = self.inner.stop_runtime_cleanup() {
675            self.inner.restore_rollback_snapshot(checkpoint);
676            return Err(err);
677        }
678        // Resolve the durable target BEFORE handing the checkpoint to the
679        // commit helper, so a classification failure restores the staged
680        // executor-exit state instead of leaving it live without rollback.
681        let target_state = match self.runtime_state_for_persistence() {
682            Ok(target_state) => target_state,
683            Err(err) => {
684                self.inner.restore_rollback_snapshot(checkpoint);
685                return Err(err);
686            }
687        };
688        self.commit_lifecycle_with_rollback(checkpoint, target_state, "stop")
689            .await?;
690        self.inner.sync_control_projection_from_dsl_authority();
691        Ok(())
692    }
693
694    pub(crate) fn machine_realize_boundary_applied_in_memory(
695        &mut self,
696        run_id: &RunId,
697        receipt: &RunBoundaryReceipt,
698    ) -> Result<(), RuntimeDriverError> {
699        self.inner.machine_realize_boundary_applied(run_id, receipt)
700    }
701
702    pub(crate) fn machine_realize_run_completed_in_memory(
703        &mut self,
704        run_id: &RunId,
705        consumed_input_ids: &[InputId],
706    ) -> Result<(), RuntimeDriverError> {
707        self.inner
708            .machine_realize_run_completed(run_id, consumed_input_ids)
709    }
710
711    pub(crate) async fn machine_realize_live_boundary_context_injected(
712        &mut self,
713        run_id: &RunId,
714        input_ids: &[InputId],
715        session_snapshot: Option<Vec<u8>>,
716    ) -> Result<(), RuntimeDriverError> {
717        let checkpoint = self.inner.rollback_snapshot();
718        let receipt = match self
719            .inner
720            .machine_realize_live_boundary_context_injected(run_id, input_ids)
721        {
722            Ok(receipt) => receipt,
723            Err(err) => {
724                self.inner.restore_rollback_snapshot(checkpoint);
725                return Err(err);
726            }
727        };
728        let input_updates = match self.inner.authorized_stored_input_states_snapshot() {
729            Ok(input_updates) => input_updates,
730            Err(err) => {
731                self.inner.restore_rollback_snapshot(checkpoint);
732                return Err(err);
733            }
734        };
735        if let Err(err) = self
736            .store
737            .atomic_apply(
738                &self.runtime_id,
739                session_snapshot
740                    .as_ref()
741                    .map(|session_snapshot| crate::store::SessionDelta {
742                        session_snapshot: session_snapshot.clone(),
743                    }),
744                receipt.clone(),
745                input_updates,
746                session_snapshot
747                    .as_deref()
748                    .and_then(|snapshot| {
749                        serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
750                    })
751                    .map(|session| session.id().clone()),
752            )
753            .await
754        {
755            self.inner.restore_rollback_snapshot(checkpoint);
756            return Err(RuntimeDriverError::Internal(format!(
757                "runtime live-boundary context commit failed: {err}"
758            )));
759        }
760        Ok(())
761    }
762
763    pub(crate) async fn machine_commit_completed_boundary_snapshot(
764        &mut self,
765        receipt: &RunBoundaryReceipt,
766        session_snapshot: Option<&Vec<u8>>,
767    ) -> Result<(), RuntimeDriverError> {
768        let input_updates = self.inner.authorized_stored_input_states_snapshot()?;
769        self.store
770            .atomic_apply(
771                &self.runtime_id,
772                session_snapshot.map(|session_snapshot| crate::store::SessionDelta {
773                    session_snapshot: session_snapshot.clone(),
774                }),
775                receipt.clone(),
776                input_updates,
777                session_snapshot
778                    .and_then(|snapshot| {
779                        serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
780                    })
781                    .map(|session| session.id().clone()),
782            )
783            .await
784            .map_err(|e| {
785                RuntimeDriverError::Internal(format!(
786                    "runtime completed-boundary commit failed: {e}"
787                ))
788            })
789    }
790
791    pub(crate) async fn machine_realize_run_failed(
792        &mut self,
793        run_id: &RunId,
794        contributing_input_ids: &[InputId],
795        replay_plan: &super::ephemeral::ReplayQueuedContributorsPlan,
796        terminal_error: &str,
797        runtime_apply_failure: Option<&meerkat_core::lifecycle::CoreApplyFailureCause>,
798        recoverable: bool,
799    ) -> Result<(), RuntimeDriverError> {
800        let checkpoint = self.inner.rollback_snapshot();
801        if let Err(err) =
802            self.inner
803                .machine_realize_run_failed(run_id, contributing_input_ids, replay_plan)
804        {
805            self.inner.restore_rollback_snapshot(checkpoint);
806            return Err(err);
807        }
808        let failure_cause = runtime_apply_failure.map(|failure| failure.kind);
809        tracing::debug!(
810            run_id = ?run_id,
811            recoverable,
812            error = terminal_error,
813            failure_cause = ?failure_cause,
814            "persistent driver realized machine-owned failed-run replay"
815        );
816        let (checkpoint, input_states, commit) = self
817            .lifecycle_persistence_payload_with_rollback(checkpoint, "failed-run terminal event")?;
818        if let Err(err) = self
819            .store
820            .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
821            .await
822        {
823            self.inner.restore_rollback_snapshot(checkpoint);
824            return Err(RuntimeDriverError::Internal(format!(
825                "terminal event persist failed: {err}"
826            )));
827        }
828        Ok(())
829    }
830
831    pub(crate) async fn machine_realize_run_cancelled(
832        &mut self,
833        run_id: &RunId,
834        contributing_input_ids: &[InputId],
835    ) -> Result<(), RuntimeDriverError> {
836        let checkpoint = self.inner.rollback_snapshot();
837        if let Err(err) = self
838            .inner
839            .machine_realize_run_cancelled(run_id, contributing_input_ids)
840        {
841            self.inner.restore_rollback_snapshot(checkpoint);
842            return Err(err);
843        }
844        tracing::debug!(
845            run_id = ?run_id,
846            contributors = contributing_input_ids.len(),
847            "persistent driver realized machine-owned cancelled run"
848        );
849        let (checkpoint, input_states, commit) = self.lifecycle_persistence_payload_with_rollback(
850            checkpoint,
851            "cancelled-run terminal event",
852        )?;
853        if let Err(err) = self
854            .store
855            .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
856            .await
857        {
858            self.inner.restore_rollback_snapshot(checkpoint);
859            return Err(RuntimeDriverError::Internal(format!(
860                "terminal cancellation persist failed: {err}"
861            )));
862        }
863        Ok(())
864    }
865}
866
867#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
868#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
869impl RuntimeDriver for PersistentRuntimeDriver {
870    async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
871        let resolved = self.resolve_admission(&input)?;
872        self.accept_resolved_input(input, resolved).await
873    }
874
875    async fn on_runtime_event(
876        &mut self,
877        event: RuntimeEventEnvelope,
878    ) -> Result<(), RuntimeDriverError> {
879        self.inner.on_runtime_event(event).await
880    }
881
882    async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
883        let mut staged = self.inner.clone_with_isolated_dsl_authority();
884        let report = crate::meerkat_machine::machine_recover_persistent_driver(
885            self.store.as_ref(),
886            &self.runtime_id,
887            &mut staged,
888        )
889        .await?;
890
891        let input_states = staged.authorized_stored_input_states_snapshot()?;
892        let commit = Self::lifecycle_commit_for_persistence_from_inner(&staged)?;
893        self.store
894            .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
895            .await
896            .map_err(|err| {
897                RuntimeDriverError::Internal(format!("recovery persist failed: {err}"))
898            })?;
899        let _ = crate::meerkat_machine::machine_recover_persistent_driver(
900            self.store.as_ref(),
901            &self.runtime_id,
902            &mut self.inner,
903        )
904        .await?;
905        Ok(report)
906    }
907
908    fn runtime_state(&self) -> RuntimeState {
909        self.inner.runtime_state()
910    }
911
912    fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
913        self.inner.input_state(input_id)
914    }
915
916    fn input_phase(&self, input_id: &InputId) -> Option<InputLifecycleState> {
917        self.inner.input_phase(input_id)
918    }
919
920    fn input_last_run_id(&self, input_id: &InputId) -> Option<RunId> {
921        self.inner.input_last_run_id(input_id)
922    }
923
924    fn input_last_boundary_sequence(&self, input_id: &InputId) -> Option<u64> {
925        self.inner.input_last_boundary_sequence(input_id)
926    }
927
928    fn stored_input_state(&self, input_id: &InputId) -> Option<StoredInputState> {
929        self.inner.stored_input_state(input_id)
930    }
931
932    fn active_input_ids(&self) -> Vec<InputId> {
933        self.inner.active_input_ids()
934    }
935}
936
937#[cfg(test)]
938#[allow(clippy::unwrap_used, clippy::expect_used)]
939mod tests {
940    use super::*;
941    use chrono::Utc;
942    use meerkat_core::lifecycle::InputId;
943
944    fn make_prompt(text: &str) -> Input {
945        Input::Prompt(crate::input::PromptInput {
946            header: crate::input::InputHeader {
947                id: InputId::new(),
948                timestamp: Utc::now(),
949                source: crate::input::InputOrigin::Operator,
950                durability: crate::input::InputDurability::Durable,
951                visibility: crate::input::InputVisibility::default(),
952                idempotency_key: None,
953                supersession_key: None,
954                correlation_id: None,
955            },
956            content: text.into(),
957            typed_turn_appends: Vec::new(),
958            turn_metadata: None,
959        })
960    }
961
962    /// Dogma K11 (Persistent destroy / driver-side shadow truth): every
963    /// fallible step of `commit_lifecycle_with_rollback` AFTER the caller has
964    /// staged a DSL lifecycle transition must restore the caller's checkpoint.
965    /// The input-state snapshot read used to escape with a bare `?`, leaving
966    /// the staged lifecycle live in driver state while reporting failure.
967    #[tokio::test]
968    async fn commit_lifecycle_snapshot_failure_restores_checkpoint() {
969        let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
970        let blob_store: Arc<dyn BlobStore> = Arc::new(meerkat_store::MemoryBlobStore::new());
971        let rid = LogicalRuntimeId::new("commit-lifecycle-rollback-contract");
972        let mut driver = PersistentRuntimeDriver::new(rid, store, blob_store);
973
974        // Checkpoint BEFORE any state mutation (the caller's pre-stage view).
975        let checkpoint = driver.rollback_snapshot();
976
977        // Mutate driver state past the checkpoint (stands in for a staged
978        // Destroy/lifecycle transition awaiting durable commit).
979        let input = make_prompt("staged work");
980        let input_id = input.id().clone();
981        let outcome = driver.accept_input(input).await.unwrap();
982        assert!(outcome.is_accepted());
983        assert!(driver.input_phase(&input_id).is_some());
984
985        // Inject a failure into the input-state snapshot step.
986        driver.force_input_snapshot_failure_for_test = true;
987        let target_state = driver.inner_ref().runtime_state();
988        let result = driver
989            .commit_lifecycle_with_rollback(checkpoint, target_state, "test destroy")
990            .await;
991
992        // The failure must propagate typed AND the staged driver state must be
993        // rolled back to the checkpoint — no half-destroyed shadow truth.
994        assert!(result.is_err(), "forced snapshot failure must propagate");
995        assert!(
996            driver.input_phase(&input_id).is_none(),
997            "staged driver state must be restored to the pre-stage checkpoint"
998        );
999        assert!(driver.active_input_ids().is_empty());
1000    }
1001
1002    /// Same K11 checkpoint-restore contract for `abandon_pending_inputs`: the
1003    /// input-state snapshot / lifecycle-commit classification steps between
1004    /// the staged `&mut` abandon and the durable commit used to escape with a
1005    /// bare `?`, leaving the abandon applied in memory while reporting
1006    /// failure (and never persisting it).
1007    #[tokio::test]
1008    async fn abandon_pending_inputs_snapshot_failure_restores_checkpoint() {
1009        let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
1010        let blob_store: Arc<dyn BlobStore> = Arc::new(meerkat_store::MemoryBlobStore::new());
1011        let rid = LogicalRuntimeId::new("abandon-rollback-contract");
1012        let mut driver = PersistentRuntimeDriver::new(rid, store, blob_store);
1013
1014        // Accept a pending input so the abandon has staged work to mutate.
1015        let input = make_prompt("pending work");
1016        let input_id = input.id().clone();
1017        let outcome = driver.accept_input(input).await.unwrap();
1018        assert!(outcome.is_accepted());
1019        assert!(driver.input_phase(&input_id).is_some());
1020
1021        // Inject a failure into the input-state snapshot step that runs after
1022        // the staged abandon mutation.
1023        driver.force_input_snapshot_failure_for_test = true;
1024        let result = driver
1025            .abandon_pending_inputs(InputAbandonReason::Reset)
1026            .await;
1027
1028        assert!(result.is_err(), "forced snapshot failure must propagate");
1029        assert!(
1030            driver.input_phase(&input_id).is_some(),
1031            "staged abandon must be rolled back: the pending input must still be live"
1032        );
1033    }
1034}