use std::collections::BTreeMap;
use std::io::Cursor;
use std::path::{Path, PathBuf};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use crate::effect::{EffectFailure, EffectHandler, EffectOutcome, EffectRequest, EffectResult};
use crate::semantic_objects::{
AgreementEvidence, AgreementLevel, AgreementState, FinalizationOutcome,
};
pub const PERSISTED_DURABILITY_SCHEMA_VERSION: &str = "telltale.machine.durability.v1";
pub const MAX_PERSISTED_DURABILITY_BYTES: usize = 64 * 1024 * 1024;
fn decode_cbor<T>(bytes: &[u8], context: &str) -> Result<T, String>
where
T: for<'de> Deserialize<'de>,
{
if bytes.len() > MAX_PERSISTED_DURABILITY_BYTES {
return Err(format!(
"{context}: input is {} bytes, max is {MAX_PERSISTED_DURABILITY_BYTES}",
bytes.len()
));
}
ciborium::from_reader(Cursor::new(bytes)).map_err(|err| format!("{context}: {err}"))
}
fn encode_cbor<T>(value: &T, context: &str) -> Result<Vec<u8>, String>
where
T: Serialize,
{
let mut bytes = Vec::new();
ciborium::into_writer(value, &mut bytes).map_err(|err| format!("{context}: {err}"))?;
Ok(bytes)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case", tag = "kind")]
pub enum AgreementWalEntry {
Escalation {
operation_id: String,
previous_level: AgreementLevel,
new_level: AgreementLevel,
#[serde(default)]
evidence_id: Option<String>,
tick: u64,
},
EvidenceProduced {
evidence: AgreementEvidence,
tick: u64,
},
Finalization {
operation_id: String,
outcome: FinalizationOutcome,
#[serde(default)]
materialization_proof_id: Option<String>,
#[serde(default)]
canonical_handle_id: Option<String>,
tick: u64,
},
VisibilityGateCrossing {
operation_id: String,
downstream_coroutine_id: String,
gate_level: AgreementLevel,
tick: u64,
},
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgreementWalArtifact {
pub entries: Vec<AgreementWalEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WalSyncRequest {
pub operation_id: String,
pub downstream_coroutine_id: String,
pub gate_level: AgreementLevel,
#[serde(default)]
pub agreement_state: Option<AgreementState>,
#[serde(default)]
pub agreement_evidence: Vec<AgreementEvidence>,
pub tick: u64,
}
impl AgreementWalEntry {
#[must_use]
pub const fn tick(&self) -> u64 {
match self {
Self::Escalation { tick, .. }
| Self::EvidenceProduced { tick, .. }
| Self::Finalization { tick, .. }
| Self::VisibilityGateCrossing { tick, .. } => *tick,
}
}
#[must_use]
pub fn operation_id(&self) -> &str {
match self {
Self::Escalation { operation_id, .. }
| Self::Finalization { operation_id, .. }
| Self::VisibilityGateCrossing { operation_id, .. } => operation_id,
Self::EvidenceProduced { evidence, .. } => &evidence.operation_id,
}
}
#[must_use]
pub fn stable_identity(&self) -> String {
match self {
Self::Escalation {
operation_id,
previous_level,
new_level,
evidence_id,
tick,
} => format!(
"escalation:{operation_id}:{previous_level:?}:{new_level:?}:{}:{tick}",
evidence_id.as_deref().unwrap_or("-")
),
Self::EvidenceProduced { evidence, tick } => format!(
"evidence:{}:{}:{:?}:{tick}",
evidence.operation_id, evidence.evidence_id, evidence.level
),
Self::Finalization {
operation_id,
outcome,
materialization_proof_id,
canonical_handle_id,
tick,
} => format!(
"finalization:{operation_id}:{outcome:?}:{}:{}:{tick}",
materialization_proof_id.as_deref().unwrap_or("-"),
canonical_handle_id.as_deref().unwrap_or("-")
),
Self::VisibilityGateCrossing {
operation_id,
downstream_coroutine_id,
gate_level,
tick,
} => format!("gate:{operation_id}:{downstream_coroutine_id}:{gate_level:?}:{tick}"),
}
}
}
impl AgreementWalArtifact {
#[must_use]
pub fn read_since(&self, tick: u64) -> Vec<AgreementWalEntry> {
self.entries
.iter()
.filter(|entry| entry.tick() > tick)
.cloned()
.collect()
}
pub fn validate_monotonic_escalations(&self) -> Result<(), String> {
let mut last_levels = BTreeMap::<String, AgreementLevel>::new();
for entry in &self.entries {
let AgreementWalEntry::Escalation {
operation_id,
previous_level,
new_level,
..
} = entry
else {
continue;
};
if new_level.rank() < previous_level.rank() {
return Err(format!(
"agreement WAL regression for `{operation_id}`: {previous_level:?} -> {new_level:?}"
));
}
if let Some(last) = last_levels.get(operation_id) {
if previous_level.rank() < last.rank() || new_level.rank() < last.rank() {
return Err(format!(
"agreement WAL reordered or regressed for `{operation_id}`: last={last:?}, entry={previous_level:?}->{new_level:?}"
));
}
}
last_levels.insert(operation_id.clone(), *new_level);
}
Ok(())
}
}
pub trait AgreementWal {
fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String>;
fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String>;
fn load(&self) -> Result<AgreementWalArtifact, String>;
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryAgreementWal {
artifact: AgreementWalArtifact,
}
impl InMemoryAgreementWal {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl AgreementWal for InMemoryAgreementWal {
fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String> {
self.artifact.entries.push(entry);
self.artifact.validate_monotonic_escalations()
}
fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String> {
Ok(self.artifact.read_since(tick))
}
fn load(&self) -> Result<AgreementWalArtifact, String> {
Ok(self.artifact.clone())
}
}
#[derive(Debug, Clone)]
pub struct FileAgreementWal {
path: PathBuf,
}
impl FileAgreementWal {
#[must_use]
pub fn new(path: impl Into<PathBuf>) -> Self {
Self { path: path.into() }
}
fn load_artifact(&self) -> Result<AgreementWalArtifact, String> {
if !self.path.exists() {
return Ok(AgreementWalArtifact::default());
}
PersistedDurabilityArtifact::from_path(&self.path)?.into_agreement_wal()
}
fn store_artifact(&self, artifact: &AgreementWalArtifact) -> Result<(), String> {
artifact.validate_monotonic_escalations()?;
PersistedDurabilityArtifact::agreement_wal(artifact.clone()).write_to_path(&self.path)
}
}
impl AgreementWal for FileAgreementWal {
fn append(&mut self, entry: AgreementWalEntry) -> Result<(), String> {
let mut artifact = self.load_artifact()?;
artifact.entries.push(entry);
self.store_artifact(&artifact)
}
fn read_since(&self, tick: u64) -> Result<Vec<AgreementWalEntry>, String> {
Ok(self.load_artifact()?.read_since(tick))
}
fn load(&self) -> Result<AgreementWalArtifact, String> {
self.load_artifact()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct EvidenceOutcomeCacheEntry {
pub evidence_id: String,
pub interface_name: String,
pub operation_name: String,
pub outcome: EffectOutcome,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct EvidenceOutcomeCacheArtifact {
pub entries: Vec<EvidenceOutcomeCacheEntry>,
}
impl EvidenceOutcomeCacheArtifact {
#[must_use]
pub fn get(&self, evidence_id: &str) -> Option<&EvidenceOutcomeCacheEntry> {
self.entries
.iter()
.find(|entry| entry.evidence_id == evidence_id)
}
}
pub trait EvidenceOutcomeCache {
fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String>;
fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String>;
fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String>;
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryEvidenceOutcomeCache {
artifact: EvidenceOutcomeCacheArtifact,
}
impl InMemoryEvidenceOutcomeCache {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl EvidenceOutcomeCache for InMemoryEvidenceOutcomeCache {
fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String> {
Ok(self.artifact.get(evidence_id).cloned())
}
fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String> {
self.artifact
.entries
.retain(|candidate| candidate.evidence_id != entry.evidence_id);
self.artifact.entries.push(entry);
Ok(())
}
fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
Ok(self.artifact.clone())
}
}
#[derive(Debug, Clone)]
pub struct FileEvidenceOutcomeCache {
path: PathBuf,
}
impl FileEvidenceOutcomeCache {
#[must_use]
pub fn new(path: impl Into<PathBuf>) -> Self {
Self { path: path.into() }
}
fn load_artifact(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
if !self.path.exists() {
return Ok(EvidenceOutcomeCacheArtifact::default());
}
PersistedDurabilityArtifact::from_path(&self.path)?.into_evidence_outcome_cache()
}
fn store_artifact(&self, artifact: &EvidenceOutcomeCacheArtifact) -> Result<(), String> {
PersistedDurabilityArtifact::evidence_outcome_cache(artifact.clone())
.write_to_path(&self.path)
}
}
impl EvidenceOutcomeCache for FileEvidenceOutcomeCache {
fn get(&self, evidence_id: &str) -> Result<Option<EvidenceOutcomeCacheEntry>, String> {
Ok(self.load_artifact()?.get(evidence_id).cloned())
}
fn put(&mut self, entry: EvidenceOutcomeCacheEntry) -> Result<(), String> {
let mut artifact = self.load_artifact()?;
artifact
.entries
.retain(|candidate| candidate.evidence_id != entry.evidence_id);
artifact.entries.push(entry);
self.store_artifact(&artifact)
}
fn load(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
self.load_artifact()
}
}
pub trait EvidenceIdResolver: Send + Sync {
fn evidence_id_for_request(&self, request: &EffectRequest) -> Option<String>;
}
impl<F> EvidenceIdResolver for F
where
F: Fn(&EffectRequest) -> Option<String> + Send + Sync,
{
fn evidence_id_for_request(&self, request: &EffectRequest) -> Option<String> {
self(request)
}
}
pub struct EvidencePersistenceHandler<'a, C, R>
where
C: EvidenceOutcomeCache,
R: EvidenceIdResolver,
{
inner: &'a dyn EffectHandler,
cache: Mutex<C>,
resolver: R,
}
impl<'a, C, R> EvidencePersistenceHandler<'a, C, R>
where
C: EvidenceOutcomeCache,
R: EvidenceIdResolver,
{
#[must_use]
pub fn new(inner: &'a dyn EffectHandler, cache: C, resolver: R) -> Self {
Self {
inner,
cache: Mutex::new(cache),
resolver,
}
}
pub fn cached_outcome(&self, evidence_id: &str) -> Result<Option<EffectOutcome>, String> {
let cache = self.cache.lock();
Ok(cache.get(evidence_id)?.map(|entry| entry.outcome))
}
pub fn cache_snapshot(&self) -> Result<EvidenceOutcomeCacheArtifact, String> {
let cache = self.cache.lock();
cache.load()
}
}
impl<C, R> EffectHandler for EvidencePersistenceHandler<'_, C, R>
where
C: EvidenceOutcomeCache + Send,
R: EvidenceIdResolver,
{
fn handler_identity(&self) -> String {
format!("evidence_persistence<{}>", self.inner.handler_identity())
}
fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
let evidence_id = self.resolver.evidence_id_for_request(&request);
let interface_name = request.metadata.interface_name.clone();
let operation_name = request.metadata.operation_name.clone();
if let Some(evidence_id) = evidence_id.clone() {
let cache = self.cache.lock();
match cache.get(&evidence_id) {
Ok(Some(entry)) => return entry.outcome,
Ok(None) => {}
Err(err) => {
return EffectOutcome::failure(EffectFailure::unavailable(format!(
"load evidence outcome cache `{evidence_id}`: {err}"
)));
}
}
}
let outcome = self.inner.handle_effect(request);
let Some(evidence_id) = evidence_id else {
return outcome;
};
let entry = EvidenceOutcomeCacheEntry {
evidence_id: evidence_id.clone(),
interface_name,
operation_name,
outcome: outcome.clone(),
};
let mut cache = self.cache.lock();
if let Err(err) = cache.put(entry) {
return EffectOutcome::failure(EffectFailure::unavailable(format!(
"persist evidence outcome `{evidence_id}`: {err}"
)));
}
outcome
}
fn handle_send(
&self,
role: &str,
partner: &str,
label: &str,
state: &[crate::coroutine::Value],
) -> crate::effect::EffectResult<crate::coroutine::Value> {
self.inner.handle_send(role, partner, label, state)
}
fn send_decision(
&self,
input: crate::effect::SendDecisionInput<'_>,
) -> crate::effect::EffectResult<crate::effect::SendDecision> {
self.inner.send_decision(input)
}
fn handle_recv(
&self,
role: &str,
partner: &str,
label: &str,
state: &mut Vec<crate::coroutine::Value>,
payload: &crate::coroutine::Value,
) -> crate::effect::EffectResult<()> {
self.inner.handle_recv(role, partner, label, state, payload)
}
fn handle_choose(
&self,
role: &str,
partner: &str,
labels: &[String],
state: &[crate::coroutine::Value],
) -> crate::effect::EffectResult<String> {
self.inner.handle_choose(role, partner, labels, state)
}
fn step(
&self,
role: &str,
state: &mut Vec<crate::coroutine::Value>,
) -> crate::effect::EffectResult<()> {
self.inner.step(role, state)
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub enum WalSyncMode {
#[default]
Immediate,
Blocked,
Failure {
message: String,
},
}
pub struct AgreementWalHandler<'a, W>
where
W: AgreementWal,
{
inner: &'a dyn EffectHandler,
wal: Mutex<W>,
sync_mode: WalSyncMode,
}
impl<'a, W> AgreementWalHandler<'a, W>
where
W: AgreementWal,
{
#[must_use]
pub fn new(inner: &'a dyn EffectHandler, wal: W) -> Self {
Self {
inner,
wal: Mutex::new(wal),
sync_mode: WalSyncMode::Immediate,
}
}
#[must_use]
pub fn with_sync_mode(inner: &'a dyn EffectHandler, wal: W, sync_mode: WalSyncMode) -> Self {
Self {
inner,
wal: Mutex::new(wal),
sync_mode,
}
}
pub fn wal_snapshot(&self) -> Result<AgreementWalArtifact, String> {
let wal = self.wal.lock();
wal.load()
}
fn build_entries(
&self,
wal: &W,
sync: &WalSyncRequest,
) -> Result<Vec<AgreementWalEntry>, String> {
let existing = wal.load()?;
let existing_ids: std::collections::BTreeSet<_> = existing
.entries
.iter()
.map(AgreementWalEntry::stable_identity)
.collect();
let mut entries = Vec::new();
for evidence in sync
.agreement_evidence
.iter()
.filter(|evidence| evidence.operation_id == sync.operation_id)
{
let entry = AgreementWalEntry::EvidenceProduced {
evidence: evidence.clone(),
tick: sync.tick,
};
if !existing_ids.contains(&entry.stable_identity()) {
entries.push(entry);
}
}
if let Some(state) = &sync.agreement_state {
let previous_level = existing
.entries
.iter()
.filter_map(|entry| match entry {
AgreementWalEntry::Escalation {
operation_id,
new_level,
..
} if operation_id == &sync.operation_id => Some(*new_level),
_ => None,
})
.max_by_key(|level| level.rank())
.unwrap_or(AgreementLevel::None);
if state.level.rank() > previous_level.rank() {
let entry = AgreementWalEntry::Escalation {
operation_id: sync.operation_id.clone(),
previous_level,
new_level: state.level,
evidence_id: state.evidence_ids.last().cloned(),
tick: sync.tick,
};
if !existing_ids.contains(&entry.stable_identity()) {
entries.push(entry);
}
}
if let Some(outcome) = state.finalization {
let entry = AgreementWalEntry::Finalization {
operation_id: sync.operation_id.clone(),
outcome,
materialization_proof_id: state
.evidence_ids
.iter()
.find(|evidence_id| evidence_id.contains("proof"))
.cloned(),
canonical_handle_id: None,
tick: sync.tick,
};
if !existing_ids.contains(&entry.stable_identity()) {
entries.push(entry);
}
}
}
let gate = AgreementWalEntry::VisibilityGateCrossing {
operation_id: sync.operation_id.clone(),
downstream_coroutine_id: sync.downstream_coroutine_id.clone(),
gate_level: sync.gate_level,
tick: sync.tick,
};
if !existing_ids.contains(&gate.stable_identity()) {
entries.push(gate);
}
Ok(entries)
}
}
impl<W> EffectHandler for AgreementWalHandler<'_, W>
where
W: AgreementWal + Send,
{
fn handler_identity(&self) -> String {
format!("agreement_wal<{}>", self.inner.handler_identity())
}
fn supports_wal_sync(&self) -> bool {
true
}
fn wal_sync(&self, sync: &WalSyncRequest) -> EffectResult<()> {
match &self.sync_mode {
WalSyncMode::Immediate => {
let mut wal = self.wal.lock();
let entries = match self.build_entries(&*wal, sync) {
Ok(entries) => entries,
Err(err) => {
return EffectResult::failure(EffectFailure::unavailable(format!(
"load agreement WAL for `{}`: {err}",
sync.operation_id
)));
}
};
for entry in entries {
if let Err(err) = wal.append(entry) {
return EffectResult::failure(EffectFailure::unavailable(format!(
"persist agreement WAL for `{}`: {err}",
sync.operation_id
)));
}
}
EffectResult::success(())
}
WalSyncMode::Blocked => EffectResult::Blocked,
WalSyncMode::Failure { message } => {
EffectResult::failure(EffectFailure::unavailable(message.clone()))
}
}
}
fn handle_send(
&self,
role: &str,
partner: &str,
label: &str,
state: &[crate::coroutine::Value],
) -> crate::effect::EffectResult<crate::coroutine::Value> {
self.inner.handle_send(role, partner, label, state)
}
fn send_decision(
&self,
input: crate::effect::SendDecisionInput<'_>,
) -> crate::effect::EffectResult<crate::effect::SendDecision> {
self.inner.send_decision(input)
}
fn handle_recv(
&self,
role: &str,
partner: &str,
label: &str,
state: &mut Vec<crate::coroutine::Value>,
payload: &crate::coroutine::Value,
) -> crate::effect::EffectResult<()> {
self.inner.handle_recv(role, partner, label, state, payload)
}
fn handle_choose(
&self,
role: &str,
partner: &str,
labels: &[String],
state: &[crate::coroutine::Value],
) -> crate::effect::EffectResult<String> {
self.inner.handle_choose(role, partner, labels, state)
}
fn step(
&self,
role: &str,
state: &mut Vec<crate::coroutine::Value>,
) -> crate::effect::EffectResult<()> {
self.inner.step(role, state)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DurableRecoveryMetadata {
pub checkpoint_tick: u64,
#[serde(default)]
pub wal_tail_start_tick: Option<u64>,
#[serde(default)]
pub highest_recovered_tick: Option<u64>,
#[serde(default)]
pub resumed_operation_ids: Vec<String>,
#[serde(default)]
pub terminal_operation_ids: Vec<String>,
#[serde(default)]
pub cached_evidence_ids: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum DurableRecoveryAction {
ReexecuteFromScratch,
ResumeFromEvidenceBoundary,
ReuseFinalized,
PreserveTerminal,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DurableRecoveryDecision {
pub operation_id: String,
pub level: AgreementLevel,
#[serde(default)]
pub finalization: Option<FinalizationOutcome>,
pub action: DurableRecoveryAction,
#[serde(default)]
pub cached_evidence_ids: Vec<String>,
#[serde(default)]
pub gate_crossed: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DurableRecoveryPlan {
pub machine: crate::ProtocolMachine,
pub metadata: DurableRecoveryMetadata,
pub wal_suffix: Vec<AgreementWalEntry>,
pub evidence_cache: EvidenceOutcomeCacheArtifact,
pub decisions: Vec<DurableRecoveryDecision>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case", tag = "kind", content = "payload")]
pub enum PersistedDurabilityPayload {
AgreementWal(AgreementWalArtifact),
EvidenceOutcomeCache(EvidenceOutcomeCacheArtifact),
RecoveryMetadata(DurableRecoveryMetadata),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedDurabilityArtifact {
pub schema_version: String,
pub payload: PersistedDurabilityPayload,
}
impl PersistedDurabilityArtifact {
#[must_use]
pub fn agreement_wal(wal: AgreementWalArtifact) -> Self {
Self {
schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
payload: PersistedDurabilityPayload::AgreementWal(wal),
}
}
#[must_use]
pub fn evidence_outcome_cache(cache: EvidenceOutcomeCacheArtifact) -> Self {
Self {
schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
payload: PersistedDurabilityPayload::EvidenceOutcomeCache(cache),
}
}
#[must_use]
pub fn recovery_metadata(metadata: DurableRecoveryMetadata) -> Self {
Self {
schema_version: PERSISTED_DURABILITY_SCHEMA_VERSION.to_string(),
payload: PersistedDurabilityPayload::RecoveryMetadata(metadata),
}
}
pub fn from_slice(bytes: &[u8]) -> Result<Self, String> {
let artifact: Self = decode_cbor(bytes, "decode persisted durability artifact")?;
if artifact.schema_version != PERSISTED_DURABILITY_SCHEMA_VERSION {
return Err(format!(
"unsupported persisted durability schema version `{}`",
artifact.schema_version
));
}
Ok(artifact)
}
pub fn from_path(path: impl AsRef<Path>) -> Result<Self, String> {
let path = path.as_ref();
let bytes = std::fs::read(path).map_err(|err| {
format!(
"read persisted durability artifact {}: {err}",
path.display()
)
})?;
Self::from_slice(&bytes)
}
pub fn to_cbor(&self) -> Result<Vec<u8>, String> {
encode_cbor(self, "encode persisted durability artifact")
}
pub fn write_to_path(&self, path: impl AsRef<Path>) -> Result<(), String> {
let path = path.as_ref();
let bytes = self.to_cbor()?;
std::fs::write(path, bytes).map_err(|err| {
format!(
"write persisted durability artifact {}: {err}",
path.display()
)
})
}
#[must_use]
pub fn agreement_wal_artifact(&self) -> Option<&AgreementWalArtifact> {
match &self.payload {
PersistedDurabilityPayload::AgreementWal(wal) => Some(wal),
PersistedDurabilityPayload::EvidenceOutcomeCache(_)
| PersistedDurabilityPayload::RecoveryMetadata(_) => None,
}
}
pub fn into_agreement_wal(self) -> Result<AgreementWalArtifact, String> {
match self.payload {
PersistedDurabilityPayload::AgreementWal(wal) => Ok(wal),
PersistedDurabilityPayload::EvidenceOutcomeCache(_) => Err(
"persisted durability artifact contains an evidence outcome cache payload, not an agreement WAL"
.to_string(),
),
PersistedDurabilityPayload::RecoveryMetadata(_) => Err(
"persisted durability artifact contains recovery metadata, not an agreement WAL"
.to_string(),
),
}
}
#[must_use]
pub fn evidence_outcome_cache_artifact(&self) -> Option<&EvidenceOutcomeCacheArtifact> {
match &self.payload {
PersistedDurabilityPayload::EvidenceOutcomeCache(cache) => Some(cache),
PersistedDurabilityPayload::AgreementWal(_)
| PersistedDurabilityPayload::RecoveryMetadata(_) => None,
}
}
pub fn into_evidence_outcome_cache(self) -> Result<EvidenceOutcomeCacheArtifact, String> {
match self.payload {
PersistedDurabilityPayload::EvidenceOutcomeCache(cache) => Ok(cache),
PersistedDurabilityPayload::AgreementWal(_) => Err(
"persisted durability artifact contains an agreement WAL payload, not an evidence outcome cache"
.to_string(),
),
PersistedDurabilityPayload::RecoveryMetadata(_) => Err(
"persisted durability artifact contains recovery metadata, not an evidence outcome cache"
.to_string(),
),
}
}
}
impl DurableRecoveryPlan {
pub fn from_checkpoint(
checkpoint_tick: u64,
machine: crate::ProtocolMachine,
wal: &AgreementWalArtifact,
evidence_cache: EvidenceOutcomeCacheArtifact,
) -> Result<Self, String> {
wal.validate_monotonic_escalations()?;
let wal_suffix = wal.read_since(checkpoint_tick);
let metadata = DurableRecoveryMetadata {
checkpoint_tick,
wal_tail_start_tick: wal_suffix.first().map(AgreementWalEntry::tick),
highest_recovered_tick: wal_suffix.last().map(AgreementWalEntry::tick),
resumed_operation_ids: Vec::new(),
terminal_operation_ids: Vec::new(),
cached_evidence_ids: evidence_cache
.entries
.iter()
.map(|entry| entry.evidence_id.clone())
.collect(),
};
let mut plan = Self {
machine,
metadata,
wal_suffix,
evidence_cache,
decisions: Vec::new(),
};
plan.decisions = plan.build_decisions();
plan.metadata.resumed_operation_ids = plan
.decisions
.iter()
.filter(|decision| {
matches!(
decision.action,
DurableRecoveryAction::ReexecuteFromScratch
| DurableRecoveryAction::ResumeFromEvidenceBoundary
)
})
.map(|decision| decision.operation_id.clone())
.collect();
plan.metadata.terminal_operation_ids = plan
.decisions
.iter()
.filter(|decision| {
matches!(
decision.action,
DurableRecoveryAction::ReuseFinalized | DurableRecoveryAction::PreserveTerminal
)
})
.map(|decision| decision.operation_id.clone())
.collect();
Ok(plan)
}
fn build_decisions(&self) -> Vec<DurableRecoveryDecision> {
let mut operation_ids = std::collections::BTreeSet::new();
for entry in &self.wal_suffix {
operation_ids.insert(entry.operation_id().to_string());
}
operation_ids
.into_iter()
.map(|operation_id| {
let mut level = AgreementLevel::None;
let mut finalization = None;
let mut gate_crossed = false;
let mut evidence_ids = Vec::new();
for entry in self
.wal_suffix
.iter()
.filter(|entry| entry.operation_id() == operation_id)
{
match entry {
AgreementWalEntry::Escalation { new_level, .. } => {
if new_level.rank() > level.rank() {
level = *new_level;
}
}
AgreementWalEntry::EvidenceProduced { evidence, .. } => {
evidence_ids.push(evidence.evidence_id.clone());
if evidence.level.rank() > level.rank() {
level = evidence.level;
}
}
AgreementWalEntry::Finalization { outcome, .. } => {
finalization = Some(*outcome);
if matches!(outcome, FinalizationOutcome::Finalized) {
level = AgreementLevel::Finalized;
}
}
AgreementWalEntry::VisibilityGateCrossing { .. } => {
gate_crossed = true;
}
}
}
let cached_evidence_ids = self
.evidence_cache
.entries
.iter()
.filter(|entry| {
evidence_ids
.iter()
.any(|evidence_id| evidence_id == &entry.evidence_id)
})
.map(|entry| entry.evidence_id.clone())
.collect::<Vec<_>>();
let action = match finalization {
Some(FinalizationOutcome::Finalized) => DurableRecoveryAction::ReuseFinalized,
Some(
FinalizationOutcome::Aborted
| FinalizationOutcome::Rejected
| FinalizationOutcome::TimedOut,
) => DurableRecoveryAction::PreserveTerminal,
None if level.at_least(AgreementLevel::SoftSafe) => {
DurableRecoveryAction::ResumeFromEvidenceBoundary
}
None => DurableRecoveryAction::ReexecuteFromScratch,
};
DurableRecoveryDecision {
operation_id,
level,
finalization,
action,
cached_evidence_ids,
gate_crossed,
}
})
.collect()
}
}