Skip to main content

telltale_machine/
durable.rs

1//! Typed durability artifacts for agreement WALs, evidence outcome caches,
2//! and recovery metadata.
3
4use std::collections::BTreeMap;
5use std::io::Cursor;
6use std::path::{Path, PathBuf};
7
8use parking_lot::Mutex;
9use serde::{Deserialize, Serialize};
10
11use crate::effect::{EffectFailure, EffectHandler, EffectOutcome, EffectRequest, EffectResult};
12use crate::semantic_objects::{
13    AgreementEvidence, AgreementLevel, AgreementState, FinalizationOutcome,
14};
15
16/// Stable schema version for persisted durability artifacts.
17pub const PERSISTED_DURABILITY_SCHEMA_VERSION: &str = "telltale.machine.durability.v1";
18
19/// Maximum encoded size accepted for persisted durability artifacts.
20pub const MAX_PERSISTED_DURABILITY_BYTES: usize = 64 * 1024 * 1024;
21
22fn decode_cbor<T>(bytes: &[u8], context: &str) -> Result<T, String>
23where
24    T: for<'de> Deserialize<'de>,
25{
26    if bytes.len() > MAX_PERSISTED_DURABILITY_BYTES {
27        return Err(format!(
28            "{context}: input is {} bytes, max is {MAX_PERSISTED_DURABILITY_BYTES}",
29            bytes.len()
30        ));
31    }
32    ciborium::from_reader(Cursor::new(bytes)).map_err(|err| format!("{context}: {err}"))
33}
34
35fn encode_cbor<T>(value: &T, context: &str) -> Result<Vec<u8>, String>
36where
37    T: Serialize,
38{
39    let mut bytes = Vec::new();
40    ciborium::into_writer(value, &mut bytes).map_err(|err| format!("{context}: {err}"))?;
41    Ok(bytes)
42}
43
44/// One append-only agreement WAL entry.
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
46#[serde(rename_all = "snake_case", tag = "kind")]
47pub enum AgreementWalEntry {
48    /// Agreement level advanced for one operation.
49    Escalation {
50        /// Operation whose agreement level changed.
51        operation_id: String,
52        /// Previous agreement level before the transition.
53        previous_level: AgreementLevel,
54        /// New agreement level after the transition.
55        new_level: AgreementLevel,
56        /// Evidence attached to the transition when one exists.
57        #[serde(default)]
58        evidence_id: Option<String>,
59        /// Tick at which the transition was observed.
60        tick: u64,
61    },
62    /// Agreement evidence became durable.
63    EvidenceProduced {
64        /// Full typed evidence object.
65        evidence: AgreementEvidence,
66        /// Tick at which the evidence was observed.
67        tick: u64,
68    },
69    /// Finalization outcome became durable.
70    Finalization {
71        /// Operation that finalized or reached a terminal resolution.
72        operation_id: String,
73        /// Finalization or terminal outcome.
74        outcome: FinalizationOutcome,
75        /// Materialization proof id when one exists.
76        #[serde(default)]
77        materialization_proof_id: Option<String>,
78        /// Canonical handle id when one exists.
79        #[serde(default)]
80        canonical_handle_id: Option<String>,
81        /// Tick at which the outcome was observed.
82        tick: u64,
83    },
84    /// One visibility gate was crossed after durable confirmation.
85    VisibilityGateCrossing {
86        /// Operation whose visibility gate was crossed.
87        operation_id: String,
88        /// Downstream coroutine released by the gate.
89        downstream_coroutine_id: String,
90        /// Required agreement level for the gate.
91        gate_level: AgreementLevel,
92        /// Tick at which the gate crossing was observed.
93        tick: u64,
94    },
95}
96
97/// Typed agreement-WAL artifact.
98#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
99pub struct AgreementWalArtifact {
100    /// Append-only WAL entries in canonical order.
101    pub entries: Vec<AgreementWalEntry>,
102}
103
104/// Typed request payload for the internal `wal_sync` effect.
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub struct WalSyncRequest {
107    /// Operation whose durable state is being synchronized.
108    pub operation_id: String,
109    /// Coroutine released by the visibility gate on success.
110    pub downstream_coroutine_id: String,
111    /// Required agreement level for the gated step.
112    pub gate_level: AgreementLevel,
113    /// Current agreement state snapshot for the operation when available.
114    #[serde(default)]
115    pub agreement_state: Option<AgreementState>,
116    /// Agreement evidence attached to the operation when available.
117    #[serde(default)]
118    pub agreement_evidence: Vec<AgreementEvidence>,
119    /// Tick at which the gate is attempting to cross.
120    pub tick: u64,
121}
122
123impl AgreementWalEntry {
124    /// Tick at which this WAL entry was observed.
125    #[must_use]
126    pub const fn tick(&self) -> u64 {
127        match self {
128            Self::Escalation { tick, .. }
129            | Self::EvidenceProduced { tick, .. }
130            | Self::Finalization { tick, .. }
131            | Self::VisibilityGateCrossing { tick, .. } => *tick,
132        }
133    }
134
135    /// Stable operation id associated with this WAL entry.
136    #[must_use]
137    pub fn operation_id(&self) -> &str {
138        match self {
139            Self::Escalation { operation_id, .. }
140            | Self::Finalization { operation_id, .. }
141            | Self::VisibilityGateCrossing { operation_id, .. } => operation_id,
142            Self::EvidenceProduced { evidence, .. } => &evidence.operation_id,
143        }
144    }
145
146    /// Deterministic identity string for testing and replay-oriented analysis.
147    #[must_use]
148    pub fn stable_identity(&self) -> String {
149        match self {
150            Self::Escalation {
151                operation_id,
152                previous_level,
153                new_level,
154                evidence_id,
155                tick,
156            } => format!(
157                "escalation:{operation_id}:{previous_level:?}:{new_level:?}:{}:{tick}",
158                evidence_id.as_deref().unwrap_or("-")
159            ),
160            Self::EvidenceProduced { evidence, tick } => format!(
161                "evidence:{}:{}:{:?}:{tick}",
162                evidence.operation_id, evidence.evidence_id, evidence.level
163            ),
164            Self::Finalization {
165                operation_id,
166                outcome,
167                materialization_proof_id,
168                canonical_handle_id,
169                tick,
170            } => format!(
171                "finalization:{operation_id}:{outcome:?}:{}:{}:{tick}",
172                materialization_proof_id.as_deref().unwrap_or("-"),
173                canonical_handle_id.as_deref().unwrap_or("-")
174            ),
175            Self::VisibilityGateCrossing {
176                operation_id,
177                downstream_coroutine_id,
178                gate_level,
179                tick,
180            } => format!("gate:{operation_id}:{downstream_coroutine_id}:{gate_level:?}:{tick}"),
181        }
182    }
183}
184
185impl AgreementWalArtifact {
186    /// Return the WAL suffix strictly after `tick`.
187    #[must_use]
188    pub fn read_since(&self, tick: u64) -> Vec<AgreementWalEntry> {
189        self.entries
190            .iter()
191            .filter(|entry| entry.tick() > tick)
192            .cloned()
193            .collect()
194    }
195
196    /// Validate monotonic escalation ordering for each operation.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if an escalation entry regresses agreement level or
201    /// appears out of order for one operation.
202    pub fn validate_monotonic_escalations(&self) -> Result<(), String> {
203        let mut last_levels = BTreeMap::<String, AgreementLevel>::new();
204        for entry in &self.entries {
205            let AgreementWalEntry::Escalation {
206                operation_id,
207                previous_level,
208                new_level,
209                ..
210            } = entry
211            else {
212                continue;
213            };
214            if new_level.rank() < previous_level.rank() {
215                return Err(format!(
216                    "agreement WAL regression for `{operation_id}`: {previous_level:?} -> {new_level:?}"
217                ));
218            }
219            if let Some(last) = last_levels.get(operation_id) {
220                if previous_level.rank() < last.rank() || new_level.rank() < last.rank() {
221                    return Err(format!(
222                        "agreement WAL reordered or regressed for `{operation_id}`: last={last:?}, entry={previous_level:?}->{new_level:?}"
223                    ));
224                }
225            }
226            last_levels.insert(operation_id.clone(), *new_level);
227        }
228        Ok(())
229    }
230}
231
232/// Narrow append/query contract for durable agreement WALs.
233pub trait AgreementWal {
234    /// Append one WAL entry.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the backend cannot persist the entry or if the
239    /// resulting WAL violates monotonic escalation ordering.
240    fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String>;
241
242    /// Read entries strictly after `tick`.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the backend cannot load the WAL.
247    fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String>;
248
249    /// Load the full WAL artifact.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the backend cannot load the WAL.
254    fn load(&self) -> Result<AgreementWalArtifact, String>;
255}
256
257/// In-memory agreement WAL backend useful for focused tests and
258/// deterministic in-process integrations.
259#[derive(Debug, Clone, Default)]
260pub struct InMemoryAgreementWal {
261    artifact: AgreementWalArtifact,
262}
263
264impl InMemoryAgreementWal {
265    /// Create one empty in-memory agreement WAL.
266    #[must_use]
267    pub fn new() -> Self {
268        Self::default()
269    }
270}
271
272impl AgreementWal for InMemoryAgreementWal {
273    fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String> {
274        self.artifact.entries.push(entry);
275        self.artifact.validate_monotonic_escalations()
276    }
277
278    fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String> {
279        Ok(self.artifact.read_since(tick))
280    }
281
282    fn load(&self) -> Result<AgreementWalArtifact, String> {
283        Ok(self.artifact.clone())
284    }
285}
286
287/// File-backed agreement WAL backend for the initial local durability
288/// rollout.
289#[derive(Debug, Clone)]
290pub struct FileAgreementWal {
291    path: PathBuf,
292}
293
294impl FileAgreementWal {
295    /// Create one file-backed WAL rooted at `path`.
296    #[must_use]
297    pub fn new(path: impl Into<PathBuf>) -> Self {
298        Self { path: path.into() }
299    }
300
301    fn load_artifact(&self) -> Result<AgreementWalArtifact, String> {
302        if !self.path.exists() {
303            return Ok(AgreementWalArtifact::default());
304        }
305        PersistedDurabilityArtifact::from_path(&self.path)?.into_agreement_wal()
306    }
307
308    fn store_artifact(&self, artifact: &AgreementWalArtifact) -> Result<(), String> {
309        artifact.validate_monotonic_escalations()?;
310        PersistedDurabilityArtifact::agreement_wal(artifact.clone()).write_to_path(&self.path)
311    }
312}
313
314impl AgreementWal for FileAgreementWal {
315    fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String> {
316        let mut artifact = self.load_artifact()?;
317        artifact.entries.push(entry);
318        self.store_artifact(&artifact)
319    }
320
321    fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String> {
322        Ok(self.load_artifact()?.read_since(tick))
323    }
324
325    fn load(&self) -> Result<AgreementWalArtifact, String> {
326        self.load_artifact()
327    }
328}
329
330/// One persisted effect outcome keyed by semantic evidence id.
331#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
332pub struct EvidenceOutcomeCacheEntry {
333    /// Semantic evidence id used as the idempotency key.
334    pub evidence_id: String,
335    /// Effect interface name associated with the cached outcome.
336    pub interface_name: String,
337    /// Effect operation name associated with the cached outcome.
338    pub operation_name: String,
339    /// Cached typed effect outcome.
340    pub outcome: EffectOutcome,
341}
342
343/// Typed evidence outcome cache artifact.
344#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
345pub struct EvidenceOutcomeCacheArtifact {
346    /// Persisted evidence-scoped effect outcomes.
347    pub entries: Vec<EvidenceOutcomeCacheEntry>,
348}
349
350impl EvidenceOutcomeCacheArtifact {
351    /// Return one cached effect outcome by evidence id.
352    #[must_use]
353    pub fn get(&self, evidence_id: &str) -> Option<&EvidenceOutcomeCacheEntry> {
354        self.entries
355            .iter()
356            .find(|entry| entry.evidence_id == evidence_id)
357    }
358}
359
360/// Narrow append/query contract for persisted evidence outcome caches.
361pub trait EvidenceOutcomeCache {
362    /// Load one cached outcome by evidence id.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if the backend cannot load the cache.
367    fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String>;
368
369    /// Persist one cached outcome.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the backend cannot persist the cache entry.
374    fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String>;
375
376    /// Load the full cache artifact.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if the backend cannot load the cache.
381    fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String>;
382}
383
384/// In-memory evidence outcome cache backend.
385#[derive(Debug, Clone, Default)]
386pub struct InMemoryEvidenceOutcomeCache {
387    artifact: EvidenceOutcomeCacheArtifact,
388}
389
390impl InMemoryEvidenceOutcomeCache {
391    /// Create one empty in-memory cache.
392    #[must_use]
393    pub fn new() -> Self {
394        Self::default()
395    }
396}
397
398impl EvidenceOutcomeCache for InMemoryEvidenceOutcomeCache {
399    fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String> {
400        Ok(self.artifact.get(evidence_id).cloned())
401    }
402
403    fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String> {
404        self.artifact
405            .entries
406            .retain(|candidate| candidate.evidence_id != entry.evidence_id);
407        self.artifact.entries.push(entry);
408        Ok(())
409    }
410
411    fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
412        Ok(self.artifact.clone())
413    }
414}
415
416/// File-backed evidence outcome cache backend for local durable execution.
417#[derive(Debug, Clone)]
418pub struct FileEvidenceOutcomeCache {
419    path: PathBuf,
420}
421
422impl FileEvidenceOutcomeCache {
423    /// Create one file-backed cache rooted at `path`.
424    #[must_use]
425    pub fn new(path: impl Into<PathBuf>) -> Self {
426        Self { path: path.into() }
427    }
428
429    fn load_artifact(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
430        if !self.path.exists() {
431            return Ok(EvidenceOutcomeCacheArtifact::default());
432        }
433        PersistedDurabilityArtifact::from_path(&self.path)?.into_evidence_outcome_cache()
434    }
435
436    fn store_artifact(&self, artifact: &EvidenceOutcomeCacheArtifact) -> Result<(), String> {
437        PersistedDurabilityArtifact::evidence_outcome_cache(artifact.clone())
438            .write_to_path(&self.path)
439    }
440}
441
442impl EvidenceOutcomeCache for FileEvidenceOutcomeCache {
443    fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String> {
444        Ok(self.load_artifact()?.get(evidence_id).cloned())
445    }
446
447    fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String> {
448        let mut artifact = self.load_artifact()?;
449        artifact
450            .entries
451            .retain(|candidate| candidate.evidence_id != entry.evidence_id);
452        artifact.entries.push(entry);
453        self.store_artifact(&artifact)
454    }
455
456    fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
457        self.load_artifact()
458    }
459}
460
461/// Request-to-evidence-id resolver used by evidence-scoped persistence.
462pub trait EvidenceIdResolver: Send + Sync {
463    /// Resolve the evidence id for one request when the request should be
464    /// cached durably.
465    fn evidence_id_for_request(&self, request: &EffectRequest) -> Option<String>;
466}
467
468impl<F> EvidenceIdResolver for F
469where
470    F: Fn(&EffectRequest) -> Option<String> + Send + Sync,
471{
472    fn evidence_id_for_request(&self, request: &EffectRequest) -> Option<String> {
473        self(request)
474    }
475}
476
477/// Effect-handler wrapper that persists agreement-relevant outcomes keyed by
478/// semantic evidence id.
479pub struct EvidencePersistenceHandler<'a, C, R>
480where
481    C: EvidenceOutcomeCache,
482    R: EvidenceIdResolver,
483{
484    inner: &'a dyn EffectHandler,
485    cache: Mutex<C>,
486    resolver: R,
487}
488
489impl<'a, C, R> EvidencePersistenceHandler<'a, C, R>
490where
491    C: EvidenceOutcomeCache,
492    R: EvidenceIdResolver,
493{
494    /// Create one evidence-persistence wrapper around an inner handler.
495    #[must_use]
496    pub fn new(inner: &'a dyn EffectHandler, cache: C, resolver: R) -> Self {
497        Self {
498            inner,
499            cache: Mutex::new(cache),
500            resolver,
501        }
502    }
503
504    /// Load one cached outcome by evidence id.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if the cache cannot be loaded.
509    pub fn cached_outcome(&self, evidence_id: &str) -> Result<Option<EffectOutcome>, String> {
510        let cache = self.cache.lock();
511        Ok(cache.get(evidence_id)?.map(|entry| entry.outcome))
512    }
513
514    /// Load the full typed cache artifact.
515    ///
516    /// # Errors
517    ///
518    /// Returns an error if the cache cannot be loaded.
519    pub fn cache_snapshot(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
520        let cache = self.cache.lock();
521        cache.load()
522    }
523}
524
525impl<C, R> EffectHandler for EvidencePersistenceHandler<'_, C, R>
526where
527    C: EvidenceOutcomeCache + Send,
528    R: EvidenceIdResolver,
529{
530    fn handler_identity(&self) -> String {
531        format!("evidence_persistence<{}>", self.inner.handler_identity())
532    }
533
534    fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
535        let evidence_id = self.resolver.evidence_id_for_request(&request);
536        let interface_name = request.metadata.interface_name.clone();
537        let operation_name = request.metadata.operation_name.clone();
538
539        if let Some(evidence_id) = evidence_id.clone() {
540            let cache = self.cache.lock();
541            match cache.get(&evidence_id) {
542                Ok(Some(entry)) => return entry.outcome,
543                Ok(None) => {}
544                Err(err) => {
545                    return EffectOutcome::failure(EffectFailure::unavailable(format!(
546                        "load evidence outcome cache `{evidence_id}`: {err}"
547                    )));
548                }
549            }
550        }
551
552        let outcome = self.inner.handle_effect(request);
553        let Some(evidence_id) = evidence_id else {
554            return outcome;
555        };
556
557        let entry = EvidenceOutcomeCacheEntry {
558            evidence_id: evidence_id.clone(),
559            interface_name,
560            operation_name,
561            outcome: outcome.clone(),
562        };
563        let mut cache = self.cache.lock();
564        if let Err(err) = cache.put(entry) {
565            return EffectOutcome::failure(EffectFailure::unavailable(format!(
566                "persist evidence outcome `{evidence_id}`: {err}"
567            )));
568        }
569        outcome
570    }
571
572    fn handle_send(
573        &self,
574        role: &str,
575        partner: &str,
576        label: &str,
577        state: &[crate::coroutine::Value],
578    ) -> crate::effect::EffectResult<crate::coroutine::Value> {
579        self.inner.handle_send(role, partner, label, state)
580    }
581
582    fn send_decision(
583        &self,
584        input: crate::effect::SendDecisionInput<'_>,
585    ) -> crate::effect::EffectResult<crate::effect::SendDecision> {
586        self.inner.send_decision(input)
587    }
588
589    fn handle_recv(
590        &self,
591        role: &str,
592        partner: &str,
593        label: &str,
594        state: &mut Vec<crate::coroutine::Value>,
595        payload: &crate::coroutine::Value,
596    ) -> crate::effect::EffectResult<()> {
597        self.inner.handle_recv(role, partner, label, state, payload)
598    }
599
600    fn handle_choose(
601        &self,
602        role: &str,
603        partner: &str,
604        labels: &[String],
605        state: &[crate::coroutine::Value],
606    ) -> crate::effect::EffectResult<String> {
607        self.inner.handle_choose(role, partner, labels, state)
608    }
609
610    fn step(
611        &self,
612        role: &str,
613        state: &mut Vec<crate::coroutine::Value>,
614    ) -> crate::effect::EffectResult<()> {
615        self.inner.step(role, state)
616    }
617}
618
619/// Test and integration behavior for the internal `wal_sync` effect.
620#[derive(Debug, Clone, Default, PartialEq, Eq)]
621pub enum WalSyncMode {
622    /// Persist entries and report immediate success.
623    #[default]
624    Immediate,
625    /// Block at the visibility gate without persisting.
626    Blocked,
627    /// Fail the visibility gate without persisting.
628    Failure {
629        /// Stable error surfaced by the internal effect.
630        message: String,
631    },
632}
633
634/// Effect-handler wrapper that owns one durable agreement WAL and services the
635/// internal `wal_sync` effect.
636pub struct AgreementWalHandler<'a, W>
637where
638    W: AgreementWal,
639{
640    inner: &'a dyn EffectHandler,
641    wal: Mutex<W>,
642    sync_mode: WalSyncMode,
643}
644
645impl<'a, W> AgreementWalHandler<'a, W>
646where
647    W: AgreementWal,
648{
649    /// Create one agreement-WAL wrapper around an inner handler.
650    #[must_use]
651    pub fn new(inner: &'a dyn EffectHandler, wal: W) -> Self {
652        Self {
653            inner,
654            wal: Mutex::new(wal),
655            sync_mode: WalSyncMode::Immediate,
656        }
657    }
658
659    /// Create one agreement-WAL wrapper with an explicit sync mode.
660    #[must_use]
661    pub fn with_sync_mode(inner: &'a dyn EffectHandler, wal: W, sync_mode: WalSyncMode) -> Self {
662        Self {
663            inner,
664            wal: Mutex::new(wal),
665            sync_mode,
666        }
667    }
668
669    /// Load the current typed WAL snapshot.
670    ///
671    /// # Errors
672    ///
673    /// Returns an error if the WAL backend cannot be loaded.
674    pub fn wal_snapshot(&self) -> Result<AgreementWalArtifact, String> {
675        let wal = self.wal.lock();
676        wal.load()
677    }
678
679    fn build_entries(
680        &self,
681        wal: &W,
682        sync: &WalSyncRequest,
683    ) -> Result<Vec<AgreementWalEntry>, String> {
684        let existing = wal.load()?;
685        let existing_ids: std::collections::BTreeSet<_> = existing
686            .entries
687            .iter()
688            .map(AgreementWalEntry::stable_identity)
689            .collect();
690        let mut entries = Vec::new();
691
692        for evidence in sync
693            .agreement_evidence
694            .iter()
695            .filter(|evidence| evidence.operation_id == sync.operation_id)
696        {
697            let entry = AgreementWalEntry::EvidenceProduced {
698                evidence: evidence.clone(),
699                tick: sync.tick,
700            };
701            if !existing_ids.contains(&entry.stable_identity()) {
702                entries.push(entry);
703            }
704        }
705
706        if let Some(state) = &sync.agreement_state {
707            let previous_level = existing
708                .entries
709                .iter()
710                .filter_map(|entry| match entry {
711                    AgreementWalEntry::Escalation {
712                        operation_id,
713                        new_level,
714                        ..
715                    } if operation_id == &sync.operation_id => Some(*new_level),
716                    _ => None,
717                })
718                .max_by_key(|level| level.rank())
719                .unwrap_or(AgreementLevel::None);
720            if state.level.rank() > previous_level.rank() {
721                let entry = AgreementWalEntry::Escalation {
722                    operation_id: sync.operation_id.clone(),
723                    previous_level,
724                    new_level: state.level,
725                    evidence_id: state.evidence_ids.last().cloned(),
726                    tick: sync.tick,
727                };
728                if !existing_ids.contains(&entry.stable_identity()) {
729                    entries.push(entry);
730                }
731            }
732            if let Some(outcome) = state.finalization {
733                let entry = AgreementWalEntry::Finalization {
734                    operation_id: sync.operation_id.clone(),
735                    outcome,
736                    materialization_proof_id: state
737                        .evidence_ids
738                        .iter()
739                        .find(|evidence_id| evidence_id.contains("proof"))
740                        .cloned(),
741                    canonical_handle_id: None,
742                    tick: sync.tick,
743                };
744                if !existing_ids.contains(&entry.stable_identity()) {
745                    entries.push(entry);
746                }
747            }
748        }
749
750        let gate = AgreementWalEntry::VisibilityGateCrossing {
751            operation_id: sync.operation_id.clone(),
752            downstream_coroutine_id: sync.downstream_coroutine_id.clone(),
753            gate_level: sync.gate_level,
754            tick: sync.tick,
755        };
756        if !existing_ids.contains(&gate.stable_identity()) {
757            entries.push(gate);
758        }
759
760        Ok(entries)
761    }
762}
763
764impl<W> EffectHandler for AgreementWalHandler<'_, W>
765where
766    W: AgreementWal + Send,
767{
768    fn handler_identity(&self) -> String {
769        format!("agreement_wal<{}>", self.inner.handler_identity())
770    }
771
772    fn supports_wal_sync(&self) -> bool {
773        true
774    }
775
776    fn wal_sync(&self, sync: &WalSyncRequest) -> EffectResult<()> {
777        match &self.sync_mode {
778            WalSyncMode::Immediate => {
779                let mut wal = self.wal.lock();
780                let entries = match self.build_entries(&*wal, sync) {
781                    Ok(entries) => entries,
782                    Err(err) => {
783                        return EffectResult::failure(EffectFailure::unavailable(format!(
784                            "load agreement WAL for `{}`: {err}",
785                            sync.operation_id
786                        )));
787                    }
788                };
789                for entry in entries {
790                    if let Err(err) = wal.append(entry) {
791                        return EffectResult::failure(EffectFailure::unavailable(format!(
792                            "persist agreement WAL for `{}`: {err}",
793                            sync.operation_id
794                        )));
795                    }
796                }
797                EffectResult::success(())
798            }
799            WalSyncMode::Blocked => EffectResult::Blocked,
800            WalSyncMode::Failure { message } => {
801                EffectResult::failure(EffectFailure::unavailable(message.clone()))
802            }
803        }
804    }
805
806    fn handle_send(
807        &self,
808        role: &str,
809        partner: &str,
810        label: &str,
811        state: &[crate::coroutine::Value],
812    ) -> crate::effect::EffectResult<crate::coroutine::Value> {
813        self.inner.handle_send(role, partner, label, state)
814    }
815
816    fn send_decision(
817        &self,
818        input: crate::effect::SendDecisionInput<'_>,
819    ) -> crate::effect::EffectResult<crate::effect::SendDecision> {
820        self.inner.send_decision(input)
821    }
822
823    fn handle_recv(
824        &self,
825        role: &str,
826        partner: &str,
827        label: &str,
828        state: &mut Vec<crate::coroutine::Value>,
829        payload: &crate::coroutine::Value,
830    ) -> crate::effect::EffectResult<()> {
831        self.inner.handle_recv(role, partner, label, state, payload)
832    }
833
834    fn handle_choose(
835        &self,
836        role: &str,
837        partner: &str,
838        labels: &[String],
839        state: &[crate::coroutine::Value],
840    ) -> crate::effect::EffectResult<String> {
841        self.inner.handle_choose(role, partner, labels, state)
842    }
843
844    fn step(
845        &self,
846        role: &str,
847        state: &mut Vec<crate::coroutine::Value>,
848    ) -> crate::effect::EffectResult<()> {
849        self.inner.step(role, state)
850    }
851}
852
853/// Typed recovery metadata derived from one checkpoint plus durable WAL
854/// suffix.
855#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
856pub struct DurableRecoveryMetadata {
857    /// Checkpoint tick used as the recovery base.
858    pub checkpoint_tick: u64,
859    /// First tick in the WAL suffix applied on top of the checkpoint.
860    #[serde(default)]
861    pub wal_tail_start_tick: Option<u64>,
862    /// Highest tick observed in the durable suffix.
863    #[serde(default)]
864    pub highest_recovered_tick: Option<u64>,
865    /// Operation ids whose durable state was replayed during recovery.
866    #[serde(default)]
867    pub resumed_operation_ids: Vec<String>,
868    /// Operation ids that were already terminal at recovery time.
869    #[serde(default)]
870    pub terminal_operation_ids: Vec<String>,
871    /// Cached evidence ids reused during recovery.
872    #[serde(default)]
873    pub cached_evidence_ids: Vec<String>,
874}
875
876/// Resume action chosen for one operation during durable recovery.
877#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
878#[serde(rename_all = "snake_case")]
879pub enum DurableRecoveryAction {
880    /// No durable transition exists beyond the checkpoint.
881    ReexecuteFromScratch,
882    /// Replay from the last persisted evidence boundary.
883    ResumeFromEvidenceBoundary,
884    /// Reuse the finalized durable result without re-executing.
885    ReuseFinalized,
886    /// Preserve the terminal rejected/aborted/timed-out outcome.
887    PreserveTerminal,
888}
889
890/// Typed recovery decision for one durable operation.
891#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
892pub struct DurableRecoveryDecision {
893    /// Operation being classified for recovery.
894    pub operation_id: String,
895    /// Highest agreement level observed in the WAL suffix.
896    pub level: AgreementLevel,
897    /// Terminal finalization outcome when one exists.
898    #[serde(default)]
899    pub finalization: Option<FinalizationOutcome>,
900    /// Recovery action derived from the durable suffix.
901    pub action: DurableRecoveryAction,
902    /// Evidence ids available in the persisted evidence cache.
903    #[serde(default)]
904    pub cached_evidence_ids: Vec<String>,
905    /// Whether the WAL suffix proves a visibility-gate crossing.
906    #[serde(default)]
907    pub gate_crossed: bool,
908}
909
910/// Typed durable recovery state derived from one checkpoint plus WAL suffix.
911#[derive(Debug, Serialize, Deserialize)]
912pub struct DurableRecoveryPlan {
913    /// Decoded machine state at the checkpoint boundary.
914    pub machine: crate::ProtocolMachine,
915    /// Typed summary metadata for the durable suffix.
916    pub metadata: DurableRecoveryMetadata,
917    /// WAL entries strictly after the checkpoint tick.
918    pub wal_suffix: Vec<AgreementWalEntry>,
919    /// Evidence cache snapshot available during recovery.
920    pub evidence_cache: EvidenceOutcomeCacheArtifact,
921    /// Per-operation recovery decisions derived from the suffix.
922    pub decisions: Vec<DurableRecoveryDecision>,
923}
924
925/// Kind-tagged persisted durability payload family.
926#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
927#[serde(rename_all = "snake_case", tag = "kind", content = "payload")]
928pub enum PersistedDurabilityPayload {
929    /// Agreement WAL payload.
930    AgreementWal(AgreementWalArtifact),
931    /// Evidence outcome cache payload.
932    EvidenceOutcomeCache(EvidenceOutcomeCacheArtifact),
933    /// Recovery metadata payload.
934    RecoveryMetadata(DurableRecoveryMetadata),
935}
936
937/// Typed persisted durability wrapper for on-disk durable execution artifacts.
938#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
939pub struct PersistedDurabilityArtifact {
940    /// Stable schema version for this persisted artifact family.
941    pub schema_version: String,
942    /// Concrete durable payload.
943    pub payload: PersistedDurabilityPayload,
944}
945
946impl PersistedDurabilityArtifact {
947    /// Wrap one agreement-WAL artifact for persistence.
948    #[must_use]
949    pub fn agreement_wal(wal: AgreementWalArtifact) -> Self {
950        Self {
951            schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
952            payload: PersistedDurabilityPayload::AgreementWal(wal),
953        }
954    }
955
956    /// Wrap one evidence outcome cache artifact for persistence.
957    #[must_use]
958    pub fn evidence_outcome_cache(cache: EvidenceOutcomeCacheArtifact) -> Self {
959        Self {
960            schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
961            payload: PersistedDurabilityPayload::EvidenceOutcomeCache(cache),
962        }
963    }
964
965    /// Wrap one recovery metadata artifact for persistence.
966    #[must_use]
967    pub fn recovery_metadata(metadata: DurableRecoveryMetadata) -> Self {
968        Self {
969            schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
970            payload: PersistedDurabilityPayload::RecoveryMetadata(metadata),
971        }
972    }
973
974    /// Decode one persisted durability artifact from CBOR bytes.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the bytes are invalid CBOR, the schema version is
979    /// unsupported, or the payload does not decode.
980    pub fn from_slice(bytes: &[u8]) -> Result<Self, String> {
981        let artifact: Self = decode_cbor(bytes, "decode persisted durability artifact")?;
982        if artifact.schema_version != PERSISTED_DURABILITY_SCHEMA_VERSION {
983            return Err(format!(
984                "unsupported persisted durability schema version `{}`",
985                artifact.schema_version
986            ));
987        }
988        Ok(artifact)
989    }
990
991    /// Load one persisted durability artifact from disk.
992    ///
993    /// # Errors
994    ///
995    /// Returns an error if the file cannot be read or the artifact cannot be
996    /// decoded.
997    pub fn from_path(path: impl AsRef<Path>) -> Result<Self, String> {
998        let path = path.as_ref();
999        let bytes = std::fs::read(path).map_err(|err| {
1000            format!(
1001                "read persisted durability artifact {}: {err}",
1002                path.display()
1003            )
1004        })?;
1005        Self::from_slice(&bytes)
1006    }
1007
1008    /// Encode the artifact as CBOR bytes.
1009    ///
1010    /// # Errors
1011    ///
1012    /// Returns an error when serialization fails.
1013    pub fn to_cbor(&self) -> Result<Vec<u8>, String> {
1014        encode_cbor(self, "encode persisted durability artifact")
1015    }
1016
1017    /// Persist the artifact to disk as CBOR.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if serialization or file writing fails.
1022    pub fn write_to_path(&self, path: impl AsRef<Path>) -> Result<(), String> {
1023        let path = path.as_ref();
1024        let bytes = self.to_cbor()?;
1025        std::fs::write(path, bytes).map_err(|err| {
1026            format!(
1027                "write persisted durability artifact {}: {err}",
1028                path.display()
1029            )
1030        })
1031    }
1032
1033    /// Borrow the agreement-WAL payload when this artifact wraps one.
1034    #[must_use]
1035    pub fn agreement_wal_artifact(&self) -> Option<&AgreementWalArtifact> {
1036        match &self.payload {
1037            PersistedDurabilityPayload::AgreementWal(wal) => Some(wal),
1038            PersistedDurabilityPayload::EvidenceOutcomeCache(_)
1039            | PersistedDurabilityPayload::RecoveryMetadata(_) => None,
1040        }
1041    }
1042
1043    /// Consume the wrapper into one agreement-WAL artifact.
1044    ///
1045    /// # Errors
1046    ///
1047    /// Returns an error if this persisted artifact is not an agreement-WAL
1048    /// payload.
1049    pub fn into_agreement_wal(self) -> Result<AgreementWalArtifact, String> {
1050        match self.payload {
1051            PersistedDurabilityPayload::AgreementWal(wal) => Ok(wal),
1052            PersistedDurabilityPayload::EvidenceOutcomeCache(_) => Err(
1053                "persisted durability artifact contains an evidence outcome cache payload, not an agreement WAL"
1054                    .to_string(),
1055            ),
1056            PersistedDurabilityPayload::RecoveryMetadata(_) => Err(
1057                "persisted durability artifact contains recovery metadata, not an agreement WAL"
1058                    .to_string(),
1059            ),
1060        }
1061    }
1062
1063    /// Borrow the evidence-outcome-cache payload when this artifact wraps one.
1064    #[must_use]
1065    pub fn evidence_outcome_cache_artifact(&self) -> Option<&EvidenceOutcomeCacheArtifact> {
1066        match &self.payload {
1067            PersistedDurabilityPayload::EvidenceOutcomeCache(cache) => Some(cache),
1068            PersistedDurabilityPayload::AgreementWal(_)
1069            | PersistedDurabilityPayload::RecoveryMetadata(_) => None,
1070        }
1071    }
1072
1073    /// Consume the wrapper into one evidence outcome cache artifact.
1074    ///
1075    /// # Errors
1076    ///
1077    /// Returns an error if this persisted artifact is not an evidence outcome
1078    /// cache payload.
1079    pub fn into_evidence_outcome_cache(self) -> Result<EvidenceOutcomeCacheArtifact, String> {
1080        match self.payload {
1081            PersistedDurabilityPayload::EvidenceOutcomeCache(cache) => Ok(cache),
1082            PersistedDurabilityPayload::AgreementWal(_) => Err(
1083                "persisted durability artifact contains an agreement WAL payload, not an evidence outcome cache"
1084                    .to_string(),
1085            ),
1086            PersistedDurabilityPayload::RecoveryMetadata(_) => Err(
1087                "persisted durability artifact contains recovery metadata, not an evidence outcome cache"
1088                    .to_string(),
1089            ),
1090        }
1091    }
1092}
1093
1094impl DurableRecoveryPlan {
1095    /// Build a typed recovery plan from one checkpointed machine and durable suffix.
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns an error if the WAL suffix violates monotonic escalation ordering.
1100    pub fn from_checkpoint(
1101        checkpoint_tick: u64,
1102        machine: crate::ProtocolMachine,
1103        wal: &AgreementWalArtifact,
1104        evidence_cache: EvidenceOutcomeCacheArtifact,
1105    ) -> Result<Self, String> {
1106        wal.validate_monotonic_escalations()?;
1107        let wal_suffix = wal.read_since(checkpoint_tick);
1108        let metadata = DurableRecoveryMetadata {
1109            checkpoint_tick,
1110            wal_tail_start_tick: wal_suffix.first().map(AgreementWalEntry::tick),
1111            highest_recovered_tick: wal_suffix.last().map(AgreementWalEntry::tick),
1112            resumed_operation_ids: Vec::new(),
1113            terminal_operation_ids: Vec::new(),
1114            cached_evidence_ids: evidence_cache
1115                .entries
1116                .iter()
1117                .map(|entry| entry.evidence_id.clone())
1118                .collect(),
1119        };
1120        let mut plan = Self {
1121            machine,
1122            metadata,
1123            wal_suffix,
1124            evidence_cache,
1125            decisions: Vec::new(),
1126        };
1127        plan.decisions = plan.build_decisions();
1128        plan.metadata.resumed_operation_ids = plan
1129            .decisions
1130            .iter()
1131            .filter(|decision| {
1132                matches!(
1133                    decision.action,
1134                    DurableRecoveryAction::ReexecuteFromScratch
1135                        | DurableRecoveryAction::ResumeFromEvidenceBoundary
1136                )
1137            })
1138            .map(|decision| decision.operation_id.clone())
1139            .collect();
1140        plan.metadata.terminal_operation_ids = plan
1141            .decisions
1142            .iter()
1143            .filter(|decision| {
1144                matches!(
1145                    decision.action,
1146                    DurableRecoveryAction::ReuseFinalized | DurableRecoveryAction::PreserveTerminal
1147                )
1148            })
1149            .map(|decision| decision.operation_id.clone())
1150            .collect();
1151        Ok(plan)
1152    }
1153
1154    fn build_decisions(&self) -> Vec<DurableRecoveryDecision> {
1155        let mut operation_ids = std::collections::BTreeSet::new();
1156        for entry in &self.wal_suffix {
1157            operation_ids.insert(entry.operation_id().to_string());
1158        }
1159
1160        operation_ids
1161            .into_iter()
1162            .map(|operation_id| {
1163                let mut level = AgreementLevel::None;
1164                let mut finalization = None;
1165                let mut gate_crossed = false;
1166                let mut evidence_ids = Vec::new();
1167
1168                for entry in self
1169                    .wal_suffix
1170                    .iter()
1171                    .filter(|entry| entry.operation_id() == operation_id)
1172                {
1173                    match entry {
1174                        AgreementWalEntry::Escalation { new_level, .. } => {
1175                            if new_level.rank() > level.rank() {
1176                                level = *new_level;
1177                            }
1178                        }
1179                        AgreementWalEntry::EvidenceProduced { evidence, .. } => {
1180                            evidence_ids.push(evidence.evidence_id.clone());
1181                            if evidence.level.rank() > level.rank() {
1182                                level = evidence.level;
1183                            }
1184                        }
1185                        AgreementWalEntry::Finalization { outcome, .. } => {
1186                            finalization = Some(*outcome);
1187                            if matches!(outcome, FinalizationOutcome::Finalized) {
1188                                level = AgreementLevel::Finalized;
1189                            }
1190                        }
1191                        AgreementWalEntry::VisibilityGateCrossing { .. } => {
1192                            gate_crossed = true;
1193                        }
1194                    }
1195                }
1196
1197                let cached_evidence_ids = self
1198                    .evidence_cache
1199                    .entries
1200                    .iter()
1201                    .filter(|entry| {
1202                        evidence_ids
1203                            .iter()
1204                            .any(|evidence_id| evidence_id == &entry.evidence_id)
1205                    })
1206                    .map(|entry| entry.evidence_id.clone())
1207                    .collect::<Vec<_>>();
1208                let action = match finalization {
1209                    Some(FinalizationOutcome::Finalized) => DurableRecoveryAction::ReuseFinalized,
1210                    Some(
1211                        FinalizationOutcome::Aborted
1212                        | FinalizationOutcome::Rejected
1213                        | FinalizationOutcome::TimedOut,
1214                    ) => DurableRecoveryAction::PreserveTerminal,
1215                    None if level.at_least(AgreementLevel::SoftSafe) => {
1216                        DurableRecoveryAction::ResumeFromEvidenceBoundary
1217                    }
1218                    None => DurableRecoveryAction::ReexecuteFromScratch,
1219                };
1220
1221                DurableRecoveryDecision {
1222                    operation_id,
1223                    level,
1224                    finalization,
1225                    action,
1226                    cached_evidence_ids,
1227                    gate_crossed,
1228                }
1229            })
1230            .collect()
1231    }
1232}