use serde::{Deserialize, Serialize};
use std::fmt;
use crate::clock::itc::Stamp;
use crate::crdt::trace::{MergeTrace, TieBreakStep, merge_tracing_enabled};
use tracing::debug;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LwwRegister<T> {
pub value: T,
pub stamp: Stamp,
pub wall_ts: u64,
pub agent_id: String,
pub event_hash: String,
}
impl<T> LwwRegister<T> {
pub const fn new(
value: T,
stamp: Stamp,
wall_ts: u64,
agent_id: String,
event_hash: String,
) -> Self {
Self {
value,
stamp,
wall_ts,
agent_id,
event_hash,
}
}
}
impl<T: Clone> LwwRegister<T> {
pub fn merge(&mut self, other: &Self) {
if self.wins_over(other) {
} else {
self.value = other.value.clone();
self.stamp = other.stamp.clone();
self.wall_ts = other.wall_ts;
self.agent_id.clone_from(&other.agent_id);
self.event_hash.clone_from(&other.event_hash);
}
}
pub fn merge_with_trace(&mut self, other: &Self, field: &str) -> MergeTrace
where
T: fmt::Display,
{
let (self_wins, step) = self.compare(other);
let trace = if merge_tracing_enabled() {
let winner = if self_wins {
self.value.to_string()
} else {
other.value.to_string()
};
let trace = MergeTrace {
field: field.to_string(),
values: (self.value.to_string(), other.value.to_string()),
winner,
step,
correlation_id: format!("{}..{}", self.event_hash, other.event_hash),
enabled: true,
};
debug!(
target: "bones_core::crdt::merge_trace",
field = trace.field,
winner = trace.winner,
step = ?trace.step,
correlation_id = trace.correlation_id,
"LWW merge decision"
);
trace
} else {
MergeTrace::disabled()
};
if !self_wins {
self.value = other.value.clone();
self.stamp = other.stamp.clone();
self.wall_ts = other.wall_ts;
self.agent_id.clone_from(&other.agent_id);
self.event_hash.clone_from(&other.event_hash);
}
trace
}
fn wins_over(&self, other: &Self) -> bool {
self.compare(other).0
}
fn compare(&self, other: &Self) -> (bool, TieBreakStep) {
let self_leq_other = self.stamp.leq(&other.stamp);
let other_leq_self = other.stamp.leq(&self.stamp);
match (self_leq_other, other_leq_self) {
(true, false) => {
return (false, TieBreakStep::ItcCausal);
}
(false, true) => {
return (true, TieBreakStep::ItcCausal);
}
(true, true) | (false, false) => {
}
}
match self.wall_ts.cmp(&other.wall_ts) {
std::cmp::Ordering::Greater => return (true, TieBreakStep::WallTimestamp),
std::cmp::Ordering::Less => return (false, TieBreakStep::WallTimestamp),
std::cmp::Ordering::Equal => {}
}
match self.agent_id.cmp(&other.agent_id) {
std::cmp::Ordering::Greater => return (true, TieBreakStep::AgentId),
std::cmp::Ordering::Less => return (false, TieBreakStep::AgentId),
std::cmp::Ordering::Equal => {}
}
(self.event_hash >= other.event_hash, TieBreakStep::EventHash)
}
}
impl<T: fmt::Display> fmt::Display for LwwRegister<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.value)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::itc::Stamp;
fn make_stamp(counter: u64) -> Stamp {
let mut s = Stamp::seed();
for _ in 0..counter {
s.event();
}
s
}
fn make_forked_stamps(counter_a: u64, counter_b: u64) -> (Stamp, Stamp) {
let seed = Stamp::seed();
let (mut a, mut b) = seed.fork();
for _ in 0..counter_a {
a.event();
}
for _ in 0..counter_b {
b.event();
}
(a, b)
}
fn reg(
value: &str,
stamp: Stamp,
wall_ts: u64,
agent: &str,
hash: &str,
) -> LwwRegister<String> {
LwwRegister::new(
value.to_string(),
stamp,
wall_ts,
agent.to_string(),
hash.to_string(),
)
}
#[test]
fn causal_later_wins() {
let s1 = make_stamp(1);
let s2 = make_stamp(2);
assert!(s1.leq(&s2));
assert!(!s2.leq(&s1));
let mut a = reg("old", s1, 100, "alice", "aaa");
let b = reg("new", s2, 100, "alice", "aaa");
a.merge(&b);
assert_eq!(a.value, "new");
}
#[test]
fn causal_earlier_loses() {
let s1 = make_stamp(1);
let s2 = make_stamp(2);
let mut a = reg("new", s2, 100, "alice", "aaa");
let b = reg("old", s1, 100, "alice", "aaa");
a.merge(&b);
assert_eq!(a.value, "new"); }
#[test]
fn concurrent_higher_wall_ts_wins() {
let (sa, sb) = make_forked_stamps(1, 1);
assert!(sa.concurrent(&sb));
let mut a = reg("alice-val", sa, 200, "alice", "aaa");
let b = reg("bob-val", sb, 300, "bob", "bbb");
a.merge(&b);
assert_eq!(a.value, "bob-val"); }
#[test]
fn concurrent_lower_wall_ts_loses() {
let (sa, sb) = make_forked_stamps(1, 1);
let mut a = reg("alice-val", sa, 300, "alice", "aaa");
let b = reg("bob-val", sb, 200, "bob", "bbb");
a.merge(&b);
assert_eq!(a.value, "alice-val"); }
#[test]
fn concurrent_same_ts_higher_agent_wins() {
let (sa, sb) = make_forked_stamps(1, 1);
let mut a = reg("alice-val", sa, 100, "alice", "aaa");
let b = reg("bob-val", sb, 100, "bob", "bbb");
a.merge(&b);
assert_eq!(a.value, "bob-val"); }
#[test]
fn concurrent_same_ts_lower_agent_loses() {
let (sa, sb) = make_forked_stamps(1, 1);
let mut a = reg("bob-val", sa, 100, "bob", "bbb");
let b = reg("alice-val", sb, 100, "alice", "aaa");
a.merge(&b);
assert_eq!(a.value, "bob-val"); }
#[test]
fn concurrent_same_agent_higher_hash_wins() {
let (sa, sb) = make_forked_stamps(1, 1);
let mut a = reg("val-a", sa, 100, "alice", "hash-aaa");
let b = reg("val-b", sb, 100, "alice", "hash-zzz");
a.merge(&b);
assert_eq!(a.value, "val-b"); }
#[test]
fn concurrent_same_agent_lower_hash_loses() {
let (sa, sb) = make_forked_stamps(1, 1);
let mut a = reg("val-a", sa, 100, "alice", "hash-zzz");
let b = reg("val-b", sb, 100, "alice", "hash-aaa");
a.merge(&b);
assert_eq!(a.value, "val-a"); }
#[test]
fn semilattice_commutative() {
let (sa, sb) = make_forked_stamps(1, 1);
let a = reg("val-a", sa.clone(), 100, "alice", "hash-a");
let b = reg("val-b", sb.clone(), 200, "bob", "hash-b");
let mut ab = a.clone();
ab.merge(&b);
let mut ba = b.clone();
ba.merge(&a);
assert_eq!(ab, ba);
}
#[test]
fn semilattice_associative() {
let seed = Stamp::seed();
let (left, right) = seed.fork();
let (mut sa, sb) = left.fork();
let (mut sc, _) = right.fork();
sa.event();
sc.event();
let a = reg("val-a", sa, 100, "alice", "hash-a");
let b = reg("val-b", sb, 200, "bob", "hash-b");
let c = reg("val-c", sc, 150, "carol", "hash-c");
let mut left_merge = a.clone();
left_merge.merge(&b);
left_merge.merge(&c);
let mut bc = b.clone();
bc.merge(&c);
let mut right_merge = a.clone();
right_merge.merge(&bc);
assert_eq!(left_merge, right_merge);
}
#[test]
fn semilattice_idempotent_self_merge() {
let s = make_stamp(3);
let a = reg("value", s, 500, "agent", "hash-123");
let mut m = a.clone();
m.merge(&a);
assert_eq!(m, a);
}
#[test]
fn equal_stamps_are_idempotent() {
let s = make_stamp(2);
let a = reg("same", s.clone(), 100, "agent", "hash");
let mut m = a.clone();
m.merge(&a);
assert_eq!(m, a);
}
#[test]
fn identical_timestamps_different_agents() {
let (sa, sb) = make_forked_stamps(1, 1);
let a = reg("alice-val", sa.clone(), 999, "alice", "hash-same");
let b = reg("bob-val", sb.clone(), 999, "bob", "hash-same");
let mut ab = a.clone();
ab.merge(&b);
assert_eq!(ab.value, "bob-val");
let mut ba = b.clone();
ba.merge(&a);
assert_eq!(ba.value, "bob-val");
assert_eq!(ab, ba); }
#[test]
fn same_agent_concurrent_writes() {
let (sa, sb) = make_forked_stamps(1, 1);
let a = reg("write-1", sa, 100, "alice", "hash-111");
let b = reg("write-2", sb, 100, "alice", "hash-222");
let mut ab = a.clone();
ab.merge(&b);
let mut ba = b.clone();
ba.merge(&a);
assert_eq!(ab, ba); assert_eq!(ab.value, "write-2"); }
#[test]
fn display_shows_value() {
let s = make_stamp(1);
let r = reg("Hello, World!", s, 0, "agent", "hash");
assert_eq!(r.to_string(), "Hello, World!");
}
#[test]
fn serde_roundtrip() {
let s = make_stamp(2);
let r = reg("test-value", s, 42, "agent-1", "blake3:abc");
let json = serde_json::to_string(&r).unwrap();
let deserialized: LwwRegister<String> = serde_json::from_str(&json).unwrap();
assert_eq!(r, deserialized);
}
#[test]
fn numeric_value_type() {
let s = make_stamp(1);
let mut a = LwwRegister::new(42u64, s.clone(), 100, "alice".to_string(), "h1".to_string());
let s2 = make_stamp(2);
let b = LwwRegister::new(99u64, s2, 200, "bob".to_string(), "h2".to_string());
a.merge(&b);
assert_eq!(a.value, 99);
}
#[test]
fn merge_with_trace_disabled_by_default_has_no_payload() {
let s1 = make_stamp(1);
let s2 = make_stamp(2);
let mut a = reg("old", s1, 100, "alice", "aaa");
let b = reg("new", s2, 100, "alice", "bbb");
let trace = a.merge_with_trace(&b, "title");
assert_eq!(a.value, "new");
assert!(!trace.enabled);
assert_eq!(trace.step, TieBreakStep::Equal);
assert!(trace.field.is_empty());
}
#[test]
fn merge_with_trace_reports_decisive_step_when_enabled() {
if !merge_tracing_enabled() {
return;
}
let (sa, sb) = make_forked_stamps(1, 1);
let mut a = reg("alice-val", sa, 100, "alice", "aaa");
let b = reg("bob-val", sb, 200, "bob", "bbb");
let trace = a.merge_with_trace(&b, "title");
assert!(trace.enabled);
assert_eq!(trace.field, "title");
assert_eq!(trace.winner, "bob-val");
assert_eq!(trace.step, TieBreakStep::WallTimestamp);
assert!(!trace.correlation_id.is_empty());
}
#[test]
fn merge_chain_converges() {
let seed = Stamp::seed();
let (left, right) = seed.fork();
let (mut s1, mut s2) = left.fork();
let (mut s3, _) = right.fork();
s1.event();
s2.event();
s3.event();
let r1 = reg("v1", s1, 100, "alice", "h1");
let r2 = reg("v2", s2, 200, "bob", "h2");
let r3 = reg("v3", s3, 200, "carol", "h3");
let mut m1 = r1.clone();
m1.merge(&r2);
m1.merge(&r3);
let mut m2 = r3.clone();
m2.merge(&r1);
m2.merge(&r2);
let mut m3 = r2.clone();
m3.merge(&r3);
m3.merge(&r1);
assert_eq!(m1, m2);
assert_eq!(m2, m3);
}
}