Skip to main content

meerkat_runtime/driver/
persistent.rs

1//! PersistentRuntimeDriver — wraps EphemeralRuntimeDriver + RuntimeStore.
2//!
3//! Provides durable-before-ack guarantee: InputState is persisted via
4//! RuntimeStore BEFORE returning AcceptOutcome. Delegates state machine
5//! logic to the ephemeral driver.
6
7use std::sync::Arc;
8
9use chrono::Utc;
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunEvent};
12
13use crate::accept::AcceptOutcome;
14use crate::driver::ephemeral::handling_mode_from_policy;
15use crate::identifiers::LogicalRuntimeId;
16use crate::input::{Input, externalize_input_images};
17use crate::input_state::{
18    InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
19    InputTerminalOutcome,
20};
21use crate::runtime_event::RuntimeEventEnvelope;
22use crate::runtime_ingress_authority::ContentShape;
23use crate::runtime_state::RuntimeState;
24use crate::store::RuntimeStore;
25use crate::traits::{
26    DestroyReport, RecoveryReport, RuntimeControlCommand, RuntimeDriver, RuntimeDriverError,
27};
28use meerkat_core::types::HandlingMode;
29
30use super::ephemeral::EphemeralRuntimeDriver;
31
32/// Persistent runtime driver — durable InputState via RuntimeStore.
33pub struct PersistentRuntimeDriver {
34    /// Underlying ephemeral driver for state machine logic.
35    inner: EphemeralRuntimeDriver,
36    /// Durable store for InputState + receipts.
37    store: Arc<dyn RuntimeStore>,
38    /// Blob store used to externalize durable input payloads.
39    blob_store: Arc<dyn BlobStore>,
40    /// Runtime ID for store operations.
41    runtime_id: LogicalRuntimeId,
42}
43
44impl PersistentRuntimeDriver {
45    /// Create a new persistent runtime driver.
46    pub fn new(
47        runtime_id: LogicalRuntimeId,
48        store: Arc<dyn RuntimeStore>,
49        blob_store: Arc<dyn BlobStore>,
50    ) -> Self {
51        Self {
52            inner: EphemeralRuntimeDriver::new(runtime_id.clone()),
53            store,
54            blob_store,
55            runtime_id,
56        }
57    }
58
59    /// Get immutable reference to the inner ephemeral driver.
60    pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
61        &self.inner
62    }
63
64    /// Set the list of comms intents that should be silently accepted (delegates to inner).
65    pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
66        self.inner.set_silent_comms_intents(intents);
67    }
68
69    /// Check if the runtime is idle (delegates to inner).
70    pub fn is_idle(&self) -> bool {
71        self.inner.is_idle()
72    }
73
74    /// Check if the runtime is idle or attached (delegates to inner).
75    pub fn is_idle_or_attached(&self) -> bool {
76        self.inner.is_idle_or_attached()
77    }
78
79    /// Attach an executor (Idle → Attached). Delegates to inner.
80    pub fn attach(&mut self) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
81        self.inner.attach()
82    }
83
84    /// Detach an executor (Attached → Idle). Delegates to inner.
85    pub fn detach(
86        &mut self,
87    ) -> Result<Option<RuntimeState>, crate::runtime_state::RuntimeStateTransitionError> {
88        self.inner.detach()
89    }
90
91    /// Map runtime state for persistence.
92    ///
93    /// Attached must never be persisted — on recovery, the executor is
94    /// re-attached by the surface. Map Attached to Idle for store operations.
95    fn runtime_state_for_persistence(&self) -> RuntimeState {
96        match self.inner.runtime_state() {
97            RuntimeState::Attached => RuntimeState::Idle,
98            other => other,
99        }
100    }
101
102    /// Start a new run (delegates to inner).
103    pub fn start_run(
104        &mut self,
105        run_id: meerkat_core::lifecycle::RunId,
106    ) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
107        self.inner.start_run(run_id)
108    }
109
110    /// Complete a run (delegates to inner).
111    pub fn complete_run(
112        &mut self,
113    ) -> Result<meerkat_core::lifecycle::RunId, crate::runtime_state::RuntimeStateTransitionError>
114    {
115        self.inner.complete_run()
116    }
117
118    /// Get pending events (delegates to inner).
119    pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
120        self.inner.drain_events()
121    }
122
123    /// Check and clear wake flag (delegates to inner).
124    pub fn take_wake_requested(&mut self) -> bool {
125        self.inner.take_wake_requested()
126    }
127
128    /// Check and clear immediate processing flag (delegates to inner).
129    pub fn take_process_requested(&mut self) -> bool {
130        self.inner.take_process_requested()
131    }
132
133    /// Dequeue next input (delegates to inner).
134    pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
135        self.inner.dequeue_next()
136    }
137
138    /// Dequeue a specific input by ID (delegates to inner).
139    pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
140        self.inner.dequeue_by_id(input_id)
141    }
142
143    /// Look up the persisted input for a given ID (delegates to inner).
144    pub fn persisted_input(&self, input_id: &InputId) -> Option<&Input> {
145        self.inner.persisted_input(input_id)
146    }
147
148    pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
149        self.inner.has_queued_input_outside(excluded)
150    }
151
152    /// Stage input (delegates to inner).
153    pub fn stage_input(
154        &mut self,
155        input_id: &InputId,
156        run_id: &meerkat_core::lifecycle::RunId,
157    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
158        self.inner.stage_input(input_id, run_id)
159    }
160
161    /// Stage a batch of inputs atomically (delegates to inner).
162    pub fn stage_batch(
163        &mut self,
164        input_ids: &[InputId],
165        run_id: &meerkat_core::lifecycle::RunId,
166    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
167        self.inner.stage_batch(input_ids, run_id)
168    }
169
170    /// Apply input (delegates to inner).
171    pub fn apply_input(
172        &mut self,
173        input_id: &InputId,
174        run_id: &meerkat_core::lifecycle::RunId,
175    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
176        self.inner.apply_input(input_id, run_id)
177    }
178
179    /// Roll back staged inputs (delegates to inner).
180    pub fn rollback_staged(
181        &mut self,
182        input_ids: &[InputId],
183    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
184        self.inner.rollback_staged(input_ids)
185    }
186
187    /// Consume applied inputs without completing a runtime run.
188    pub fn consume_inputs(
189        &mut self,
190        input_ids: &[InputId],
191        run_id: &meerkat_core::lifecycle::RunId,
192    ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
193        self.inner.consume_inputs(input_ids, run_id)
194    }
195
196    async fn persist_state(&self, state: &InputState) -> Result<(), RuntimeDriverError> {
197        self.store
198            .persist_input_state(&self.runtime_id, state)
199            .await
200            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
201    }
202
203    pub async fn abandon_pending_inputs(
204        &mut self,
205        reason: InputAbandonReason,
206    ) -> Result<usize, RuntimeDriverError> {
207        let checkpoint = self.inner.clone();
208        let abandoned = self.inner.abandon_pending_inputs(reason);
209        let input_states = self.inner.input_states_snapshot();
210        if let Err(err) = self
211            .store
212            .atomic_lifecycle_commit(
213                &self.runtime_id,
214                self.runtime_state_for_persistence(),
215                &input_states,
216            )
217            .await
218        {
219            self.inner = checkpoint;
220            return Err(RuntimeDriverError::Internal(format!(
221                "pending input abandon persist failed: {err}"
222            )));
223        }
224        Ok(abandoned)
225    }
226
227    /// Recycle the in-memory driver shell while preserving canonical pending
228    /// work from durable runtime truth.
229    ///
230    /// Unlike `reset()`, this must not abandon queued/staged work.
231    pub async fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
232        let silent_intents = self.inner.silent_comms_intents();
233        self.inner = EphemeralRuntimeDriver::new(self.runtime_id.clone());
234        self.inner.set_silent_comms_intents(silent_intents);
235        let _ = RuntimeDriver::recover(self).await?;
236        Ok(self.inner.active_input_ids().len())
237    }
238}
239
240#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
241#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
242impl RuntimeDriver for PersistentRuntimeDriver {
243    async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
244        let checkpoint = self.inner.clone();
245        let input_for_recovery = input.clone();
246
247        // Delegate to ephemeral for state machine logic
248        let mut outcome = self.inner.accept_input(input).await?;
249
250        // Durable-before-ack: persist InputState before returning
251        if let AcceptOutcome::Accepted {
252            ref input_id,
253            ref mut state,
254            ..
255        } = outcome
256            && let Some(inner_state) = self.inner.input_state(input_id).cloned()
257        {
258            let mut input_for_recovery = input_for_recovery.clone();
259            if let Err(err) =
260                externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery).await
261            {
262                self.inner = checkpoint;
263                return Err(RuntimeDriverError::Internal(format!(
264                    "failed to externalize runtime input images: {err}"
265                )));
266            }
267            let mut persisted = inner_state;
268            persisted.persisted_input = Some(input_for_recovery);
269            self.inner.ledger_mut().accept(persisted.clone());
270            if let Err(err) = self.persist_state(&persisted).await {
271                self.inner = checkpoint;
272                return Err(err);
273            }
274            *state = persisted;
275        }
276
277        Ok(outcome)
278    }
279
280    async fn on_runtime_event(
281        &mut self,
282        event: RuntimeEventEnvelope,
283    ) -> Result<(), RuntimeDriverError> {
284        self.inner.on_runtime_event(event).await
285    }
286
287    async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
288        match event {
289            // BoundaryApplied persists the receipt and the applied state atomically.
290            RunEvent::BoundaryApplied {
291                ref receipt,
292                ref session_snapshot,
293                ..
294            } => {
295                let checkpoint = self.inner.clone();
296                self.inner.on_run_event(event.clone()).await?;
297                if self
298                    .store
299                    .load_boundary_receipt(&self.runtime_id, &receipt.run_id, receipt.sequence)
300                    .await
301                    .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
302                    .is_some()
303                {
304                    return Ok(());
305                }
306                let input_updates: Vec<InputState> = receipt
307                    .contributing_input_ids
308                    .iter()
309                    .filter_map(|id| self.inner.input_state(id).cloned())
310                    .collect();
311
312                self.store
313                    .atomic_apply(
314                        &self.runtime_id,
315                        session_snapshot.clone().map(|session_snapshot| {
316                            crate::store::SessionDelta { session_snapshot }
317                        }),
318                        receipt.clone(),
319                        input_updates,
320                        None, // session_store_key — caller provides if dual-store needed
321                    )
322                    .await
323                    .map_err(|e| {
324                        self.inner = checkpoint;
325                        RuntimeDriverError::Internal(format!("runtime boundary commit failed: {e}"))
326                    })?;
327            }
328            RunEvent::RunCompleted { .. }
329            | RunEvent::RunFailed { .. }
330            | RunEvent::RunCancelled { .. } => {
331                let checkpoint = self.inner.clone();
332                self.inner.on_run_event(event).await?;
333                let input_states = self.inner.input_states_snapshot();
334                if let Err(err) = self
335                    .store
336                    .atomic_lifecycle_commit(
337                        &self.runtime_id,
338                        self.runtime_state_for_persistence(),
339                        &input_states,
340                    )
341                    .await
342                {
343                    self.inner = checkpoint;
344                    return Err(RuntimeDriverError::Internal(format!(
345                        "terminal event persist failed: {err}"
346                    )));
347                }
348            }
349            _ => {
350                self.inner.on_run_event(event).await?;
351            }
352        }
353
354        Ok(())
355    }
356
357    async fn on_runtime_control(
358        &mut self,
359        command: RuntimeControlCommand,
360    ) -> Result<(), RuntimeDriverError> {
361        let checkpoint = self.inner.clone();
362        self.inner.on_runtime_control(command).await?;
363        let input_states = self.inner.input_states_snapshot();
364        if let Err(err) = self
365            .store
366            .atomic_lifecycle_commit(
367                &self.runtime_id,
368                self.runtime_state_for_persistence(),
369                &input_states,
370            )
371            .await
372        {
373            self.inner = checkpoint;
374            return Err(RuntimeDriverError::Internal(format!(
375                "control op persist failed: {err}"
376            )));
377        }
378        Ok(())
379    }
380
381    async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
382        // §24 full recovery: load durable InputState from store
383        let stored_states = self
384            .store
385            .load_input_states(&self.runtime_id)
386            .await
387            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
388
389        let mut recovered_payloads = Vec::new();
390
391        // Inject stored states into the ephemeral driver's ledger.
392        // Uses recover() which also rebuilds the idempotency index for
393        // dedup correctness and filters out Ephemeral inputs.
394        for mut state in stored_states {
395            if matches!(
396                state.current_state(),
397                InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption
398            ) {
399                let has_receipt =
400                    match (state.last_run_id().cloned(), state.last_boundary_sequence()) {
401                        (Some(run_id), Some(sequence)) => self
402                            .store
403                            .load_boundary_receipt(&self.runtime_id, &run_id, sequence)
404                            .await
405                            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
406                            .is_some(),
407                        _ => false,
408                    };
409                let now = Utc::now();
410                let from = state.current_state();
411                if has_receipt {
412                    let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
413                        InputLifecycleState::Consumed,
414                        Some(InputTerminalOutcome::Consumed),
415                        state.last_run_id().cloned(),
416                        state.last_boundary_sequence(),
417                        {
418                            let mut h = state.history().to_vec();
419                            h.push(InputStateHistoryEntry {
420                                timestamp: now,
421                                from,
422                                to: InputLifecycleState::Consumed,
423                                reason: Some("recovery: boundary receipt already committed".into()),
424                            });
425                            h
426                        },
427                        now,
428                    );
429                    *state.authority_mut() = auth;
430                } else {
431                    let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
432                        InputLifecycleState::Queued,
433                        None,
434                        state.last_run_id().cloned(),
435                        state.last_boundary_sequence(),
436                        {
437                            let mut h = state.history().to_vec();
438                            h.push(InputStateHistoryEntry {
439                                timestamp: now,
440                                from,
441                                to: InputLifecycleState::Queued,
442                                reason: Some("recovery: missing boundary receipt".into()),
443                            });
444                            h
445                        },
446                        now,
447                    );
448                    *state.authority_mut() = auth;
449                }
450            }
451            if matches!(
452                state.current_state(),
453                InputLifecycleState::Accepted | InputLifecycleState::Staged
454            ) {
455                // Accepted/Staged are pre-run in-flight states. On recovery they
456                // must re-enter the queue explicitly so ingress/ledger/queue
457                // truth stays aligned before Recover effects are evaluated.
458                let now = Utc::now();
459                let from = state.current_state();
460                let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
461                    InputLifecycleState::Queued,
462                    None,
463                    state.last_run_id().cloned(),
464                    state.last_boundary_sequence(),
465                    {
466                        let mut h = state.history().to_vec();
467                        h.push(InputStateHistoryEntry {
468                            timestamp: now,
469                            from,
470                            to: InputLifecycleState::Queued,
471                            reason: Some("recovery: pre-run state normalized to queued".into()),
472                        });
473                        h
474                    },
475                    now,
476                );
477                *state.authority_mut() = auth;
478            }
479
480            // Admit to ingress authority so Recover can see this input.
481            if self.inner.input_state(&state.input_id).is_none() {
482                let handling_mode = state
483                    .policy
484                    .as_ref()
485                    .map(|p| handling_mode_from_policy(&p.decision))
486                    .unwrap_or(HandlingMode::Queue);
487                let content_shape = state
488                    .persisted_input
489                    .as_ref()
490                    .map(|i| ContentShape(i.kind_id().to_string()))
491                    .unwrap_or_else(|| ContentShape("unknown".into()));
492                let policy = match state.policy.as_ref() {
493                    Some(p) => p.decision.clone(),
494                    None => match state.persisted_input.as_ref() {
495                        Some(input) => {
496                            crate::policy_table::DefaultPolicyTable::resolve(input, true)
497                        }
498                        None => {
499                            // No policy and no payload — load into ledger for dedup
500                            // but skip ingress admission (nothing to route).
501                            self.inner.ledger_mut().recover(state);
502                            continue;
503                        }
504                    },
505                };
506                let request_id = None;
507                let reservation_key = None;
508
509                let inserted = self.inner.ledger_mut().recover(state.clone());
510                if !inserted {
511                    // Filtered by ledger recover (e.g. ephemeral durability): do not
512                    // admit to ingress, otherwise ingress queue truth can outlive
513                    // canonical ledger truth.
514                    continue;
515                }
516
517                if let Some(input) = state.persisted_input.clone() {
518                    recovered_payloads.push((state.input_id.clone(), input));
519                }
520
521                let lifecycle_state = state.current_state();
522
523                if let Err(err) = self.inner.admit_recovered_to_ingress(
524                    state.input_id.clone(),
525                    content_shape,
526                    handling_mode,
527                    lifecycle_state,
528                    policy,
529                    request_id,
530                    reservation_key,
531                ) {
532                    return Err(RuntimeDriverError::Internal(format!(
533                        "failed to admit recovered input '{}' to ingress authority: {err}",
534                        state.input_id
535                    )));
536                }
537            }
538        }
539
540        // Then run ephemeral recovery logic to finalize ingress recovery,
541        // execute any remaining per-input recovery effects, and rebuild the
542        // physical queue projections from canonical ingress truth.
543        let report = self.inner.recover().await?;
544
545        for (input_id, _input) in recovered_payloads {
546            let should_requeue = self.inner.input_state(&input_id).is_some_and(|state| {
547                state.current_state() == crate::input_state::InputLifecycleState::Queued
548            });
549            if should_requeue && !self.inner.has_queued_input(&input_id) {
550                return Err(RuntimeDriverError::Internal(format!(
551                    "persistent recover left queued input '{input_id}' out of the runtime queue projection"
552                )));
553            }
554        }
555
556        if let Some(runtime_state) = self
557            .store
558            .load_runtime_state(&self.runtime_id)
559            .await
560            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
561        {
562            match runtime_state {
563                RuntimeState::Retired if self.inner.runtime_state() != RuntimeState::Retired => {
564                    EphemeralRuntimeDriver::retire(&mut self.inner)?;
565                }
566                RuntimeState::Stopped
567                    if self.inner.runtime_state() != RuntimeState::Stopped
568                        && self.inner.runtime_state() != RuntimeState::Destroyed =>
569                {
570                    // Never revive Destroyed as Stopped
571                    self.inner
572                        .on_runtime_control(RuntimeControlCommand::Stop)
573                        .await?;
574                }
575                RuntimeState::Destroyed
576                    if self.inner.runtime_state() != RuntimeState::Destroyed =>
577                {
578                    self.inner.destroy()?;
579                }
580                _ => {}
581            }
582
583            // Terminal states must not have active inputs. If persisted state
584            // is terminal but active inputs exist, fail closed as store
585            // corruption instead of mutating queue projections in shell code.
586            if runtime_state.is_terminal() {
587                let active = self.inner.active_input_ids();
588                if !active.is_empty() {
589                    return Err(RuntimeDriverError::Internal(format!(
590                        "store corruption: terminal runtime '{}' has {} active inputs",
591                        runtime_state,
592                        active.len()
593                    )));
594                }
595            }
596        }
597
598        // Persist recovered state atomically
599        let input_states = self.inner.input_states_snapshot();
600        self.store
601            .atomic_lifecycle_commit(
602                &self.runtime_id,
603                self.runtime_state_for_persistence(),
604                &input_states,
605            )
606            .await
607            .map_err(|e| RuntimeDriverError::Internal(format!("recovery persist failed: {e}")))?;
608        Ok(report)
609    }
610
611    async fn retire(&mut self) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
612        let checkpoint = self.inner.clone();
613        let report = EphemeralRuntimeDriver::retire(&mut self.inner)?;
614        let input_states = self.inner.input_states_snapshot();
615        if let Err(err) = self
616            .store
617            .atomic_lifecycle_commit(
618                &self.runtime_id,
619                self.runtime_state_for_persistence(),
620                &input_states,
621            )
622            .await
623        {
624            self.inner = checkpoint;
625            return Err(RuntimeDriverError::Internal(format!(
626                "retire persist failed: {err}"
627            )));
628        }
629        Ok(report)
630    }
631
632    async fn reset(&mut self) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
633        let checkpoint = self.inner.clone();
634        let report = EphemeralRuntimeDriver::reset(&mut self.inner)?;
635        let input_states = self.inner.input_states_snapshot();
636        if let Err(err) = self
637            .store
638            .atomic_lifecycle_commit(
639                &self.runtime_id,
640                self.runtime_state_for_persistence(),
641                &input_states,
642            )
643            .await
644        {
645            self.inner = checkpoint;
646            return Err(RuntimeDriverError::Internal(format!(
647                "reset persist failed: {err}"
648            )));
649        }
650        Ok(report)
651    }
652
653    async fn destroy(&mut self) -> Result<DestroyReport, RuntimeDriverError> {
654        let abandoned = self.inner.destroy()?;
655        let input_states = self.inner.input_states_snapshot();
656        if let Err(err) = self
657            .store
658            .atomic_lifecycle_commit(
659                &self.runtime_id,
660                self.runtime_state_for_persistence(),
661                &input_states,
662            )
663            .await
664        {
665            return Err(RuntimeDriverError::Internal(format!(
666                "destroy persist failed: {err}"
667            )));
668        }
669        Ok(DestroyReport {
670            inputs_abandoned: abandoned,
671        })
672    }
673
674    fn runtime_state(&self) -> RuntimeState {
675        self.inner.runtime_state()
676    }
677
678    fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
679        self.inner.input_state(input_id)
680    }
681
682    fn active_input_ids(&self) -> Vec<InputId> {
683        self.inner.active_input_ids()
684    }
685}