1use std::sync::Arc;
8
9use chrono::Utc;
10use meerkat_core::lifecycle::{InputId, RunEvent};
11
12use crate::accept::AcceptOutcome;
13use crate::identifiers::LogicalRuntimeId;
14use crate::input::Input;
15use crate::input_state::{
16 InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
17 InputTerminalOutcome,
18};
19use crate::runtime_event::RuntimeEventEnvelope;
20use crate::runtime_state::RuntimeState;
21use crate::store::RuntimeStore;
22use crate::traits::{RecoveryReport, RuntimeControlCommand, RuntimeDriver, RuntimeDriverError};
23
24use super::ephemeral::EphemeralRuntimeDriver;
25
26pub struct PersistentRuntimeDriver {
28 inner: EphemeralRuntimeDriver,
30 store: Arc<dyn RuntimeStore>,
32 runtime_id: LogicalRuntimeId,
34}
35
36impl PersistentRuntimeDriver {
37 pub fn new(runtime_id: LogicalRuntimeId, store: Arc<dyn RuntimeStore>) -> Self {
39 Self {
40 inner: EphemeralRuntimeDriver::new(runtime_id.clone()),
41 store,
42 runtime_id,
43 }
44 }
45
46 pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
48 &self.inner
49 }
50
51 pub fn is_idle(&self) -> bool {
53 self.inner.is_idle()
54 }
55
56 pub fn start_run(
58 &mut self,
59 run_id: meerkat_core::lifecycle::RunId,
60 ) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
61 self.inner.start_run(run_id)
62 }
63
64 pub fn complete_run(
66 &mut self,
67 ) -> Result<meerkat_core::lifecycle::RunId, crate::runtime_state::RuntimeStateTransitionError>
68 {
69 self.inner.complete_run()
70 }
71
72 pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
74 self.inner.drain_events()
75 }
76
77 pub fn take_wake_requested(&mut self) -> bool {
79 self.inner.take_wake_requested()
80 }
81
82 pub fn take_process_requested(&mut self) -> bool {
84 self.inner.take_process_requested()
85 }
86
87 pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
89 self.inner.dequeue_next()
90 }
91
92 pub fn stage_input(
94 &mut self,
95 input_id: &InputId,
96 run_id: &meerkat_core::lifecycle::RunId,
97 ) -> Result<(), crate::input_machine::InputStateMachineError> {
98 self.inner.stage_input(input_id, run_id)
99 }
100
101 pub fn apply_input(
103 &mut self,
104 input_id: &InputId,
105 run_id: &meerkat_core::lifecycle::RunId,
106 ) -> Result<(), crate::input_machine::InputStateMachineError> {
107 self.inner.apply_input(input_id, run_id)
108 }
109
110 pub fn rollback_staged(
112 &mut self,
113 input_ids: &[InputId],
114 ) -> Result<(), crate::input_machine::InputStateMachineError> {
115 self.inner.rollback_staged(input_ids)
116 }
117
118 pub fn consume_inputs(
120 &mut self,
121 input_ids: &[InputId],
122 run_id: &meerkat_core::lifecycle::RunId,
123 ) -> Result<(), crate::input_machine::InputStateMachineError> {
124 self.inner.consume_inputs(input_ids, run_id)
125 }
126
127 pub fn forget_input(&mut self, input_id: &InputId) {
129 self.inner.forget_input(input_id);
130 }
131
132 async fn persist_state(&self, state: &InputState) -> Result<(), RuntimeDriverError> {
133 self.store
134 .persist_input_state(&self.runtime_id, state)
135 .await
136 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
137 }
138}
139
140#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
141#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
142impl RuntimeDriver for PersistentRuntimeDriver {
143 async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
144 let input_for_recovery = input.clone();
145
146 let mut outcome = self.inner.accept_input(input).await?;
148
149 if let AcceptOutcome::Accepted {
151 ref input_id,
152 ref mut state,
153 ..
154 } = outcome
155 && let Some(inner_state) = self.inner.input_state(input_id).cloned()
156 {
157 let mut persisted = inner_state;
158 persisted.persisted_input = Some(input_for_recovery);
159 self.inner.ledger_mut().accept(persisted.clone());
160 if let Err(err) = self.persist_state(&persisted).await {
161 self.forget_input(input_id);
162 return Err(err);
163 }
164 *state = persisted;
165 }
166
167 Ok(outcome)
168 }
169
170 async fn on_runtime_event(
171 &mut self,
172 event: RuntimeEventEnvelope,
173 ) -> Result<(), RuntimeDriverError> {
174 self.inner.on_runtime_event(event).await
175 }
176
177 async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
178 match event {
179 RunEvent::BoundaryApplied {
181 ref receipt,
182 ref session_snapshot,
183 ..
184 } => {
185 let checkpoint = self.inner.clone();
186 self.inner.on_run_event(event.clone()).await?;
187 if self
188 .store
189 .load_boundary_receipt(&self.runtime_id, &receipt.run_id, receipt.sequence)
190 .await
191 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
192 .is_some()
193 {
194 return Ok(());
195 }
196 let input_updates: Vec<InputState> = receipt
197 .contributing_input_ids
198 .iter()
199 .filter_map(|id| self.inner.input_state(id).cloned())
200 .collect();
201
202 self.store
203 .atomic_apply(
204 &self.runtime_id,
205 session_snapshot.clone().map(|session_snapshot| {
206 crate::store::SessionDelta { session_snapshot }
207 }),
208 receipt.clone(),
209 input_updates,
210 None, )
212 .await
213 .map_err(|e| {
214 self.inner = checkpoint;
215 RuntimeDriverError::Internal(format!("runtime boundary commit failed: {e}"))
216 })?;
217 }
218 RunEvent::RunCompleted { .. }
219 | RunEvent::RunFailed { .. }
220 | RunEvent::RunCancelled { .. } => {
221 let checkpoint = self.inner.clone();
222 self.inner.on_run_event(event).await?;
223 let input_states = self.inner.input_states_snapshot();
224 if let Err(err) = self
225 .store
226 .atomic_lifecycle_commit(
227 &self.runtime_id,
228 self.inner.runtime_state(),
229 &input_states,
230 )
231 .await
232 {
233 self.inner = checkpoint;
234 return Err(RuntimeDriverError::Internal(format!(
235 "terminal event persist failed: {err}"
236 )));
237 }
238 }
239 _ => {
240 self.inner.on_run_event(event).await?;
241 }
242 }
243
244 Ok(())
245 }
246
247 async fn on_runtime_control(
248 &mut self,
249 command: RuntimeControlCommand,
250 ) -> Result<(), RuntimeDriverError> {
251 let checkpoint = self.inner.clone();
252 self.inner.on_runtime_control(command).await?;
253 let input_states = self.inner.input_states_snapshot();
254 if let Err(err) = self
255 .store
256 .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
257 .await
258 {
259 self.inner = checkpoint;
260 return Err(RuntimeDriverError::Internal(format!(
261 "control op persist failed: {err}"
262 )));
263 }
264 Ok(())
265 }
266
267 async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
268 let stored_states = self
270 .store
271 .load_input_states(&self.runtime_id)
272 .await
273 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
274
275 let mut recovered_payloads = Vec::new();
276
277 for mut state in stored_states {
281 if matches!(
282 state.current_state,
283 InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption
284 ) {
285 let has_receipt = match (state.last_run_id.clone(), state.last_boundary_sequence) {
286 (Some(run_id), Some(sequence)) => self
287 .store
288 .load_boundary_receipt(&self.runtime_id, &run_id, sequence)
289 .await
290 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
291 .is_some(),
292 _ => false,
293 };
294 let now = Utc::now();
295 if has_receipt {
296 state.history.push(InputStateHistoryEntry {
297 timestamp: now,
298 from: state.current_state,
299 to: InputLifecycleState::Consumed,
300 reason: Some("recovery: boundary receipt already committed".into()),
301 });
302 state.current_state = InputLifecycleState::Consumed;
303 state.terminal_outcome = Some(InputTerminalOutcome::Consumed);
304 state.updated_at = now;
305 } else {
306 state.history.push(InputStateHistoryEntry {
307 timestamp: now,
308 from: state.current_state,
309 to: InputLifecycleState::Queued,
310 reason: Some("recovery: missing boundary receipt".into()),
311 });
312 state.current_state = InputLifecycleState::Queued;
313 state.updated_at = now;
314 }
315 }
316
317 if let Some(input) = state.persisted_input.clone() {
318 recovered_payloads.push((state.input_id.clone(), input));
319 }
320 let ledger = &mut self.inner;
321 if ledger.input_state(&state.input_id).is_none() {
322 ledger.ledger_mut().recover(state);
323 }
324 }
325
326 let report = self.inner.recover().await?;
328
329 for (input_id, input) in recovered_payloads {
330 let should_requeue = self.inner.input_state(&input_id).is_some_and(|state| {
331 state.current_state == crate::input_state::InputLifecycleState::Queued
332 });
333 if should_requeue && !self.inner.has_queued_input(&input_id) {
334 self.inner.enqueue_recovered_input(input_id, input);
335 }
336 }
337
338 if let Some(runtime_state) = self
339 .store
340 .load_runtime_state(&self.runtime_id)
341 .await
342 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
343 {
344 match runtime_state {
345 RuntimeState::Retired if self.inner.runtime_state() != RuntimeState::Retired => {
346 EphemeralRuntimeDriver::retire(&mut self.inner)?;
347 }
348 RuntimeState::Stopped
349 if self.inner.runtime_state() != RuntimeState::Stopped
350 && self.inner.runtime_state() != RuntimeState::Destroyed =>
351 {
352 self.inner
354 .on_runtime_control(RuntimeControlCommand::Stop)
355 .await?;
356 }
357 RuntimeState::Destroyed
358 if self.inner.runtime_state() != RuntimeState::Destroyed =>
359 {
360 self.inner.destroy()?;
361 }
362 _ => {}
363 }
364
365 if runtime_state.is_terminal() {
369 let active = self.inner.active_input_ids();
370 if !active.is_empty() {
371 tracing::warn!(
372 runtime_id = %self.runtime_id,
373 active_count = active.len(),
374 persisted_state = %runtime_state,
375 "terminal runtime has active inputs — terminalizing as corrupted"
376 );
377 let abandoned = self
378 .inner
379 .abandon_all_non_terminal(InputAbandonReason::Destroyed);
380 self.inner.queue_mut().drain();
381 tracing::warn!(
382 runtime_id = %self.runtime_id,
383 abandoned,
384 "force-abandoned active inputs from terminal runtime"
385 );
386 }
387 }
388 }
389
390 let input_states = self.inner.input_states_snapshot();
392 self.store
393 .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
394 .await
395 .map_err(|e| RuntimeDriverError::Internal(format!("recovery persist failed: {e}")))?;
396 Ok(report)
397 }
398
399 async fn retire(&mut self) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
400 let checkpoint = self.inner.clone();
401 let report = EphemeralRuntimeDriver::retire(&mut self.inner)?;
402 let input_states = self.inner.input_states_snapshot();
403 if let Err(err) = self
404 .store
405 .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
406 .await
407 {
408 self.inner = checkpoint;
409 return Err(RuntimeDriverError::Internal(format!(
410 "retire persist failed: {err}"
411 )));
412 }
413 Ok(report)
414 }
415
416 async fn reset(&mut self) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
417 let checkpoint = self.inner.clone();
418 let report = EphemeralRuntimeDriver::reset(&mut self.inner)?;
419 let input_states = self.inner.input_states_snapshot();
420 if let Err(err) = self
421 .store
422 .atomic_lifecycle_commit(&self.runtime_id, self.inner.runtime_state(), &input_states)
423 .await
424 {
425 self.inner = checkpoint;
426 return Err(RuntimeDriverError::Internal(format!(
427 "reset persist failed: {err}"
428 )));
429 }
430 Ok(report)
431 }
432
433 fn runtime_state(&self) -> RuntimeState {
434 self.inner.runtime_state()
435 }
436
437 fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
438 self.inner.input_state(input_id)
439 }
440
441 fn active_input_ids(&self) -> Vec<InputId> {
442 self.inner.active_input_ids()
443 }
444}
445
446#[cfg(test)]
447#[allow(clippy::unwrap_used)]
448mod tests {
449 use super::*;
450 use crate::input::*;
451 use crate::store::InMemoryRuntimeStore;
452 use chrono::Utc;
453
454 fn make_prompt(text: &str) -> Input {
455 Input::Prompt(PromptInput {
456 header: InputHeader {
457 id: InputId::new(),
458 timestamp: Utc::now(),
459 source: InputOrigin::Operator,
460 durability: InputDurability::Durable,
461 visibility: InputVisibility::default(),
462 idempotency_key: None,
463 supersession_key: None,
464 correlation_id: None,
465 },
466 text: text.into(),
467 blocks: None,
468 turn_metadata: None,
469 })
470 }
471
472 #[tokio::test]
473 async fn durable_before_ack() {
474 let store = Arc::new(InMemoryRuntimeStore::new());
475 let rid = LogicalRuntimeId::new("test");
476 let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
477
478 let input = make_prompt("hello");
479 let input_id = input.id().clone();
480 let outcome = driver.accept_input(input).await.unwrap();
481 assert!(outcome.is_accepted());
482
483 let stored = store.load_input_state(&rid, &input_id).await.unwrap();
485 assert!(stored.is_some());
486 assert!(stored.unwrap().persisted_input.is_some());
487 }
488
489 #[tokio::test]
490 async fn dedup_not_persisted() {
491 let store = Arc::new(InMemoryRuntimeStore::new());
492 let rid = LogicalRuntimeId::new("test");
493 let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
494
495 let key = crate::identifiers::IdempotencyKey::new("req-1");
496 let mut input1 = make_prompt("hello");
497 if let Input::Prompt(ref mut p) = input1 {
498 p.header.idempotency_key = Some(key.clone());
499 }
500 driver.accept_input(input1).await.unwrap();
501
502 let mut input2 = make_prompt("hello again");
503 if let Input::Prompt(ref mut p) = input2 {
504 p.header.idempotency_key = Some(key);
505 }
506 let outcome = driver.accept_input(input2).await.unwrap();
507 assert!(outcome.is_deduplicated());
508
509 let states = store.load_input_states(&rid).await.unwrap();
511 assert_eq!(states.len(), 1);
512 }
513
514 #[tokio::test]
515 async fn recover_from_store() {
516 let store = Arc::new(InMemoryRuntimeStore::new());
517 let rid = LogicalRuntimeId::new("test");
518
519 let input = make_prompt("hello");
521 let input_id = input.id().clone();
522 let mut state = InputState::new_accepted(input_id.clone());
523 state.persisted_input = Some(input.clone());
524 state.durability = Some(InputDurability::Durable);
525 store.persist_input_state(&rid, &state).await.unwrap();
526
527 let mut driver = PersistentRuntimeDriver::new(rid, store);
529
530 let report = driver.recover().await.unwrap();
532 assert_eq!(report.inputs_recovered, 1);
533
534 assert!(driver.input_state(&input_id).is_some());
536 let dequeued = driver.dequeue_next();
537 assert!(
538 dequeued.is_some(),
539 "Recovered queued input should be re-enqueued"
540 );
541 let (queued_id, queued_input) = dequeued.unwrap();
542 assert_eq!(queued_id, input_id);
543 assert_eq!(queued_input.id(), &input_id);
544 }
545
546 #[tokio::test]
547 async fn recover_rebuilds_dedup_index() {
548 let store = Arc::new(InMemoryRuntimeStore::new());
549 let rid = LogicalRuntimeId::new("test");
550 let key = crate::identifiers::IdempotencyKey::new("dedup-key");
551
552 let input_id = InputId::new();
554 let mut state = InputState::new_accepted(input_id.clone());
555 state.idempotency_key = Some(key.clone());
556 state.durability = Some(InputDurability::Durable);
557 store.persist_input_state(&rid, &state).await.unwrap();
558
559 let mut driver = PersistentRuntimeDriver::new(rid, store);
561 driver.recover().await.unwrap();
562
563 let mut dup_input = make_prompt("duplicate");
565 if let Input::Prompt(ref mut p) = dup_input {
566 p.header.idempotency_key = Some(key);
567 }
568 let outcome = driver.accept_input(dup_input).await.unwrap();
569 assert!(
570 outcome.is_deduplicated(),
571 "After recovery, dedup index should be rebuilt so duplicates are caught"
572 );
573 }
574
575 #[tokio::test]
576 async fn recover_filters_ephemeral_inputs() {
577 let store = Arc::new(InMemoryRuntimeStore::new());
578 let rid = LogicalRuntimeId::new("test");
579
580 let input_id = InputId::new();
582 let mut state = InputState::new_accepted(input_id.clone());
583 state.durability = Some(InputDurability::Ephemeral);
584 store.persist_input_state(&rid, &state).await.unwrap();
585
586 let mut driver = PersistentRuntimeDriver::new(rid, store);
588 let report = driver.recover().await.unwrap();
589
590 assert!(
592 driver.input_state(&input_id).is_none(),
593 "Ephemeral inputs should be filtered during recovery"
594 );
595 assert_eq!(report.inputs_recovered, 0);
596 }
597
598 #[tokio::test]
599 async fn boundary_applied_persists_atomically() {
600 use meerkat_core::lifecycle::RunId;
601 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
602 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
603
604 let store = Arc::new(InMemoryRuntimeStore::new());
605 let rid = LogicalRuntimeId::new("test");
606 let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
607
608 let input = make_prompt("hello");
610 let input_id = input.id().clone();
611 driver.accept_input(input).await.unwrap();
612
613 let run_id = RunId::new();
614 driver.start_run(run_id.clone()).unwrap();
615 driver.stage_input(&input_id, &run_id).unwrap();
616
617 let receipt = RunBoundaryReceipt {
619 run_id: run_id.clone(),
620 boundary: RunApplyBoundary::RunStart,
621 contributing_input_ids: vec![input_id.clone()],
622 conversation_digest: None,
623 message_count: 1,
624 sequence: 0,
625 };
626 driver
627 .on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
628 run_id: run_id.clone(),
629 receipt: receipt.clone(),
630 session_snapshot: Some(b"session-data".to_vec()),
631 })
632 .await
633 .unwrap();
634
635 let loaded = store.load_boundary_receipt(&rid, &run_id, 0).await.unwrap();
637 assert!(
638 loaded.is_some(),
639 "BoundaryApplied should persist the receipt via atomic_apply"
640 );
641 }
642
643 #[tokio::test]
644 async fn retire_preserves_inputs_for_drain() {
645 let store = Arc::new(InMemoryRuntimeStore::new());
646 let rid = LogicalRuntimeId::new("test");
647 let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
648
649 let input = make_prompt("hello");
650 let input_id = input.id().clone();
651 driver.accept_input(input).await.unwrap();
652
653 let report = driver.retire().await.unwrap();
654 assert_eq!(report.inputs_abandoned, 0);
655 assert_eq!(report.inputs_pending_drain, 1);
656
657 let stored = store
659 .load_input_state(&rid, &input_id)
660 .await
661 .unwrap()
662 .unwrap();
663 assert_eq!(
664 stored.current_state,
665 crate::input_state::InputLifecycleState::Queued
666 );
667 }
668
669 #[tokio::test]
670 async fn reset_persists_abandoned_inputs() {
671 let store = Arc::new(InMemoryRuntimeStore::new());
672 let rid = LogicalRuntimeId::new("test");
673 let mut driver = PersistentRuntimeDriver::new(rid.clone(), store.clone());
674
675 let input = make_prompt("hello");
676 let input_id = input.id().clone();
677 driver.accept_input(input).await.unwrap();
678
679 let report = driver.reset().await.unwrap();
680 assert_eq!(report.inputs_abandoned, 1);
681
682 let stored = store
683 .load_input_state(&rid, &input_id)
684 .await
685 .unwrap()
686 .unwrap();
687 assert_eq!(
688 stored.current_state,
689 crate::input_state::InputLifecycleState::Abandoned
690 );
691 }
692
693 #[tokio::test]
694 async fn recover_consumes_committed_applied_pending_inputs() {
695 use meerkat_core::lifecycle::RunId;
696 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
697 use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
698
699 let store = Arc::new(InMemoryRuntimeStore::new());
700 let rid = LogicalRuntimeId::new("test");
701 let input = make_prompt("already committed");
702 let input_id = input.id().clone();
703 let run_id = RunId::new();
704
705 let mut state = InputState::new_accepted(input_id.clone());
706 state.persisted_input = Some(input);
707 state.durability = Some(InputDurability::Durable);
708 state.current_state = InputLifecycleState::AppliedPendingConsumption;
709 state.last_run_id = Some(run_id.clone());
710 state.last_boundary_sequence = Some(0);
711 store.persist_input_state(&rid, &state).await.unwrap();
712 store
713 .atomic_apply(
714 &rid,
715 None,
716 RunBoundaryReceipt {
717 run_id: run_id.clone(),
718 boundary: RunApplyBoundary::RunStart,
719 contributing_input_ids: vec![input_id.clone()],
720 conversation_digest: None,
721 message_count: 1,
722 sequence: 0,
723 },
724 vec![state.clone()],
725 None,
726 )
727 .await
728 .unwrap();
729
730 let mut driver = PersistentRuntimeDriver::new(rid, store);
731 driver.recover().await.unwrap();
732
733 let recovered = driver.input_state(&input_id);
734 assert!(
735 recovered.is_some(),
736 "committed input should remain queryable after recovery"
737 );
738 let Some(recovered) = recovered else {
739 unreachable!("asserted some recovery state above");
740 };
741 assert_eq!(recovered.current_state, InputLifecycleState::Consumed);
742 assert!(
743 driver.active_input_ids().is_empty(),
744 "committed applied inputs should not stay active after recovery"
745 );
746 assert!(
747 driver.dequeue_next().is_none(),
748 "committed applied inputs should not be replayed after recovery"
749 );
750 }
751}