Skip to main content

meerkat_runtime/driver/
persistent.rs

1//! PersistentRuntimeDriver — wraps EphemeralRuntimeDriver + RuntimeStore.
2//!
3//! Provides durable-before-ack guarantee: InputState is persisted via
4//! RuntimeStore BEFORE returning AcceptOutcome. Delegates state machine
5//! logic to the ephemeral driver.
6
7use std::sync::Arc;
8
9use chrono::Utc;
10use meerkat_core::lifecycle::{InputId, RunEvent};
11
12use crate::accept::AcceptOutcome;
13use crate::identifiers::LogicalRuntimeId;
14use crate::input::Input;
15use crate::input_state::{
16    InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
17    InputTerminalOutcome,
18};
19use crate::runtime_event::RuntimeEventEnvelope;
20use crate::runtime_state::RuntimeState;
21use crate::store::RuntimeStore;
22use crate::traits::{RecoveryReport, RuntimeControlCommand, RuntimeDriver, RuntimeDriverError};
23
24use super::ephemeral::EphemeralRuntimeDriver;
25
26/// Persistent runtime driver — durable InputState via RuntimeStore.
27pub struct PersistentRuntimeDriver {
28    /// Underlying ephemeral driver for state machine logic.
29    inner: EphemeralRuntimeDriver,
30    /// Durable store for InputState + receipts.
31    store: Arc<dyn RuntimeStore>,
32    /// Runtime ID for store operations.
33    runtime_id: LogicalRuntimeId,
34}
35
36impl PersistentRuntimeDriver {
37    /// Create a new persistent runtime driver.
38    pub fn new(runtime_id: LogicalRuntimeId, store: Arc<dyn RuntimeStore>) -> Self {
39        Self {
40            inner: EphemeralRuntimeDriver::new(runtime_id.clone()),
41            store,
42            runtime_id,
43        }
44    }
45
46    /// Get immutable reference to the inner ephemeral driver.
47    pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
48        &self.inner
49    }
50
51    /// Check if the runtime is idle (delegates to inner).
52    pub fn is_idle(&self) -> bool {
53        self.inner.is_idle()
54    }
55
56    /// Start a new run (delegates to inner).
57    pub fn start_run(
58        &mut self,
59        run_id: meerkat_core::lifecycle::RunId,
60    ) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
61        self.inner.start_run(run_id)
62    }
63
64    /// Complete a run (delegates to inner).
65    pub fn complete_run(
66        &mut self,
67    ) -> Result<meerkat_core::lifecycle::RunId, crate::runtime_state::RuntimeStateTransitionError>
68    {
69        self.inner.complete_run()
70    }
71
72    /// Get pending events (delegates to inner).
73    pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
74        self.inner.drain_events()
75    }
76
77    /// Check and clear wake flag (delegates to inner).
78    pub fn take_wake_requested(&mut self) -> bool {
79        self.inner.take_wake_requested()
80    }
81
82    /// Check and clear immediate processing flag (delegates to inner).
83    pub fn take_process_requested(&mut self) -> bool {
84        self.inner.take_process_requested()
85    }
86
87    /// Dequeue next input (delegates to inner).
88    pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
89        self.inner.dequeue_next()
90    }
91
92    /// Stage input (delegates to inner).
93    pub fn stage_input(
94        &mut self,
95        input_id: &InputId,
96        run_id: &meerkat_core::lifecycle::RunId,
97    ) -> Result<(), crate::input_machine::InputStateMachineError> {
98        self.inner.stage_input(input_id, run_id)
99    }
100
101    /// Apply input (delegates to inner).
102    pub fn apply_input(
103        &mut self,
104        input_id: &InputId,
105        run_id: &meerkat_core::lifecycle::RunId,
106    ) -> Result<(), crate::input_machine::InputStateMachineError> {
107        self.inner.apply_input(input_id, run_id)
108    }
109
110    /// Roll back staged inputs (delegates to inner).
111    pub fn rollback_staged(
112        &mut self,
113        input_ids: &[InputId],
114    ) -> Result<(), crate::input_machine::InputStateMachineError> {
115        self.inner.rollback_staged(input_ids)
116    }
117
118    /// Consume applied inputs without completing a runtime run.
119    pub fn consume_inputs(
120        &mut self,
121        input_ids: &[InputId],
122        run_id: &meerkat_core::lifecycle::RunId,
123    ) -> Result<(), crate::input_machine::InputStateMachineError> {
124        self.inner.consume_inputs(input_ids, run_id)
125    }
126
127    /// Remove a previously accepted input from the ledger/queue.
128    pub fn forget_input(&mut self, input_id: &InputId) {
129        self.inner.forget_input(input_id);
130    }
131
132    async fn persist_state(&self, state: &InputState) -> Result<(), RuntimeDriverError> {
133        self.store
134            .persist_input_state(&self.runtime_id, state)
135            .await
136            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
137    }
138}
139
140#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
141#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
142impl RuntimeDriver for PersistentRuntimeDriver {
143    async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
144        let input_for_recovery = input.clone();
145
146        // Delegate to ephemeral for state machine logic
147        let mut outcome = self.inner.accept_input(input).await?;
148
149        // Durable-before-ack: persist InputState before returning
150        if let AcceptOutcome::Accepted {
151            ref input_id,
152            ref mut state,
153            ..
154        } = outcome
155            && let Some(inner_state) = self.inner.input_state(input_id).cloned()
156        {
157            let mut persisted = inner_state;
158            persisted.persisted_input = Some(input_for_recovery);
159            self.inner.ledger_mut().accept(persisted.clone());
160            if let Err(err) = self.persist_state(&persisted).await {
161                self.forget_input(input_id);
162                return Err(err);
163            }
164            *state = persisted;
165        }
166
167        Ok(outcome)
168    }
169
170    async fn on_runtime_event(
171        &mut self,
172        event: RuntimeEventEnvelope,
173    ) -> Result<(), RuntimeDriverError> {
174        self.inner.on_runtime_event(event).await
175    }
176
177    async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
178        match event {
179            // BoundaryApplied persists the receipt and the applied state atomically.
180            RunEvent::BoundaryApplied {
181                ref receipt,
182                ref session_snapshot,
183                ..
184            } => {
185                let checkpoint = self.inner.clone();
186                self.inner.on_run_event(event.clone()).await?;
187                if self
188                    .store
189                    .load_boundary_receipt(&self.runtime_id, &receipt.run_id, receipt.sequence)
190                    .await
191                    .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
192                    .is_some()
193                {
194                    return Ok(());
195                }
196                let input_updates: Vec<InputState> = receipt
197                    .contributing_input_ids
198                    .iter()
199                    .filter_map(|id| self.inner.input_state(id).cloned())
200                    .collect();
201
202                self.store
203                    .atomic_apply(
204                        &self.runtime_id,
205                        session_snapshot.clone().map(|session_snapshot| {
206                            crate::store::SessionDelta { session_snapshot }
207                        }),
208                        receipt.clone(),
209                        input_updates,
210                        None, // session_store_key — caller provides if dual-store needed
211                    )
212                    .await
213                    .map_err(|e| {
214                        self.inner = checkpoint;
215                        RuntimeDriverError::Internal(format!("runtime boundary commit failed: {e}"))
216                    })?;
217            }
218            RunEvent::RunCompleted { .. }
219            | RunEvent::RunFailed { .. }
220            | RunEvent::RunCancelled { .. } => {
221                let checkpoint = self.inner.clone();
222                self.inner.on_run_event(event).await?;
223                let input_states = self.inner.input_states_snapshot();
224                if let Err(err) = self
225                    .store
226                    .atomic_lifecycle_commit(
227                        &self.runtime_id,
228                        self.inner.runtime_state(),
229                        &input_states,
230                    )
231                    .await
232                {
233                    self.inner = checkpoint;
234                    return Err(RuntimeDriverError::Internal(format!(
235                        "terminal event persist failed: {err}"
236                    )));
237                }
238            }
239            _ => {
240                self.inner.on_run_event(event).await?;
241            }
242        }
243
244        Ok(())
245    }
246
247    async fn on_runtime_control(
248        &mut self,
249        command: RuntimeControlCommand,
250    ) -> Result<(), RuntimeDriverError> {
251        let checkpoint = self.inner.clone();
252        self.inner.on_runtime_control(command).await?;
253        let input_states = self.inner.input_states_snapshot();
254        if let Err(err) = self
255            .store
256            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
257            .await
258        {
259            self.inner = checkpoint;
260            return Err(RuntimeDriverError::Internal(format!(
261                "control op persist failed: {err}"
262            )));
263        }
264        Ok(())
265    }
266
267    async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
268        // §24 full recovery: load durable InputState from store
269        let stored_states = self
270            .store
271            .load_input_states(&self.runtime_id)
272            .await
273            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
274
275        let mut recovered_payloads = Vec::new();
276
277        // Inject stored states into the ephemeral driver's ledger.
278        // Uses recover() which also rebuilds the idempotency index for
279        // dedup correctness and filters out Ephemeral inputs.
280        for mut state in stored_states {
281            if matches!(
282                state.current_state,
283                InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption
284            ) {
285                let has_receipt = match (state.last_run_id.clone(), state.last_boundary_sequence) {
286                    (Some(run_id), Some(sequence)) => self
287                        .store
288                        .load_boundary_receipt(&self.runtime_id, &run_id, sequence)
289                        .await
290                        .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
291                        .is_some(),
292                    _ => false,
293                };
294                let now = Utc::now();
295                if has_receipt {
296                    state.history.push(InputStateHistoryEntry {
297                        timestamp: now,
298                        from: state.current_state,
299                        to: InputLifecycleState::Consumed,
300                        reason: Some("recovery: boundary receipt already committed".into()),
301                    });
302                    state.current_state = InputLifecycleState::Consumed;
303                    state.terminal_outcome = Some(InputTerminalOutcome::Consumed);
304                    state.updated_at = now;
305                } else {
306                    state.history.push(InputStateHistoryEntry {
307                        timestamp: now,
308                        from: state.current_state,
309                        to: InputLifecycleState::Queued,
310                        reason: Some("recovery: missing boundary receipt".into()),
311                    });
312                    state.current_state = InputLifecycleState::Queued;
313                    state.updated_at = now;
314                }
315            }
316
317            if let Some(input) = state.persisted_input.clone() {
318                recovered_payloads.push((state.input_id.clone(), input));
319            }
320            let ledger = &mut self.inner;
321            if ledger.input_state(&state.input_id).is_none() {
322                ledger.ledger_mut().recover(state);
323            }
324        }
325
326        // Then run ephemeral recovery logic (requeue Accepted/Staged)
327        let report = self.inner.recover().await?;
328
329        for (input_id, input) in recovered_payloads {
330            let should_requeue = self.inner.input_state(&input_id).is_some_and(|state| {
331                state.current_state == crate::input_state::InputLifecycleState::Queued
332            });
333            if should_requeue && !self.inner.has_queued_input(&input_id) {
334                self.inner.enqueue_recovered_input(input_id, input);
335            }
336        }
337
338        if let Some(runtime_state) = self
339            .store
340            .load_runtime_state(&self.runtime_id)
341            .await
342            .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
343        {
344            match runtime_state {
345                RuntimeState::Retired if self.inner.runtime_state() != RuntimeState::Retired => {
346                    EphemeralRuntimeDriver::retire(&mut self.inner)?;
347                }
348                RuntimeState::Stopped
349                    if self.inner.runtime_state() != RuntimeState::Stopped
350                        && self.inner.runtime_state() != RuntimeState::Destroyed =>
351                {
352                    // Never revive Destroyed as Stopped
353                    self.inner
354                        .on_runtime_control(RuntimeControlCommand::Stop)
355                        .await?;
356                }
357                RuntimeState::Destroyed
358                    if self.inner.runtime_state() != RuntimeState::Destroyed =>
359                {
360                    self.inner.destroy()?;
361                }
362                _ => {}
363            }
364
365            // Terminal states must not have active inputs. If persisted state
366            // is terminal but active inputs exist, treat as store corruption:
367            // terminalize those inputs instead of resurrecting work.
368            if runtime_state.is_terminal() {
369                let active = self.inner.active_input_ids();
370                if !active.is_empty() {
371                    tracing::warn!(
372                        runtime_id = %self.runtime_id,
373                        active_count = active.len(),
374                        persisted_state = %runtime_state,
375                        "terminal runtime has active inputs — terminalizing as corrupted"
376                    );
377                    let abandoned = self
378                        .inner
379                        .abandon_all_non_terminal(InputAbandonReason::Destroyed);
380                    self.inner.queue_mut().drain();
381                    tracing::warn!(
382                        runtime_id = %self.runtime_id,
383                        abandoned,
384                        "force-abandoned active inputs from terminal runtime"
385                    );
386                }
387            }
388        }
389
390        // Persist recovered state atomically
391        let input_states = self.inner.input_states_snapshot();
392        self.store
393            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
394            .await
395            .map_err(|e| RuntimeDriverError::Internal(format!("recovery persist failed: {e}")))?;
396        Ok(report)
397    }
398
399    async fn retire(&mut self) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
400        let checkpoint = self.inner.clone();
401        let report = EphemeralRuntimeDriver::retire(&mut self.inner)?;
402        let input_states = self.inner.input_states_snapshot();
403        if let Err(err) = self
404            .store
405            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
406            .await
407        {
408            self.inner = checkpoint;
409            return Err(RuntimeDriverError::Internal(format!(
410                "retire persist failed: {err}"
411            )));
412        }
413        Ok(report)
414    }
415
416    async fn reset(&mut self) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
417        let checkpoint = self.inner.clone();
418        let report = EphemeralRuntimeDriver::reset(&mut self.inner)?;
419        let input_states = self.inner.input_states_snapshot();
420        if let Err(err) = self
421            .store
422            .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
423            .await
424        {
425            self.inner = checkpoint;
426            return Err(RuntimeDriverError::Internal(format!(
427                "reset persist failed: {err}"
428            )));
429        }
430        Ok(report)
431    }
432
433    fn runtime_state(&self) -> RuntimeState {
434        self.inner.runtime_state()
435    }
436
437    fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
438        self.inner.input_state(input_id)
439    }
440
441    fn active_input_ids(&self) -> Vec<InputId> {
442        self.inner.active_input_ids()
443    }
444}
445
446#[cfg(test)]
447#[allow(clippy::unwrap_used)]
448mod tests {
449    use super::*;
450    use crate::input::*;
451    use crate::store::InMemoryRuntimeStore;
452    use chrono::Utc;
453
454    fn make_prompt(text: &str) -> Input {
455        Input::Prompt(PromptInput {
456            header: InputHeader {
457                id: InputId::new(),
458                timestamp: Utc::now(),
459                source: InputOrigin::Operator,
460                durability: InputDurability::Durable,
461                visibility: InputVisibility::default(),
462                idempotency_key: None,
463                supersession_key: None,
464                correlation_id: None,
465            },
466            text: text.into(),
467            turn_metadata: None,
468        })
469    }
470
471    #[tokio::test]
472    async fn durable_before_ack() {
473        let store = Arc::new(InMemoryRuntimeStore::new());
474        let rid = LogicalRuntimeId::new("test");
475        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
476
477        let input = make_prompt("hello");
478        let input_id = input.id().clone();
479        let outcome = driver.accept_input(input).await.unwrap();
480        assert!(outcome.is_accepted());
481
482        // Verify state was persisted to store BEFORE we returned
483        let stored = store.load_input_state(&rid, &input_id).await.unwrap();
484        assert!(stored.is_some());
485        assert!(stored.unwrap().persisted_input.is_some());
486    }
487
488    #[tokio::test]
489    async fn dedup_not_persisted() {
490        let store = Arc::new(InMemoryRuntimeStore::new());
491        let rid = LogicalRuntimeId::new("test");
492        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
493
494        let key = crate::identifiers::IdempotencyKey::new("req-1");
495        let mut input1 = make_prompt("hello");
496        if let Input::Prompt(ref mut p) = input1 {
497            p.header.idempotency_key = Some(key.clone());
498        }
499        driver.accept_input(input1).await.unwrap();
500
501        let mut input2 = make_prompt("hello again");
502        if let Input::Prompt(ref mut p) = input2 {
503            p.header.idempotency_key = Some(key);
504        }
505        let outcome = driver.accept_input(input2).await.unwrap();
506        assert!(outcome.is_deduplicated());
507
508        // Only one state in store
509        let states = store.load_input_states(&rid).await.unwrap();
510        assert_eq!(states.len(), 1);
511    }
512
513    #[tokio::test]
514    async fn recover_from_store() {
515        let store = Arc::new(InMemoryRuntimeStore::new());
516        let rid = LogicalRuntimeId::new("test");
517
518        // Pre-populate store with a state (simulating crash recovery)
519        let input = make_prompt("hello");
520        let input_id = input.id().clone();
521        let mut state = InputState::new_accepted(input_id.clone());
522        state.persisted_input = Some(input.clone());
523        state.durability = Some(InputDurability::Durable);
524        store.persist_input_state(&rid, &state).await.unwrap();
525
526        // Create a fresh driver (simulating restart)
527        let mut driver = PersistentRuntimeDriver::new(rid, store);
528
529        // Recover
530        let report = driver.recover().await.unwrap();
531        assert_eq!(report.inputs_recovered, 1);
532
533        // State should now be in the driver
534        assert!(driver.input_state(&input_id).is_some());
535        let dequeued = driver.dequeue_next();
536        assert!(
537            dequeued.is_some(),
538            "Recovered queued input should be re-enqueued"
539        );
540        let (queued_id, queued_input) = dequeued.unwrap();
541        assert_eq!(queued_id, input_id);
542        assert_eq!(queued_input.id(), &input_id);
543    }
544
545    #[tokio::test]
546    async fn recover_rebuilds_dedup_index() {
547        let store = Arc::new(InMemoryRuntimeStore::new());
548        let rid = LogicalRuntimeId::new("test");
549        let key = crate::identifiers::IdempotencyKey::new("dedup-key");
550
551        // Pre-populate store with a state that has an idempotency key
552        let input_id = InputId::new();
553        let mut state = InputState::new_accepted(input_id.clone());
554        state.idempotency_key = Some(key.clone());
555        state.durability = Some(InputDurability::Durable);
556        store.persist_input_state(&rid, &state).await.unwrap();
557
558        // Create a fresh driver and recover
559        let mut driver = PersistentRuntimeDriver::new(rid, store);
560        driver.recover().await.unwrap();
561
562        // Now try to accept a new input with the same idempotency key
563        let mut dup_input = make_prompt("duplicate");
564        if let Input::Prompt(ref mut p) = dup_input {
565            p.header.idempotency_key = Some(key);
566        }
567        let outcome = driver.accept_input(dup_input).await.unwrap();
568        assert!(
569            outcome.is_deduplicated(),
570            "After recovery, dedup index should be rebuilt so duplicates are caught"
571        );
572    }
573
574    #[tokio::test]
575    async fn recover_filters_ephemeral_inputs() {
576        let store = Arc::new(InMemoryRuntimeStore::new());
577        let rid = LogicalRuntimeId::new("test");
578
579        // Pre-populate with an ephemeral input state
580        let input_id = InputId::new();
581        let mut state = InputState::new_accepted(input_id.clone());
582        state.durability = Some(InputDurability::Ephemeral);
583        store.persist_input_state(&rid, &state).await.unwrap();
584
585        // Create fresh driver and recover
586        let mut driver = PersistentRuntimeDriver::new(rid, store);
587        let report = driver.recover().await.unwrap();
588
589        // Ephemeral input should NOT be recovered (it shouldn't survive restart)
590        assert!(
591            driver.input_state(&input_id).is_none(),
592            "Ephemeral inputs should be filtered during recovery"
593        );
594        assert_eq!(report.inputs_recovered, 0);
595    }
596
597    #[tokio::test]
598    async fn boundary_applied_persists_atomically() {
599        use meerkat_core::lifecycle::RunId;
600        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
601        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
602
603        let store = Arc::new(InMemoryRuntimeStore::new());
604        let rid = LogicalRuntimeId::new("test");
605        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
606
607        // Accept and manually process an input
608        let input = make_prompt("hello");
609        let input_id = input.id().clone();
610        driver.accept_input(input).await.unwrap();
611
612        let run_id = RunId::new();
613        driver.start_run(run_id.clone()).unwrap();
614        driver.stage_input(&input_id, &run_id).unwrap();
615
616        // Fire BoundaryApplied — this should persist atomically
617        let receipt = RunBoundaryReceipt {
618            run_id: run_id.clone(),
619            boundary: RunApplyBoundary::RunStart,
620            contributing_input_ids: vec![input_id.clone()],
621            conversation_digest: None,
622            message_count: 1,
623            sequence: 0,
624        };
625        driver
626            .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
627                run_id: run_id.clone(),
628                receipt: receipt.clone(),
629                session_snapshot: Some(b"session-data".to_vec()),
630            })
631            .await
632            .unwrap();
633
634        // Verify the receipt was persisted via atomic_apply
635        let loaded = store.load_boundary_receipt(&rid, &run_id, 0).await.unwrap();
636        assert!(
637            loaded.is_some(),
638            "BoundaryApplied should persist the receipt via atomic_apply"
639        );
640    }
641
642    #[tokio::test]
643    async fn retire_preserves_inputs_for_drain() {
644        let store = Arc::new(InMemoryRuntimeStore::new());
645        let rid = LogicalRuntimeId::new("test");
646        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
647
648        let input = make_prompt("hello");
649        let input_id = input.id().clone();
650        driver.accept_input(input).await.unwrap();
651
652        let report = driver.retire().await.unwrap();
653        assert_eq!(report.inputs_abandoned, 0);
654        assert_eq!(report.inputs_pending_drain, 1);
655
656        // Input is still queued, not abandoned
657        let stored = store
658            .load_input_state(&rid, &input_id)
659            .await
660            .unwrap()
661            .unwrap();
662        assert_eq!(
663            stored.current_state,
664            crate::input_state::InputLifecycleState::Queued
665        );
666    }
667
668    #[tokio::test]
669    async fn reset_persists_abandoned_inputs() {
670        let store = Arc::new(InMemoryRuntimeStore::new());
671        let rid = LogicalRuntimeId::new("test");
672        let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
673
674        let input = make_prompt("hello");
675        let input_id = input.id().clone();
676        driver.accept_input(input).await.unwrap();
677
678        let report = driver.reset().await.unwrap();
679        assert_eq!(report.inputs_abandoned, 1);
680
681        let stored = store
682            .load_input_state(&rid, &input_id)
683            .await
684            .unwrap()
685            .unwrap();
686        assert_eq!(
687            stored.current_state,
688            crate::input_state::InputLifecycleState::Abandoned
689        );
690    }
691
692    #[tokio::test]
693    async fn recover_consumes_committed_applied_pending_inputs() {
694        use meerkat_core::lifecycle::RunId;
695        use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
696        use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
697
698        let store = Arc::new(InMemoryRuntimeStore::new());
699        let rid = LogicalRuntimeId::new("test");
700        let input = make_prompt("already committed");
701        let input_id = input.id().clone();
702        let run_id = RunId::new();
703
704        let mut state = InputState::new_accepted(input_id.clone());
705        state.persisted_input = Some(input);
706        state.durability = Some(InputDurability::Durable);
707        state.current_state = InputLifecycleState::AppliedPendingConsumption;
708        state.last_run_id = Some(run_id.clone());
709        state.last_boundary_sequence = Some(0);
710        store.persist_input_state(&rid, &state).await.unwrap();
711        store
712            .atomic_apply(
713                &rid,
714                None,
715                RunBoundaryReceipt {
716                    run_id: run_id.clone(),
717                    boundary: RunApplyBoundary::RunStart,
718                    contributing_input_ids: vec![input_id.clone()],
719                    conversation_digest: None,
720                    message_count: 1,
721                    sequence: 0,
722                },
723                vec![state.clone()],
724                None,
725            )
726            .await
727            .unwrap();
728
729        let mut driver = PersistentRuntimeDriver::new(rid, store);
730        driver.recover().await.unwrap();
731
732        let recovered = driver.input_state(&input_id);
733        assert!(
734            recovered.is_some(),
735            "committed input should remain queryable after recovery"
736        );
737        let Some(recovered) = recovered else {
738            unreachable!("asserted some recovery state above");
739        };
740        assert_eq!(recovered.current_state, InputLifecycleState::Consumed);
741        assert!(
742            driver.active_input_ids().is_empty(),
743            "committed applied inputs should not stay active after recovery"
744        );
745        assert!(
746            driver.dequeue_next().is_none(),
747            "committed applied inputs should not be replayed after recovery"
748        );
749    }
750}