use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use tsoracle_consensus::AdvancePayload;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum HighWaterCommand {
Advance(AdvancePayload),
Barrier { node: u64, seq: u64 },
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct HighWaterSnapshot {
pub value: u64,
pub applied_barriers: HashMap<u64, u64>,
}
impl omnipaxos::storage::Entry for HighWaterCommand {
type Snapshot = HighWaterSnapshot;
}
impl omnipaxos::storage::Snapshot<HighWaterCommand> for HighWaterSnapshot {
fn create(entries: &[HighWaterCommand]) -> Self {
let mut value = 0u64;
let mut applied_barriers: HashMap<u64, u64> = HashMap::new();
for command in entries {
match command {
HighWaterCommand::Advance(advance) => {
value = advance.merge(value);
}
HighWaterCommand::Barrier { node, seq } => {
let slot = applied_barriers.entry(*node).or_insert(0);
if *seq > *slot {
*slot = *seq;
}
}
}
}
Self {
value,
applied_barriers,
}
}
fn merge(&mut self, other: Self) {
self.value = self.value.max(other.value);
for (node, seq) in other.applied_barriers {
let slot = self.applied_barriers.entry(node).or_insert(0);
if seq > *slot {
*slot = seq;
}
}
}
fn use_snapshots() -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use omnipaxos::storage::Snapshot;
#[test]
fn snapshot_create_picks_max_advance() {
let entries = vec![
HighWaterCommand::Advance(AdvancePayload { at_least: 10 }),
HighWaterCommand::Barrier { node: 1, seq: 1 },
HighWaterCommand::Advance(AdvancePayload { at_least: 30 }),
HighWaterCommand::Advance(AdvancePayload { at_least: 20 }),
];
let snap = HighWaterSnapshot::create(&entries);
assert_eq!(snap.value, 30);
}
#[test]
fn snapshot_create_on_empty_yields_zero() {
let snap = HighWaterSnapshot::create(&[]);
assert_eq!(snap.value, 0);
}
#[test]
fn snapshot_merge_picks_higher_value() {
let mut first = HighWaterSnapshot {
value: 5,
..Default::default()
};
let second = HighWaterSnapshot {
value: 12,
..Default::default()
};
first.merge(second);
assert_eq!(first.value, 12);
}
#[test]
fn snapshot_merge_keeps_higher_value() {
let mut first = HighWaterSnapshot {
value: 50,
..Default::default()
};
let second = HighWaterSnapshot {
value: 12,
..Default::default()
};
first.merge(second);
assert_eq!(first.value, 50);
}
#[test]
fn barrier_does_not_affect_snapshot_value() {
let snap = HighWaterSnapshot::create(&[HighWaterCommand::Barrier { node: 42, seq: 7 }]);
assert_eq!(snap.value, 0);
}
#[test]
fn snapshot_create_folds_barriers_into_ledger() {
let entries = vec![
HighWaterCommand::Barrier { node: 1, seq: 3 },
HighWaterCommand::Advance(AdvancePayload { at_least: 100 }),
HighWaterCommand::Barrier { node: 1, seq: 5 },
HighWaterCommand::Barrier { node: 2, seq: 9 },
];
let snap = HighWaterSnapshot::create(&entries);
assert_eq!(snap.value, 100);
assert_eq!(snap.applied_barriers.get(&1).copied(), Some(5));
assert_eq!(snap.applied_barriers.get(&2).copied(), Some(9));
}
#[test]
fn snapshot_merge_takes_max_barrier_seq_per_node() {
let mut first = HighWaterSnapshot {
value: 0,
applied_barriers: HashMap::from([(1, 5), (2, 3)]),
};
let second = HighWaterSnapshot {
value: 0,
applied_barriers: HashMap::from([(1, 7), (3, 2)]),
};
first.merge(second);
assert_eq!(first.applied_barriers.get(&1).copied(), Some(7));
assert_eq!(first.applied_barriers.get(&2).copied(), Some(3));
assert_eq!(first.applied_barriers.get(&3).copied(), Some(2));
}
}