use std::collections::BTreeMap;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct KvOp {
pub key: String,
pub value: Option<Vec<u8>>,
pub lamport: u64,
pub writer: [u8; 20],
pub ts: u64,
}
fn op_wins(candidate: &KvOp, current: &KvOp) -> bool {
(candidate.lamport, candidate.writer, &candidate.value)
> (current.lamport, current.writer, ¤t.value)
}
pub fn reduce(ops: &[KvOp], now: u64, ttl_secs: u64) -> BTreeMap<String, Vec<u8>> {
let mut winners: BTreeMap<&str, &KvOp> = BTreeMap::new();
for op in ops {
if ttl_secs != 0 && op.ts.saturating_add(ttl_secs) <= now {
continue;
}
match winners.get(op.key.as_str()) {
Some(current) if !op_wins(op, current) => {}
_ => {
winners.insert(op.key.as_str(), op);
}
}
}
winners
.into_iter()
.filter_map(|(k, op)| op.value.as_ref().map(|v| (k.to_string(), v.clone())))
.collect()
}
pub fn next_lamport(ops: &[KvOp]) -> u64 {
ops.iter().map(|o| o.lamport).max().map_or(0, |m| m + 1)
}
#[cfg(test)]
mod tests {
use super::*;
fn op(key: &str, val: Option<&[u8]>, lamport: u64, writer: u8, ts: u64) -> KvOp {
KvOp {
key: key.to_string(),
value: val.map(|v| v.to_vec()),
lamport,
writer: [writer; 20],
ts,
}
}
fn map_of(pairs: &[(&str, &[u8])]) -> BTreeMap<String, Vec<u8>> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_vec()))
.collect()
}
#[test]
fn empty_log_is_empty_map() {
assert!(reduce(&[], 0, 0).is_empty());
assert_eq!(next_lamport(&[]), 0);
}
#[test]
fn higher_lamport_wins() {
let ops = [
op("a", Some(b"old"), 1, 1, 100),
op("a", Some(b"new"), 2, 1, 100),
];
assert_eq!(reduce(&ops, 0, 0), map_of(&[("a", b"new")]));
}
#[test]
fn writer_tiebreak_on_equal_lamport() {
let lo = op("a", Some(b"lo"), 5, 1, 100);
let hi = op("a", Some(b"hi"), 5, 9, 100);
assert_eq!(reduce(&[lo.clone(), hi.clone()], 0, 0), map_of(&[("a", b"hi")]));
assert_eq!(reduce(&[hi, lo], 0, 0), map_of(&[("a", b"hi")]));
}
#[test]
fn tombstone_wins_then_loses_by_clock() {
let del = [
op("a", Some(b"v"), 1, 1, 100),
op("a", None, 2, 1, 100),
];
assert!(reduce(&del, 0, 0).is_empty());
let revive = [
op("a", None, 2, 1, 100),
op("a", Some(b"v2"), 3, 1, 100),
];
assert_eq!(reduce(&revive, 0, 0), map_of(&[("a", b"v2")]));
}
#[test]
fn value_tiebreak_on_equal_lamport_and_writer() {
let a = op("k", Some(b"AAA"), 5, 1, 100);
let b = op("k", Some(b"BBB"), 5, 1, 100);
assert_eq!(reduce(&[a.clone(), b.clone()], 0, 0), map_of(&[("k", b"BBB")]));
assert_eq!(reduce(&[b, a], 0, 0), map_of(&[("k", b"BBB")]));
}
#[test]
fn write_beats_tombstone_at_equal_clock_deterministically() {
let write = op("k", Some(b"v"), 9, 7, 100);
let tomb = op("k", None, 9, 7, 100);
assert_eq!(reduce(&[write.clone(), tomb.clone()], 0, 0), map_of(&[("k", b"v")]));
assert_eq!(reduce(&[tomb, write], 0, 0), map_of(&[("k", b"v")]));
}
#[test]
fn converges_under_any_permutation() {
let base = [
op("x", Some(b"1"), 1, 1, 10),
op("y", Some(b"2"), 1, 2, 10),
op("x", Some(b"3"), 3, 2, 20),
op("y", None, 2, 1, 30),
op("z", Some(b"9"), 7, 3, 40),
op("x", Some(b"3"), 3, 2, 20), ];
let expected = reduce(&base, 0, 0);
for shift in 0..base.len() {
let mut perm = base.to_vec();
perm.rotate_left(shift);
assert_eq!(reduce(&perm, 0, 0), expected, "rotation {shift} diverged");
}
assert_eq!(expected, map_of(&[("x", b"3"), ("z", b"9")]));
}
#[test]
fn idempotent_apply_twice_equals_once() {
let ops = [
op("a", Some(b"1"), 1, 1, 10),
op("b", Some(b"2"), 2, 2, 10),
];
let once = reduce(&ops, 0, 0);
let doubled: Vec<KvOp> = ops.iter().chain(ops.iter()).cloned().collect();
assert_eq!(reduce(&doubled, 0, 0), once);
}
#[test]
fn ttl_filters_expired_ops() {
let ops = [
op("a", Some(b"stale"), 1, 1, 100),
op("b", Some(b"fresh"), 1, 1, 900),
];
assert_eq!(reduce(&ops, 1000, 300), map_of(&[("b", b"fresh")]));
assert_eq!(
reduce(&ops, 1000, 0),
map_of(&[("a", b"stale"), ("b", b"fresh")])
);
}
#[test]
fn ttl_expired_winner_does_not_mask_live_older_write() {
let ops = [
op("a", Some(b"live"), 1, 1, 950),
op("a", Some(b"expired"), 5, 1, 100),
];
assert_eq!(reduce(&ops, 1000, 300), map_of(&[("a", b"live")]));
}
#[test]
fn next_lamport_is_max_plus_one() {
let ops = [op("a", Some(b"v"), 4, 1, 0), op("b", Some(b"v"), 9, 1, 0)];
assert_eq!(next_lamport(&ops), 10);
}
}