1use std::sync::Arc;
8
9use chrono::Utc;
10use meerkat_core::BlobStore;
11use meerkat_core::lifecycle::{InputId, RunEvent};
12
13use crate::accept::AcceptOutcome;
14use crate::driver::ephemeral::handling_mode_from_policy;
15use crate::identifiers::LogicalRuntimeId;
16use crate::input::{Input, externalize_input_images};
17use crate::input_state::{
18 InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry,
19 InputTerminalOutcome,
20};
21use crate::runtime_event::RuntimeEventEnvelope;
22use crate::runtime_ingress_authority::ContentShape;
23use crate::runtime_state::RuntimeState;
24use crate::store::RuntimeStore;
25use crate::traits::{
26 DestroyReport, RecoveryReport, RuntimeControlCommand, RuntimeDriver, RuntimeDriverError,
27};
28use meerkat_core::types::HandlingMode;
29
30use super::ephemeral::EphemeralRuntimeDriver;
31
32pub struct PersistentRuntimeDriver {
34 inner: EphemeralRuntimeDriver,
36 store: Arc<dyn RuntimeStore>,
38 blob_store: Arc<dyn BlobStore>,
40 runtime_id: LogicalRuntimeId,
42}
43
44impl PersistentRuntimeDriver {
45 pub fn new(
47 runtime_id: LogicalRuntimeId,
48 store: Arc<dyn RuntimeStore>,
49 blob_store: Arc<dyn BlobStore>,
50 ) -> Self {
51 Self {
52 inner: EphemeralRuntimeDriver::new(runtime_id.clone()),
53 store,
54 blob_store,
55 runtime_id,
56 }
57 }
58
59 pub fn inner_ref(&self) -> &EphemeralRuntimeDriver {
61 &self.inner
62 }
63
64 pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
66 self.inner.set_silent_comms_intents(intents);
67 }
68
69 pub fn is_idle(&self) -> bool {
71 self.inner.is_idle()
72 }
73
74 pub fn is_idle_or_attached(&self) -> bool {
76 self.inner.is_idle_or_attached()
77 }
78
79 pub fn attach(&mut self) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
81 self.inner.attach()
82 }
83
84 pub fn detach(
86 &mut self,
87 ) -> Result<Option<RuntimeState>, crate::runtime_state::RuntimeStateTransitionError> {
88 self.inner.detach()
89 }
90
91 fn runtime_state_for_persistence(&self) -> RuntimeState {
96 match self.inner.runtime_state() {
97 RuntimeState::Attached => RuntimeState::Idle,
98 other => other,
99 }
100 }
101
102 pub fn start_run(
104 &mut self,
105 run_id: meerkat_core::lifecycle::RunId,
106 ) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
107 self.inner.start_run(run_id)
108 }
109
110 pub fn complete_run(
112 &mut self,
113 ) -> Result<meerkat_core::lifecycle::RunId, crate::runtime_state::RuntimeStateTransitionError>
114 {
115 self.inner.complete_run()
116 }
117
118 pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
120 self.inner.drain_events()
121 }
122
123 pub fn take_post_admission_signal(&mut self) -> crate::driver::ephemeral::PostAdmissionSignal {
125 self.inner.take_post_admission_signal()
126 }
127
128 pub fn take_wake_requested(&mut self) -> bool {
130 self.inner.take_wake_requested()
131 }
132
133 pub fn take_process_requested(&mut self) -> bool {
135 self.inner.take_process_requested()
136 }
137
138 pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
140 self.inner.dequeue_next()
141 }
142
143 pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
145 self.inner.dequeue_by_id(input_id)
146 }
147
148 pub fn persisted_input(&self, input_id: &InputId) -> Option<&Input> {
150 self.inner.persisted_input(input_id)
151 }
152
153 pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
154 self.inner.has_queued_input_outside(excluded)
155 }
156
157 pub fn stage_input(
159 &mut self,
160 input_id: &InputId,
161 run_id: &meerkat_core::lifecycle::RunId,
162 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
163 self.inner.stage_input(input_id, run_id)
164 }
165
166 pub fn stage_batch(
168 &mut self,
169 input_ids: &[InputId],
170 run_id: &meerkat_core::lifecycle::RunId,
171 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
172 self.inner.stage_batch(input_ids, run_id)
173 }
174
175 pub fn apply_input(
177 &mut self,
178 input_id: &InputId,
179 run_id: &meerkat_core::lifecycle::RunId,
180 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
181 self.inner.apply_input(input_id, run_id)
182 }
183
184 pub fn rollback_staged(
186 &mut self,
187 input_ids: &[InputId],
188 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
189 self.inner.rollback_staged(input_ids)
190 }
191
192 pub fn consume_inputs(
194 &mut self,
195 input_ids: &[InputId],
196 run_id: &meerkat_core::lifecycle::RunId,
197 ) -> Result<(), crate::input_lifecycle_authority::InputLifecycleError> {
198 self.inner.consume_inputs(input_ids, run_id)
199 }
200
201 async fn persist_state(&self, state: &InputState) -> Result<(), RuntimeDriverError> {
202 self.store
203 .persist_input_state(&self.runtime_id, state)
204 .await
205 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))
206 }
207
208 pub async fn abandon_pending_inputs(
209 &mut self,
210 reason: InputAbandonReason,
211 ) -> Result<usize, RuntimeDriverError> {
212 let checkpoint = self.inner.clone();
213 let abandoned = self.inner.abandon_pending_inputs(reason);
214 let input_states = self.inner.input_states_snapshot();
215 if let Err(err) = self
216 .store
217 .atomic_lifecycle_commit(
218 &self.runtime_id,
219 self.runtime_state_for_persistence(),
220 &input_states,
221 )
222 .await
223 {
224 self.inner = checkpoint;
225 return Err(RuntimeDriverError::Internal(format!(
226 "pending input abandon persist failed: {err}"
227 )));
228 }
229 Ok(abandoned)
230 }
231
232 pub async fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
237 let silent_intents = self.inner.silent_comms_intents();
238 self.inner = EphemeralRuntimeDriver::new(self.runtime_id.clone());
239 self.inner.set_silent_comms_intents(silent_intents);
240 let _ = RuntimeDriver::recover(self).await?;
241 Ok(self.inner.active_input_ids().len())
242 }
243}
244
245#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
246#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
247impl RuntimeDriver for PersistentRuntimeDriver {
248 async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
249 let checkpoint = self.inner.clone();
250 let input_for_recovery = input.clone();
251
252 let mut outcome = self.inner.accept_input(input).await?;
254
255 if let AcceptOutcome::Accepted {
257 ref input_id,
258 ref mut state,
259 ..
260 } = outcome
261 && let Some(inner_state) = self.inner.input_state(input_id).cloned()
262 {
263 let mut input_for_recovery = input_for_recovery.clone();
264 if let Err(err) =
265 externalize_input_images(self.blob_store.as_ref(), &mut input_for_recovery).await
266 {
267 self.inner = checkpoint;
268 return Err(RuntimeDriverError::Internal(format!(
269 "failed to externalize runtime input images: {err}"
270 )));
271 }
272 let mut persisted = inner_state;
273 persisted.persisted_input = Some(input_for_recovery);
274 self.inner.ledger_mut().accept(persisted.clone());
275 if let Err(err) = self.persist_state(&persisted).await {
276 self.inner = checkpoint;
277 return Err(err);
278 }
279 *state = persisted;
280 }
281
282 Ok(outcome)
283 }
284
285 async fn on_runtime_event(
286 &mut self,
287 event: RuntimeEventEnvelope,
288 ) -> Result<(), RuntimeDriverError> {
289 self.inner.on_runtime_event(event).await
290 }
291
292 async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
293 match event {
294 RunEvent::BoundaryApplied {
296 ref receipt,
297 ref session_snapshot,
298 ..
299 } => {
300 let checkpoint = self.inner.clone();
301 self.inner.on_run_event(event.clone()).await?;
302 if self
303 .store
304 .load_boundary_receipt(&self.runtime_id, &receipt.run_id, receipt.sequence)
305 .await
306 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
307 .is_some()
308 {
309 return Ok(());
310 }
311 let input_updates: Vec<InputState> = receipt
312 .contributing_input_ids
313 .iter()
314 .filter_map(|id| self.inner.input_state(id).cloned())
315 .collect();
316
317 self.store
318 .atomic_apply(
319 &self.runtime_id,
320 session_snapshot.clone().map(|session_snapshot| {
321 crate::store::SessionDelta { session_snapshot }
322 }),
323 receipt.clone(),
324 input_updates,
325 None, )
327 .await
328 .map_err(|e| {
329 self.inner = checkpoint;
330 RuntimeDriverError::Internal(format!("runtime boundary commit failed: {e}"))
331 })?;
332 }
333 RunEvent::RunCompleted { .. }
334 | RunEvent::RunFailed { .. }
335 | RunEvent::RunCancelled { .. } => {
336 let checkpoint = self.inner.clone();
337 self.inner.on_run_event(event).await?;
338 let input_states = self.inner.input_states_snapshot();
339 if let Err(err) = self
340 .store
341 .atomic_lifecycle_commit(
342 &self.runtime_id,
343 self.runtime_state_for_persistence(),
344 &input_states,
345 )
346 .await
347 {
348 self.inner = checkpoint;
349 return Err(RuntimeDriverError::Internal(format!(
350 "terminal event persist failed: {err}"
351 )));
352 }
353 }
354 _ => {
355 self.inner.on_run_event(event).await?;
356 }
357 }
358
359 Ok(())
360 }
361
362 async fn on_runtime_control(
363 &mut self,
364 command: RuntimeControlCommand,
365 ) -> Result<(), RuntimeDriverError> {
366 let checkpoint = self.inner.clone();
367 self.inner.on_runtime_control(command).await?;
368 let input_states = self.inner.input_states_snapshot();
369 if let Err(err) = self
370 .store
371 .atomic_lifecycle_commit(
372 &self.runtime_id,
373 self.runtime_state_for_persistence(),
374 &input_states,
375 )
376 .await
377 {
378 self.inner = checkpoint;
379 return Err(RuntimeDriverError::Internal(format!(
380 "control op persist failed: {err}"
381 )));
382 }
383 Ok(())
384 }
385
386 async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
387 let stored_states = self
389 .store
390 .load_input_states(&self.runtime_id)
391 .await
392 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
393
394 let mut recovered_payloads = Vec::new();
395
396 for mut state in stored_states {
400 if matches!(
401 state.current_state(),
402 InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption
403 ) {
404 let has_receipt =
405 match (state.last_run_id().cloned(), state.last_boundary_sequence()) {
406 (Some(run_id), Some(sequence)) => self
407 .store
408 .load_boundary_receipt(&self.runtime_id, &run_id, sequence)
409 .await
410 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
411 .is_some(),
412 _ => false,
413 };
414 let now = Utc::now();
415 let from = state.current_state();
416 if has_receipt {
417 let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
418 InputLifecycleState::Consumed,
419 Some(InputTerminalOutcome::Consumed),
420 state.last_run_id().cloned(),
421 state.last_boundary_sequence(),
422 state.attempt_count(),
423 {
424 let mut h = state.history().to_vec();
425 h.push(InputStateHistoryEntry {
426 timestamp: now,
427 from,
428 to: InputLifecycleState::Consumed,
429 reason: Some("recovery: boundary receipt already committed".into()),
430 });
431 h
432 },
433 now,
434 );
435 *state.authority_mut() = auth;
436 } else {
437 let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
438 InputLifecycleState::Queued,
439 None,
440 state.last_run_id().cloned(),
441 state.last_boundary_sequence(),
442 state.attempt_count(),
443 {
444 let mut h = state.history().to_vec();
445 h.push(InputStateHistoryEntry {
446 timestamp: now,
447 from,
448 to: InputLifecycleState::Queued,
449 reason: Some("recovery: missing boundary receipt".into()),
450 });
451 h
452 },
453 now,
454 );
455 *state.authority_mut() = auth;
456 }
457 }
458 if matches!(
459 state.current_state(),
460 InputLifecycleState::Accepted | InputLifecycleState::Staged
461 ) {
462 let now = Utc::now();
466 let from = state.current_state();
467 let auth = crate::input_lifecycle_authority::InputLifecycleAuthority::restore(
468 InputLifecycleState::Queued,
469 None,
470 state.last_run_id().cloned(),
471 state.last_boundary_sequence(),
472 state.attempt_count(),
473 {
474 let mut h = state.history().to_vec();
475 h.push(InputStateHistoryEntry {
476 timestamp: now,
477 from,
478 to: InputLifecycleState::Queued,
479 reason: Some("recovery: pre-run state normalized to queued".into()),
480 });
481 h
482 },
483 now,
484 );
485 *state.authority_mut() = auth;
486 }
487
488 if self.inner.input_state(&state.input_id).is_none() {
490 let handling_mode = state
491 .policy
492 .as_ref()
493 .map(|p| handling_mode_from_policy(&p.decision))
494 .unwrap_or(HandlingMode::Queue);
495 let content_shape = state
496 .persisted_input
497 .as_ref()
498 .map(|i| ContentShape(i.kind_id().to_string()))
499 .unwrap_or_else(|| ContentShape("unknown".into()));
500 let policy = match state.policy.as_ref() {
501 Some(p) => p.decision.clone(),
502 None => match state.persisted_input.as_ref() {
503 Some(input) => {
504 crate::policy_table::DefaultPolicyTable::resolve(input, true)
505 }
506 None => {
507 self.inner.ledger_mut().recover(state);
510 continue;
511 }
512 },
513 };
514 let request_id = None;
515 let reservation_key = None;
516
517 let inserted = self.inner.ledger_mut().recover(state.clone());
518 if !inserted {
519 continue;
523 }
524
525 if let Some(input) = state.persisted_input.clone() {
526 recovered_payloads.push((state.input_id.clone(), input));
527 }
528
529 let lifecycle_state = state.current_state();
530
531 if let Err(err) = self.inner.admit_recovered_to_ingress(
532 state.input_id.clone(),
533 content_shape,
534 handling_mode,
535 lifecycle_state,
536 policy,
537 request_id,
538 reservation_key,
539 ) {
540 return Err(RuntimeDriverError::Internal(format!(
541 "failed to admit recovered input '{}' to ingress authority: {err}",
542 state.input_id
543 )));
544 }
545 }
546 }
547
548 let report = self.inner.recover().await?;
552
553 for (input_id, _input) in recovered_payloads {
554 let should_requeue = self.inner.input_state(&input_id).is_some_and(|state| {
555 state.current_state() == crate::input_state::InputLifecycleState::Queued
556 });
557 if should_requeue && !self.inner.has_queued_input(&input_id) {
558 return Err(RuntimeDriverError::Internal(format!(
559 "persistent recover left queued input '{input_id}' out of the runtime queue projection"
560 )));
561 }
562 }
563
564 if let Some(runtime_state) = self
565 .store
566 .load_runtime_state(&self.runtime_id)
567 .await
568 .map_err(|e| RuntimeDriverError::Internal(e.to_string()))?
569 {
570 match runtime_state {
571 RuntimeState::Retired if self.inner.runtime_state() != RuntimeState::Retired => {
572 EphemeralRuntimeDriver::retire(&mut self.inner)?;
573 }
574 RuntimeState::Stopped
575 if self.inner.runtime_state() != RuntimeState::Stopped
576 && self.inner.runtime_state() != RuntimeState::Destroyed =>
577 {
578 self.inner
580 .on_runtime_control(RuntimeControlCommand::Stop)
581 .await?;
582 }
583 RuntimeState::Destroyed
584 if self.inner.runtime_state() != RuntimeState::Destroyed =>
585 {
586 self.inner.destroy()?;
587 }
588 _ => {}
589 }
590
591 if runtime_state.is_terminal() {
595 let active = self.inner.active_input_ids();
596 if !active.is_empty() {
597 return Err(RuntimeDriverError::Internal(format!(
598 "store corruption: terminal runtime '{}' has {} active inputs",
599 runtime_state,
600 active.len()
601 )));
602 }
603 }
604 }
605
606 let input_states = self.inner.input_states_snapshot();
608 self.store
609 .atomic_lifecycle_commit(
610 &self.runtime_id,
611 self.runtime_state_for_persistence(),
612 &input_states,
613 )
614 .await
615 .map_err(|e| RuntimeDriverError::Internal(format!("recovery persist failed: {e}")))?;
616 Ok(report)
617 }
618
619 async fn retire(&mut self) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
620 let checkpoint = self.inner.clone();
621 let report = EphemeralRuntimeDriver::retire(&mut self.inner)?;
622 let input_states = self.inner.input_states_snapshot();
623 if let Err(err) = self
624 .store
625 .atomic_lifecycle_commit(
626 &self.runtime_id,
627 self.runtime_state_for_persistence(),
628 &input_states,
629 )
630 .await
631 {
632 self.inner = checkpoint;
633 return Err(RuntimeDriverError::Internal(format!(
634 "retire persist failed: {err}"
635 )));
636 }
637 Ok(report)
638 }
639
640 async fn reset(&mut self) -> Result<crate::traits::ResetReport, RuntimeDriverError> {
641 let checkpoint = self.inner.clone();
642 let report = EphemeralRuntimeDriver::reset(&mut self.inner)?;
643 let input_states = self.inner.input_states_snapshot();
644 if let Err(err) = self
645 .store
646 .atomic_lifecycle_commit(
647 &self.runtime_id,
648 self.runtime_state_for_persistence(),
649 &input_states,
650 )
651 .await
652 {
653 self.inner = checkpoint;
654 return Err(RuntimeDriverError::Internal(format!(
655 "reset persist failed: {err}"
656 )));
657 }
658 Ok(report)
659 }
660
661 async fn destroy(&mut self) -> Result<DestroyReport, RuntimeDriverError> {
662 let abandoned = self.inner.destroy()?;
663 let input_states = self.inner.input_states_snapshot();
664 if let Err(err) = self
665 .store
666 .atomic_lifecycle_commit(
667 &self.runtime_id,
668 self.runtime_state_for_persistence(),
669 &input_states,
670 )
671 .await
672 {
673 return Err(RuntimeDriverError::Internal(format!(
674 "destroy persist failed: {err}"
675 )));
676 }
677 Ok(DestroyReport {
678 inputs_abandoned: abandoned,
679 })
680 }
681
682 fn runtime_state(&self) -> RuntimeState {
683 self.inner.runtime_state()
684 }
685
686 fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
687 self.inner.input_state(input_id)
688 }
689
690 fn active_input_ids(&self) -> Vec<InputId> {
691 self.inner.active_input_ids()
692 }
693}