Skip to main content

meerkat_runtime/driver/
persistent.rs

1//! PersistentRuntimeDriver — wraps EphemeralRuntimeDriver + RuntimeStore.
2//!
3//! Provides durable-before-ack guarantee: InputState is persisted via
4//! RuntimeStore BEFORE returning AcceptOutcome. Delegates state machine
5//! logic to the ephemeral driver.
6
7use std::sync::Arc;
8
9use chrono::Utc;
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunEvent};
12
13use crate::accept::AcceptOutcome;
14use crate::driver::ephemeral::handling_mode_from_policy;
15use crate::identifiers::LogicalRuntimeId;
16use crate::input::{Input, externalize_input_images};
17use crate::input_state::{
18    InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
19    InputTerminalOutcome,
20};
21use crate::runtime_event::RuntimeEventEnvelope;
22use crate::runtime_ingress_authority::ContentShape;
23use crate::runtime_state::RuntimeState;
24use crate::store::RuntimeStore;
25use crate::traits::{
26    DestroyReport, RecoveryReport, RuntimeControlCommand, RuntimeDriver, RuntimeDriverError,
27};
28use meerkat_core::types::HandlingMode;
29
30use super::ephemeral::EphemeralRuntimeDriver;
31
32/// Persistent runtime driver — durable InputState via RuntimeStore.
33pub struct PersistentRuntimeDriver {
34    /// Underlying ephemeral driver for state machine logic.
35    inner: EphemeralRuntimeDriver,
36    /// Durable store for InputState + receipts.
37    store: Arc<dyn RuntimeStore>,
38    /// Blob store used to externalize durable input payloads.
39    blob_store: Arc<dyn BlobStore>,
40    /// Runtime ID for store operations.
41    runtime_id: LogicalRuntimeId,
42}
43
44impl PersistentRuntimeDriver {
45    /// Create a new persistent runtime driver.
46    pub fn new(
47        runtime_id: LogicalRuntimeId,
48        store: Arc<dyn RuntimeStore>,
49        blob_store: Arc<dyn BlobStore>,
50    ) -> Self {
51        Self {
52            inner: EphemeralRuntimeDriver::new(runtime_id.clone()),
53            store,
54            blob_store,
55            runtime_id,
56        }
57    }
58
59    /// Get immutable reference to the inner ephemeral driver.
60    pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
61        &self.inner
62    }
63
64    /// Set the list of comms intents that should be silently accepted (delegates to inner).
65    pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
66        self.inner.set_silent_comms_intents(intents);
67    }
68
69    /// Check if the runtime is idle (delegates to inner).
70    pub fn is_idle(&self) -> bool {
71        self.inner.is_idle()
72    }
73
74    /// Check if the runtime is idle or attached (delegates to inner).
75    pub fn is_idle_or_attached(&self) -> bool {
76        self.inner.is_idle_or_attached()
77    }
78
79    /// Attach an executor (Idle → Attached). Delegates to inner.
80    pub fn attach(&mut self) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
81        self.inner.attach()
82    }
83
84    /// Detach an executor (Attached → Idle). Delegates to inner.
85    pub fn detach(
86        &mut self,
87    ) -> Result<Option<RuntimeState>, crate::runtime_state::RuntimeStateTransitionError> {
88        self.inner.detach()
89    }
90
91    /// Map runtime state for persistence.
92    ///
93    /// Attached must never be persisted — on recovery, the executor is
94    /// re-attached by the surface. Map Attached to Idle for store operations.
95    fn runtime_state_for_persistence(&self) -> RuntimeState {
96        match self.inner.runtime_state() {
97            RuntimeState::Attached => RuntimeState::Idle,
98            other => other,
99        }
100    }
101
102    /// Start a new run (delegates to inner).
103    pub fn start_run(
104        &mut self,
105        run_id: meerkat_core::lifecycle::RunId,
106    ) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
107        self.inner.start_run(run_id)
108    }
109
110    /// Complete a run (delegates to inner).
111    pub fn complete_run(
112        &mut self,
113    ) -> Result<meerkat_core::lifecycle::RunId, crate::runtime_state::RuntimeStateTransitionError>
114    {
115        self.inner.complete_run()
116    }
117
118    /// Get pending events (delegates to inner).
119    pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
120        self.inner.drain_events()
121    }
122
123    /// Drain the typed post-admission signal (delegates to inner).
124    pub fn take_post_admission_signal(&mut self) -> crate::driver::ephemeral::PostAdmissionSignal {
125        self.inner.take_post_admission_signal()
126    }
127
128    /// Check and clear wake flag (backward-compat, delegates to inner).
129    pub fn take_wake_requested(&mut self) -> bool {
130        self.inner.take_wake_requested()
131    }
132
133    /// Check and clear immediate processing flag (backward-compat, delegates to inner).
134    pub fn take_process_requested(&mut self) -> bool {
135        self.inner.take_process_requested()
136    }
137
138    /// Dequeue next input (delegates to inner).
139    pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
140        self.inner.dequeue_next()
141    }
142
143    /// Dequeue a specific input by ID (delegates to inner).
144    pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
145        self.inner.dequeue_by_id(input_id)
146    }
147
148    /// Look up the persisted input for a given ID (delegates to inner).
149    pub fn persisted_input(&self, input_id: &InputId) -> Option<&Input> {
150        self.inner.persisted_input(input_id)
151    }
152
153    pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
154        self.inner.has_queued_input_outside(excluded)
155    }
156
157    /// Stage input (delegates to inner).
158    pub fn stage_input(
159        &mut self,
160        input_id: &InputId,
161        run_id: &meerkat_core::lifecycle::RunId,
162    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
163        self.inner.stage_input(input_id, run_id)
164    }
165
166    /// Stage a batch of inputs atomically (delegates to inner).
167    pub fn stage_batch(
168        &mut self,
169        input_ids: &[InputId],
170        run_id: &meerkat_core::lifecycle::RunId,
171    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
172        self.inner.stage_batch(input_ids, run_id)
173    }
174
175    /// Apply input (delegates to inner).
176    pub fn apply_input(
177        &mut self,
178        input_id: &InputId,
179        run_id: &meerkat_core::lifecycle::RunId,
180    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
181        self.inner.apply_input(input_id, run_id)
182    }
183
184    /// Roll back staged inputs (delegates to inner).
185    pub fn rollback_staged(
186        &mut self,
187        input_ids: &[InputId],
188    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
189        self.inner.rollback_staged(input_ids)
190    }
191
192    /// Consume applied inputs without completing a runtime run.
193    pub fn consume_inputs(
194        &mut self,
195        input_ids: &[InputId],
196        run_id: &meerkat_core::lifecycle::RunId,
197    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
198        self.inner.consume_inputs(input_ids, run_id)
199    }
200
201    async fn persist_state(&self, state: &InputState) -> Result<(), RuntimeDriverError> {
202        self.store
203            .persist_input_state(&self.runtime_id, state)
204            .await
205            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
206    }
207
208    pub async fn abandon_pending_inputs(
209        &mut self,
210        reason: InputAbandonReason,
211    ) -> Result<usize, RuntimeDriverError> {
212        let checkpoint = self.inner.clone();
213        let abandoned = self.inner.abandon_pending_inputs(reason);
214        let input_states = self.inner.input_states_snapshot();
215        if let Err(err) = self
216            .store
217            .atomic_lifecycle_commit(
218                &self.runtime_id,
219                self.runtime_state_for_persistence(),
220                &input_states,
221            )
222            .await
223        {
224            self.inner = checkpoint;
225            return Err(RuntimeDriverError::Internal(format!(
226                "pending input abandon persist failed: {err}"
227            )));
228        }
229        Ok(abandoned)
230    }
231
232    /// Recycle the in-memory driver shell while preserving canonical pending
233    /// work from durable runtime truth.
234    ///
235    /// Unlike `reset()`, this must not abandon queued/staged work.
236    pub async fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
237        let silent_intents = self.inner.silent_comms_intents();
238        self.inner = EphemeralRuntimeDriver::new(self.runtime_id.clone());
239        self.inner.set_silent_comms_intents(silent_intents);
240        let _ = RuntimeDriver::recover(self).await?;
241        Ok(self.inner.active_input_ids().len())
242    }
243}
244
245#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
246#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
247impl RuntimeDriver for PersistentRuntimeDriver {
248    async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
249        let checkpoint = self.inner.clone();
250        let input_for_recovery = input.clone();
251
252        // Delegate to ephemeral for state machine logic
253        let mut outcome = self.inner.accept_input(input).await?;
254
255        // Durable-before-ack: persist InputState before returning
256        if let AcceptOutcome::Accepted {
257            ref input_id,
258            ref mut state,
259            ..
260        } = outcome
261            && let Some(inner_state) = self.inner.input_state(input_id).cloned()
262        {
263            let mut input_for_recovery = input_for_recovery.clone();
264            if let Err(err) =
265                externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery).await
266            {
267                self.inner = checkpoint;
268                return Err(RuntimeDriverError::Internal(format!(
269                    "failed to externalize runtime input images: {err}"
270                )));
271            }
272            let mut persisted = inner_state;
273            persisted.persisted_input = Some(input_for_recovery);
274            self.inner.ledger_mut().accept(persisted.clone());
275            if let Err(err) = self.persist_state(&persisted).await {
276                self.inner = checkpoint;
277                return Err(err);
278            }
279            *state = persisted;
280        }
281
282        Ok(outcome)
283    }
284
285    async fn on_runtime_event(
286        &mut self,
287        event: RuntimeEventEnvelope,
288    ) -> Result<(), RuntimeDriverError> {
289        self.inner.on_runtime_event(event).await
290    }
291
292    async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
293        match event {
294            // BoundaryApplied persists the receipt and the applied state atomically.
295            RunEvent::BoundaryApplied {
296                ref receipt,
297                ref session_snapshot,
298                ..
299            } => {
300                let checkpoint = self.inner.clone();
301                self.inner.on_run_event(event.clone()).await?;
302                if self
303                    .store
304                    .load_boundary_receipt(&self.runtime_id, &receipt.run_id, receipt.sequence)
305                    .await
306                    .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
307                    .is_some()
308                {
309                    return Ok(());
310                }
311                let input_updates: Vec<InputState> = receipt
312                    .contributing_input_ids
313                    .iter()
314                    .filter_map(|id| self.inner.input_state(id).cloned())
315                    .collect();
316
317                self.store
318                    .atomic_apply(
319                        &self.runtime_id,
320                        session_snapshot.clone().map(|session_snapshot| {
321                            crate::store::SessionDelta { session_snapshot }
322                        }),
323                        receipt.clone(),
324                        input_updates,
325                        None, // session_store_key — caller provides if dual-store needed
326                    )
327                    .await
328                    .map_err(|e| {
329                        self.inner = checkpoint;
330                        RuntimeDriverError::Internal(format!("runtime boundary commit failed: {e}"))
331                    })?;
332            }
333            RunEvent::RunCompleted { .. }
334            | RunEvent::RunFailed { .. }
335            | RunEvent::RunCancelled { .. } => {
336                let checkpoint = self.inner.clone();
337                self.inner.on_run_event(event).await?;
338                let input_states = self.inner.input_states_snapshot();
339                if let Err(err) = self
340                    .store
341                    .atomic_lifecycle_commit(
342                        &self.runtime_id,
343                        self.runtime_state_for_persistence(),
344                        &input_states,
345                    )
346                    .await
347                {
348                    self.inner = checkpoint;
349                    return Err(RuntimeDriverError::Internal(format!(
350                        "terminal event persist failed: {err}"
351                    )));
352                }
353            }
354            _ => {
355                self.inner.on_run_event(event).await?;
356            }
357        }
358
359        Ok(())
360    }
361
362    async fn on_runtime_control(
363        &mut self,
364        command: RuntimeControlCommand,
365    ) -> Result<(), RuntimeDriverError> {
366        let checkpoint = self.inner.clone();
367        self.inner.on_runtime_control(command).await?;
368        let input_states = self.inner.input_states_snapshot();
369        if let Err(err) = self
370            .store
371            .atomic_lifecycle_commit(
372                &self.runtime_id,
373                self.runtime_state_for_persistence(),
374                &input_states,
375            )
376            .await
377        {
378            self.inner = checkpoint;
379            return Err(RuntimeDriverError::Internal(format!(
380                "control op persist failed: {err}"
381            )));
382        }
383        Ok(())
384    }
385
386    async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
387        // §24 full recovery: load durable InputState from store
388        let stored_states = self
389            .store
390            .load_input_states(&self.runtime_id)
391            .await
392            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
393
394        let mut recovered_payloads = Vec::new();
395
396        // Inject stored states into the ephemeral driver's ledger.
397        // Uses recover() which also rebuilds the idempotency index for
398        // dedup correctness and filters out Ephemeral inputs.
399        for mut state in stored_states {
400            if matches!(
401                state.current_state(),
402                InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption
403            ) {
404                let has_receipt =
405                    match (state.last_run_id().cloned(), state.last_boundary_sequence()) {
406                        (Some(run_id), Some(sequence)) => self
407                            .store
408                            .load_boundary_receipt(&self.runtime_id, &run_id, sequence)
409                            .await
410                            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
411                            .is_some(),
412                        _ => false,
413                    };
414                let now = Utc::now();
415                let from = state.current_state();
416                if has_receipt {
417                    let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
418                        InputLifecycleState::Consumed,
419                        Some(InputTerminalOutcome::Consumed),
420                        state.last_run_id().cloned(),
421                        state.last_boundary_sequence(),
422                        state.attempt_count(),
423                        {
424                            let mut h = state.history().to_vec();
425                            h.push(InputStateHistoryEntry {
426                                timestamp: now,
427                                from,
428                                to: InputLifecycleState::Consumed,
429                                reason: Some("recovery: boundary receipt already committed".into()),
430                            });
431                            h
432                        },
433                        now,
434                    );
435                    *state.authority_mut() = auth;
436                } else {
437                    let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
438                        InputLifecycleState::Queued,
439                        None,
440                        state.last_run_id().cloned(),
441                        state.last_boundary_sequence(),
442                        state.attempt_count(),
443                        {
444                            let mut h = state.history().to_vec();
445                            h.push(InputStateHistoryEntry {
446                                timestamp: now,
447                                from,
448                                to: InputLifecycleState::Queued,
449                                reason: Some("recovery: missing boundary receipt".into()),
450                            });
451                            h
452                        },
453                        now,
454                    );
455                    *state.authority_mut() = auth;
456                }
457            }
458            if matches!(
459                state.current_state(),
460                InputLifecycleState::Accepted | InputLifecycleState::Staged
461            ) {
462                // Accepted/Staged are pre-run in-flight states. On recovery they
463                // must re-enter the queue explicitly so ingress/ledger/queue
464                // truth stays aligned before Recover effects are evaluated.
465                let now = Utc::now();
466                let from = state.current_state();
467                let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
468                    InputLifecycleState::Queued,
469                    None,
470                    state.last_run_id().cloned(),
471                    state.last_boundary_sequence(),
472                    state.attempt_count(),
473                    {
474                        let mut h = state.history().to_vec();
475                        h.push(InputStateHistoryEntry {
476                            timestamp: now,
477                            from,
478                            to: InputLifecycleState::Queued,
479                            reason: Some("recovery: pre-run state normalized to queued".into()),
480                        });
481                        h
482                    },
483                    now,
484                );
485                *state.authority_mut() = auth;
486            }
487
488            // Admit to ingress authority so Recover can see this input.
489            if self.inner.input_state(&state.input_id).is_none() {
490                let handling_mode = state
491                    .policy
492                    .as_ref()
493                    .map(|p| handling_mode_from_policy(&p.decision))
494                    .unwrap_or(HandlingMode::Queue);
495                let content_shape = state
496                    .persisted_input
497                    .as_ref()
498                    .map(|i| ContentShape(i.kind_id().to_string()))
499                    .unwrap_or_else(|| ContentShape("unknown".into()));
500                let policy = match state.policy.as_ref() {
501                    Some(p) => p.decision.clone(),
502                    None => match state.persisted_input.as_ref() {
503                        Some(input) => {
504                            crate::policy_table::DefaultPolicyTable::resolve(input, true)
505                        }
506                        None => {
507                            // No policy and no payload — load into ledger for dedup
508                            // but skip ingress admission (nothing to route).
509                            self.inner.ledger_mut().recover(state);
510                            continue;
511                        }
512                    },
513                };
514                let request_id = None;
515                let reservation_key = None;
516
517                let inserted = self.inner.ledger_mut().recover(state.clone());
518                if !inserted {
519                    // Filtered by ledger recover (e.g. ephemeral durability): do not
520                    // admit to ingress, otherwise ingress queue truth can outlive
521                    // canonical ledger truth.
522                    continue;
523                }
524
525                if let Some(input) = state.persisted_input.clone() {
526                    recovered_payloads.push((state.input_id.clone(), input));
527                }
528
529                let lifecycle_state = state.current_state();
530
531                if let Err(err) = self.inner.admit_recovered_to_ingress(
532                    state.input_id.clone(),
533                    content_shape,
534                    handling_mode,
535                    lifecycle_state,
536                    policy,
537                    request_id,
538                    reservation_key,
539                ) {
540                    return Err(RuntimeDriverError::Internal(format!(
541                        "failed to admit recovered input '{}' to ingress authority: {err}",
542                        state.input_id
543                    )));
544                }
545            }
546        }
547
548        // Then run ephemeral recovery logic to finalize ingress recovery,
549        // execute any remaining per-input recovery effects, and rebuild the
550        // physical queue projections from canonical ingress truth.
551        let report = self.inner.recover().await?;
552
553        for (input_id, _input) in recovered_payloads {
554            let should_requeue = self.inner.input_state(&input_id).is_some_and(|state| {
555                state.current_state() == crate::input_state::InputLifecycleState::Queued
556            });
557            if should_requeue && !self.inner.has_queued_input(&input_id) {
558                return Err(RuntimeDriverError::Internal(format!(
559                    "persistent recover left queued input '{input_id}' out of the runtime queue projection"
560                )));
561            }
562        }
563
564        if let Some(runtime_state) = self
565            .store
566            .load_runtime_state(&self.runtime_id)
567            .await
568            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
569        {
570            match runtime_state {
571                RuntimeState::Retired if self.inner.runtime_state() != RuntimeState::Retired => {
572                    EphemeralRuntimeDriver::retire(&mut self.inner)?;
573                }
574                RuntimeState::Stopped
575                    if self.inner.runtime_state() != RuntimeState::Stopped
576                        && self.inner.runtime_state() != RuntimeState::Destroyed =>
577                {
578                    // Never revive Destroyed as Stopped
579                    self.inner
580                        .on_runtime_control(RuntimeControlCommand::Stop)
581                        .await?;
582                }
583                RuntimeState::Destroyed
584                    if self.inner.runtime_state() != RuntimeState::Destroyed =>
585                {
586                    self.inner.destroy()?;
587                }
588                _ => {}
589            }
590
591            // Terminal states must not have active inputs. If persisted state
592            // is terminal but active inputs exist, fail closed as store
593            // corruption instead of mutating queue projections in shell code.
594            if runtime_state.is_terminal() {
595                let active = self.inner.active_input_ids();
596                if !active.is_empty() {
597                    return Err(RuntimeDriverError::Internal(format!(
598                        "store corruption: terminal runtime '{}' has {} active inputs",
599                        runtime_state,
600                        active.len()
601                    )));
602                }
603            }
604        }
605
606        // Persist recovered state atomically
607        let input_states = self.inner.input_states_snapshot();
608        self.store
609            .atomic_lifecycle_commit(
610                &self.runtime_id,
611                self.runtime_state_for_persistence(),
612                &input_states,
613            )
614            .await
615            .map_err(|e| RuntimeDriverError::Internal(format!("recovery persist failed: {e}")))?;
616        Ok(report)
617    }
618
619    async fn retire(&mut self) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
620        let checkpoint = self.inner.clone();
621        let report = EphemeralRuntimeDriver::retire(&mut self.inner)?;
622        let input_states = self.inner.input_states_snapshot();
623        if let Err(err) = self
624            .store
625            .atomic_lifecycle_commit(
626                &self.runtime_id,
627                self.runtime_state_for_persistence(),
628                &input_states,
629            )
630            .await
631        {
632            self.inner = checkpoint;
633            return Err(RuntimeDriverError::Internal(format!(
634                "retire persist failed: {err}"
635            )));
636        }
637        Ok(report)
638    }
639
640    async fn reset(&mut self) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
641        let checkpoint = self.inner.clone();
642        let report = EphemeralRuntimeDriver::reset(&mut self.inner)?;
643        let input_states = self.inner.input_states_snapshot();
644        if let Err(err) = self
645            .store
646            .atomic_lifecycle_commit(
647                &self.runtime_id,
648                self.runtime_state_for_persistence(),
649                &input_states,
650            )
651            .await
652        {
653            self.inner = checkpoint;
654            return Err(RuntimeDriverError::Internal(format!(
655                "reset persist failed: {err}"
656            )));
657        }
658        Ok(report)
659    }
660
661    async fn destroy(&mut self) -> Result<DestroyReport, RuntimeDriverError> {
662        let abandoned = self.inner.destroy()?;
663        let input_states = self.inner.input_states_snapshot();
664        if let Err(err) = self
665            .store
666            .atomic_lifecycle_commit(
667                &self.runtime_id,
668                self.runtime_state_for_persistence(),
669                &input_states,
670            )
671            .await
672        {
673            return Err(RuntimeDriverError::Internal(format!(
674                "destroy persist failed: {err}"
675            )));
676        }
677        Ok(DestroyReport {
678            inputs_abandoned: abandoned,
679        })
680    }
681
682    fn runtime_state(&self) -> RuntimeState {
683        self.inner.runtime_state()
684    }
685
686    fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
687        self.inner.input_state(input_id)
688    }
689
690    fn active_input_ids(&self) -> Vec<InputId> {
691        self.inner.active_input_ids()
692    }
693}