1use std::sync::Arc;
8use std::sync::RwLock as StdRwLock;
9
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12
13use crate::accept::AcceptOutcome;
14use crate::identifiers::LogicalRuntimeId;
15use crate::input::{Input, externalize_input_images};
16use crate::input_state::{
17 InputAbandonReason, InputLifecycleState, InputState, InputStatePersistenceRecord,
18 StoredInputState,
19};
20use crate::runtime_event::RuntimeEventEnvelope;
21use crate::runtime_state::RuntimeState;
22use crate::store::{MachineLifecycleCommit, RuntimeStore};
23use crate::traits::{DestroyReport, RecoveryReport, RuntimeDriver, RuntimeDriverError};
24
25use super::ephemeral::{
26 EphemeralDriverRollbackSnapshot, EphemeralRuntimeDriver, SharedIngressDslAuthority,
27};
28
29pub struct PersistentRuntimeDriver {
31 inner: EphemeralRuntimeDriver,
33 store: Arc<dyn RuntimeStore>,
35 blob_store: Arc<dyn BlobStore>,
37 runtime_id: LogicalRuntimeId,
39 #[cfg(test)]
43 pub(crate) force_input_snapshot_failure_for_test: bool,
44}
45
46impl PersistentRuntimeDriver {
47 pub fn new(
49 runtime_id: LogicalRuntimeId,
50 store: Arc<dyn RuntimeStore>,
51 blob_store: Arc<dyn BlobStore>,
52 ) -> Self {
53 Self::new_with_control(
54 runtime_id,
55 store,
56 blob_store,
57 Arc::new(StdRwLock::new(
58 crate::driver::ephemeral::RuntimeControlProjection::default(),
59 )),
60 crate::driver::ephemeral::new_ingress_dsl_authority(),
61 )
62 }
63
64 pub(crate) fn new_with_control(
65 runtime_id: LogicalRuntimeId,
66 store: Arc<dyn RuntimeStore>,
67 blob_store: Arc<dyn BlobStore>,
68 control: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
69 dsl: SharedIngressDslAuthority,
70 ) -> Self {
71 Self {
72 inner: EphemeralRuntimeDriver::new_with_control_and_dsl(
73 runtime_id.clone(),
74 control,
75 dsl,
76 ),
77 store,
78 blob_store,
79 runtime_id,
80 #[cfg(test)]
81 force_input_snapshot_failure_for_test: false,
82 }
83 }
84
85 pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
87 &self.inner
88 }
89
90 pub(crate) fn rollback_snapshot(&self) -> EphemeralDriverRollbackSnapshot {
91 self.inner.rollback_snapshot()
92 }
93
94 pub(crate) fn restore_rollback_snapshot(&mut self, snapshot: EphemeralDriverRollbackSnapshot) {
95 self.inner.restore_rollback_snapshot(snapshot);
96 }
97
98 pub fn runtime_id(&self) -> &LogicalRuntimeId {
100 &self.runtime_id
101 }
102
103 pub fn silent_comms_intents(&self) -> Vec<String> {
104 self.inner.silent_comms_intents()
105 }
106
107 pub fn is_idle(&self) -> bool {
109 self.inner.is_idle()
110 }
111
112 fn runtime_state_for_persistence(&self) -> Result<RuntimeState, RuntimeDriverError> {
114 Self::runtime_state_for_persistence_from_inner(&self.inner)
115 }
116
117 fn runtime_state_for_persistence_from_inner(
118 inner: &EphemeralRuntimeDriver,
119 ) -> Result<RuntimeState, RuntimeDriverError> {
120 crate::meerkat_machine::classify_runtime_lifecycle_durable_state(inner.runtime_state())
121 .map_err(|err| {
122 RuntimeDriverError::Internal(format!(
123 "generated runtime lifecycle durability classification failed: {err}"
124 ))
125 })
126 }
127
128 fn lifecycle_commit_for_persistence(
129 &self,
130 ) -> Result<MachineLifecycleCommit, RuntimeDriverError> {
131 Self::lifecycle_commit_for_persistence_from_inner(&self.inner)
132 }
133
134 fn lifecycle_commit_for_persistence_from_inner(
135 inner: &EphemeralRuntimeDriver,
136 ) -> Result<MachineLifecycleCommit, RuntimeDriverError> {
137 Ok(MachineLifecycleCommit::new_with_binding(
138 Self::runtime_state_for_persistence_from_inner(inner)?,
139 inner.machine_lifecycle_binding_facts(),
140 ))
141 }
142
143 fn lifecycle_persistence_payload_with_rollback(
153 &mut self,
154 checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
155 context: &str,
156 ) -> Result<
157 (
158 super::ephemeral::EphemeralDriverRollbackSnapshot,
159 Vec<InputStatePersistenceRecord>,
160 MachineLifecycleCommit,
161 ),
162 RuntimeDriverError,
163 > {
164 let input_states_result = self.inner.authorized_stored_input_states_snapshot();
165 #[cfg(test)]
166 let input_states_result = if self.force_input_snapshot_failure_for_test {
167 Err(RuntimeDriverError::Internal(
168 "forced input-state snapshot failure for checkpoint-restore contract test"
169 .to_string(),
170 ))
171 } else {
172 input_states_result
173 };
174 let input_states = match input_states_result {
175 Ok(input_states) => input_states,
176 Err(err) => {
177 self.inner.restore_rollback_snapshot(checkpoint);
178 return Err(RuntimeDriverError::Internal(format!(
179 "{context} input-state snapshot failed: {err}"
180 )));
181 }
182 };
183 let commit = match self.lifecycle_commit_for_persistence() {
184 Ok(commit) => commit,
185 Err(err) => {
186 self.inner.restore_rollback_snapshot(checkpoint);
187 return Err(RuntimeDriverError::Internal(format!(
188 "{context} lifecycle commit classification failed: {err}"
189 )));
190 }
191 };
192 Ok((checkpoint, input_states, commit))
193 }
194
195 async fn commit_lifecycle_with_rollback(
196 &mut self,
197 checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
198 target_state: RuntimeState,
199 context: &str,
200 ) -> Result<(), RuntimeDriverError> {
201 let (checkpoint, input_states, commit) =
206 self.lifecycle_persistence_payload_with_rollback(checkpoint, context)?;
207 let target_durable_state =
208 match crate::meerkat_machine::classify_runtime_lifecycle_durable_state(target_state) {
209 Ok(target_durable_state) => target_durable_state,
210 Err(err) => {
211 self.inner.restore_rollback_snapshot(checkpoint);
212 return Err(RuntimeDriverError::Internal(format!(
213 "{context} generated target lifecycle durability classification failed: {err}"
214 )));
215 }
216 };
217 if commit.runtime_state() != target_durable_state {
218 self.inner.restore_rollback_snapshot(checkpoint);
219 return Err(RuntimeDriverError::Internal(format!(
220 "{context} durable persist target {target_durable_state:?} from live {target_state:?} disagreed with generated lifecycle commit {:?}",
221 commit.runtime_state()
222 )));
223 }
224 if let Err(err) = self
225 .store
226 .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
227 .await
228 {
229 self.inner.restore_rollback_snapshot(checkpoint);
230 return Err(RuntimeDriverError::Internal(format!(
231 "{context} persist failed: {err}"
232 )));
233 }
234 Ok(())
235 }
236
237 pub(crate) async fn publish_service_turn_terminal_lifecycle(
238 &mut self,
239 checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
240 target_state: RuntimeState,
241 ) -> Result<(), RuntimeDriverError> {
242 self.commit_lifecycle_with_rollback(
243 checkpoint,
244 target_state,
245 "service turn terminal receipt",
246 )
247 .await?;
248 self.inner.sync_control_projection_from_dsl_authority();
249 Ok(())
250 }
251
252 pub(crate) fn set_control_projection(
253 &mut self,
254 next_phase: RuntimeState,
255 current_run_id: Option<RunId>,
256 pre_run_phase: Option<RuntimeState>,
257 ) {
258 self.inner
259 .set_control_projection(next_phase, current_run_id, pre_run_phase);
260 }
261
262 pub(crate) fn sync_control_projection_from_dsl_authority(&mut self) {
267 self.inner.sync_control_projection_from_dsl_authority();
268 }
269
270 pub(crate) async fn persist_current_machine_lifecycle(
271 &mut self,
272 context: &str,
273 ) -> Result<(), RuntimeDriverError> {
274 let input_states = self.inner.authorized_stored_input_states_snapshot()?;
275 let commit = self.lifecycle_commit_for_persistence()?;
276 self.store
277 .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
278 .await
279 .map_err(|err| {
280 RuntimeDriverError::Internal(format!("{context} lifecycle persist failed: {err}"))
281 })
282 }
283
284 #[doc(hidden)]
287 pub fn contract_begin_run_authority(
288 &mut self,
289 run_id: RunId,
290 ) -> Result<(), RuntimeDriverError> {
291 self.inner.contract_begin_run_authority(run_id)
292 }
293
294 #[cfg(test)]
297 #[doc(hidden)]
298 pub(crate) fn contract_force_runtime_authority(
299 &mut self,
300 next_phase: RuntimeState,
301 current_run_id: Option<RunId>,
302 pre_run_phase: Option<RuntimeState>,
303 ) {
304 self.inner
305 .contract_force_runtime_authority(next_phase, current_run_id, pre_run_phase);
306 }
307
308 pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
310 self.inner.drain_events()
311 }
312
313 pub fn take_post_admission_signal(&mut self) -> crate::driver::ephemeral::PostAdmissionSignal {
315 self.inner.take_post_admission_signal()
316 }
317
318 pub fn post_admission_signal(&self) -> crate::driver::ephemeral::PostAdmissionSignal {
320 self.inner.post_admission_signal()
321 }
322
323 pub fn take_wake_requested(&mut self) -> bool {
325 self.inner.take_wake_requested()
326 }
327
328 pub fn take_process_requested(&mut self) -> bool {
330 self.inner.take_process_requested()
331 }
332
333 pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
335 self.inner.dequeue_next()
336 }
337
338 pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
340 self.inner.dequeue_by_id(input_id)
341 }
342
343 pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
344 self.inner.has_queued_input_outside(excluded)
345 }
346
347 pub(crate) fn defer_queued_inputs_behind_backlog(
348 &mut self,
349 input_ids: &[InputId],
350 ) -> Result<(), RuntimeDriverError> {
351 self.inner.defer_queued_inputs_behind_backlog(input_ids)
352 }
353
354 pub(crate) fn absorb_post_admission_effects(
355 &mut self,
356 effects: &[crate::meerkat_machine::dsl::MeerkatMachineEffect],
357 ) {
358 self.inner.absorb_post_admission_effects(effects);
359 }
360
361 pub(crate) fn resolve_admission(
362 &self,
363 input: &Input,
364 ) -> Result<crate::accept::ResolvedAdmission, RuntimeDriverError> {
365 self.inner.resolve_admission(input)
366 }
367
368 pub(crate) fn resolve_admission_with_active_turn_boundary(
369 &self,
370 input: &Input,
371 active_turn_boundary_available: bool,
372 ) -> Result<crate::accept::ResolvedAdmission, RuntimeDriverError> {
373 self.inner
374 .resolve_admission_with_active_turn_boundary(input, active_turn_boundary_available)
375 }
376
377 pub(crate) fn resolve_admission_without_wake_with_active_turn_boundary(
378 &self,
379 input: &Input,
380 active_turn_boundary_available: bool,
381 ) -> Result<crate::accept::ResolvedAdmission, RuntimeDriverError> {
382 self.inner
383 .resolve_admission_without_wake_with_active_turn_boundary(
384 input,
385 active_turn_boundary_available,
386 )
387 }
388
389 pub(crate) async fn accept_resolved_input(
390 &mut self,
391 input: Input,
392 resolved: crate::accept::ResolvedAdmission,
393 ) -> Result<AcceptOutcome, RuntimeDriverError> {
394 let flags = resolved.coarse_flags();
395 let mut staged = self.inner.clone_with_isolated_dsl_authority();
396 staged.ensure_contract_session_authority()?;
397 let staged_outcome = staged
398 .accept_resolved_input(input.clone(), resolved.clone())
399 .await?;
400
401 let AcceptOutcome::Accepted {
402 input_id: staged_input_id,
403 ..
404 } = staged_outcome
405 else {
406 return self.inner.accept_resolved_input(input, resolved).await;
407 };
408
409 staged.machine_apply_accept_with_completion_signal(&staged_input_id, flags)?;
410 let Some(mut staged_bundle) = staged.stored_input_state(&staged_input_id) else {
411 return Err(RuntimeDriverError::Internal(format!(
412 "generated input lifecycle phase missing for accepted input {staged_input_id}"
413 )));
414 };
415 let mut input_for_recovery = input.clone();
416 externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery)
417 .await
418 .map_err(|err| {
419 RuntimeDriverError::Internal(format!(
420 "failed to externalize runtime input images: {err}"
421 ))
422 })?;
423 staged_bundle.state.persisted_input = Some(input_for_recovery.clone());
424 self.persist_state(&staged_bundle).await?;
425
426 self.inner.ensure_contract_session_authority()?;
427 let mut outcome = self.inner.accept_resolved_input(input, resolved).await?;
428 if let AcceptOutcome::Accepted {
429 ref input_id,
430 ref mut state,
431 ref mut seed,
432 ..
433 } = outcome
434 {
435 if input_id != &staged_input_id {
436 return Err(RuntimeDriverError::Internal(format!(
437 "staged accepted input {staged_input_id} differed from committed input {input_id}"
438 )));
439 }
440 self.inner
441 .machine_apply_accept_with_completion_signal(input_id, flags)?;
442 let Some(mut bundle) = self.inner.stored_input_state(input_id) else {
443 return Err(RuntimeDriverError::Internal(format!(
444 "generated input lifecycle phase missing for accepted input {input_id}"
445 )));
446 };
447 bundle.state.persisted_input = Some(input_for_recovery);
448 self.inner.ledger_mut().accept(bundle.state.clone());
449 *state = bundle.state;
450 *seed = bundle.seed;
451 }
452
453 Ok(outcome)
454 }
455
456 pub(crate) async fn preview_accept_resolved_input(
457 &self,
458 input: Input,
459 resolved: crate::accept::ResolvedAdmission,
460 ) -> Result<AcceptOutcome, RuntimeDriverError> {
461 let mut staged = self.inner.clone_with_isolated_dsl_authority();
462 staged.ensure_contract_session_authority()?;
463 staged.accept_resolved_input(input, resolved).await
464 }
465
466 pub fn stage_input(
468 &mut self,
469 input_id: &InputId,
470 run_id: &meerkat_core::lifecycle::RunId,
471 ) -> Result<(), crate::traits::RuntimeDriverError> {
472 self.inner.stage_input(input_id, run_id)
473 }
474
475 pub fn stage_batch(
477 &mut self,
478 input_ids: &[InputId],
479 run_id: &meerkat_core::lifecycle::RunId,
480 ) -> Result<(), crate::traits::RuntimeDriverError> {
481 self.inner.stage_batch(input_ids, run_id)
482 }
483
484 pub(crate) fn machine_realize_stage_batch(
485 &mut self,
486 input_ids: &[InputId],
487 run_id: &meerkat_core::lifecycle::RunId,
488 ) -> Result<(), crate::traits::RuntimeDriverError> {
489 self.inner.machine_realize_stage_batch(input_ids, run_id)
490 }
491
492 pub fn apply_input(
494 &mut self,
495 input_id: &InputId,
496 run_id: &meerkat_core::lifecycle::RunId,
497 ) -> Result<(), crate::traits::RuntimeDriverError> {
498 self.inner.apply_input(input_id, run_id)
499 }
500
501 pub fn rollback_staged(
503 &mut self,
504 input_ids: &[InputId],
505 ) -> Result<(), crate::traits::RuntimeDriverError> {
506 self.inner.rollback_staged(input_ids)
507 }
508
509 async fn persist_state(&self, state: &StoredInputState) -> Result<(), RuntimeDriverError> {
510 let state = InputStatePersistenceRecord::from_machine_snapshot(state.clone())
511 .map_err(RuntimeDriverError::Internal)?;
512 self.store
513 .persist_input_state(&self.runtime_id, &state)
514 .await
515 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
516 }
517
518 pub(crate) async fn abandon_pending_inputs(
519 &mut self,
520 reason: InputAbandonReason,
521 ) -> Result<usize, RuntimeDriverError> {
522 let checkpoint = self.inner.rollback_snapshot();
523 let abandoned = match self.inner.abandon_pending_inputs(reason) {
524 Ok(abandoned) => abandoned,
525 Err(err) => {
526 self.inner.restore_rollback_snapshot(checkpoint);
527 return Err(err);
528 }
529 };
530 let (checkpoint, input_states, commit) =
531 self.lifecycle_persistence_payload_with_rollback(checkpoint, "pending input abandon")?;
532 if let Err(err) = self
533 .store
534 .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
535 .await
536 {
537 self.inner.restore_rollback_snapshot(checkpoint);
538 return Err(RuntimeDriverError::Internal(format!(
539 "pending input abandon persist failed: {err}"
540 )));
541 }
542 Ok(abandoned)
543 }
544
545 pub(crate) async fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
550 let checkpoint = self.inner.rollback_snapshot();
551 let transferred = match self.inner.recycle_preserving_work() {
552 Ok(transferred) => transferred,
553 Err(err) => {
554 self.inner.restore_rollback_snapshot(checkpoint);
555 return Err(err);
556 }
557 };
558 let (checkpoint, input_states, commit) =
559 self.lifecycle_persistence_payload_with_rollback(checkpoint, "recycle")?;
560 if let Err(err) = self
561 .store
562 .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
563 .await
564 {
565 self.inner.restore_rollback_snapshot(checkpoint);
566 return Err(RuntimeDriverError::Internal(format!(
567 "recycle persist failed: {err}"
568 )));
569 }
570
571 self.inner.sync_control_projection_from_dsl_authority();
572 Ok(transferred)
573 }
574
575 pub(crate) async fn realize_retire_lifecycle(
576 &mut self,
577 ) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
578 let checkpoint = self.inner.rollback_snapshot();
579 let report = self.inner.finalize_retire();
580 let target_state = match self.runtime_state_for_persistence() {
583 Ok(target_state) => target_state,
584 Err(err) => {
585 self.inner.restore_rollback_snapshot(checkpoint);
586 return Err(err);
587 }
588 };
589 self.commit_lifecycle_with_rollback(checkpoint, target_state, "retire")
590 .await?;
591 self.inner.sync_control_projection_from_dsl_authority();
592 Ok(report)
593 }
594
595 pub(crate) async fn realize_reset_lifecycle(
596 &mut self,
597 ) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
598 let checkpoint = self.inner.rollback_snapshot();
599 let report = match self.inner.reset_cleanup() {
600 Ok(report) => report,
601 Err(err) => {
602 self.inner.restore_rollback_snapshot(checkpoint);
603 return Err(err);
604 }
605 };
606 let target_state = match self.runtime_state_for_persistence() {
609 Ok(target_state) => target_state,
610 Err(err) => {
611 self.inner.restore_rollback_snapshot(checkpoint);
612 return Err(err);
613 }
614 };
615 self.commit_lifecycle_with_rollback(checkpoint, target_state, "reset")
616 .await?;
617 self.inner.sync_control_projection_from_dsl_authority();
618 Ok(report)
619 }
620
621 pub(crate) fn prepare_destroy_lifecycle(
622 &mut self,
623 ) -> Result<(EphemeralDriverRollbackSnapshot, DestroyReport), RuntimeDriverError> {
624 let checkpoint = self.inner.rollback_snapshot();
625 let abandoned = match self.inner.destroy_cleanup() {
626 Ok(abandoned) => abandoned,
627 Err(err) => {
628 self.inner.restore_rollback_snapshot(checkpoint);
629 return Err(err);
630 }
631 };
632 Ok((
633 checkpoint,
634 DestroyReport {
635 inputs_abandoned: abandoned,
636 },
637 ))
638 }
639
640 pub(crate) async fn commit_prepared_destroy_lifecycle(
641 &mut self,
642 checkpoint: EphemeralDriverRollbackSnapshot,
643 ) -> Result<(), RuntimeDriverError> {
644 let target_state = match self.runtime_state_for_persistence() {
649 Ok(target_state) => target_state,
650 Err(err) => {
651 self.inner.restore_rollback_snapshot(checkpoint);
652 return Err(err);
653 }
654 };
655 self.commit_lifecycle_with_rollback(checkpoint, target_state, "destroy")
656 .await
657 }
658
659 pub(crate) fn rollback_prepared_destroy_lifecycle(
660 &mut self,
661 checkpoint: EphemeralDriverRollbackSnapshot,
662 ) {
663 self.inner.restore_rollback_snapshot(checkpoint);
664 }
665
666 pub(crate) async fn finalize_runtime_executor_exit(
667 &mut self,
668 ) -> Result<(), RuntimeDriverError> {
669 let checkpoint = self.inner.rollback_snapshot();
670 if let Err(err) = self.inner.apply_runtime_executor_exited_authority() {
671 self.inner.restore_rollback_snapshot(checkpoint);
672 return Err(err);
673 }
674 if let Err(err) = self.inner.stop_runtime_cleanup() {
675 self.inner.restore_rollback_snapshot(checkpoint);
676 return Err(err);
677 }
678 let target_state = match self.runtime_state_for_persistence() {
682 Ok(target_state) => target_state,
683 Err(err) => {
684 self.inner.restore_rollback_snapshot(checkpoint);
685 return Err(err);
686 }
687 };
688 self.commit_lifecycle_with_rollback(checkpoint, target_state, "stop")
689 .await?;
690 self.inner.sync_control_projection_from_dsl_authority();
691 Ok(())
692 }
693
694 pub(crate) fn machine_realize_boundary_applied_in_memory(
695 &mut self,
696 run_id: &RunId,
697 receipt: &RunBoundaryReceipt,
698 ) -> Result<(), RuntimeDriverError> {
699 self.inner.machine_realize_boundary_applied(run_id, receipt)
700 }
701
702 pub(crate) fn machine_realize_run_completed_in_memory(
703 &mut self,
704 run_id: &RunId,
705 consumed_input_ids: &[InputId],
706 ) -> Result<(), RuntimeDriverError> {
707 self.inner
708 .machine_realize_run_completed(run_id, consumed_input_ids)
709 }
710
711 pub(crate) async fn machine_realize_live_boundary_context_injected(
712 &mut self,
713 run_id: &RunId,
714 input_ids: &[InputId],
715 session_snapshot: Option<Vec<u8>>,
716 ) -> Result<(), RuntimeDriverError> {
717 let checkpoint = self.inner.rollback_snapshot();
718 let receipt = match self
719 .inner
720 .machine_realize_live_boundary_context_injected(run_id, input_ids)
721 {
722 Ok(receipt) => receipt,
723 Err(err) => {
724 self.inner.restore_rollback_snapshot(checkpoint);
725 return Err(err);
726 }
727 };
728 let input_updates = match self.inner.authorized_stored_input_states_snapshot() {
729 Ok(input_updates) => input_updates,
730 Err(err) => {
731 self.inner.restore_rollback_snapshot(checkpoint);
732 return Err(err);
733 }
734 };
735 if let Err(err) = self
736 .store
737 .atomic_apply(
738 &self.runtime_id,
739 session_snapshot
740 .as_ref()
741 .map(|session_snapshot| crate::store::SessionDelta {
742 session_snapshot: session_snapshot.clone(),
743 }),
744 receipt.clone(),
745 input_updates,
746 session_snapshot
747 .as_deref()
748 .and_then(|snapshot| {
749 serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
750 })
751 .map(|session| session.id().clone()),
752 )
753 .await
754 {
755 self.inner.restore_rollback_snapshot(checkpoint);
756 return Err(RuntimeDriverError::Internal(format!(
757 "runtime live-boundary context commit failed: {err}"
758 )));
759 }
760 Ok(())
761 }
762
763 pub(crate) async fn machine_commit_completed_boundary_snapshot(
764 &mut self,
765 receipt: &RunBoundaryReceipt,
766 session_snapshot: Option<&Vec<u8>>,
767 ) -> Result<(), RuntimeDriverError> {
768 let input_updates = self.inner.authorized_stored_input_states_snapshot()?;
769 self.store
770 .atomic_apply(
771 &self.runtime_id,
772 session_snapshot.map(|session_snapshot| crate::store::SessionDelta {
773 session_snapshot: session_snapshot.clone(),
774 }),
775 receipt.clone(),
776 input_updates,
777 session_snapshot
778 .and_then(|snapshot| {
779 serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
780 })
781 .map(|session| session.id().clone()),
782 )
783 .await
784 .map_err(|e| {
785 RuntimeDriverError::Internal(format!(
786 "runtime completed-boundary commit failed: {e}"
787 ))
788 })
789 }
790
791 pub(crate) async fn machine_realize_run_failed(
792 &mut self,
793 run_id: &RunId,
794 contributing_input_ids: &[InputId],
795 replay_plan: &super::ephemeral::ReplayQueuedContributorsPlan,
796 terminal_error: &str,
797 runtime_apply_failure: Option<&meerkat_core::lifecycle::CoreApplyFailureCause>,
798 recoverable: bool,
799 ) -> Result<(), RuntimeDriverError> {
800 let checkpoint = self.inner.rollback_snapshot();
801 if let Err(err) =
802 self.inner
803 .machine_realize_run_failed(run_id, contributing_input_ids, replay_plan)
804 {
805 self.inner.restore_rollback_snapshot(checkpoint);
806 return Err(err);
807 }
808 let failure_cause = runtime_apply_failure.map(|failure| failure.kind);
809 tracing::debug!(
810 run_id = ?run_id,
811 recoverable,
812 error = terminal_error,
813 failure_cause = ?failure_cause,
814 "persistent driver realized machine-owned failed-run replay"
815 );
816 let (checkpoint, input_states, commit) = self
817 .lifecycle_persistence_payload_with_rollback(checkpoint, "failed-run terminal event")?;
818 if let Err(err) = self
819 .store
820 .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
821 .await
822 {
823 self.inner.restore_rollback_snapshot(checkpoint);
824 return Err(RuntimeDriverError::Internal(format!(
825 "terminal event persist failed: {err}"
826 )));
827 }
828 Ok(())
829 }
830
831 pub(crate) async fn machine_realize_run_cancelled(
832 &mut self,
833 run_id: &RunId,
834 contributing_input_ids: &[InputId],
835 ) -> Result<(), RuntimeDriverError> {
836 let checkpoint = self.inner.rollback_snapshot();
837 if let Err(err) = self
838 .inner
839 .machine_realize_run_cancelled(run_id, contributing_input_ids)
840 {
841 self.inner.restore_rollback_snapshot(checkpoint);
842 return Err(err);
843 }
844 tracing::debug!(
845 run_id = ?run_id,
846 contributors = contributing_input_ids.len(),
847 "persistent driver realized machine-owned cancelled run"
848 );
849 let (checkpoint, input_states, commit) = self.lifecycle_persistence_payload_with_rollback(
850 checkpoint,
851 "cancelled-run terminal event",
852 )?;
853 if let Err(err) = self
854 .store
855 .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
856 .await
857 {
858 self.inner.restore_rollback_snapshot(checkpoint);
859 return Err(RuntimeDriverError::Internal(format!(
860 "terminal cancellation persist failed: {err}"
861 )));
862 }
863 Ok(())
864 }
865}
866
867#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
868#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
869impl RuntimeDriver for PersistentRuntimeDriver {
870 async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
871 let resolved = self.resolve_admission(&input)?;
872 self.accept_resolved_input(input, resolved).await
873 }
874
875 async fn on_runtime_event(
876 &mut self,
877 event: RuntimeEventEnvelope,
878 ) -> Result<(), RuntimeDriverError> {
879 self.inner.on_runtime_event(event).await
880 }
881
882 async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
883 let mut staged = self.inner.clone_with_isolated_dsl_authority();
884 let report = crate::meerkat_machine::machine_recover_persistent_driver(
885 self.store.as_ref(),
886 &self.runtime_id,
887 &mut staged,
888 )
889 .await?;
890
891 let input_states = staged.authorized_stored_input_states_snapshot()?;
892 let commit = Self::lifecycle_commit_for_persistence_from_inner(&staged)?;
893 self.store
894 .commit_machine_lifecycle(&self.runtime_id, commit, &input_states)
895 .await
896 .map_err(|err| {
897 RuntimeDriverError::Internal(format!("recovery persist failed: {err}"))
898 })?;
899 let _ = crate::meerkat_machine::machine_recover_persistent_driver(
900 self.store.as_ref(),
901 &self.runtime_id,
902 &mut self.inner,
903 )
904 .await?;
905 Ok(report)
906 }
907
908 fn runtime_state(&self) -> RuntimeState {
909 self.inner.runtime_state()
910 }
911
912 fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
913 self.inner.input_state(input_id)
914 }
915
916 fn input_phase(&self, input_id: &InputId) -> Option<InputLifecycleState> {
917 self.inner.input_phase(input_id)
918 }
919
920 fn input_last_run_id(&self, input_id: &InputId) -> Option<RunId> {
921 self.inner.input_last_run_id(input_id)
922 }
923
924 fn input_last_boundary_sequence(&self, input_id: &InputId) -> Option<u64> {
925 self.inner.input_last_boundary_sequence(input_id)
926 }
927
928 fn stored_input_state(&self, input_id: &InputId) -> Option<StoredInputState> {
929 self.inner.stored_input_state(input_id)
930 }
931
932 fn active_input_ids(&self) -> Vec<InputId> {
933 self.inner.active_input_ids()
934 }
935}
936
937#[cfg(test)]
938#[allow(clippy::unwrap_used, clippy::expect_used)]
939mod tests {
940 use super::*;
941 use chrono::Utc;
942 use meerkat_core::lifecycle::InputId;
943
944 fn make_prompt(text: &str) -> Input {
945 Input::Prompt(crate::input::PromptInput {
946 header: crate::input::InputHeader {
947 id: InputId::new(),
948 timestamp: Utc::now(),
949 source: crate::input::InputOrigin::Operator,
950 durability: crate::input::InputDurability::Durable,
951 visibility: crate::input::InputVisibility::default(),
952 idempotency_key: None,
953 supersession_key: None,
954 correlation_id: None,
955 },
956 content: text.into(),
957 typed_turn_appends: Vec::new(),
958 turn_metadata: None,
959 })
960 }
961
962 #[tokio::test]
968 async fn commit_lifecycle_snapshot_failure_restores_checkpoint() {
969 let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
970 let blob_store: Arc<dyn BlobStore> = Arc::new(meerkat_store::MemoryBlobStore::new());
971 let rid = LogicalRuntimeId::new("commit-lifecycle-rollback-contract");
972 let mut driver = PersistentRuntimeDriver::new(rid, store, blob_store);
973
974 let checkpoint = driver.rollback_snapshot();
976
977 let input = make_prompt("staged work");
980 let input_id = input.id().clone();
981 let outcome = driver.accept_input(input).await.unwrap();
982 assert!(outcome.is_accepted());
983 assert!(driver.input_phase(&input_id).is_some());
984
985 driver.force_input_snapshot_failure_for_test = true;
987 let target_state = driver.inner_ref().runtime_state();
988 let result = driver
989 .commit_lifecycle_with_rollback(checkpoint, target_state, "test destroy")
990 .await;
991
992 assert!(result.is_err(), "forced snapshot failure must propagate");
995 assert!(
996 driver.input_phase(&input_id).is_none(),
997 "staged driver state must be restored to the pre-stage checkpoint"
998 );
999 assert!(driver.active_input_ids().is_empty());
1000 }
1001
1002 #[tokio::test]
1008 async fn abandon_pending_inputs_snapshot_failure_restores_checkpoint() {
1009 let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
1010 let blob_store: Arc<dyn BlobStore> = Arc::new(meerkat_store::MemoryBlobStore::new());
1011 let rid = LogicalRuntimeId::new("abandon-rollback-contract");
1012 let mut driver = PersistentRuntimeDriver::new(rid, store, blob_store);
1013
1014 let input = make_prompt("pending work");
1016 let input_id = input.id().clone();
1017 let outcome = driver.accept_input(input).await.unwrap();
1018 assert!(outcome.is_accepted());
1019 assert!(driver.input_phase(&input_id).is_some());
1020
1021 driver.force_input_snapshot_failure_for_test = true;
1024 let result = driver
1025 .abandon_pending_inputs(InputAbandonReason::Reset)
1026 .await;
1027
1028 assert!(result.is_err(), "forced snapshot failure must propagate");
1029 assert!(
1030 driver.input_phase(&input_id).is_some(),
1031 "staged abandon must be rolled back: the pending input must still be live"
1032 );
1033 }
1034}