Skip to main content

meerkat_runtime/driver/
persistent.rs

1//! PersistentRuntimeDriver — wraps EphemeralRuntimeDriver + RuntimeStore.
2//!
3//! Provides durable-before-ack guarantee: InputState is persisted via
4//! RuntimeStore BEFORE returning AcceptOutcome. Delegates state machine
5//! logic to the ephemeral driver.
6
7use std::sync::Arc;
8
9use chrono::Utc;
10use meerkat_core::lifecycle::{InputId, RunEvent};
11
12use crate::accept::AcceptOutcome;
13use crate::identifiers::LogicalRuntimeId;
14use crate::input::Input;
15use crate::input_state::{
16    InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
17    InputTerminalOutcome,
18};
19use crate::runtime_event::RuntimeEventEnvelope;
20use crate::runtime_state::RuntimeState;
21use crate::store::RuntimeStore;
22use crate::traits::{RecoveryReport, RuntimeControlCommand, RuntimeDriver, RuntimeDriverError};
23
24use super::ephemeral::EphemeralRuntimeDriver;
25
26/// Persistent runtime driver — durable InputState via RuntimeStore.
27pub struct PersistentRuntimeDriver {
28    /// Underlying ephemeral driver for state machine logic.
29    inner: EphemeralRuntimeDriver,
30    /// Durable store for InputState + receipts.
31    store: Arc<dyn RuntimeStore>,
32    /// Runtime ID for store operations.
33    runtime_id: LogicalRuntimeId,
34}
35
36impl PersistentRuntimeDriver {
37    /// Create a new persistent runtime driver.
38    pub fn new(runtime_id: LogicalRuntimeId, store: Arc<dyn RuntimeStore>) -> Self {
39        Self {
40            inner: EphemeralRuntimeDriver::new(runtime_id.clone()),
41            store,
42            runtime_id,
43        }
44    }
45
46    /// Get immutable reference to the inner ephemeral driver.
47    pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
48        &self.inner
49    }
50
51    /// Check if the runtime is idle (delegates to inner).
52    pub fn is_idle(&self) -> bool {
53        self.inner.is_idle()
54    }
55
56    /// Start a new run (delegates to inner).
57    pub fn start_run(
58        &mut self,
59        run_id: meerkat_core::lifecycle::RunId,
60    ) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
61        self.inner.start_run(run_id)
62    }
63
64    /// Complete a run (delegates to inner).
65    pub fn complete_run(
66        &mut self,
67    ) -> Result<meerkat_core::lifecycle::RunId, crate::runtime_state::RuntimeStateTransitionError>
68    {
69        self.inner.complete_run()
70    }
71
72    /// Get pending events (delegates to inner).
73    pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
74        self.inner.drain_events()
75    }
76
77    /// Check and clear wake flag (delegates to inner).
78    pub fn take_wake_requested(&mut self) -> bool {
79        self.inner.take_wake_requested()
80    }
81
82    /// Check and clear immediate processing flag (delegates to inner).
83    pub fn take_process_requested(&mut self) -> bool {
84        self.inner.take_process_requested()
85    }
86
87    /// Dequeue next input (delegates to inner).
88    pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
89        self.inner.dequeue_next()
90    }
91
92    /// Stage input (delegates to inner).
93    pub fn stage_input(
94        &mut self,
95        input_id: &InputId,
96        run_id: &meerkat_core::lifecycle::RunId,
97    ) -> Result<(), crate::input_machine::InputStateMachineError> {
98        self.inner.stage_input(input_id, run_id)
99    }
100
101    /// Apply input (delegates to inner).
102    pub fn apply_input(
103        &mut self,
104        input_id: &InputId,
105        run_id: &meerkat_core::lifecycle::RunId,
106    ) -> Result<(), crate::input_machine::InputStateMachineError> {
107        self.inner.apply_input(input_id, run_id)
108    }
109
110    /// Roll back staged inputs (delegates to inner).
111    pub fn rollback_staged(
112        &mut self,
113        input_ids: &[InputId],
114    ) -> Result<(), crate::input_machine::InputStateMachineError> {
115        self.inner.rollback_staged(input_ids)
116    }
117
118    /// Consume applied inputs without completing a runtime run.
119    pub fn consume_inputs(
120        &mut self,
121        input_ids: &[InputId],
122        run_id: &meerkat_core::lifecycle::RunId,
123    ) -> Result<(), crate::input_machine::InputStateMachineError> {
124        self.inner.consume_inputs(input_ids, run_id)
125    }
126
127    /// Remove a previously accepted input from the ledger/queue.
128    pub fn forget_input(&mut self, input_id: &InputId) {
129        self.inner.forget_input(input_id);
130    }
131
132    async fn persist_state(&self, state: &InputState) -> Result<(), RuntimeDriverError> {
133        self.store
134            .persist_input_state(&self.runtime_id, state)
135            .await
136            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
137    }
138}
139
140#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
141#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
142impl RuntimeDriver for PersistentRuntimeDriver {
143    async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
144        let input_for_recovery = input.clone();
145
146        // Delegate to ephemeral for state machine logic
147        let mut outcome = self.inner.accept_input(input).await?;
148
149        // Durable-before-ack: persist InputState before returning
150        if let AcceptOutcome::Accepted {
151            ref input_id,
152            ref mut state,
153            ..
154        } = outcome
155            && let Some(inner_state) = self.inner.input_state(input_id).cloned()
156        {
157            let mut persisted = inner_state;
158            persisted.persisted_input = Some(input_for_recovery);
159            self.inner.ledger_mut().accept(persisted.clone());
160            if let Err(err) = self.persist_state(&persisted).await {
161                self.forget_input(input_id);
162                return Err(err);
163            }
164            *state = persisted;
165        }
166
167        Ok(outcome)
168    }
169
170    async fn on_runtime_event(
171        &mut self,
172        event: RuntimeEventEnvelope,
173    ) -> Result<(), RuntimeDriverError> {
174        self.inner.on_runtime_event(event).await
175    }
176
177    async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
178        match event {
179            // BoundaryApplied persists the receipt and the applied state atomically.
180            RunEvent::BoundaryApplied {
181                ref receipt,
182                ref session_snapshot,
183                ..
184            } => {
185                let checkpoint = self.inner.clone();
186                self.inner.on_run_event(event.clone()).await?;
187                if self
188                    .store
189                    .load_boundary_receipt(&self.runtime_id, &receipt.run_id, receipt.sequence)
190                    .await
191                    .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
192                    .is_some()
193                {
194                    return Ok(());
195                }
196                let input_updates: Vec<InputState> = receipt
197                    .contributing_input_ids
198                    .iter()
199                    .filter_map(|id| self.inner.input_state(id).cloned())
200                    .collect();
201
202                self.store
203                    .atomic_apply(
204                        &self.runtime_id,
205                        session_snapshot.clone().map(|session_snapshot| {
206                            crate::store::SessionDelta { session_snapshot }
207                        }),
208                        receipt.clone(),
209                        input_updates,
210                        None, // session_store_key — caller provides if dual-store needed
211                    )
212                    .await
213                    .map_err(|e| {
214                        self.inner = checkpoint;
215                        RuntimeDriverError::Internal(format!("runtime boundary commit failed: {e}"))
216                    })?;
217            }
218            RunEvent::RunCompleted { .. }
219            | RunEvent::RunFailed { .. }
220            | RunEvent::RunCancelled { .. } => {
221                let checkpoint = self.inner.clone();
222                self.inner.on_run_event(event).await?;
223                let input_states = self.inner.input_states_snapshot();
224                if let Err(err) = self
225                    .store
226                    .atomic_lifecycle_commit(
227                        &self.runtime_id,
228                        self.inner.runtime_state(),
229                        &input_states,
230                    )
231                    .await
232                {
233                    self.inner = checkpoint;
234                    return Err(RuntimeDriverError::Internal(format!(
235                        "terminal event persist failed: {err}"
236                    )));
237                }
238            }
239            _ => {
240                self.inner.on_run_event(event).await?;
241            }
242        }
243
244        Ok(())
245    }
246
247    async fn on_runtime_control(
248        &mut self,
249        command: RuntimeControlCommand,
250    ) -> Result<(), RuntimeDriverError> {
251        let checkpoint = self.inner.clone();
252        self.inner.on_runtime_control(command).await?;
253        let input_states = self.inner.input_states_snapshot();
254        if let Err(err) = self
255            .store
256            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
257            .await
258        {
259            self.inner = checkpoint;
260            return Err(RuntimeDriverError::Internal(format!(
261                "control op persist failed: {err}"
262            )));
263        }
264        Ok(())
265    }
266
267    async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
268        // §24 full recovery: load durable InputState from store
269        let stored_states = self
270            .store
271            .load_input_states(&self.runtime_id)
272            .await
273            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
274
275        let mut recovered_payloads = Vec::new();
276
277        // Inject stored states into the ephemeral driver's ledger.
278        // Uses recover() which also rebuilds the idempotency index for
279        // dedup correctness and filters out Ephemeral inputs.
280        for mut state in stored_states {
281            if matches!(
282                state.current_state,
283                InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption
284            ) {
285                let has_receipt = match (state.last_run_id.clone(), state.last_boundary_sequence) {
286                    (Some(run_id), Some(sequence)) => self
287                        .store
288                        .load_boundary_receipt(&self.runtime_id, &run_id, sequence)
289                        .await
290                        .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
291                        .is_some(),
292                    _ => false,
293                };
294                let now = Utc::now();
295                if has_receipt {
296                    state.history.push(InputStateHistoryEntry {
297                        timestamp: now,
298                        from: state.current_state,
299                        to: InputLifecycleState::Consumed,
300                        reason: Some("recovery: boundary receipt already committed".into()),
301                    });
302                    state.current_state = InputLifecycleState::Consumed;
303                    state.terminal_outcome = Some(InputTerminalOutcome::Consumed);
304                    state.updated_at = now;
305                } else {
306                    state.history.push(InputStateHistoryEntry {
307                        timestamp: now,
308                        from: state.current_state,
309                        to: InputLifecycleState::Queued,
310                        reason: Some("recovery: missing boundary receipt".into()),
311                    });
312                    state.current_state = InputLifecycleState::Queued;
313                    state.updated_at = now;
314                }
315            }
316
317            if let Some(input) = state.persisted_input.clone() {
318                recovered_payloads.push((state.input_id.clone(), input));
319            }
320            let ledger = &mut self.inner;
321            if ledger.input_state(&state.input_id).is_none() {
322                ledger.ledger_mut().recover(state);
323            }
324        }
325
326        // Then run ephemeral recovery logic (requeue Accepted/Staged)
327        let report = self.inner.recover().await?;
328
329        for (input_id, input) in recovered_payloads {
330            let should_requeue = self.inner.input_state(&input_id).is_some_and(|state| {
331                state.current_state == crate::input_state::InputLifecycleState::Queued
332            });
333            if should_requeue && !self.inner.has_queued_input(&input_id) {
334                self.inner.enqueue_recovered_input(input_id, input);
335            }
336        }
337
338        if let Some(runtime_state) = self
339            .store
340            .load_runtime_state(&self.runtime_id)
341            .await
342            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
343        {
344            match runtime_state {
345                RuntimeState::Retired if self.inner.runtime_state() != RuntimeState::Retired => {
346                    EphemeralRuntimeDriver::retire(&mut self.inner)?;
347                }
348                RuntimeState::Stopped
349                    if self.inner.runtime_state() != RuntimeState::Stopped
350                        && self.inner.runtime_state() != RuntimeState::Destroyed =>
351                {
352                    // Never revive Destroyed as Stopped
353                    self.inner
354                        .on_runtime_control(RuntimeControlCommand::Stop)
355                        .await?;
356                }
357                RuntimeState::Destroyed
358                    if self.inner.runtime_state() != RuntimeState::Destroyed =>
359                {
360                    self.inner.destroy()?;
361                }
362                _ => {}
363            }
364
365            // Terminal states must not have active inputs. If persisted state
366            // is terminal but active inputs exist, treat as store corruption:
367            // terminalize those inputs instead of resurrecting work.
368            if runtime_state.is_terminal() {
369                let active = self.inner.active_input_ids();
370                if !active.is_empty() {
371                    tracing::warn!(
372                        runtime_id = %self.runtime_id,
373                        active_count = active.len(),
374                        persisted_state = %runtime_state,
375                        "terminal runtime has active inputs — terminalizing as corrupted"
376                    );
377                    let abandoned = self
378                        .inner
379                        .abandon_all_non_terminal(InputAbandonReason::Destroyed);
380                    self.inner.queue_mut().drain();
381                    tracing::warn!(
382                        runtime_id = %self.runtime_id,
383                        abandoned,
384                        "force-abandoned active inputs from terminal runtime"
385                    );
386                }
387            }
388        }
389
390        // Persist recovered state atomically
391        let input_states = self.inner.input_states_snapshot();
392        self.store
393            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
394            .await
395            .map_err(|e| RuntimeDriverError::Internal(format!("recovery persist failed: {e}")))?;
396        Ok(report)
397    }
398
399    async fn retire(&mut self) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
400        let checkpoint = self.inner.clone();
401        let report = EphemeralRuntimeDriver::retire(&mut self.inner)?;
402        let input_states = self.inner.input_states_snapshot();
403        if let Err(err) = self
404            .store
405            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
406            .await
407        {
408            self.inner = checkpoint;
409            return Err(RuntimeDriverError::Internal(format!(
410                "retire persist failed: {err}"
411            )));
412        }
413        Ok(report)
414    }
415
416    async fn reset(&mut self) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
417        let checkpoint = self.inner.clone();
418        let report = EphemeralRuntimeDriver::reset(&mut self.inner)?;
419        let input_states = self.inner.input_states_snapshot();
420        if let Err(err) = self
421            .store
422            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
423            .await
424        {
425            self.inner = checkpoint;
426            return Err(RuntimeDriverError::Internal(format!(
427                "reset persist failed: {err}"
428            )));
429        }
430        Ok(report)
431    }
432
433    fn runtime_state(&self) -> RuntimeState {
434        self.inner.runtime_state()
435    }
436
437    fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
438        self.inner.input_state(input_id)
439    }
440
441    fn active_input_ids(&self) -> Vec<InputId> {
442        self.inner.active_input_ids()
443    }
444}
445
446#[cfg(test)]
447#[allow(clippy::unwrap_used)]
448mod tests {
449    use super::*;
450    use crate::input::*;
451    use crate::store::InMemoryRuntimeStore;
452    use chrono::Utc;
453
454    fn make_prompt(text: &str) -> Input {
455        Input::Prompt(PromptInput {
456            header: InputHeader {
457                id: InputId::new(),
458                timestamp: Utc::now(),
459                source: InputOrigin::Operator,
460                durability: InputDurability::Durable,
461                visibility: InputVisibility::default(),
462                idempotency_key: None,
463                supersession_key: None,
464                correlation_id: None,
465            },
466            text: text.into(),
467            blocks: None,
468            turn_metadata: None,
469        })
470    }
471
472    #[tokio::test]
473    async fn durable_before_ack() {
474        let store = Arc::new(InMemoryRuntimeStore::new());
475        let rid = LogicalRuntimeId::new("test");
476        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
477
478        let input = make_prompt("hello");
479        let input_id = input.id().clone();
480        let outcome = driver.accept_input(input).await.unwrap();
481        assert!(outcome.is_accepted());
482
483        // Verify state was persisted to store BEFORE we returned
484        let stored = store.load_input_state(&rid, &input_id).await.unwrap();
485        assert!(stored.is_some());
486        assert!(stored.unwrap().persisted_input.is_some());
487    }
488
489    #[tokio::test]
490    async fn dedup_not_persisted() {
491        let store = Arc::new(InMemoryRuntimeStore::new());
492        let rid = LogicalRuntimeId::new("test");
493        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
494
495        let key = crate::identifiers::IdempotencyKey::new("req-1");
496        let mut input1 = make_prompt("hello");
497        if let Input::Prompt(ref mut p) = input1 {
498            p.header.idempotency_key = Some(key.clone());
499        }
500        driver.accept_input(input1).await.unwrap();
501
502        let mut input2 = make_prompt("hello again");
503        if let Input::Prompt(ref mut p) = input2 {
504            p.header.idempotency_key = Some(key);
505        }
506        let outcome = driver.accept_input(input2).await.unwrap();
507        assert!(outcome.is_deduplicated());
508
509        // Only one state in store
510        let states = store.load_input_states(&rid).await.unwrap();
511        assert_eq!(states.len(), 1);
512    }
513
514    #[tokio::test]
515    async fn recover_from_store() {
516        let store = Arc::new(InMemoryRuntimeStore::new());
517        let rid = LogicalRuntimeId::new("test");
518
519        // Pre-populate store with a state (simulating crash recovery)
520        let input = make_prompt("hello");
521        let input_id = input.id().clone();
522        let mut state = InputState::new_accepted(input_id.clone());
523        state.persisted_input = Some(input.clone());
524        state.durability = Some(InputDurability::Durable);
525        store.persist_input_state(&rid, &state).await.unwrap();
526
527        // Create a fresh driver (simulating restart)
528        let mut driver = PersistentRuntimeDriver::new(rid, store);
529
530        // Recover
531        let report = driver.recover().await.unwrap();
532        assert_eq!(report.inputs_recovered, 1);
533
534        // State should now be in the driver
535        assert!(driver.input_state(&input_id).is_some());
536        let dequeued = driver.dequeue_next();
537        assert!(
538            dequeued.is_some(),
539            "Recovered queued input should be re-enqueued"
540        );
541        let (queued_id, queued_input) = dequeued.unwrap();
542        assert_eq!(queued_id, input_id);
543        assert_eq!(queued_input.id(), &input_id);
544    }
545
546    #[tokio::test]
547    async fn recover_rebuilds_dedup_index() {
548        let store = Arc::new(InMemoryRuntimeStore::new());
549        let rid = LogicalRuntimeId::new("test");
550        let key = crate::identifiers::IdempotencyKey::new("dedup-key");
551
552        // Pre-populate store with a state that has an idempotency key
553        let input_id = InputId::new();
554        let mut state = InputState::new_accepted(input_id.clone());
555        state.idempotency_key = Some(key.clone());
556        state.durability = Some(InputDurability::Durable);
557        store.persist_input_state(&rid, &state).await.unwrap();
558
559        // Create a fresh driver and recover
560        let mut driver = PersistentRuntimeDriver::new(rid, store);
561        driver.recover().await.unwrap();
562
563        // Now try to accept a new input with the same idempotency key
564        let mut dup_input = make_prompt("duplicate");
565        if let Input::Prompt(ref mut p) = dup_input {
566            p.header.idempotency_key = Some(key);
567        }
568        let outcome = driver.accept_input(dup_input).await.unwrap();
569        assert!(
570            outcome.is_deduplicated(),
571            "After recovery, dedup index should be rebuilt so duplicates are caught"
572        );
573    }
574
575    #[tokio::test]
576    async fn recover_filters_ephemeral_inputs() {
577        let store = Arc::new(InMemoryRuntimeStore::new());
578        let rid = LogicalRuntimeId::new("test");
579
580        // Pre-populate with an ephemeral input state
581        let input_id = InputId::new();
582        let mut state = InputState::new_accepted(input_id.clone());
583        state.durability = Some(InputDurability::Ephemeral);
584        store.persist_input_state(&rid, &state).await.unwrap();
585
586        // Create fresh driver and recover
587        let mut driver = PersistentRuntimeDriver::new(rid, store);
588        let report = driver.recover().await.unwrap();
589
590        // Ephemeral input should NOT be recovered (it shouldn't survive restart)
591        assert!(
592            driver.input_state(&input_id).is_none(),
593            "Ephemeral inputs should be filtered during recovery"
594        );
595        assert_eq!(report.inputs_recovered, 0);
596    }
597
598    #[tokio::test]
599    async fn boundary_applied_persists_atomically() {
600        use meerkat_core::lifecycle::RunId;
601        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
602        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
603
604        let store = Arc::new(InMemoryRuntimeStore::new());
605        let rid = LogicalRuntimeId::new("test");
606        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
607
608        // Accept and manually process an input
609        let input = make_prompt("hello");
610        let input_id = input.id().clone();
611        driver.accept_input(input).await.unwrap();
612
613        let run_id = RunId::new();
614        driver.start_run(run_id.clone()).unwrap();
615        driver.stage_input(&input_id, &run_id).unwrap();
616
617        // Fire BoundaryApplied — this should persist atomically
618        let receipt = RunBoundaryReceipt {
619            run_id: run_id.clone(),
620            boundary: RunApplyBoundary::RunStart,
621            contributing_input_ids: vec![input_id.clone()],
622            conversation_digest: None,
623            message_count: 1,
624            sequence: 0,
625        };
626        driver
627            .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
628                run_id: run_id.clone(),
629                receipt: receipt.clone(),
630                session_snapshot: Some(b"session-data".to_vec()),
631            })
632            .await
633            .unwrap();
634
635        // Verify the receipt was persisted via atomic_apply
636        let loaded = store.load_boundary_receipt(&rid, &run_id, 0).await.unwrap();
637        assert!(
638            loaded.is_some(),
639            "BoundaryApplied should persist the receipt via atomic_apply"
640        );
641    }
642
643    #[tokio::test]
644    async fn retire_preserves_inputs_for_drain() {
645        let store = Arc::new(InMemoryRuntimeStore::new());
646        let rid = LogicalRuntimeId::new("test");
647        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
648
649        let input = make_prompt("hello");
650        let input_id = input.id().clone();
651        driver.accept_input(input).await.unwrap();
652
653        let report = driver.retire().await.unwrap();
654        assert_eq!(report.inputs_abandoned, 0);
655        assert_eq!(report.inputs_pending_drain, 1);
656
657        // Input is still queued, not abandoned
658        let stored = store
659            .load_input_state(&rid, &input_id)
660            .await
661            .unwrap()
662            .unwrap();
663        assert_eq!(
664            stored.current_state,
665            crate::input_state::InputLifecycleState::Queued
666        );
667    }
668
669    #[tokio::test]
670    async fn reset_persists_abandoned_inputs() {
671        let store = Arc::new(InMemoryRuntimeStore::new());
672        let rid = LogicalRuntimeId::new("test");
673        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
674
675        let input = make_prompt("hello");
676        let input_id = input.id().clone();
677        driver.accept_input(input).await.unwrap();
678
679        let report = driver.reset().await.unwrap();
680        assert_eq!(report.inputs_abandoned, 1);
681
682        let stored = store
683            .load_input_state(&rid, &input_id)
684            .await
685            .unwrap()
686            .unwrap();
687        assert_eq!(
688            stored.current_state,
689            crate::input_state::InputLifecycleState::Abandoned
690        );
691    }
692
693    #[tokio::test]
694    async fn recover_consumes_committed_applied_pending_inputs() {
695        use meerkat_core::lifecycle::RunId;
696        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
697        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
698
699        let store = Arc::new(InMemoryRuntimeStore::new());
700        let rid = LogicalRuntimeId::new("test");
701        let input = make_prompt("already committed");
702        let input_id = input.id().clone();
703        let run_id = RunId::new();
704
705        let mut state = InputState::new_accepted(input_id.clone());
706        state.persisted_input = Some(input);
707        state.durability = Some(InputDurability::Durable);
708        state.current_state = InputLifecycleState::AppliedPendingConsumption;
709        state.last_run_id = Some(run_id.clone());
710        state.last_boundary_sequence = Some(0);
711        store.persist_input_state(&rid, &state).await.unwrap();
712        store
713            .atomic_apply(
714                &rid,
715                None,
716                RunBoundaryReceipt {
717                    run_id: run_id.clone(),
718                    boundary: RunApplyBoundary::RunStart,
719                    contributing_input_ids: vec![input_id.clone()],
720                    conversation_digest: None,
721                    message_count: 1,
722                    sequence: 0,
723                },
724                vec![state.clone()],
725                None,
726            )
727            .await
728            .unwrap();
729
730        let mut driver = PersistentRuntimeDriver::new(rid, store);
731        driver.recover().await.unwrap();
732
733        let recovered = driver.input_state(&input_id);
734        assert!(
735            recovered.is_some(),
736            "committed input should remain queryable after recovery"
737        );
738        let Some(recovered) = recovered else {
739            unreachable!("asserted some recovery state above");
740        };
741        assert_eq!(recovered.current_state, InputLifecycleState::Consumed);
742        assert!(
743            driver.active_input_ids().is_empty(),
744            "committed applied inputs should not stay active after recovery"
745        );
746        assert!(
747            driver.dequeue_next().is_none(),
748            "committed applied inputs should not be replayed after recovery"
749        );
750    }
751}