use std::collections::BTreeMap;
pub const MAX_VCLOCK_ENTRIES: u16 = 4096;
pub const VCLOCK_ENTRY_SIZE: usize = 16 + 8;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct VectorClock {
entries: BTreeMap<u128, u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VClockOrdering {
Equal,
LessThan,
GreaterThan,
Concurrent,
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum VClockError {
#[error("truncated vector clock: need {needed} bytes, have {have}")]
Truncated {
needed: usize,
have: usize,
},
#[error("vector clock has too many entries: {0} (max 4096)")]
TooManyEntries(u16),
}
impl VectorClock {
pub fn new() -> Self {
Self::default()
}
pub fn singleton(node_id: u128, counter: u64) -> Self {
let mut entries = BTreeMap::new();
entries.insert(node_id, counter);
Self { entries }
}
pub fn get(&self, node_id: u128) -> u64 {
self.entries.get(&node_id).copied().unwrap_or(0)
}
pub fn set(&mut self, node_id: u128, counter: u64) {
self.entries.insert(node_id, counter);
}
pub fn bump(&mut self, node_id: u128) -> u64 {
let entry = self.entries.entry(node_id).or_insert(0);
*entry = entry.saturating_add(1);
*entry
}
pub fn merge(&mut self, other: &VectorClock) {
for (&node, &counter) in &other.entries {
let cur = self.entries.entry(node).or_insert(0);
if counter > *cur {
*cur = counter;
}
}
}
pub fn compare(&self, other: &VectorClock) -> VClockOrdering {
let mut le = true;
let mut ge = true;
let all_keys: std::collections::BTreeSet<u128> = self
.entries
.keys()
.chain(other.entries.keys())
.copied()
.collect();
for k in all_keys {
let a = self.get(k);
let b = other.get(k);
if a > b {
le = false;
}
if a < b {
ge = false;
}
}
match (le, ge) {
(true, true) => VClockOrdering::Equal,
(true, false) => VClockOrdering::LessThan,
(false, true) => VClockOrdering::GreaterThan,
(false, false) => VClockOrdering::Concurrent,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = (u128, u64)> + '_ {
self.entries.iter().map(|(&k, &v)| (k, v))
}
pub fn encoded_len(&self) -> usize {
2 + self.entries.len() * VCLOCK_ENTRY_SIZE
}
pub fn encode_to(&self, buf: &mut Vec<u8>) {
let n = self.entries.len() as u16;
buf.extend_from_slice(&n.to_le_bytes());
for (&node, &counter) in &self.entries {
buf.extend_from_slice(&node.to_le_bytes());
buf.extend_from_slice(&counter.to_le_bytes());
}
}
pub fn decode(buf: &[u8]) -> Result<(Self, usize), VClockError> {
if buf.len() < 2 {
return Err(VClockError::Truncated {
needed: 2,
have: buf.len(),
});
}
let n = u16::from_le_bytes([buf[0], buf[1]]);
if n > MAX_VCLOCK_ENTRIES {
return Err(VClockError::TooManyEntries(n));
}
let total = 2 + (n as usize) * VCLOCK_ENTRY_SIZE;
if buf.len() < total {
return Err(VClockError::Truncated {
needed: total,
have: buf.len(),
});
}
let mut entries = BTreeMap::new();
let mut cur = 2usize;
for _ in 0..n {
let mut nid = [0u8; 16];
nid.copy_from_slice(&buf[cur..cur + 16]);
let node = u128::from_le_bytes(nid);
cur += 16;
let mut cb = [0u8; 8];
cb.copy_from_slice(&buf[cur..cur + 8]);
let counter = u64::from_le_bytes(cb);
cur += 8;
entries.insert(node, counter);
}
Ok((Self { entries }, total))
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;
#[test]
fn test_vclock_compare_equal() {
let a = VectorClock::singleton(1, 5);
let b = VectorClock::singleton(1, 5);
assert_eq!(a.compare(&b), VClockOrdering::Equal);
}
#[test]
fn test_vclock_compare_less_than() {
let a = VectorClock::singleton(1, 4);
let b = VectorClock::singleton(1, 5);
assert_eq!(a.compare(&b), VClockOrdering::LessThan);
let mut a2 = VectorClock::new();
a2.set(1, 4);
a2.set(2, 7);
let mut b2 = VectorClock::new();
b2.set(1, 4);
b2.set(2, 8);
assert_eq!(a2.compare(&b2), VClockOrdering::LessThan);
}
#[test]
fn test_vclock_compare_greater_than() {
let a = VectorClock::singleton(1, 5);
let b = VectorClock::singleton(1, 4);
assert_eq!(a.compare(&b), VClockOrdering::GreaterThan);
}
#[test]
fn test_vclock_compare_concurrent() {
let a = VectorClock::singleton(1, 1);
let b = VectorClock::singleton(2, 1);
assert_eq!(a.compare(&b), VClockOrdering::Concurrent);
let mut a2 = VectorClock::new();
a2.set(1, 2);
a2.set(2, 1);
let mut b2 = VectorClock::new();
b2.set(1, 1);
b2.set(2, 2);
assert_eq!(a2.compare(&b2), VClockOrdering::Concurrent);
}
#[test]
fn test_vclock_merge_pointwise_max() {
let mut a = VectorClock::new();
a.set(1, 3);
a.set(2, 5);
let mut b = VectorClock::new();
b.set(1, 7);
b.set(3, 9);
a.merge(&b);
assert_eq!(a.get(1), 7);
assert_eq!(a.get(2), 5);
assert_eq!(a.get(3), 9);
}
#[test]
fn test_vclock_bump_increments() {
let mut a = VectorClock::new();
assert_eq!(a.bump(42), 1);
assert_eq!(a.bump(42), 2);
assert_eq!(a.bump(7), 1);
assert_eq!(a.get(42), 2);
assert_eq!(a.get(7), 1);
}
#[test]
fn test_vclock_encode_decode_round_trip() {
let mut a = VectorClock::new();
a.set(0xdeadbeef, 42);
a.set(0xbadc0ffee, 7);
a.set(123456789, 0);
let mut buf = Vec::new();
a.encode_to(&mut buf);
assert_eq!(buf.len(), a.encoded_len());
let (b, n) = VectorClock::decode(&buf).unwrap();
assert_eq!(n, buf.len());
assert_eq!(a, b);
}
#[test]
fn test_vclock_empty_round_trip() {
let a = VectorClock::new();
let mut buf = Vec::new();
a.encode_to(&mut buf);
assert_eq!(buf, &[0u8, 0u8]);
let (b, n) = VectorClock::decode(&buf).unwrap();
assert_eq!(n, 2);
assert!(b.is_empty());
}
#[test]
fn test_vclock_decode_truncated_returns_error() {
let mut buf = Vec::new();
buf.extend_from_slice(&3u16.to_le_bytes());
buf.extend_from_slice(&1u128.to_le_bytes());
buf.extend_from_slice(&7u64.to_le_bytes());
let err = VectorClock::decode(&buf).unwrap_err();
assert!(matches!(err, VClockError::Truncated { .. }));
}
#[test]
fn test_vclock_decode_too_many_entries() {
let mut buf = Vec::new();
buf.extend_from_slice(&(MAX_VCLOCK_ENTRIES + 1).to_le_bytes());
let err = VectorClock::decode(&buf).unwrap_err();
assert!(matches!(err, VClockError::TooManyEntries(_)));
}
#[test]
fn vclock_encode_decode_proptest() {
let mut rng = rand::thread_rng();
for _ in 0..200 {
let n: u16 = rng.gen_range(0..64);
let mut a = VectorClock::new();
for _ in 0..n {
let node: u128 = rng.r#gen();
let counter: u64 = rng.r#gen();
a.set(node, counter);
}
let mut buf = Vec::new();
a.encode_to(&mut buf);
assert_eq!(buf.len(), a.encoded_len());
let (b, consumed) = VectorClock::decode(&buf).unwrap();
assert_eq!(consumed, buf.len());
assert_eq!(a, b, "round-trip mismatch on n={n}");
let mut buf2 = Vec::new();
b.encode_to(&mut buf2);
assert_eq!(buf, buf2, "encoding is not byte-stable");
}
}
}