Skip to main content

meerkat_runtime/driver/
persistent.rs

1//! PersistentRuntimeDriver — wraps EphemeralRuntimeDriver + RuntimeStore.
2//!
3//! Provides durable-before-ack guarantee: InputState is persisted via
4//! RuntimeStore BEFORE returning AcceptOutcome. Delegates state machine
5//! logic to the ephemeral driver.
6
7use std::sync::Arc;
8use std::sync::RwLock as StdRwLock;
9
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12
13use crate::accept::AcceptOutcome;
14use crate::identifiers::LogicalRuntimeId;
15use crate::input::{Input, externalize_input_images};
16use crate::input_state::{InputAbandonReason, InputLifecycleState, InputState, StoredInputState};
17use crate::runtime_event::RuntimeEventEnvelope;
18use crate::runtime_state::RuntimeState;
19use crate::store::{MachineLifecycleCommit, RuntimeStore};
20use crate::traits::{DestroyReport, RecoveryReport, RuntimeDriver, RuntimeDriverError};
21
22use super::ephemeral::{
23    EphemeralDriverRollbackSnapshot, EphemeralRuntimeDriver, SharedIngressDslAuthority,
24};
25
26/// Persistent runtime driver — durable InputState via RuntimeStore.
27pub struct PersistentRuntimeDriver {
28    /// Underlying ephemeral driver for state machine logic.
29    inner: EphemeralRuntimeDriver,
30    /// Durable store for InputState + receipts.
31    store: Arc<dyn RuntimeStore>,
32    /// Blob store used to externalize durable input payloads.
33    blob_store: Arc<dyn BlobStore>,
34    /// Runtime ID for store operations.
35    runtime_id: LogicalRuntimeId,
36}
37
38impl PersistentRuntimeDriver {
39    /// Create a new persistent runtime driver.
40    pub fn new(
41        runtime_id: LogicalRuntimeId,
42        store: Arc<dyn RuntimeStore>,
43        blob_store: Arc<dyn BlobStore>,
44    ) -> Self {
45        Self::new_with_control(
46            runtime_id,
47            store,
48            blob_store,
49            Arc::new(StdRwLock::new(
50                crate::driver::ephemeral::RuntimeControlProjection::default(),
51            )),
52            crate::driver::ephemeral::new_ingress_dsl_authority(),
53        )
54    }
55
56    pub(crate) fn new_with_control(
57        runtime_id: LogicalRuntimeId,
58        store: Arc<dyn RuntimeStore>,
59        blob_store: Arc<dyn BlobStore>,
60        control: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
61        dsl: SharedIngressDslAuthority,
62    ) -> Self {
63        Self {
64            inner: EphemeralRuntimeDriver::new_with_control_and_dsl(
65                runtime_id.clone(),
66                control,
67                dsl,
68            ),
69            store,
70            blob_store,
71            runtime_id,
72        }
73    }
74
75    /// Get immutable reference to the inner ephemeral driver.
76    pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
77        &self.inner
78    }
79
80    pub(crate) fn rollback_snapshot(&self) -> EphemeralDriverRollbackSnapshot {
81        self.inner.rollback_snapshot()
82    }
83
84    pub(crate) fn restore_rollback_snapshot(&mut self, snapshot: EphemeralDriverRollbackSnapshot) {
85        self.inner.restore_rollback_snapshot(snapshot);
86    }
87
88    /// Get the logical runtime ID for this driver.
89    pub fn runtime_id(&self) -> &LogicalRuntimeId {
90        &self.runtime_id
91    }
92
93    /// Set the list of comms intents that should be silently accepted (delegates to inner).
94    pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
95        self.inner.set_silent_comms_intents(intents);
96    }
97
98    pub fn silent_comms_intents(&self) -> Vec<String> {
99        self.inner.silent_comms_intents()
100    }
101
102    /// Check if the runtime is idle (delegates to inner).
103    pub fn is_idle(&self) -> bool {
104        self.inner.is_idle()
105    }
106
107    /// Check if the runtime is idle or attached (delegates to inner).
108    pub fn is_idle_or_attached(&self) -> bool {
109        self.inner.is_idle_or_attached()
110    }
111
112    /// Map runtime state for persistence.
113    ///
114    /// Attached must never be persisted — on recovery, the executor is
115    /// re-attached by the surface. Map Attached to Idle for store operations.
116    fn runtime_state_for_persistence(&self) -> RuntimeState {
117        match self.inner.runtime_state() {
118            RuntimeState::Attached => RuntimeState::Idle,
119            other => other,
120        }
121    }
122
123    async fn commit_lifecycle_with_rollback(
124        &mut self,
125        checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
126        target_state: RuntimeState,
127        context: &str,
128    ) -> Result<(), RuntimeDriverError> {
129        let input_states = self.inner.stored_input_states_snapshot();
130        if let Err(err) = self
131            .store
132            .commit_machine_lifecycle(
133                &self.runtime_id,
134                MachineLifecycleCommit::new(target_state),
135                &input_states,
136            )
137            .await
138        {
139            self.inner.restore_rollback_snapshot(checkpoint);
140            return Err(RuntimeDriverError::Internal(format!(
141                "{context} persist failed: {err}"
142            )));
143        }
144        Ok(())
145    }
146
147    pub(crate) async fn publish_service_turn_terminal_lifecycle(
148        &mut self,
149        checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
150        target_state: RuntimeState,
151    ) -> Result<(), RuntimeDriverError> {
152        self.commit_lifecycle_with_rollback(
153            checkpoint,
154            target_state,
155            "service turn terminal receipt",
156        )
157        .await?;
158        self.inner.set_control_projection(target_state, None, None);
159        Ok(())
160    }
161
162    pub(crate) fn set_control_projection(
163        &mut self,
164        next_phase: RuntimeState,
165        current_run_id: Option<RunId>,
166        pre_run_phase: Option<RuntimeState>,
167    ) {
168        self.inner
169            .set_control_projection(next_phase, current_run_id, pre_run_phase);
170    }
171
172    /// Low-level control projection shim for external contract tests.
173    ///
174    /// This does not decide lifecycle legality; it only applies an already
175    /// chosen MeerkatMachine control projection to the concrete driver shell.
176    pub(crate) fn sync_control_projection_from_dsl_authority(&mut self) {
177        self.inner.sync_control_projection_from_dsl_authority();
178    }
179
180    /// Contract helper for external tests that need to start a run through the
181    /// same DSL authority used by the runtime loop.
182    #[doc(hidden)]
183    pub fn contract_begin_run_authority(
184        &mut self,
185        run_id: RunId,
186    ) -> Result<(), RuntimeDriverError> {
187        self.inner.contract_begin_run_authority(run_id)
188    }
189
190    /// Test-only authority override for crate-unit tests that need to seed
191    /// impossible or already-realized runtime phases.
192    #[cfg(test)]
193    #[doc(hidden)]
194    pub(crate) fn contract_force_runtime_authority(
195        &mut self,
196        next_phase: RuntimeState,
197        current_run_id: Option<RunId>,
198        pre_run_phase: Option<RuntimeState>,
199    ) {
200        self.inner
201            .contract_force_runtime_authority(next_phase, current_run_id, pre_run_phase);
202    }
203
204    /// Get pending events (delegates to inner).
205    pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
206        self.inner.drain_events()
207    }
208
209    /// Drain the typed post-admission signal (delegates to inner).
210    pub fn take_post_admission_signal(&mut self) -> crate::driver::ephemeral::PostAdmissionSignal {
211        self.inner.take_post_admission_signal()
212    }
213
214    /// Inspect the current typed post-admission signal without draining it.
215    pub fn post_admission_signal(&self) -> crate::driver::ephemeral::PostAdmissionSignal {
216        self.inner.post_admission_signal()
217    }
218
219    /// Check and clear wake flag (backward-compat, delegates to inner).
220    pub fn take_wake_requested(&mut self) -> bool {
221        self.inner.take_wake_requested()
222    }
223
224    /// Check and clear immediate processing flag (backward-compat, delegates to inner).
225    pub fn take_process_requested(&mut self) -> bool {
226        self.inner.take_process_requested()
227    }
228
229    /// Dequeue next input (delegates to inner).
230    pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
231        self.inner.dequeue_next()
232    }
233
234    /// Dequeue a specific input by ID (delegates to inner).
235    pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
236        self.inner.dequeue_by_id(input_id)
237    }
238
239    pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
240        self.inner.has_queued_input_outside(excluded)
241    }
242
243    pub(crate) fn defer_queued_inputs_behind_backlog(&mut self, input_ids: &[InputId]) {
244        self.inner.defer_queued_inputs_behind_backlog(input_ids);
245    }
246
247    pub(crate) fn absorb_post_admission_effects(
248        &mut self,
249        effects: &[crate::meerkat_machine::dsl::MeerkatMachineEffect],
250    ) {
251        self.inner.absorb_post_admission_effects(effects);
252    }
253
254    pub(crate) fn resolve_admission_for_runtime_idle(
255        &self,
256        input: &Input,
257        runtime_idle: bool,
258    ) -> crate::accept::ResolvedAdmission {
259        self.inner
260            .resolve_admission_for_runtime_idle(input, runtime_idle)
261    }
262
263    pub(crate) fn resolve_admission(&self, input: &Input) -> crate::accept::ResolvedAdmission {
264        self.inner.resolve_admission(input)
265    }
266
267    pub(crate) async fn accept_resolved_input(
268        &mut self,
269        input: Input,
270        resolved: crate::accept::ResolvedAdmission,
271    ) -> Result<AcceptOutcome, RuntimeDriverError> {
272        let checkpoint = self.inner.rollback_snapshot();
273        let input_for_recovery = input.clone();
274
275        let mut outcome = self.inner.accept_resolved_input(input, resolved).await?;
276
277        if let AcceptOutcome::Accepted {
278            ref input_id,
279            ref mut state,
280            ..
281        } = outcome
282            && let Some(mut bundle) = self.inner.stored_input_state(input_id)
283        {
284            let mut input_for_recovery = input_for_recovery.clone();
285            if let Err(err) =
286                externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery).await
287            {
288                self.inner.restore_rollback_snapshot(checkpoint);
289                return Err(RuntimeDriverError::Internal(format!(
290                    "failed to externalize runtime input images: {err}"
291                )));
292            }
293            bundle.state.persisted_input = Some(input_for_recovery);
294            self.inner.ledger_mut().accept(bundle.state.clone());
295            if let Err(err) = self.persist_state(&bundle).await {
296                self.inner.restore_rollback_snapshot(checkpoint);
297                return Err(err);
298            }
299            *state = bundle.state;
300        }
301
302        Ok(outcome)
303    }
304
305    /// Stage input (delegates to inner).
306    pub fn stage_input(
307        &mut self,
308        input_id: &InputId,
309        run_id: &meerkat_core::lifecycle::RunId,
310    ) -> Result<(), crate::traits::RuntimeDriverError> {
311        self.inner.stage_input(input_id, run_id)
312    }
313
314    /// Stage a batch of inputs atomically (delegates to inner).
315    pub fn stage_batch(
316        &mut self,
317        input_ids: &[InputId],
318        run_id: &meerkat_core::lifecycle::RunId,
319    ) -> Result<(), crate::traits::RuntimeDriverError> {
320        self.inner.stage_batch(input_ids, run_id)
321    }
322
323    pub(crate) fn machine_realize_stage_batch(
324        &mut self,
325        input_ids: &[InputId],
326        run_id: &meerkat_core::lifecycle::RunId,
327    ) -> Result<(), crate::traits::RuntimeDriverError> {
328        self.inner.machine_realize_stage_batch(input_ids, run_id)
329    }
330
331    /// Apply input (delegates to inner).
332    pub fn apply_input(
333        &mut self,
334        input_id: &InputId,
335        run_id: &meerkat_core::lifecycle::RunId,
336    ) -> Result<(), crate::traits::RuntimeDriverError> {
337        self.inner.apply_input(input_id, run_id)
338    }
339
340    /// Roll back staged inputs (delegates to inner).
341    pub fn rollback_staged(
342        &mut self,
343        input_ids: &[InputId],
344    ) -> Result<(), crate::traits::RuntimeDriverError> {
345        self.inner.rollback_staged(input_ids)
346    }
347
348    async fn persist_state(&self, state: &StoredInputState) -> Result<(), RuntimeDriverError> {
349        self.store
350            .persist_input_state(&self.runtime_id, state)
351            .await
352            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
353    }
354
355    pub(crate) async fn abandon_pending_inputs(
356        &mut self,
357        reason: InputAbandonReason,
358    ) -> Result<usize, RuntimeDriverError> {
359        let checkpoint = self.inner.rollback_snapshot();
360        let abandoned = self.inner.abandon_pending_inputs(reason);
361        let input_states = self.inner.stored_input_states_snapshot();
362        if let Err(err) = self
363            .store
364            .commit_machine_lifecycle(
365                &self.runtime_id,
366                MachineLifecycleCommit::new(self.runtime_state_for_persistence()),
367                &input_states,
368            )
369            .await
370        {
371            self.inner.restore_rollback_snapshot(checkpoint);
372            return Err(RuntimeDriverError::Internal(format!(
373                "pending input abandon persist failed: {err}"
374            )));
375        }
376        Ok(abandoned)
377    }
378
379    /// Recycle the in-memory driver shell while preserving canonical pending
380    /// work from durable runtime truth.
381    ///
382    /// Unlike `reset()`, this must not abandon queued/staged work.
383    pub(crate) async fn recycle_preserving_work(
384        &mut self,
385        target_phase: RuntimeState,
386    ) -> Result<usize, RuntimeDriverError> {
387        let checkpoint = self.inner.rollback_snapshot();
388        let transferred = match self.inner.recycle_preserving_work() {
389            Ok(transferred) => transferred,
390            Err(err) => {
391                self.inner.restore_rollback_snapshot(checkpoint);
392                return Err(err);
393            }
394        };
395        let input_states = self.inner.stored_input_states_snapshot();
396        if let Err(err) = self
397            .store
398            .commit_machine_lifecycle(
399                &self.runtime_id,
400                MachineLifecycleCommit::new(target_phase),
401                &input_states,
402            )
403            .await
404        {
405            self.inner.restore_rollback_snapshot(checkpoint);
406            return Err(RuntimeDriverError::Internal(format!(
407                "recycle persist failed: {err}"
408            )));
409        }
410
411        self.inner.set_control_projection(target_phase, None, None);
412        Ok(transferred)
413    }
414
415    pub(crate) async fn realize_retire_lifecycle(
416        &mut self,
417    ) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
418        let checkpoint = self.inner.rollback_snapshot();
419        let report = self.inner.finalize_retire();
420        self.commit_lifecycle_with_rollback(checkpoint, RuntimeState::Retired, "retire")
421            .await?;
422        self.inner
423            .set_control_projection(RuntimeState::Retired, None, None);
424        Ok(report)
425    }
426
427    pub(crate) async fn realize_reset_lifecycle(
428        &mut self,
429    ) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
430        let checkpoint = self.inner.rollback_snapshot();
431        let report = self.inner.reset_cleanup();
432        self.commit_lifecycle_with_rollback(checkpoint, RuntimeState::Idle, "reset")
433            .await?;
434        self.inner
435            .set_control_projection(RuntimeState::Idle, None, None);
436        Ok(report)
437    }
438
439    pub(crate) fn prepare_destroy_lifecycle(
440        &mut self,
441    ) -> (EphemeralDriverRollbackSnapshot, DestroyReport) {
442        let checkpoint = self.inner.rollback_snapshot();
443        let abandoned = self.inner.destroy_cleanup();
444        (
445            checkpoint,
446            DestroyReport {
447                inputs_abandoned: abandoned,
448            },
449        )
450    }
451
452    pub(crate) async fn commit_prepared_destroy_lifecycle(
453        &mut self,
454        checkpoint: EphemeralDriverRollbackSnapshot,
455    ) -> Result<(), RuntimeDriverError> {
456        self.commit_lifecycle_with_rollback(
457            checkpoint,
458            self.runtime_state_for_persistence(),
459            "destroy",
460        )
461        .await
462    }
463
464    pub(crate) fn rollback_prepared_destroy_lifecycle(
465        &mut self,
466        checkpoint: EphemeralDriverRollbackSnapshot,
467    ) {
468        self.inner.restore_rollback_snapshot(checkpoint);
469    }
470
471    pub(crate) async fn finalize_runtime_executor_exit(
472        &mut self,
473    ) -> Result<(), RuntimeDriverError> {
474        let checkpoint = self.inner.rollback_snapshot();
475        if let Err(err) = self.inner.apply_runtime_executor_exited_authority() {
476            self.inner.restore_rollback_snapshot(checkpoint);
477            return Err(err);
478        }
479        self.inner.stop_runtime_cleanup();
480        self.commit_lifecycle_with_rollback(checkpoint, RuntimeState::Stopped, "stop")
481            .await?;
482        self.inner.sync_control_projection_from_dsl_authority();
483        Ok(())
484    }
485
486    pub(crate) fn machine_realize_boundary_applied_in_memory(
487        &mut self,
488        run_id: &RunId,
489        receipt: &RunBoundaryReceipt,
490    ) -> Result<(), RuntimeDriverError> {
491        self.inner.machine_realize_boundary_applied(run_id, receipt)
492    }
493
494    pub(crate) fn machine_realize_run_completed_in_memory(
495        &mut self,
496        run_id: &RunId,
497        consumed_input_ids: &[InputId],
498    ) -> Result<(), RuntimeDriverError> {
499        self.inner
500            .machine_realize_run_completed(run_id, consumed_input_ids)
501    }
502
503    pub(crate) fn next_live_boundary_context_sequence(&self, run_id: &RunId) -> u64 {
504        self.inner.next_live_boundary_context_sequence(run_id)
505    }
506
507    pub(crate) async fn machine_realize_live_boundary_context_injected(
508        &mut self,
509        run_id: &RunId,
510        input_ids: &[InputId],
511        receipt: &RunBoundaryReceipt,
512        session_snapshot: Option<Vec<u8>>,
513    ) -> Result<(), RuntimeDriverError> {
514        let checkpoint = self.inner.rollback_snapshot();
515        if let Err(err) = self
516            .inner
517            .machine_realize_live_boundary_context_injected(run_id, input_ids, receipt)
518        {
519            self.inner.restore_rollback_snapshot(checkpoint);
520            return Err(err);
521        }
522        let input_updates = self.inner.stored_input_states_snapshot();
523        if let Err(err) = self
524            .store
525            .atomic_apply(
526                &self.runtime_id,
527                session_snapshot
528                    .as_ref()
529                    .map(|session_snapshot| crate::store::SessionDelta {
530                        session_snapshot: session_snapshot.clone(),
531                    }),
532                receipt.clone(),
533                input_updates,
534                session_snapshot
535                    .as_deref()
536                    .and_then(|snapshot| {
537                        serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
538                    })
539                    .map(|session| session.id().clone()),
540            )
541            .await
542        {
543            self.inner.restore_rollback_snapshot(checkpoint);
544            return Err(RuntimeDriverError::Internal(format!(
545                "runtime live-boundary context commit failed: {err}"
546            )));
547        }
548        Ok(())
549    }
550
551    pub(crate) async fn machine_commit_completed_boundary_snapshot(
552        &mut self,
553        receipt: &RunBoundaryReceipt,
554        session_snapshot: Option<&Vec<u8>>,
555    ) -> Result<(), RuntimeDriverError> {
556        let input_updates = self.inner.stored_input_states_snapshot();
557        self.store
558            .atomic_apply(
559                &self.runtime_id,
560                session_snapshot.map(|session_snapshot| crate::store::SessionDelta {
561                    session_snapshot: session_snapshot.clone(),
562                }),
563                receipt.clone(),
564                input_updates,
565                session_snapshot
566                    .and_then(|snapshot| {
567                        serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
568                    })
569                    .map(|session| session.id().clone()),
570            )
571            .await
572            .map_err(|e| {
573                RuntimeDriverError::Internal(format!(
574                    "runtime completed-boundary commit failed: {e}"
575                ))
576            })
577    }
578
579    pub(crate) async fn machine_realize_run_failed(
580        &mut self,
581        run_id: &RunId,
582        contributing_input_ids: &[InputId],
583        replay_plan: &super::ephemeral::ReplayQueuedContributorsPlan,
584        terminal_error: &str,
585        runtime_apply_failure: Option<&meerkat_core::lifecycle::CoreApplyFailureCause>,
586        recoverable: bool,
587    ) -> Result<(), RuntimeDriverError> {
588        let checkpoint = self.inner.rollback_snapshot();
589        self.inner
590            .machine_realize_run_failed(run_id, contributing_input_ids, replay_plan)?;
591        let failure_cause = runtime_apply_failure.map(|failure| failure.kind);
592        tracing::debug!(
593            run_id = ?run_id,
594            recoverable,
595            error = terminal_error,
596            failure_cause = ?failure_cause,
597            "persistent driver realized machine-owned failed-run replay"
598        );
599        let input_states = self.inner.stored_input_states_snapshot();
600        if let Err(err) = self
601            .store
602            .commit_machine_lifecycle(
603                &self.runtime_id,
604                MachineLifecycleCommit::new(self.runtime_state_for_persistence()),
605                &input_states,
606            )
607            .await
608        {
609            self.inner.restore_rollback_snapshot(checkpoint);
610            return Err(RuntimeDriverError::Internal(format!(
611                "terminal event persist failed: {err}"
612            )));
613        }
614        Ok(())
615    }
616
617    pub(crate) async fn machine_realize_run_cancelled(
618        &mut self,
619        run_id: &RunId,
620        contributing_input_ids: &[InputId],
621    ) -> Result<(), RuntimeDriverError> {
622        let checkpoint = self.inner.rollback_snapshot();
623        self.inner
624            .machine_realize_run_cancelled(run_id, contributing_input_ids)?;
625        tracing::debug!(
626            run_id = ?run_id,
627            contributors = contributing_input_ids.len(),
628            "persistent driver realized machine-owned cancelled run"
629        );
630        let input_states = self.inner.stored_input_states_snapshot();
631        if let Err(err) = self
632            .store
633            .commit_machine_lifecycle(
634                &self.runtime_id,
635                MachineLifecycleCommit::new(self.runtime_state_for_persistence()),
636                &input_states,
637            )
638            .await
639        {
640            self.inner.restore_rollback_snapshot(checkpoint);
641            return Err(RuntimeDriverError::Internal(format!(
642                "terminal cancellation persist failed: {err}"
643            )));
644        }
645        Ok(())
646    }
647}
648
649#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
650#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
651impl RuntimeDriver for PersistentRuntimeDriver {
652    async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
653        let resolved = self.resolve_admission(&input);
654        self.accept_resolved_input(input, resolved).await
655    }
656
657    async fn on_runtime_event(
658        &mut self,
659        event: RuntimeEventEnvelope,
660    ) -> Result<(), RuntimeDriverError> {
661        self.inner.on_runtime_event(event).await
662    }
663
664    async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
665        let checkpoint = self.inner.rollback_snapshot();
666        let report = match crate::meerkat_machine::machine_recover_persistent_driver(
667            self.store.as_ref(),
668            &self.runtime_id,
669            &mut self.inner,
670        )
671        .await
672        {
673            Ok(report) => report,
674            Err(err) => {
675                self.inner.restore_rollback_snapshot(checkpoint.clone());
676                return Err(err);
677            }
678        };
679
680        // Persist recovered state atomically
681        let runtime_state_for_persistence = self.runtime_state_for_persistence();
682        self.commit_lifecycle_with_rollback(checkpoint, runtime_state_for_persistence, "recovery")
683            .await?;
684        Ok(report)
685    }
686
687    fn runtime_state(&self) -> RuntimeState {
688        self.inner.runtime_state()
689    }
690
691    fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
692        self.inner.input_state(input_id)
693    }
694
695    fn input_phase(&self, input_id: &InputId) -> Option<InputLifecycleState> {
696        self.inner.input_phase(input_id)
697    }
698
699    fn input_last_run_id(&self, input_id: &InputId) -> Option<RunId> {
700        self.inner.input_last_run_id(input_id)
701    }
702
703    fn input_last_boundary_sequence(&self, input_id: &InputId) -> Option<u64> {
704        self.inner.input_last_boundary_sequence(input_id)
705    }
706
707    fn stored_input_state(&self, input_id: &InputId) -> Option<StoredInputState> {
708        self.inner.stored_input_state(input_id)
709    }
710
711    fn active_input_ids(&self) -> Vec<InputId> {
712        self.inner.active_input_ids()
713    }
714}