1use std::sync::Arc;
8
9use chrono::Utc;
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunEvent};
12
13use crate::accept::AcceptOutcome;
14use crate::driver::ephemeral::handling_mode_from_policy;
15use crate::identifiers::LogicalRuntimeId;
16use crate::input::{Input, externalize_input_images};
17use crate::input_state::{
18 InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
19 InputTerminalOutcome,
20};
21use crate::runtime_event::RuntimeEventEnvelope;
22use crate::runtime_ingress_authority::ContentShape;
23use crate::runtime_state::RuntimeState;
24use crate::store::RuntimeStore;
25use crate::traits::{
26 DestroyReport, RecoveryReport, RuntimeControlCommand, RuntimeDriver, RuntimeDriverError,
27};
28use meerkat_core::types::HandlingMode;
29
30use super::ephemeral::EphemeralRuntimeDriver;
31
32pub struct PersistentRuntimeDriver {
34 inner: EphemeralRuntimeDriver,
36 store: Arc<dyn RuntimeStore>,
38 blob_store: Arc<dyn BlobStore>,
40 runtime_id: LogicalRuntimeId,
42}
43
44impl PersistentRuntimeDriver {
45 pub fn new(
47 runtime_id: LogicalRuntimeId,
48 store: Arc<dyn RuntimeStore>,
49 blob_store: Arc<dyn BlobStore>,
50 ) -> Self {
51 Self {
52 inner: EphemeralRuntimeDriver::new(runtime_id.clone()),
53 store,
54 blob_store,
55 runtime_id,
56 }
57 }
58
59 pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
61 &self.inner
62 }
63
64 pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
66 self.inner.set_silent_comms_intents(intents);
67 }
68
69 pub fn is_idle(&self) -> bool {
71 self.inner.is_idle()
72 }
73
74 pub fn is_idle_or_attached(&self) -> bool {
76 self.inner.is_idle_or_attached()
77 }
78
79 pub fn attach(&mut self) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
81 self.inner.attach()
82 }
83
84 pub fn detach(
86 &mut self,
87 ) -> Result<Option<RuntimeState>, crate::runtime_state::RuntimeStateTransitionError> {
88 self.inner.detach()
89 }
90
91 fn runtime_state_for_persistence(&self) -> RuntimeState {
96 match self.inner.runtime_state() {
97 RuntimeState::Attached => RuntimeState::Idle,
98 other => other,
99 }
100 }
101
102 pub fn start_run(
104 &mut self,
105 run_id: meerkat_core::lifecycle::RunId,
106 ) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
107 self.inner.start_run(run_id)
108 }
109
110 pub fn complete_run(
112 &mut self,
113 ) -> Result<meerkat_core::lifecycle::RunId, crate::runtime_state::RuntimeStateTransitionError>
114 {
115 self.inner.complete_run()
116 }
117
118 pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
120 self.inner.drain_events()
121 }
122
123 pub fn take_wake_requested(&mut self) -> bool {
125 self.inner.take_wake_requested()
126 }
127
128 pub fn take_process_requested(&mut self) -> bool {
130 self.inner.take_process_requested()
131 }
132
133 pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
135 self.inner.dequeue_next()
136 }
137
138 pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
140 self.inner.dequeue_by_id(input_id)
141 }
142
143 pub fn persisted_input(&self, input_id: &InputId) -> Option<&Input> {
145 self.inner.persisted_input(input_id)
146 }
147
148 pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
149 self.inner.has_queued_input_outside(excluded)
150 }
151
152 pub fn stage_input(
154 &mut self,
155 input_id: &InputId,
156 run_id: &meerkat_core::lifecycle::RunId,
157 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
158 self.inner.stage_input(input_id, run_id)
159 }
160
161 pub fn stage_batch(
163 &mut self,
164 input_ids: &[InputId],
165 run_id: &meerkat_core::lifecycle::RunId,
166 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
167 self.inner.stage_batch(input_ids, run_id)
168 }
169
170 pub fn apply_input(
172 &mut self,
173 input_id: &InputId,
174 run_id: &meerkat_core::lifecycle::RunId,
175 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
176 self.inner.apply_input(input_id, run_id)
177 }
178
179 pub fn rollback_staged(
181 &mut self,
182 input_ids: &[InputId],
183 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
184 self.inner.rollback_staged(input_ids)
185 }
186
187 pub fn consume_inputs(
189 &mut self,
190 input_ids: &[InputId],
191 run_id: &meerkat_core::lifecycle::RunId,
192 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
193 self.inner.consume_inputs(input_ids, run_id)
194 }
195
196 async fn persist_state(&self, state: &InputState) -> Result<(), RuntimeDriverError> {
197 self.store
198 .persist_input_state(&self.runtime_id, state)
199 .await
200 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
201 }
202
203 pub async fn abandon_pending_inputs(
204 &mut self,
205 reason: InputAbandonReason,
206 ) -> Result<usize, RuntimeDriverError> {
207 let checkpoint = self.inner.clone();
208 let abandoned = self.inner.abandon_pending_inputs(reason);
209 let input_states = self.inner.input_states_snapshot();
210 if let Err(err) = self
211 .store
212 .atomic_lifecycle_commit(
213 &self.runtime_id,
214 self.runtime_state_for_persistence(),
215 &input_states,
216 )
217 .await
218 {
219 self.inner = checkpoint;
220 return Err(RuntimeDriverError::Internal(format!(
221 "pending input abandon persist failed: {err}"
222 )));
223 }
224 Ok(abandoned)
225 }
226
227 pub async fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
232 let silent_intents = self.inner.silent_comms_intents();
233 self.inner = EphemeralRuntimeDriver::new(self.runtime_id.clone());
234 self.inner.set_silent_comms_intents(silent_intents);
235 let _ = RuntimeDriver::recover(self).await?;
236 Ok(self.inner.active_input_ids().len())
237 }
238}
239
240#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
241#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
242impl RuntimeDriver for PersistentRuntimeDriver {
243 async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
244 let checkpoint = self.inner.clone();
245 let input_for_recovery = input.clone();
246
247 let mut outcome = self.inner.accept_input(input).await?;
249
250 if let AcceptOutcome::Accepted {
252 ref input_id,
253 ref mut state,
254 ..
255 } = outcome
256 && let Some(inner_state) = self.inner.input_state(input_id).cloned()
257 {
258 let mut input_for_recovery = input_for_recovery.clone();
259 if let Err(err) =
260 externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery).await
261 {
262 self.inner = checkpoint;
263 return Err(RuntimeDriverError::Internal(format!(
264 "failed to externalize runtime input images: {err}"
265 )));
266 }
267 let mut persisted = inner_state;
268 persisted.persisted_input = Some(input_for_recovery);
269 self.inner.ledger_mut().accept(persisted.clone());
270 if let Err(err) = self.persist_state(&persisted).await {
271 self.inner = checkpoint;
272 return Err(err);
273 }
274 *state = persisted;
275 }
276
277 Ok(outcome)
278 }
279
280 async fn on_runtime_event(
281 &mut self,
282 event: RuntimeEventEnvelope,
283 ) -> Result<(), RuntimeDriverError> {
284 self.inner.on_runtime_event(event).await
285 }
286
287 async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
288 match event {
289 RunEvent::BoundaryApplied {
291 ref receipt,
292 ref session_snapshot,
293 ..
294 } => {
295 let checkpoint = self.inner.clone();
296 self.inner.on_run_event(event.clone()).await?;
297 if self
298 .store
299 .load_boundary_receipt(&self.runtime_id, &receipt.run_id, receipt.sequence)
300 .await
301 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
302 .is_some()
303 {
304 return Ok(());
305 }
306 let input_updates: Vec<InputState> = receipt
307 .contributing_input_ids
308 .iter()
309 .filter_map(|id| self.inner.input_state(id).cloned())
310 .collect();
311
312 self.store
313 .atomic_apply(
314 &self.runtime_id,
315 session_snapshot.clone().map(|session_snapshot| {
316 crate::store::SessionDelta { session_snapshot }
317 }),
318 receipt.clone(),
319 input_updates,
320 None, )
322 .await
323 .map_err(|e| {
324 self.inner = checkpoint;
325 RuntimeDriverError::Internal(format!("runtime boundary commit failed: {e}"))
326 })?;
327 }
328 RunEvent::RunCompleted { .. }
329 | RunEvent::RunFailed { .. }
330 | RunEvent::RunCancelled { .. } => {
331 let checkpoint = self.inner.clone();
332 self.inner.on_run_event(event).await?;
333 let input_states = self.inner.input_states_snapshot();
334 if let Err(err) = self
335 .store
336 .atomic_lifecycle_commit(
337 &self.runtime_id,
338 self.runtime_state_for_persistence(),
339 &input_states,
340 )
341 .await
342 {
343 self.inner = checkpoint;
344 return Err(RuntimeDriverError::Internal(format!(
345 "terminal event persist failed: {err}"
346 )));
347 }
348 }
349 _ => {
350 self.inner.on_run_event(event).await?;
351 }
352 }
353
354 Ok(())
355 }
356
357 async fn on_runtime_control(
358 &mut self,
359 command: RuntimeControlCommand,
360 ) -> Result<(), RuntimeDriverError> {
361 let checkpoint = self.inner.clone();
362 self.inner.on_runtime_control(command).await?;
363 let input_states = self.inner.input_states_snapshot();
364 if let Err(err) = self
365 .store
366 .atomic_lifecycle_commit(
367 &self.runtime_id,
368 self.runtime_state_for_persistence(),
369 &input_states,
370 )
371 .await
372 {
373 self.inner = checkpoint;
374 return Err(RuntimeDriverError::Internal(format!(
375 "control op persist failed: {err}"
376 )));
377 }
378 Ok(())
379 }
380
381 async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
382 let stored_states = self
384 .store
385 .load_input_states(&self.runtime_id)
386 .await
387 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
388
389 let mut recovered_payloads = Vec::new();
390
391 for mut state in stored_states {
395 if matches!(
396 state.current_state(),
397 InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption
398 ) {
399 let has_receipt =
400 match (state.last_run_id().cloned(), state.last_boundary_sequence()) {
401 (Some(run_id), Some(sequence)) => self
402 .store
403 .load_boundary_receipt(&self.runtime_id, &run_id, sequence)
404 .await
405 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
406 .is_some(),
407 _ => false,
408 };
409 let now = Utc::now();
410 let from = state.current_state();
411 if has_receipt {
412 let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
413 InputLifecycleState::Consumed,
414 Some(InputTerminalOutcome::Consumed),
415 state.last_run_id().cloned(),
416 state.last_boundary_sequence(),
417 {
418 let mut h = state.history().to_vec();
419 h.push(InputStateHistoryEntry {
420 timestamp: now,
421 from,
422 to: InputLifecycleState::Consumed,
423 reason: Some("recovery: boundary receipt already committed".into()),
424 });
425 h
426 },
427 now,
428 );
429 *state.authority_mut() = auth;
430 } else {
431 let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
432 InputLifecycleState::Queued,
433 None,
434 state.last_run_id().cloned(),
435 state.last_boundary_sequence(),
436 {
437 let mut h = state.history().to_vec();
438 h.push(InputStateHistoryEntry {
439 timestamp: now,
440 from,
441 to: InputLifecycleState::Queued,
442 reason: Some("recovery: missing boundary receipt".into()),
443 });
444 h
445 },
446 now,
447 );
448 *state.authority_mut() = auth;
449 }
450 }
451 if matches!(
452 state.current_state(),
453 InputLifecycleState::Accepted | InputLifecycleState::Staged
454 ) {
455 let now = Utc::now();
459 let from = state.current_state();
460 let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
461 InputLifecycleState::Queued,
462 None,
463 state.last_run_id().cloned(),
464 state.last_boundary_sequence(),
465 {
466 let mut h = state.history().to_vec();
467 h.push(InputStateHistoryEntry {
468 timestamp: now,
469 from,
470 to: InputLifecycleState::Queued,
471 reason: Some("recovery: pre-run state normalized to queued".into()),
472 });
473 h
474 },
475 now,
476 );
477 *state.authority_mut() = auth;
478 }
479
480 if self.inner.input_state(&state.input_id).is_none() {
482 let handling_mode = state
483 .policy
484 .as_ref()
485 .map(|p| handling_mode_from_policy(&p.decision))
486 .unwrap_or(HandlingMode::Queue);
487 let content_shape = state
488 .persisted_input
489 .as_ref()
490 .map(|i| ContentShape(i.kind_id().to_string()))
491 .unwrap_or_else(|| ContentShape("unknown".into()));
492 let policy = match state.policy.as_ref() {
493 Some(p) => p.decision.clone(),
494 None => match state.persisted_input.as_ref() {
495 Some(input) => {
496 crate::policy_table::DefaultPolicyTable::resolve(input, true)
497 }
498 None => {
499 self.inner.ledger_mut().recover(state);
502 continue;
503 }
504 },
505 };
506 let request_id = None;
507 let reservation_key = None;
508
509 let inserted = self.inner.ledger_mut().recover(state.clone());
510 if !inserted {
511 continue;
515 }
516
517 if let Some(input) = state.persisted_input.clone() {
518 recovered_payloads.push((state.input_id.clone(), input));
519 }
520
521 let lifecycle_state = state.current_state();
522
523 if let Err(err) = self.inner.admit_recovered_to_ingress(
524 state.input_id.clone(),
525 content_shape,
526 handling_mode,
527 lifecycle_state,
528 policy,
529 request_id,
530 reservation_key,
531 ) {
532 return Err(RuntimeDriverError::Internal(format!(
533 "failed to admit recovered input '{}' to ingress authority: {err}",
534 state.input_id
535 )));
536 }
537 }
538 }
539
540 let report = self.inner.recover().await?;
544
545 for (input_id, _input) in recovered_payloads {
546 let should_requeue = self.inner.input_state(&input_id).is_some_and(|state| {
547 state.current_state() == crate::input_state::InputLifecycleState::Queued
548 });
549 if should_requeue && !self.inner.has_queued_input(&input_id) {
550 return Err(RuntimeDriverError::Internal(format!(
551 "persistent recover left queued input '{input_id}' out of the runtime queue projection"
552 )));
553 }
554 }
555
556 if let Some(runtime_state) = self
557 .store
558 .load_runtime_state(&self.runtime_id)
559 .await
560 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
561 {
562 match runtime_state {
563 RuntimeState::Retired if self.inner.runtime_state() != RuntimeState::Retired => {
564 EphemeralRuntimeDriver::retire(&mut self.inner)?;
565 }
566 RuntimeState::Stopped
567 if self.inner.runtime_state() != RuntimeState::Stopped
568 && self.inner.runtime_state() != RuntimeState::Destroyed =>
569 {
570 self.inner
572 .on_runtime_control(RuntimeControlCommand::Stop)
573 .await?;
574 }
575 RuntimeState::Destroyed
576 if self.inner.runtime_state() != RuntimeState::Destroyed =>
577 {
578 self.inner.destroy()?;
579 }
580 _ => {}
581 }
582
583 if runtime_state.is_terminal() {
587 let active = self.inner.active_input_ids();
588 if !active.is_empty() {
589 return Err(RuntimeDriverError::Internal(format!(
590 "store corruption: terminal runtime '{}' has {} active inputs",
591 runtime_state,
592 active.len()
593 )));
594 }
595 }
596 }
597
598 let input_states = self.inner.input_states_snapshot();
600 self.store
601 .atomic_lifecycle_commit(
602 &self.runtime_id,
603 self.runtime_state_for_persistence(),
604 &input_states,
605 )
606 .await
607 .map_err(|e| RuntimeDriverError::Internal(format!("recovery persist failed: {e}")))?;
608 Ok(report)
609 }
610
611 async fn retire(&mut self) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
612 let checkpoint = self.inner.clone();
613 let report = EphemeralRuntimeDriver::retire(&mut self.inner)?;
614 let input_states = self.inner.input_states_snapshot();
615 if let Err(err) = self
616 .store
617 .atomic_lifecycle_commit(
618 &self.runtime_id,
619 self.runtime_state_for_persistence(),
620 &input_states,
621 )
622 .await
623 {
624 self.inner = checkpoint;
625 return Err(RuntimeDriverError::Internal(format!(
626 "retire persist failed: {err}"
627 )));
628 }
629 Ok(report)
630 }
631
632 async fn reset(&mut self) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
633 let checkpoint = self.inner.clone();
634 let report = EphemeralRuntimeDriver::reset(&mut self.inner)?;
635 let input_states = self.inner.input_states_snapshot();
636 if let Err(err) = self
637 .store
638 .atomic_lifecycle_commit(
639 &self.runtime_id,
640 self.runtime_state_for_persistence(),
641 &input_states,
642 )
643 .await
644 {
645 self.inner = checkpoint;
646 return Err(RuntimeDriverError::Internal(format!(
647 "reset persist failed: {err}"
648 )));
649 }
650 Ok(report)
651 }
652
653 async fn destroy(&mut self) -> Result<DestroyReport, RuntimeDriverError> {
654 let abandoned = self.inner.destroy()?;
655 let input_states = self.inner.input_states_snapshot();
656 if let Err(err) = self
657 .store
658 .atomic_lifecycle_commit(
659 &self.runtime_id,
660 self.runtime_state_for_persistence(),
661 &input_states,
662 )
663 .await
664 {
665 return Err(RuntimeDriverError::Internal(format!(
666 "destroy persist failed: {err}"
667 )));
668 }
669 Ok(DestroyReport {
670 inputs_abandoned: abandoned,
671 })
672 }
673
674 fn runtime_state(&self) -> RuntimeState {
675 self.inner.runtime_state()
676 }
677
678 fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
679 self.inner.input_state(input_id)
680 }
681
682 fn active_input_ids(&self) -> Vec<InputId> {
683 self.inner.active_input_ids()
684 }
685}