1use std::collections::BTreeMap;
5use std::io::Cursor;
6use std::path::{Path, PathBuf};
7
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10
11use crate::effect::{EffectFailure, EffectHandler, EffectOutcome, EffectRequest, EffectResult};
12use crate::semantic_objects::{
13 AgreementEvidence, AgreementLevel, AgreementState, FinalizationOutcome,
14};
15
16pub const PERSISTED_DURABILITY_SCHEMA_VERSION: &str = "telltale.machine.durability.v1";
18
19pub const MAX_PERSISTED_DURABILITY_BYTES: usize = 64 * 1024 * 1024;
21
22fn decode_cbor<T>(bytes: &[u8], context: &str) -> Result<T, String>
23where
24 T: for<'de> Deserialize<'de>,
25{
26 if bytes.len() > MAX_PERSISTED_DURABILITY_BYTES {
27 return Err(format!(
28 "{context}: input is {} bytes, max is {MAX_PERSISTED_DURABILITY_BYTES}",
29 bytes.len()
30 ));
31 }
32 ciborium::from_reader(Cursor::new(bytes)).map_err(|err| format!("{context}: {err}"))
33}
34
35fn encode_cbor<T>(value: &T, context: &str) -> Result<Vec<u8>, String>
36where
37 T: Serialize,
38{
39 let mut bytes = Vec::new();
40 ciborium::into_writer(value, &mut bytes).map_err(|err| format!("{context}: {err}"))?;
41 Ok(bytes)
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
46#[serde(rename_all = "snake_case", tag = "kind")]
47pub enum AgreementWalEntry {
48 Escalation {
50 operation_id: String,
52 previous_level: AgreementLevel,
54 new_level: AgreementLevel,
56 #[serde(default)]
58 evidence_id: Option<String>,
59 tick: u64,
61 },
62 EvidenceProduced {
64 evidence: AgreementEvidence,
66 tick: u64,
68 },
69 Finalization {
71 operation_id: String,
73 outcome: FinalizationOutcome,
75 #[serde(default)]
77 materialization_proof_id: Option<String>,
78 #[serde(default)]
80 canonical_handle_id: Option<String>,
81 tick: u64,
83 },
84 VisibilityGateCrossing {
86 operation_id: String,
88 downstream_coroutine_id: String,
90 gate_level: AgreementLevel,
92 tick: u64,
94 },
95}
96
97#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
99pub struct AgreementWalArtifact {
100 pub entries: Vec<AgreementWalEntry>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub struct WalSyncRequest {
107 pub operation_id: String,
109 pub downstream_coroutine_id: String,
111 pub gate_level: AgreementLevel,
113 #[serde(default)]
115 pub agreement_state: Option<AgreementState>,
116 #[serde(default)]
118 pub agreement_evidence: Vec<AgreementEvidence>,
119 pub tick: u64,
121}
122
123impl AgreementWalEntry {
124 #[must_use]
126 pub const fn tick(&self) -> u64 {
127 match self {
128 Self::Escalation { tick, .. }
129 | Self::EvidenceProduced { tick, .. }
130 | Self::Finalization { tick, .. }
131 | Self::VisibilityGateCrossing { tick, .. } => *tick,
132 }
133 }
134
135 #[must_use]
137 pub fn operation_id(&self) -> &str {
138 match self {
139 Self::Escalation { operation_id, .. }
140 | Self::Finalization { operation_id, .. }
141 | Self::VisibilityGateCrossing { operation_id, .. } => operation_id,
142 Self::EvidenceProduced { evidence, .. } => &evidence.operation_id,
143 }
144 }
145
146 #[must_use]
148 pub fn stable_identity(&self) -> String {
149 match self {
150 Self::Escalation {
151 operation_id,
152 previous_level,
153 new_level,
154 evidence_id,
155 tick,
156 } => format!(
157 "escalation:{operation_id}:{previous_level:?}:{new_level:?}:{}:{tick}",
158 evidence_id.as_deref().unwrap_or("-")
159 ),
160 Self::EvidenceProduced { evidence, tick } => format!(
161 "evidence:{}:{}:{:?}:{tick}",
162 evidence.operation_id, evidence.evidence_id, evidence.level
163 ),
164 Self::Finalization {
165 operation_id,
166 outcome,
167 materialization_proof_id,
168 canonical_handle_id,
169 tick,
170 } => format!(
171 "finalization:{operation_id}:{outcome:?}:{}:{}:{tick}",
172 materialization_proof_id.as_deref().unwrap_or("-"),
173 canonical_handle_id.as_deref().unwrap_or("-")
174 ),
175 Self::VisibilityGateCrossing {
176 operation_id,
177 downstream_coroutine_id,
178 gate_level,
179 tick,
180 } => format!("gate:{operation_id}:{downstream_coroutine_id}:{gate_level:?}:{tick}"),
181 }
182 }
183}
184
185impl AgreementWalArtifact {
186 #[must_use]
188 pub fn read_since(&self, tick: u64) -> Vec<AgreementWalEntry> {
189 self.entries
190 .iter()
191 .filter(|entry| entry.tick() > tick)
192 .cloned()
193 .collect()
194 }
195
196 pub fn validate_monotonic_escalations(&self) -> Result<(), String> {
203 let mut last_levels = BTreeMap::<String, AgreementLevel>::new();
204 for entry in &self.entries {
205 let AgreementWalEntry::Escalation {
206 operation_id,
207 previous_level,
208 new_level,
209 ..
210 } = entry
211 else {
212 continue;
213 };
214 if new_level.rank() < previous_level.rank() {
215 return Err(format!(
216 "agreement WAL regression for `{operation_id}`: {previous_level:?} -> {new_level:?}"
217 ));
218 }
219 if let Some(last) = last_levels.get(operation_id) {
220 if previous_level.rank() < last.rank() || new_level.rank() < last.rank() {
221 return Err(format!(
222 "agreement WAL reordered or regressed for `{operation_id}`: last={last:?}, entry={previous_level:?}->{new_level:?}"
223 ));
224 }
225 }
226 last_levels.insert(operation_id.clone(), *new_level);
227 }
228 Ok(())
229 }
230}
231
232pub trait AgreementWal {
234 fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String>;
241
242 fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String>;
248
249 fn load(&self) -> Result<AgreementWalArtifact, String>;
255}
256
257#[derive(Debug, Clone, Default)]
260pub struct InMemoryAgreementWal {
261 artifact: AgreementWalArtifact,
262}
263
264impl InMemoryAgreementWal {
265 #[must_use]
267 pub fn new() -> Self {
268 Self::default()
269 }
270}
271
272impl AgreementWal for InMemoryAgreementWal {
273 fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String> {
274 self.artifact.entries.push(entry);
275 self.artifact.validate_monotonic_escalations()
276 }
277
278 fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String> {
279 Ok(self.artifact.read_since(tick))
280 }
281
282 fn load(&self) -> Result<AgreementWalArtifact, String> {
283 Ok(self.artifact.clone())
284 }
285}
286
287#[derive(Debug, Clone)]
290pub struct FileAgreementWal {
291 path: PathBuf,
292}
293
294impl FileAgreementWal {
295 #[must_use]
297 pub fn new(path: impl Into<PathBuf>) -> Self {
298 Self { path: path.into() }
299 }
300
301 fn load_artifact(&self) -> Result<AgreementWalArtifact, String> {
302 if !self.path.exists() {
303 return Ok(AgreementWalArtifact::default());
304 }
305 PersistedDurabilityArtifact::from_path(&self.path)?.into_agreement_wal()
306 }
307
308 fn store_artifact(&self, artifact: &AgreementWalArtifact) -> Result<(), String> {
309 artifact.validate_monotonic_escalations()?;
310 PersistedDurabilityArtifact::agreement_wal(artifact.clone()).write_to_path(&self.path)
311 }
312}
313
314impl AgreementWal for FileAgreementWal {
315 fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String> {
316 let mut artifact = self.load_artifact()?;
317 artifact.entries.push(entry);
318 self.store_artifact(&artifact)
319 }
320
321 fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String> {
322 Ok(self.load_artifact()?.read_since(tick))
323 }
324
325 fn load(&self) -> Result<AgreementWalArtifact, String> {
326 self.load_artifact()
327 }
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
332pub struct EvidenceOutcomeCacheEntry {
333 pub evidence_id: String,
335 pub interface_name: String,
337 pub operation_name: String,
339 pub outcome: EffectOutcome,
341}
342
343#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
345pub struct EvidenceOutcomeCacheArtifact {
346 pub entries: Vec<EvidenceOutcomeCacheEntry>,
348}
349
350impl EvidenceOutcomeCacheArtifact {
351 #[must_use]
353 pub fn get(&self, evidence_id: &str) -> Option<&EvidenceOutcomeCacheEntry> {
354 self.entries
355 .iter()
356 .find(|entry| entry.evidence_id == evidence_id)
357 }
358}
359
360pub trait EvidenceOutcomeCache {
362 fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String>;
368
369 fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String>;
375
376 fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String>;
382}
383
384#[derive(Debug, Clone, Default)]
386pub struct InMemoryEvidenceOutcomeCache {
387 artifact: EvidenceOutcomeCacheArtifact,
388}
389
390impl InMemoryEvidenceOutcomeCache {
391 #[must_use]
393 pub fn new() -> Self {
394 Self::default()
395 }
396}
397
398impl EvidenceOutcomeCache for InMemoryEvidenceOutcomeCache {
399 fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String> {
400 Ok(self.artifact.get(evidence_id).cloned())
401 }
402
403 fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String> {
404 self.artifact
405 .entries
406 .retain(|candidate| candidate.evidence_id != entry.evidence_id);
407 self.artifact.entries.push(entry);
408 Ok(())
409 }
410
411 fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
412 Ok(self.artifact.clone())
413 }
414}
415
416#[derive(Debug, Clone)]
418pub struct FileEvidenceOutcomeCache {
419 path: PathBuf,
420}
421
422impl FileEvidenceOutcomeCache {
423 #[must_use]
425 pub fn new(path: impl Into<PathBuf>) -> Self {
426 Self { path: path.into() }
427 }
428
429 fn load_artifact(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
430 if !self.path.exists() {
431 return Ok(EvidenceOutcomeCacheArtifact::default());
432 }
433 PersistedDurabilityArtifact::from_path(&self.path)?.into_evidence_outcome_cache()
434 }
435
436 fn store_artifact(&self, artifact: &EvidenceOutcomeCacheArtifact) -> Result<(), String> {
437 PersistedDurabilityArtifact::evidence_outcome_cache(artifact.clone())
438 .write_to_path(&self.path)
439 }
440}
441
442impl EvidenceOutcomeCache for FileEvidenceOutcomeCache {
443 fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String> {
444 Ok(self.load_artifact()?.get(evidence_id).cloned())
445 }
446
447 fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String> {
448 let mut artifact = self.load_artifact()?;
449 artifact
450 .entries
451 .retain(|candidate| candidate.evidence_id != entry.evidence_id);
452 artifact.entries.push(entry);
453 self.store_artifact(&artifact)
454 }
455
456 fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
457 self.load_artifact()
458 }
459}
460
461pub trait EvidenceIdResolver: Send + Sync {
463 fn evidence_id_for_request(&self, request: &EffectRequest) -> Option<String>;
466}
467
468impl<F> EvidenceIdResolver for F
469where
470 F: Fn(&EffectRequest) -> Option<String> + Send + Sync,
471{
472 fn evidence_id_for_request(&self, request: &EffectRequest) -> Option<String> {
473 self(request)
474 }
475}
476
477pub struct EvidencePersistenceHandler<'a, C, R>
480where
481 C: EvidenceOutcomeCache,
482 R: EvidenceIdResolver,
483{
484 inner: &'a dyn EffectHandler,
485 cache: Mutex<C>,
486 resolver: R,
487}
488
489impl<'a, C, R> EvidencePersistenceHandler<'a, C, R>
490where
491 C: EvidenceOutcomeCache,
492 R: EvidenceIdResolver,
493{
494 #[must_use]
496 pub fn new(inner: &'a dyn EffectHandler, cache: C, resolver: R) -> Self {
497 Self {
498 inner,
499 cache: Mutex::new(cache),
500 resolver,
501 }
502 }
503
504 pub fn cached_outcome(&self, evidence_id: &str) -> Result<Option<EffectOutcome>, String> {
510 let cache = self.cache.lock();
511 Ok(cache.get(evidence_id)?.map(|entry| entry.outcome))
512 }
513
514 pub fn cache_snapshot(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
520 let cache = self.cache.lock();
521 cache.load()
522 }
523}
524
525impl<C, R> EffectHandler for EvidencePersistenceHandler<'_, C, R>
526where
527 C: EvidenceOutcomeCache + Send,
528 R: EvidenceIdResolver,
529{
530 fn handler_identity(&self) -> String {
531 format!("evidence_persistence<{}>", self.inner.handler_identity())
532 }
533
534 fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
535 let evidence_id = self.resolver.evidence_id_for_request(&request);
536 let interface_name = request.metadata.interface_name.clone();
537 let operation_name = request.metadata.operation_name.clone();
538
539 if let Some(evidence_id) = evidence_id.clone() {
540 let cache = self.cache.lock();
541 match cache.get(&evidence_id) {
542 Ok(Some(entry)) => return entry.outcome,
543 Ok(None) => {}
544 Err(err) => {
545 return EffectOutcome::failure(EffectFailure::unavailable(format!(
546 "load evidence outcome cache `{evidence_id}`: {err}"
547 )));
548 }
549 }
550 }
551
552 let outcome = self.inner.handle_effect(request);
553 let Some(evidence_id) = evidence_id else {
554 return outcome;
555 };
556
557 let entry = EvidenceOutcomeCacheEntry {
558 evidence_id: evidence_id.clone(),
559 interface_name,
560 operation_name,
561 outcome: outcome.clone(),
562 };
563 let mut cache = self.cache.lock();
564 if let Err(err) = cache.put(entry) {
565 return EffectOutcome::failure(EffectFailure::unavailable(format!(
566 "persist evidence outcome `{evidence_id}`: {err}"
567 )));
568 }
569 outcome
570 }
571
572 fn handle_send(
573 &self,
574 role: &str,
575 partner: &str,
576 label: &str,
577 state: &[crate::coroutine::Value],
578 ) -> crate::effect::EffectResult<crate::coroutine::Value> {
579 self.inner.handle_send(role, partner, label, state)
580 }
581
582 fn send_decision(
583 &self,
584 input: crate::effect::SendDecisionInput<'_>,
585 ) -> crate::effect::EffectResult<crate::effect::SendDecision> {
586 self.inner.send_decision(input)
587 }
588
589 fn handle_recv(
590 &self,
591 role: &str,
592 partner: &str,
593 label: &str,
594 state: &mut Vec<crate::coroutine::Value>,
595 payload: &crate::coroutine::Value,
596 ) -> crate::effect::EffectResult<()> {
597 self.inner.handle_recv(role, partner, label, state, payload)
598 }
599
600 fn handle_choose(
601 &self,
602 role: &str,
603 partner: &str,
604 labels: &[String],
605 state: &[crate::coroutine::Value],
606 ) -> crate::effect::EffectResult<String> {
607 self.inner.handle_choose(role, partner, labels, state)
608 }
609
610 fn step(
611 &self,
612 role: &str,
613 state: &mut Vec<crate::coroutine::Value>,
614 ) -> crate::effect::EffectResult<()> {
615 self.inner.step(role, state)
616 }
617}
618
619#[derive(Debug, Clone, Default, PartialEq, Eq)]
621pub enum WalSyncMode {
622 #[default]
624 Immediate,
625 Blocked,
627 Failure {
629 message: String,
631 },
632}
633
634pub struct AgreementWalHandler<'a, W>
637where
638 W: AgreementWal,
639{
640 inner: &'a dyn EffectHandler,
641 wal: Mutex<W>,
642 sync_mode: WalSyncMode,
643}
644
645impl<'a, W> AgreementWalHandler<'a, W>
646where
647 W: AgreementWal,
648{
649 #[must_use]
651 pub fn new(inner: &'a dyn EffectHandler, wal: W) -> Self {
652 Self {
653 inner,
654 wal: Mutex::new(wal),
655 sync_mode: WalSyncMode::Immediate,
656 }
657 }
658
659 #[must_use]
661 pub fn with_sync_mode(inner: &'a dyn EffectHandler, wal: W, sync_mode: WalSyncMode) -> Self {
662 Self {
663 inner,
664 wal: Mutex::new(wal),
665 sync_mode,
666 }
667 }
668
669 pub fn wal_snapshot(&self) -> Result<AgreementWalArtifact, String> {
675 let wal = self.wal.lock();
676 wal.load()
677 }
678
679 fn build_entries(
680 &self,
681 wal: &W,
682 sync: &WalSyncRequest,
683 ) -> Result<Vec<AgreementWalEntry>, String> {
684 let existing = wal.load()?;
685 let existing_ids: std::collections::BTreeSet<_> = existing
686 .entries
687 .iter()
688 .map(AgreementWalEntry::stable_identity)
689 .collect();
690 let mut entries = Vec::new();
691
692 for evidence in sync
693 .agreement_evidence
694 .iter()
695 .filter(|evidence| evidence.operation_id == sync.operation_id)
696 {
697 let entry = AgreementWalEntry::EvidenceProduced {
698 evidence: evidence.clone(),
699 tick: sync.tick,
700 };
701 if !existing_ids.contains(&entry.stable_identity()) {
702 entries.push(entry);
703 }
704 }
705
706 if let Some(state) = &sync.agreement_state {
707 let previous_level = existing
708 .entries
709 .iter()
710 .filter_map(|entry| match entry {
711 AgreementWalEntry::Escalation {
712 operation_id,
713 new_level,
714 ..
715 } if operation_id == &sync.operation_id => Some(*new_level),
716 _ => None,
717 })
718 .max_by_key(|level| level.rank())
719 .unwrap_or(AgreementLevel::None);
720 if state.level.rank() > previous_level.rank() {
721 let entry = AgreementWalEntry::Escalation {
722 operation_id: sync.operation_id.clone(),
723 previous_level,
724 new_level: state.level,
725 evidence_id: state.evidence_ids.last().cloned(),
726 tick: sync.tick,
727 };
728 if !existing_ids.contains(&entry.stable_identity()) {
729 entries.push(entry);
730 }
731 }
732 if let Some(outcome) = state.finalization {
733 let entry = AgreementWalEntry::Finalization {
734 operation_id: sync.operation_id.clone(),
735 outcome,
736 materialization_proof_id: state
737 .evidence_ids
738 .iter()
739 .find(|evidence_id| evidence_id.contains("proof"))
740 .cloned(),
741 canonical_handle_id: None,
742 tick: sync.tick,
743 };
744 if !existing_ids.contains(&entry.stable_identity()) {
745 entries.push(entry);
746 }
747 }
748 }
749
750 let gate = AgreementWalEntry::VisibilityGateCrossing {
751 operation_id: sync.operation_id.clone(),
752 downstream_coroutine_id: sync.downstream_coroutine_id.clone(),
753 gate_level: sync.gate_level,
754 tick: sync.tick,
755 };
756 if !existing_ids.contains(&gate.stable_identity()) {
757 entries.push(gate);
758 }
759
760 Ok(entries)
761 }
762}
763
764impl<W> EffectHandler for AgreementWalHandler<'_, W>
765where
766 W: AgreementWal + Send,
767{
768 fn handler_identity(&self) -> String {
769 format!("agreement_wal<{}>", self.inner.handler_identity())
770 }
771
772 fn supports_wal_sync(&self) -> bool {
773 true
774 }
775
776 fn wal_sync(&self, sync: &WalSyncRequest) -> EffectResult<()> {
777 match &self.sync_mode {
778 WalSyncMode::Immediate => {
779 let mut wal = self.wal.lock();
780 let entries = match self.build_entries(&*wal, sync) {
781 Ok(entries) => entries,
782 Err(err) => {
783 return EffectResult::failure(EffectFailure::unavailable(format!(
784 "load agreement WAL for `{}`: {err}",
785 sync.operation_id
786 )));
787 }
788 };
789 for entry in entries {
790 if let Err(err) = wal.append(entry) {
791 return EffectResult::failure(EffectFailure::unavailable(format!(
792 "persist agreement WAL for `{}`: {err}",
793 sync.operation_id
794 )));
795 }
796 }
797 EffectResult::success(())
798 }
799 WalSyncMode::Blocked => EffectResult::Blocked,
800 WalSyncMode::Failure { message } => {
801 EffectResult::failure(EffectFailure::unavailable(message.clone()))
802 }
803 }
804 }
805
806 fn handle_send(
807 &self,
808 role: &str,
809 partner: &str,
810 label: &str,
811 state: &[crate::coroutine::Value],
812 ) -> crate::effect::EffectResult<crate::coroutine::Value> {
813 self.inner.handle_send(role, partner, label, state)
814 }
815
816 fn send_decision(
817 &self,
818 input: crate::effect::SendDecisionInput<'_>,
819 ) -> crate::effect::EffectResult<crate::effect::SendDecision> {
820 self.inner.send_decision(input)
821 }
822
823 fn handle_recv(
824 &self,
825 role: &str,
826 partner: &str,
827 label: &str,
828 state: &mut Vec<crate::coroutine::Value>,
829 payload: &crate::coroutine::Value,
830 ) -> crate::effect::EffectResult<()> {
831 self.inner.handle_recv(role, partner, label, state, payload)
832 }
833
834 fn handle_choose(
835 &self,
836 role: &str,
837 partner: &str,
838 labels: &[String],
839 state: &[crate::coroutine::Value],
840 ) -> crate::effect::EffectResult<String> {
841 self.inner.handle_choose(role, partner, labels, state)
842 }
843
844 fn step(
845 &self,
846 role: &str,
847 state: &mut Vec<crate::coroutine::Value>,
848 ) -> crate::effect::EffectResult<()> {
849 self.inner.step(role, state)
850 }
851}
852
853#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
856pub struct DurableRecoveryMetadata {
857 pub checkpoint_tick: u64,
859 #[serde(default)]
861 pub wal_tail_start_tick: Option<u64>,
862 #[serde(default)]
864 pub highest_recovered_tick: Option<u64>,
865 #[serde(default)]
867 pub resumed_operation_ids: Vec<String>,
868 #[serde(default)]
870 pub terminal_operation_ids: Vec<String>,
871 #[serde(default)]
873 pub cached_evidence_ids: Vec<String>,
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
878#[serde(rename_all = "snake_case")]
879pub enum DurableRecoveryAction {
880 ReexecuteFromScratch,
882 ResumeFromEvidenceBoundary,
884 ReuseFinalized,
886 PreserveTerminal,
888}
889
890#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
892pub struct DurableRecoveryDecision {
893 pub operation_id: String,
895 pub level: AgreementLevel,
897 #[serde(default)]
899 pub finalization: Option<FinalizationOutcome>,
900 pub action: DurableRecoveryAction,
902 #[serde(default)]
904 pub cached_evidence_ids: Vec<String>,
905 #[serde(default)]
907 pub gate_crossed: bool,
908}
909
910#[derive(Debug, Serialize, Deserialize)]
912pub struct DurableRecoveryPlan {
913 pub machine: crate::ProtocolMachine,
915 pub metadata: DurableRecoveryMetadata,
917 pub wal_suffix: Vec<AgreementWalEntry>,
919 pub evidence_cache: EvidenceOutcomeCacheArtifact,
921 pub decisions: Vec<DurableRecoveryDecision>,
923}
924
925#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
927#[serde(rename_all = "snake_case", tag = "kind", content = "payload")]
928pub enum PersistedDurabilityPayload {
929 AgreementWal(AgreementWalArtifact),
931 EvidenceOutcomeCache(EvidenceOutcomeCacheArtifact),
933 RecoveryMetadata(DurableRecoveryMetadata),
935}
936
937#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
939pub struct PersistedDurabilityArtifact {
940 pub schema_version: String,
942 pub payload: PersistedDurabilityPayload,
944}
945
946impl PersistedDurabilityArtifact {
947 #[must_use]
949 pub fn agreement_wal(wal: AgreementWalArtifact) -> Self {
950 Self {
951 schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
952 payload: PersistedDurabilityPayload::AgreementWal(wal),
953 }
954 }
955
956 #[must_use]
958 pub fn evidence_outcome_cache(cache: EvidenceOutcomeCacheArtifact) -> Self {
959 Self {
960 schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
961 payload: PersistedDurabilityPayload::EvidenceOutcomeCache(cache),
962 }
963 }
964
965 #[must_use]
967 pub fn recovery_metadata(metadata: DurableRecoveryMetadata) -> Self {
968 Self {
969 schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
970 payload: PersistedDurabilityPayload::RecoveryMetadata(metadata),
971 }
972 }
973
974 pub fn from_slice(bytes: &[u8]) -> Result<Self, String> {
981 let artifact: Self = decode_cbor(bytes, "decode persisted durability artifact")?;
982 if artifact.schema_version != PERSISTED_DURABILITY_SCHEMA_VERSION {
983 return Err(format!(
984 "unsupported persisted durability schema version `{}`",
985 artifact.schema_version
986 ));
987 }
988 Ok(artifact)
989 }
990
991 pub fn from_path(path: impl AsRef<Path>) -> Result<Self, String> {
998 let path = path.as_ref();
999 let bytes = std::fs::read(path).map_err(|err| {
1000 format!(
1001 "read persisted durability artifact {}: {err}",
1002 path.display()
1003 )
1004 })?;
1005 Self::from_slice(&bytes)
1006 }
1007
1008 pub fn to_cbor(&self) -> Result<Vec<u8>, String> {
1014 encode_cbor(self, "encode persisted durability artifact")
1015 }
1016
1017 pub fn write_to_path(&self, path: impl AsRef<Path>) -> Result<(), String> {
1023 let path = path.as_ref();
1024 let bytes = self.to_cbor()?;
1025 std::fs::write(path, bytes).map_err(|err| {
1026 format!(
1027 "write persisted durability artifact {}: {err}",
1028 path.display()
1029 )
1030 })
1031 }
1032
1033 #[must_use]
1035 pub fn agreement_wal_artifact(&self) -> Option<&AgreementWalArtifact> {
1036 match &self.payload {
1037 PersistedDurabilityPayload::AgreementWal(wal) => Some(wal),
1038 PersistedDurabilityPayload::EvidenceOutcomeCache(_)
1039 | PersistedDurabilityPayload::RecoveryMetadata(_) => None,
1040 }
1041 }
1042
1043 pub fn into_agreement_wal(self) -> Result<AgreementWalArtifact, String> {
1050 match self.payload {
1051 PersistedDurabilityPayload::AgreementWal(wal) => Ok(wal),
1052 PersistedDurabilityPayload::EvidenceOutcomeCache(_) => Err(
1053 "persisted durability artifact contains an evidence outcome cache payload, not an agreement WAL"
1054 .to_string(),
1055 ),
1056 PersistedDurabilityPayload::RecoveryMetadata(_) => Err(
1057 "persisted durability artifact contains recovery metadata, not an agreement WAL"
1058 .to_string(),
1059 ),
1060 }
1061 }
1062
1063 #[must_use]
1065 pub fn evidence_outcome_cache_artifact(&self) -> Option<&EvidenceOutcomeCacheArtifact> {
1066 match &self.payload {
1067 PersistedDurabilityPayload::EvidenceOutcomeCache(cache) => Some(cache),
1068 PersistedDurabilityPayload::AgreementWal(_)
1069 | PersistedDurabilityPayload::RecoveryMetadata(_) => None,
1070 }
1071 }
1072
1073 pub fn into_evidence_outcome_cache(self) -> Result<EvidenceOutcomeCacheArtifact, String> {
1080 match self.payload {
1081 PersistedDurabilityPayload::EvidenceOutcomeCache(cache) => Ok(cache),
1082 PersistedDurabilityPayload::AgreementWal(_) => Err(
1083 "persisted durability artifact contains an agreement WAL payload, not an evidence outcome cache"
1084 .to_string(),
1085 ),
1086 PersistedDurabilityPayload::RecoveryMetadata(_) => Err(
1087 "persisted durability artifact contains recovery metadata, not an evidence outcome cache"
1088 .to_string(),
1089 ),
1090 }
1091 }
1092}
1093
1094impl DurableRecoveryPlan {
1095 pub fn from_checkpoint(
1101 checkpoint_tick: u64,
1102 machine: crate::ProtocolMachine,
1103 wal: &AgreementWalArtifact,
1104 evidence_cache: EvidenceOutcomeCacheArtifact,
1105 ) -> Result<Self, String> {
1106 wal.validate_monotonic_escalations()?;
1107 let wal_suffix = wal.read_since(checkpoint_tick);
1108 let metadata = DurableRecoveryMetadata {
1109 checkpoint_tick,
1110 wal_tail_start_tick: wal_suffix.first().map(AgreementWalEntry::tick),
1111 highest_recovered_tick: wal_suffix.last().map(AgreementWalEntry::tick),
1112 resumed_operation_ids: Vec::new(),
1113 terminal_operation_ids: Vec::new(),
1114 cached_evidence_ids: evidence_cache
1115 .entries
1116 .iter()
1117 .map(|entry| entry.evidence_id.clone())
1118 .collect(),
1119 };
1120 let mut plan = Self {
1121 machine,
1122 metadata,
1123 wal_suffix,
1124 evidence_cache,
1125 decisions: Vec::new(),
1126 };
1127 plan.decisions = plan.build_decisions();
1128 plan.metadata.resumed_operation_ids = plan
1129 .decisions
1130 .iter()
1131 .filter(|decision| {
1132 matches!(
1133 decision.action,
1134 DurableRecoveryAction::ReexecuteFromScratch
1135 | DurableRecoveryAction::ResumeFromEvidenceBoundary
1136 )
1137 })
1138 .map(|decision| decision.operation_id.clone())
1139 .collect();
1140 plan.metadata.terminal_operation_ids = plan
1141 .decisions
1142 .iter()
1143 .filter(|decision| {
1144 matches!(
1145 decision.action,
1146 DurableRecoveryAction::ReuseFinalized | DurableRecoveryAction::PreserveTerminal
1147 )
1148 })
1149 .map(|decision| decision.operation_id.clone())
1150 .collect();
1151 Ok(plan)
1152 }
1153
1154 fn build_decisions(&self) -> Vec<DurableRecoveryDecision> {
1155 let mut operation_ids = std::collections::BTreeSet::new();
1156 for entry in &self.wal_suffix {
1157 operation_ids.insert(entry.operation_id().to_string());
1158 }
1159
1160 operation_ids
1161 .into_iter()
1162 .map(|operation_id| {
1163 let mut level = AgreementLevel::None;
1164 let mut finalization = None;
1165 let mut gate_crossed = false;
1166 let mut evidence_ids = Vec::new();
1167
1168 for entry in self
1169 .wal_suffix
1170 .iter()
1171 .filter(|entry| entry.operation_id() == operation_id)
1172 {
1173 match entry {
1174 AgreementWalEntry::Escalation { new_level, .. } => {
1175 if new_level.rank() > level.rank() {
1176 level = *new_level;
1177 }
1178 }
1179 AgreementWalEntry::EvidenceProduced { evidence, .. } => {
1180 evidence_ids.push(evidence.evidence_id.clone());
1181 if evidence.level.rank() > level.rank() {
1182 level = evidence.level;
1183 }
1184 }
1185 AgreementWalEntry::Finalization { outcome, .. } => {
1186 finalization = Some(*outcome);
1187 if matches!(outcome, FinalizationOutcome::Finalized) {
1188 level = AgreementLevel::Finalized;
1189 }
1190 }
1191 AgreementWalEntry::VisibilityGateCrossing { .. } => {
1192 gate_crossed = true;
1193 }
1194 }
1195 }
1196
1197 let cached_evidence_ids = self
1198 .evidence_cache
1199 .entries
1200 .iter()
1201 .filter(|entry| {
1202 evidence_ids
1203 .iter()
1204 .any(|evidence_id| evidence_id == &entry.evidence_id)
1205 })
1206 .map(|entry| entry.evidence_id.clone())
1207 .collect::<Vec<_>>();
1208 let action = match finalization {
1209 Some(FinalizationOutcome::Finalized) => DurableRecoveryAction::ReuseFinalized,
1210 Some(
1211 FinalizationOutcome::Aborted
1212 | FinalizationOutcome::Rejected
1213 | FinalizationOutcome::TimedOut,
1214 ) => DurableRecoveryAction::PreserveTerminal,
1215 None if level.at_least(AgreementLevel::SoftSafe) => {
1216 DurableRecoveryAction::ResumeFromEvidenceBoundary
1217 }
1218 None => DurableRecoveryAction::ReexecuteFromScratch,
1219 };
1220
1221 DurableRecoveryDecision {
1222 operation_id,
1223 level,
1224 finalization,
1225 action,
1226 cached_evidence_ids,
1227 gate_crossed,
1228 }
1229 })
1230 .collect()
1231 }
1232}