1use std::sync::Arc;
8use std::sync::RwLock as StdRwLock;
9
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12
13use crate::accept::AcceptOutcome;
14use crate::identifiers::LogicalRuntimeId;
15use crate::input::{Input, externalize_input_images};
16use crate::input_state::{InputAbandonReason, InputLifecycleState, InputState, StoredInputState};
17use crate::runtime_event::RuntimeEventEnvelope;
18use crate::runtime_state::RuntimeState;
19use crate::store::{MachineLifecycleCommit, RuntimeStore};
20use crate::traits::{DestroyReport, RecoveryReport, RuntimeDriver, RuntimeDriverError};
21
22use super::ephemeral::{
23 EphemeralDriverRollbackSnapshot, EphemeralRuntimeDriver, SharedIngressDslAuthority,
24};
25
26pub struct PersistentRuntimeDriver {
28 inner: EphemeralRuntimeDriver,
30 store: Arc<dyn RuntimeStore>,
32 blob_store: Arc<dyn BlobStore>,
34 runtime_id: LogicalRuntimeId,
36}
37
38impl PersistentRuntimeDriver {
39 pub fn new(
41 runtime_id: LogicalRuntimeId,
42 store: Arc<dyn RuntimeStore>,
43 blob_store: Arc<dyn BlobStore>,
44 ) -> Self {
45 Self::new_with_control(
46 runtime_id,
47 store,
48 blob_store,
49 Arc::new(StdRwLock::new(
50 crate::driver::ephemeral::RuntimeControlProjection::default(),
51 )),
52 crate::driver::ephemeral::new_ingress_dsl_authority(),
53 )
54 }
55
56 pub(crate) fn new_with_control(
57 runtime_id: LogicalRuntimeId,
58 store: Arc<dyn RuntimeStore>,
59 blob_store: Arc<dyn BlobStore>,
60 control: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
61 dsl: SharedIngressDslAuthority,
62 ) -> Self {
63 Self {
64 inner: EphemeralRuntimeDriver::new_with_control_and_dsl(
65 runtime_id.clone(),
66 control,
67 dsl,
68 ),
69 store,
70 blob_store,
71 runtime_id,
72 }
73 }
74
75 pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
77 &self.inner
78 }
79
80 pub(crate) fn rollback_snapshot(&self) -> EphemeralDriverRollbackSnapshot {
81 self.inner.rollback_snapshot()
82 }
83
84 pub(crate) fn restore_rollback_snapshot(&mut self, snapshot: EphemeralDriverRollbackSnapshot) {
85 self.inner.restore_rollback_snapshot(snapshot);
86 }
87
88 pub fn runtime_id(&self) -> &LogicalRuntimeId {
90 &self.runtime_id
91 }
92
93 pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
95 self.inner.set_silent_comms_intents(intents);
96 }
97
98 pub fn silent_comms_intents(&self) -> Vec<String> {
99 self.inner.silent_comms_intents()
100 }
101
102 pub fn is_idle(&self) -> bool {
104 self.inner.is_idle()
105 }
106
107 pub fn is_idle_or_attached(&self) -> bool {
109 self.inner.is_idle_or_attached()
110 }
111
112 fn runtime_state_for_persistence(&self) -> RuntimeState {
117 match self.inner.runtime_state() {
118 RuntimeState::Attached => RuntimeState::Idle,
119 other => other,
120 }
121 }
122
123 async fn commit_lifecycle_with_rollback(
124 &mut self,
125 checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
126 target_state: RuntimeState,
127 context: &str,
128 ) -> Result<(), RuntimeDriverError> {
129 let input_states = self.inner.stored_input_states_snapshot();
130 if let Err(err) = self
131 .store
132 .commit_machine_lifecycle(
133 &self.runtime_id,
134 MachineLifecycleCommit::new(target_state),
135 &input_states,
136 )
137 .await
138 {
139 self.inner.restore_rollback_snapshot(checkpoint);
140 return Err(RuntimeDriverError::Internal(format!(
141 "{context} persist failed: {err}"
142 )));
143 }
144 Ok(())
145 }
146
147 pub(crate) async fn publish_service_turn_terminal_lifecycle(
148 &mut self,
149 checkpoint: super::ephemeral::EphemeralDriverRollbackSnapshot,
150 target_state: RuntimeState,
151 ) -> Result<(), RuntimeDriverError> {
152 self.commit_lifecycle_with_rollback(
153 checkpoint,
154 target_state,
155 "service turn terminal receipt",
156 )
157 .await?;
158 self.inner.set_control_projection(target_state, None, None);
159 Ok(())
160 }
161
162 pub(crate) fn set_control_projection(
163 &mut self,
164 next_phase: RuntimeState,
165 current_run_id: Option<RunId>,
166 pre_run_phase: Option<RuntimeState>,
167 ) {
168 self.inner
169 .set_control_projection(next_phase, current_run_id, pre_run_phase);
170 }
171
172 pub(crate) fn sync_control_projection_from_dsl_authority(&mut self) {
177 self.inner.sync_control_projection_from_dsl_authority();
178 }
179
180 #[doc(hidden)]
183 pub fn contract_begin_run_authority(
184 &mut self,
185 run_id: RunId,
186 ) -> Result<(), RuntimeDriverError> {
187 self.inner.contract_begin_run_authority(run_id)
188 }
189
190 #[cfg(test)]
193 #[doc(hidden)]
194 pub(crate) fn contract_force_runtime_authority(
195 &mut self,
196 next_phase: RuntimeState,
197 current_run_id: Option<RunId>,
198 pre_run_phase: Option<RuntimeState>,
199 ) {
200 self.inner
201 .contract_force_runtime_authority(next_phase, current_run_id, pre_run_phase);
202 }
203
204 pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
206 self.inner.drain_events()
207 }
208
209 pub fn take_post_admission_signal(&mut self) -> crate::driver::ephemeral::PostAdmissionSignal {
211 self.inner.take_post_admission_signal()
212 }
213
214 pub fn post_admission_signal(&self) -> crate::driver::ephemeral::PostAdmissionSignal {
216 self.inner.post_admission_signal()
217 }
218
219 pub fn take_wake_requested(&mut self) -> bool {
221 self.inner.take_wake_requested()
222 }
223
224 pub fn take_process_requested(&mut self) -> bool {
226 self.inner.take_process_requested()
227 }
228
229 pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
231 self.inner.dequeue_next()
232 }
233
234 pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
236 self.inner.dequeue_by_id(input_id)
237 }
238
239 pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
240 self.inner.has_queued_input_outside(excluded)
241 }
242
243 pub(crate) fn defer_queued_inputs_behind_backlog(&mut self, input_ids: &[InputId]) {
244 self.inner.defer_queued_inputs_behind_backlog(input_ids);
245 }
246
247 pub(crate) fn absorb_post_admission_effects(
248 &mut self,
249 effects: &[crate::meerkat_machine::dsl::MeerkatMachineEffect],
250 ) {
251 self.inner.absorb_post_admission_effects(effects);
252 }
253
254 pub(crate) fn resolve_admission_for_runtime_idle(
255 &self,
256 input: &Input,
257 runtime_idle: bool,
258 ) -> crate::accept::ResolvedAdmission {
259 self.inner
260 .resolve_admission_for_runtime_idle(input, runtime_idle)
261 }
262
263 pub(crate) fn resolve_admission(&self, input: &Input) -> crate::accept::ResolvedAdmission {
264 self.inner.resolve_admission(input)
265 }
266
267 pub(crate) async fn accept_resolved_input(
268 &mut self,
269 input: Input,
270 resolved: crate::accept::ResolvedAdmission,
271 ) -> Result<AcceptOutcome, RuntimeDriverError> {
272 let checkpoint = self.inner.rollback_snapshot();
273 let input_for_recovery = input.clone();
274
275 let mut outcome = self.inner.accept_resolved_input(input, resolved).await?;
276
277 if let AcceptOutcome::Accepted {
278 ref input_id,
279 ref mut state,
280 ..
281 } = outcome
282 && let Some(mut bundle) = self.inner.stored_input_state(input_id)
283 {
284 let mut input_for_recovery = input_for_recovery.clone();
285 if let Err(err) =
286 externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery).await
287 {
288 self.inner.restore_rollback_snapshot(checkpoint);
289 return Err(RuntimeDriverError::Internal(format!(
290 "failed to externalize runtime input images: {err}"
291 )));
292 }
293 bundle.state.persisted_input = Some(input_for_recovery);
294 self.inner.ledger_mut().accept(bundle.state.clone());
295 if let Err(err) = self.persist_state(&bundle).await {
296 self.inner.restore_rollback_snapshot(checkpoint);
297 return Err(err);
298 }
299 *state = bundle.state;
300 }
301
302 Ok(outcome)
303 }
304
305 pub fn stage_input(
307 &mut self,
308 input_id: &InputId,
309 run_id: &meerkat_core::lifecycle::RunId,
310 ) -> Result<(), crate::traits::RuntimeDriverError> {
311 self.inner.stage_input(input_id, run_id)
312 }
313
314 pub fn stage_batch(
316 &mut self,
317 input_ids: &[InputId],
318 run_id: &meerkat_core::lifecycle::RunId,
319 ) -> Result<(), crate::traits::RuntimeDriverError> {
320 self.inner.stage_batch(input_ids, run_id)
321 }
322
323 pub(crate) fn machine_realize_stage_batch(
324 &mut self,
325 input_ids: &[InputId],
326 run_id: &meerkat_core::lifecycle::RunId,
327 ) -> Result<(), crate::traits::RuntimeDriverError> {
328 self.inner.machine_realize_stage_batch(input_ids, run_id)
329 }
330
331 pub fn apply_input(
333 &mut self,
334 input_id: &InputId,
335 run_id: &meerkat_core::lifecycle::RunId,
336 ) -> Result<(), crate::traits::RuntimeDriverError> {
337 self.inner.apply_input(input_id, run_id)
338 }
339
340 pub fn rollback_staged(
342 &mut self,
343 input_ids: &[InputId],
344 ) -> Result<(), crate::traits::RuntimeDriverError> {
345 self.inner.rollback_staged(input_ids)
346 }
347
348 async fn persist_state(&self, state: &StoredInputState) -> Result<(), RuntimeDriverError> {
349 self.store
350 .persist_input_state(&self.runtime_id, state)
351 .await
352 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
353 }
354
355 pub(crate) async fn abandon_pending_inputs(
356 &mut self,
357 reason: InputAbandonReason,
358 ) -> Result<usize, RuntimeDriverError> {
359 let checkpoint = self.inner.rollback_snapshot();
360 let abandoned = self.inner.abandon_pending_inputs(reason);
361 let input_states = self.inner.stored_input_states_snapshot();
362 if let Err(err) = self
363 .store
364 .commit_machine_lifecycle(
365 &self.runtime_id,
366 MachineLifecycleCommit::new(self.runtime_state_for_persistence()),
367 &input_states,
368 )
369 .await
370 {
371 self.inner.restore_rollback_snapshot(checkpoint);
372 return Err(RuntimeDriverError::Internal(format!(
373 "pending input abandon persist failed: {err}"
374 )));
375 }
376 Ok(abandoned)
377 }
378
379 pub(crate) async fn recycle_preserving_work(
384 &mut self,
385 target_phase: RuntimeState,
386 ) -> Result<usize, RuntimeDriverError> {
387 let checkpoint = self.inner.rollback_snapshot();
388 let transferred = match self.inner.recycle_preserving_work() {
389 Ok(transferred) => transferred,
390 Err(err) => {
391 self.inner.restore_rollback_snapshot(checkpoint);
392 return Err(err);
393 }
394 };
395 let input_states = self.inner.stored_input_states_snapshot();
396 if let Err(err) = self
397 .store
398 .commit_machine_lifecycle(
399 &self.runtime_id,
400 MachineLifecycleCommit::new(target_phase),
401 &input_states,
402 )
403 .await
404 {
405 self.inner.restore_rollback_snapshot(checkpoint);
406 return Err(RuntimeDriverError::Internal(format!(
407 "recycle persist failed: {err}"
408 )));
409 }
410
411 self.inner.set_control_projection(target_phase, None, None);
412 Ok(transferred)
413 }
414
415 pub(crate) async fn realize_retire_lifecycle(
416 &mut self,
417 ) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
418 let checkpoint = self.inner.rollback_snapshot();
419 let report = self.inner.finalize_retire();
420 self.commit_lifecycle_with_rollback(checkpoint, RuntimeState::Retired, "retire")
421 .await?;
422 self.inner
423 .set_control_projection(RuntimeState::Retired, None, None);
424 Ok(report)
425 }
426
427 pub(crate) async fn realize_reset_lifecycle(
428 &mut self,
429 ) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
430 let checkpoint = self.inner.rollback_snapshot();
431 let report = self.inner.reset_cleanup();
432 self.commit_lifecycle_with_rollback(checkpoint, RuntimeState::Idle, "reset")
433 .await?;
434 self.inner
435 .set_control_projection(RuntimeState::Idle, None, None);
436 Ok(report)
437 }
438
439 pub(crate) fn prepare_destroy_lifecycle(
440 &mut self,
441 ) -> (EphemeralDriverRollbackSnapshot, DestroyReport) {
442 let checkpoint = self.inner.rollback_snapshot();
443 let abandoned = self.inner.destroy_cleanup();
444 (
445 checkpoint,
446 DestroyReport {
447 inputs_abandoned: abandoned,
448 },
449 )
450 }
451
452 pub(crate) async fn commit_prepared_destroy_lifecycle(
453 &mut self,
454 checkpoint: EphemeralDriverRollbackSnapshot,
455 ) -> Result<(), RuntimeDriverError> {
456 self.commit_lifecycle_with_rollback(
457 checkpoint,
458 self.runtime_state_for_persistence(),
459 "destroy",
460 )
461 .await
462 }
463
464 pub(crate) fn rollback_prepared_destroy_lifecycle(
465 &mut self,
466 checkpoint: EphemeralDriverRollbackSnapshot,
467 ) {
468 self.inner.restore_rollback_snapshot(checkpoint);
469 }
470
471 pub(crate) async fn finalize_runtime_executor_exit(
472 &mut self,
473 ) -> Result<(), RuntimeDriverError> {
474 let checkpoint = self.inner.rollback_snapshot();
475 if let Err(err) = self.inner.apply_runtime_executor_exited_authority() {
476 self.inner.restore_rollback_snapshot(checkpoint);
477 return Err(err);
478 }
479 self.inner.stop_runtime_cleanup();
480 self.commit_lifecycle_with_rollback(checkpoint, RuntimeState::Stopped, "stop")
481 .await?;
482 self.inner.sync_control_projection_from_dsl_authority();
483 Ok(())
484 }
485
486 pub(crate) fn machine_realize_boundary_applied_in_memory(
487 &mut self,
488 run_id: &RunId,
489 receipt: &RunBoundaryReceipt,
490 ) -> Result<(), RuntimeDriverError> {
491 self.inner.machine_realize_boundary_applied(run_id, receipt)
492 }
493
494 pub(crate) fn machine_realize_run_completed_in_memory(
495 &mut self,
496 run_id: &RunId,
497 consumed_input_ids: &[InputId],
498 ) -> Result<(), RuntimeDriverError> {
499 self.inner
500 .machine_realize_run_completed(run_id, consumed_input_ids)
501 }
502
503 pub(crate) fn next_live_boundary_context_sequence(&self, run_id: &RunId) -> u64 {
504 self.inner.next_live_boundary_context_sequence(run_id)
505 }
506
507 pub(crate) async fn machine_realize_live_boundary_context_injected(
508 &mut self,
509 run_id: &RunId,
510 input_ids: &[InputId],
511 receipt: &RunBoundaryReceipt,
512 session_snapshot: Option<Vec<u8>>,
513 ) -> Result<(), RuntimeDriverError> {
514 let checkpoint = self.inner.rollback_snapshot();
515 if let Err(err) = self
516 .inner
517 .machine_realize_live_boundary_context_injected(run_id, input_ids, receipt)
518 {
519 self.inner.restore_rollback_snapshot(checkpoint);
520 return Err(err);
521 }
522 let input_updates = self.inner.stored_input_states_snapshot();
523 if let Err(err) = self
524 .store
525 .atomic_apply(
526 &self.runtime_id,
527 session_snapshot
528 .as_ref()
529 .map(|session_snapshot| crate::store::SessionDelta {
530 session_snapshot: session_snapshot.clone(),
531 }),
532 receipt.clone(),
533 input_updates,
534 session_snapshot
535 .as_deref()
536 .and_then(|snapshot| {
537 serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
538 })
539 .map(|session| session.id().clone()),
540 )
541 .await
542 {
543 self.inner.restore_rollback_snapshot(checkpoint);
544 return Err(RuntimeDriverError::Internal(format!(
545 "runtime live-boundary context commit failed: {err}"
546 )));
547 }
548 Ok(())
549 }
550
551 pub(crate) async fn machine_commit_completed_boundary_snapshot(
552 &mut self,
553 receipt: &RunBoundaryReceipt,
554 session_snapshot: Option<&Vec<u8>>,
555 ) -> Result<(), RuntimeDriverError> {
556 let input_updates = self.inner.stored_input_states_snapshot();
557 self.store
558 .atomic_apply(
559 &self.runtime_id,
560 session_snapshot.map(|session_snapshot| crate::store::SessionDelta {
561 session_snapshot: session_snapshot.clone(),
562 }),
563 receipt.clone(),
564 input_updates,
565 session_snapshot
566 .and_then(|snapshot| {
567 serde_json::from_slice::<meerkat_core::Session>(snapshot).ok()
568 })
569 .map(|session| session.id().clone()),
570 )
571 .await
572 .map_err(|e| {
573 RuntimeDriverError::Internal(format!(
574 "runtime completed-boundary commit failed: {e}"
575 ))
576 })
577 }
578
579 pub(crate) async fn machine_realize_run_failed(
580 &mut self,
581 run_id: &RunId,
582 contributing_input_ids: &[InputId],
583 replay_plan: &super::ephemeral::ReplayQueuedContributorsPlan,
584 terminal_error: &str,
585 runtime_apply_failure: Option<&meerkat_core::lifecycle::CoreApplyFailureCause>,
586 recoverable: bool,
587 ) -> Result<(), RuntimeDriverError> {
588 let checkpoint = self.inner.rollback_snapshot();
589 self.inner
590 .machine_realize_run_failed(run_id, contributing_input_ids, replay_plan)?;
591 let failure_cause = runtime_apply_failure.map(|failure| failure.kind);
592 tracing::debug!(
593 run_id = ?run_id,
594 recoverable,
595 error = terminal_error,
596 failure_cause = ?failure_cause,
597 "persistent driver realized machine-owned failed-run replay"
598 );
599 let input_states = self.inner.stored_input_states_snapshot();
600 if let Err(err) = self
601 .store
602 .commit_machine_lifecycle(
603 &self.runtime_id,
604 MachineLifecycleCommit::new(self.runtime_state_for_persistence()),
605 &input_states,
606 )
607 .await
608 {
609 self.inner.restore_rollback_snapshot(checkpoint);
610 return Err(RuntimeDriverError::Internal(format!(
611 "terminal event persist failed: {err}"
612 )));
613 }
614 Ok(())
615 }
616
617 pub(crate) async fn machine_realize_run_cancelled(
618 &mut self,
619 run_id: &RunId,
620 contributing_input_ids: &[InputId],
621 ) -> Result<(), RuntimeDriverError> {
622 let checkpoint = self.inner.rollback_snapshot();
623 self.inner
624 .machine_realize_run_cancelled(run_id, contributing_input_ids)?;
625 tracing::debug!(
626 run_id = ?run_id,
627 contributors = contributing_input_ids.len(),
628 "persistent driver realized machine-owned cancelled run"
629 );
630 let input_states = self.inner.stored_input_states_snapshot();
631 if let Err(err) = self
632 .store
633 .commit_machine_lifecycle(
634 &self.runtime_id,
635 MachineLifecycleCommit::new(self.runtime_state_for_persistence()),
636 &input_states,
637 )
638 .await
639 {
640 self.inner.restore_rollback_snapshot(checkpoint);
641 return Err(RuntimeDriverError::Internal(format!(
642 "terminal cancellation persist failed: {err}"
643 )));
644 }
645 Ok(())
646 }
647}
648
649#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
650#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
651impl RuntimeDriver for PersistentRuntimeDriver {
652 async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
653 let resolved = self.resolve_admission(&input);
654 self.accept_resolved_input(input, resolved).await
655 }
656
657 async fn on_runtime_event(
658 &mut self,
659 event: RuntimeEventEnvelope,
660 ) -> Result<(), RuntimeDriverError> {
661 self.inner.on_runtime_event(event).await
662 }
663
664 async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
665 let checkpoint = self.inner.rollback_snapshot();
666 let report = match crate::meerkat_machine::machine_recover_persistent_driver(
667 self.store.as_ref(),
668 &self.runtime_id,
669 &mut self.inner,
670 )
671 .await
672 {
673 Ok(report) => report,
674 Err(err) => {
675 self.inner.restore_rollback_snapshot(checkpoint.clone());
676 return Err(err);
677 }
678 };
679
680 let runtime_state_for_persistence = self.runtime_state_for_persistence();
682 self.commit_lifecycle_with_rollback(checkpoint, runtime_state_for_persistence, "recovery")
683 .await?;
684 Ok(report)
685 }
686
687 fn runtime_state(&self) -> RuntimeState {
688 self.inner.runtime_state()
689 }
690
691 fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
692 self.inner.input_state(input_id)
693 }
694
695 fn input_phase(&self, input_id: &InputId) -> Option<InputLifecycleState> {
696 self.inner.input_phase(input_id)
697 }
698
699 fn input_last_run_id(&self, input_id: &InputId) -> Option<RunId> {
700 self.inner.input_last_run_id(input_id)
701 }
702
703 fn input_last_boundary_sequence(&self, input_id: &InputId) -> Option<u64> {
704 self.inner.input_last_boundary_sequence(input_id)
705 }
706
707 fn stored_input_state(&self, input_id: &InputId) -> Option<StoredInputState> {
708 self.inner.stored_input_state(input_id)
709 }
710
711 fn active_input_ids(&self) -> Vec<InputId> {
712 self.inner.active_input_ids()
713 }
714}