use serde::{Deserialize, Serialize};
const COMMIT_MARKER: u64 = 0x4943_4D45_4D43_4F4D;
const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
pub trait ProtectedGenerationSlot: Eq {
fn generation(&self) -> u64;
fn validates(&self) -> bool;
}
pub trait DualProtectedCommitStore {
type Slot: ProtectedGenerationSlot;
fn slot0(&self) -> Option<&Self::Slot>;
fn slot1(&self) -> Option<&Self::Slot>;
fn is_uninitialized(&self) -> bool {
self.slot0().is_none() && self.slot1().is_none()
}
fn authoritative_slot(&self) -> Result<AuthoritativeSlot<'_, Self::Slot>, CommitRecoveryError> {
select_authoritative_slot(self.slot0(), self.slot1())
}
fn inactive_slot_index(&self) -> CommitSlotIndex {
match self.authoritative_slot() {
Ok(authoritative) => authoritative.index.opposite(),
Err(_) => CommitSlotIndex::Slot0,
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub enum CommitSlotIndex {
Slot0,
Slot1,
}
impl CommitSlotIndex {
#[must_use]
pub const fn opposite(self) -> Self {
match self {
Self::Slot0 => Self::Slot1,
Self::Slot1 => Self::Slot0,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct AuthoritativeSlot<'slot, T> {
pub index: CommitSlotIndex,
pub record: &'slot T,
}
pub fn select_authoritative_slot<'slot, T: ProtectedGenerationSlot>(
slot0: Option<&'slot T>,
slot1: Option<&'slot T>,
) -> Result<AuthoritativeSlot<'slot, T>, CommitRecoveryError> {
let slot0 = slot0
.filter(|slot| slot.validates())
.map(|record| AuthoritativeSlot {
index: CommitSlotIndex::Slot0,
record,
});
let slot1 = slot1
.filter(|slot| slot.validates())
.map(|record| AuthoritativeSlot {
index: CommitSlotIndex::Slot1,
record,
});
match (slot0, slot1) {
(Some(left), Some(right))
if left.record.generation() == right.record.generation()
&& left.record != right.record =>
{
Err(CommitRecoveryError::AmbiguousGeneration {
generation: left.record.generation(),
})
}
(Some(left), Some(right)) if right.record.generation() > left.record.generation() => {
Ok(right)
}
(Some(left), Some(_) | None) => Ok(left),
(None, Some(right)) => Ok(right),
(None, None) => Err(CommitRecoveryError::NoValidGeneration),
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct CommittedGenerationBytes {
pub(crate) generation: u64,
pub(crate) commit_marker: u64,
pub(crate) checksum: u64,
pub(crate) payload: Vec<u8>,
}
impl CommittedGenerationBytes {
#[must_use]
pub fn new(generation: u64, payload: Vec<u8>) -> Self {
let mut record = Self {
generation,
commit_marker: COMMIT_MARKER,
checksum: 0,
payload,
};
record.checksum = generation_checksum(&record);
record
}
#[must_use]
pub const fn generation(&self) -> u64 {
self.generation
}
#[must_use]
pub const fn commit_marker(&self) -> u64 {
self.commit_marker
}
#[must_use]
pub const fn checksum(&self) -> u64 {
self.checksum
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
#[must_use]
pub fn validates(&self) -> bool {
self.commit_marker == COMMIT_MARKER && self.checksum == generation_checksum(self)
}
}
impl ProtectedGenerationSlot for CommittedGenerationBytes {
fn generation(&self) -> u64 {
self.generation
}
fn validates(&self) -> bool {
self.validates()
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DualCommitStore {
pub(crate) slot0: Option<CommittedGenerationBytes>,
pub(crate) slot1: Option<CommittedGenerationBytes>,
}
impl DualCommitStore {
#[must_use]
pub const fn is_uninitialized(&self) -> bool {
self.slot0.is_none() && self.slot1.is_none()
}
#[must_use]
pub const fn slot0(&self) -> Option<&CommittedGenerationBytes> {
self.slot0.as_ref()
}
#[must_use]
pub const fn slot1(&self) -> Option<&CommittedGenerationBytes> {
self.slot1.as_ref()
}
pub fn authoritative(&self) -> Result<&CommittedGenerationBytes, CommitRecoveryError> {
self.authoritative_slot()
.map(|authoritative| authoritative.record)
}
#[must_use]
pub fn diagnostic(&self) -> CommitStoreDiagnostic {
CommitStoreDiagnostic::from_store(self)
}
pub fn commit_payload(
&mut self,
payload: Vec<u8>,
) -> Result<&CommittedGenerationBytes, CommitRecoveryError> {
let next_generation =
match self.authoritative() {
Ok(record) => record.generation.checked_add(1).ok_or(
CommitRecoveryError::GenerationOverflow {
generation: record.generation,
},
)?,
Err(CommitRecoveryError::NoValidGeneration) if self.is_uninitialized() => 0,
Err(err) => return Err(err),
};
self.commit_payload_at_generation(next_generation, payload)
}
pub fn commit_payload_at_generation(
&mut self,
generation: u64,
payload: Vec<u8>,
) -> Result<&CommittedGenerationBytes, CommitRecoveryError> {
match self.authoritative() {
Ok(record) => {
let expected = record.generation.checked_add(1).ok_or(
CommitRecoveryError::GenerationOverflow {
generation: record.generation,
},
)?;
if generation != expected {
return Err(CommitRecoveryError::UnexpectedGeneration {
expected,
actual: generation,
});
}
}
Err(CommitRecoveryError::NoValidGeneration) if self.is_uninitialized() => {}
Err(err) => return Err(err),
}
let next = CommittedGenerationBytes::new(generation, payload);
if self.inactive_slot_index() == CommitSlotIndex::Slot0 {
self.slot0 = Some(next);
} else {
self.slot1 = Some(next);
}
self.authoritative()
}
#[cfg(test)]
pub fn write_corrupt_inactive_slot(&mut self, generation: u64, payload: Vec<u8>) {
let mut corrupt = CommittedGenerationBytes::new(generation, payload);
corrupt.checksum = corrupt.checksum.wrapping_add(1);
if self.inactive_slot_index() == CommitSlotIndex::Slot0 {
self.slot0 = Some(corrupt);
} else {
self.slot1 = Some(corrupt);
}
}
}
impl DualProtectedCommitStore for DualCommitStore {
type Slot = CommittedGenerationBytes;
fn slot0(&self) -> Option<&Self::Slot> {
self.slot0.as_ref()
}
fn slot1(&self) -> Option<&Self::Slot> {
self.slot1.as_ref()
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct CommitStoreDiagnostic {
pub slot0: CommitSlotDiagnostic,
pub slot1: CommitSlotDiagnostic,
pub authoritative_generation: Option<u64>,
pub recovery_error: Option<CommitRecoveryError>,
}
impl CommitStoreDiagnostic {
#[must_use]
pub fn from_store<S: DualProtectedCommitStore>(store: &S) -> Self {
let (authoritative_generation, recovery_error) = match store.authoritative_slot() {
Ok(slot) => (Some(slot.record.generation()), None),
Err(err) => (None, Some(err)),
};
Self {
slot0: CommitSlotDiagnostic::from_slot(store.slot0()),
slot1: CommitSlotDiagnostic::from_slot(store.slot1()),
authoritative_generation,
recovery_error,
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct CommitSlotDiagnostic {
pub present: bool,
pub generation: Option<u64>,
pub valid: bool,
}
impl CommitSlotDiagnostic {
fn from_slot<T: ProtectedGenerationSlot>(slot: Option<&T>) -> Self {
match slot {
Some(record) => Self {
present: true,
generation: Some(record.generation()),
valid: record.validates(),
},
None => Self {
present: false,
generation: None,
valid: false,
},
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, thiserror::Error, PartialEq, Serialize)]
pub enum CommitRecoveryError {
#[error("no valid committed ledger generation")]
NoValidGeneration,
#[error("ambiguous committed ledger generation {generation}")]
AmbiguousGeneration {
generation: u64,
},
#[error("committed ledger generation {generation} cannot be advanced without overflow")]
GenerationOverflow {
generation: u64,
},
#[error("expected committed ledger generation {expected}, got {actual}")]
UnexpectedGeneration {
expected: u64,
actual: u64,
},
}
fn generation_checksum(generation: &CommittedGenerationBytes) -> u64 {
let mut hash = FNV_OFFSET;
hash = hash_u64(hash, generation.generation);
hash = hash_u64(hash, generation.commit_marker);
hash = hash_usize(hash, generation.payload.len());
for byte in &generation.payload {
hash = hash_byte(hash, *byte);
}
hash
}
fn hash_usize(hash: u64, value: usize) -> u64 {
hash_u64(hash, value as u64)
}
fn hash_u64(mut hash: u64, value: u64) -> u64 {
for byte in value.to_le_bytes() {
hash = hash_byte(hash, byte);
}
hash
}
const fn hash_byte(hash: u64, byte: u8) -> u64 {
(hash ^ byte as u64).wrapping_mul(FNV_PRIME)
}
#[cfg(test)]
mod tests {
use super::*;
fn payload(value: u8) -> Vec<u8> {
vec![value; 4]
}
#[test]
fn committed_generation_validates_marker_and_checksum() {
let mut generation = CommittedGenerationBytes::new(7, payload(1));
assert!(generation.validates());
generation.checksum = generation.checksum.wrapping_add(1);
assert!(!generation.validates());
}
#[test]
fn physical_commit_accessors_expose_read_only_state() {
let mut store = DualCommitStore::default();
store.commit_payload(payload(1)).expect("first commit");
let slot = store.slot0().expect("first slot");
assert_eq!(slot.generation(), 0);
assert_eq!(slot.payload(), payload(1).as_slice());
assert_eq!(slot.commit_marker(), COMMIT_MARKER);
assert_eq!(slot.checksum(), generation_checksum(slot));
assert!(store.slot1().is_none());
}
#[test]
fn authoritative_selects_highest_valid_generation() {
let mut store = DualCommitStore::default();
store.commit_payload(payload(1)).expect("first commit");
store.commit_payload(payload(2)).expect("second commit");
let authoritative = store.authoritative().expect("authoritative");
let authoritative_slot =
select_authoritative_slot(store.slot0.as_ref(), store.slot1.as_ref())
.expect("authoritative slot");
assert_eq!(authoritative.generation, 1);
assert_eq!(authoritative.payload, payload(2));
assert_eq!(authoritative_slot.index, CommitSlotIndex::Slot1);
assert_eq!(authoritative_slot.record.payload, payload(2));
}
#[test]
fn corrupt_newer_slot_leaves_prior_generation_authoritative() {
let mut store = DualCommitStore::default();
store.commit_payload(payload(1)).expect("first commit");
store.write_corrupt_inactive_slot(1, payload(2));
let authoritative = store.authoritative().expect("authoritative");
assert_eq!(authoritative.generation, 0);
assert_eq!(authoritative.payload, payload(1));
}
#[test]
fn no_valid_generation_fails_closed() {
let mut store = DualCommitStore::default();
store.write_corrupt_inactive_slot(0, payload(1));
store.write_corrupt_inactive_slot(1, payload(2));
let err = store.authoritative().expect_err("no valid slot");
assert_eq!(err, CommitRecoveryError::NoValidGeneration);
}
#[test]
fn same_generation_identical_slots_recover_deterministically() {
let committed = CommittedGenerationBytes::new(7, payload(1));
let store = DualCommitStore {
slot0: Some(committed.clone()),
slot1: Some(committed),
};
let authoritative = store.authoritative_slot().expect("authoritative");
assert_eq!(authoritative.index, CommitSlotIndex::Slot0);
assert_eq!(authoritative.record.generation, 7);
}
#[test]
fn same_generation_divergent_slots_fail_closed() {
let store = DualCommitStore {
slot0: Some(CommittedGenerationBytes::new(7, payload(1))),
slot1: Some(CommittedGenerationBytes::new(7, payload(2))),
};
let err = store.authoritative().expect_err("ambiguous generation");
assert_eq!(
err,
CommitRecoveryError::AmbiguousGeneration { generation: 7 }
);
}
#[test]
fn physical_generation_overflow_fails_closed() {
let mut store = DualCommitStore {
slot0: Some(CommittedGenerationBytes::new(u64::MAX, payload(1))),
slot1: None,
};
let err = store
.commit_payload(payload(2))
.expect_err("overflow must fail");
assert_eq!(
err,
CommitRecoveryError::GenerationOverflow {
generation: u64::MAX
}
);
}
#[test]
fn diagnostic_reports_authoritative_generation_and_corrupt_slots() {
let mut store = DualCommitStore::default();
store.commit_payload(payload(1)).expect("first commit");
store.write_corrupt_inactive_slot(1, payload(2));
let diagnostic = store.diagnostic();
assert_eq!(diagnostic.authoritative_generation, Some(0));
assert_eq!(diagnostic.recovery_error, None);
assert_eq!(diagnostic.slot0.generation, Some(0));
assert!(diagnostic.slot0.valid);
assert_eq!(diagnostic.slot1.generation, Some(1));
assert!(!diagnostic.slot1.valid);
}
#[test]
fn diagnostic_reports_no_valid_generation_for_empty_store() {
let diagnostic = DualCommitStore::default().diagnostic();
assert_eq!(diagnostic.authoritative_generation, None);
assert_eq!(
diagnostic.recovery_error,
Some(CommitRecoveryError::NoValidGeneration)
);
assert!(!diagnostic.slot0.present);
assert!(!diagnostic.slot1.present);
}
#[test]
fn diagnostic_builds_from_any_dual_protected_store() {
#[derive(Eq, PartialEq)]
struct TestSlot {
generation: u64,
valid: bool,
}
impl ProtectedGenerationSlot for TestSlot {
fn generation(&self) -> u64 {
self.generation
}
fn validates(&self) -> bool {
self.valid
}
}
struct TestStore {
slot0: Option<TestSlot>,
slot1: Option<TestSlot>,
}
impl DualProtectedCommitStore for TestStore {
type Slot = TestSlot;
fn slot0(&self) -> Option<&Self::Slot> {
self.slot0.as_ref()
}
fn slot1(&self) -> Option<&Self::Slot> {
self.slot1.as_ref()
}
}
let diagnostic = CommitStoreDiagnostic::from_store(&TestStore {
slot0: Some(TestSlot {
generation: 8,
valid: true,
}),
slot1: Some(TestSlot {
generation: 9,
valid: false,
}),
});
assert_eq!(diagnostic.authoritative_generation, Some(8));
assert!(diagnostic.slot0.valid);
assert_eq!(diagnostic.slot1.generation, Some(9));
assert!(!diagnostic.slot1.valid);
}
#[test]
fn uninitialized_distinguishes_empty_from_corrupt() {
let mut store = DualCommitStore::default();
assert!(store.is_uninitialized());
store.write_corrupt_inactive_slot(0, payload(1));
assert!(!store.is_uninitialized());
}
#[test]
fn commit_after_corrupt_slot_advances_from_prior_valid_generation() {
let mut store = DualCommitStore::default();
store.commit_payload(payload(1)).expect("first commit");
store.write_corrupt_inactive_slot(1, payload(2));
store.commit_payload(payload(3)).expect("third commit");
let authoritative = store.authoritative().expect("authoritative");
assert_eq!(authoritative.generation, 1);
assert_eq!(authoritative.payload, payload(3));
}
}