Skip to main content

bones_core/crdt/
lww.rs

1//! Last-Writer-Wins (LWW) Register CRDT.
2//!
3//! LWW Register is the CRDT for scalar fields: title, description, kind,
4//! size, urgency, parent. The merge uses a deterministic 4-step tie-breaking
5//! chain that guarantees bit-identical convergence across all replicas.
6//!
7//! # Tie-Breaking Chain
8//!
9//! Given two `LwwRegister<T>` values `a` and `b`:
10//!
11//! 1. **ITC causal dominance**: If `a.stamp.leq(&b.stamp)` and they are
12//!    not concurrent, the causally later one wins.
13//! 2. **Wall-clock timestamp**: If concurrent, higher `wall_ts` wins.
14//! 3. **Agent ID**: If wall clocks are equal, lexicographically greater
15//!    `agent_id` wins.
16//! 4. **Event hash**: If agent IDs are equal (same agent, concurrent writes),
17//!    lexicographically greater `event_hash` wins. This step guarantees
18//!    uniqueness — no ties are possible.
19
20use serde::{Deserialize, Serialize};
21use std::fmt;
22
23use crate::clock::itc::Stamp;
24use crate::crdt::trace::{MergeTrace, TieBreakStep, merge_tracing_enabled};
25use tracing::debug;
26
27// ---------------------------------------------------------------------------
28// LwwRegister
29// ---------------------------------------------------------------------------
30
31/// A Last-Writer-Wins register holding a value of type `T`.
32///
33/// Each write records the value along with metadata used for deterministic
34/// merge: an ITC stamp for causal ordering, a wall-clock timestamp, the
35/// writing agent's ID, and the event hash.
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct LwwRegister<T> {
38    /// The current value of the register.
39    pub value: T,
40    /// ITC stamp for causal ordering.
41    pub stamp: Stamp,
42    /// Wall-clock timestamp in microseconds since Unix epoch.
43    pub wall_ts: u64,
44    /// Agent identifier (e.g., "alice", "bot-1").
45    pub agent_id: String,
46    /// BLAKE3 hash of the event that wrote this value.
47    pub event_hash: String,
48}
49
50impl<T> LwwRegister<T> {
51    /// Create a new LWW register with the given value and metadata.
52    pub const fn new(
53        value: T,
54        stamp: Stamp,
55        wall_ts: u64,
56        agent_id: String,
57        event_hash: String,
58    ) -> Self {
59        Self {
60            value,
61            stamp,
62            wall_ts,
63            agent_id,
64            event_hash,
65        }
66    }
67}
68
69impl<T: Clone> LwwRegister<T> {
70    /// Merge another register into this one, keeping the "winning" value.
71    ///
72    /// The 4-step tie-breaking chain:
73    /// 1. ITC causal dominance (non-concurrent: later wins)
74    /// 2. Wall-clock timestamp (concurrent: higher wins)
75    /// 3. Agent ID (lexicographic: greater wins)
76    /// 4. Event hash (lexicographic: greater wins — guaranteed unique)
77    ///
78    /// After merge, `self` contains the winning value.
79    pub fn merge(&mut self, other: &Self) {
80        if self.wins_over(other) {
81            // Keep self
82        } else {
83            self.value = other.value.clone();
84            self.stamp = other.stamp.clone();
85            self.wall_ts = other.wall_ts;
86            self.agent_id.clone_from(&other.agent_id);
87            self.event_hash.clone_from(&other.event_hash);
88        }
89    }
90
91    /// Merge and optionally emit structured decision trace.
92    ///
93    /// If tracing is disabled via environment toggles, returns a no-op trace
94    /// payload and preserves the normal low-overhead merge path.
95    pub fn merge_with_trace(&mut self, other: &Self, field: &str) -> MergeTrace
96    where
97        T: fmt::Display,
98    {
99        let (self_wins, step) = self.compare(other);
100
101        let trace = if merge_tracing_enabled() {
102            let winner = if self_wins {
103                self.value.to_string()
104            } else {
105                other.value.to_string()
106            };
107
108            let trace = MergeTrace {
109                field: field.to_string(),
110                values: (self.value.to_string(), other.value.to_string()),
111                winner,
112                step,
113                correlation_id: format!("{}..{}", self.event_hash, other.event_hash),
114                enabled: true,
115            };
116
117            debug!(
118                target: "bones_core::crdt::merge_trace",
119                field = trace.field,
120                winner = trace.winner,
121                step = ?trace.step,
122                correlation_id = trace.correlation_id,
123                "LWW merge decision"
124            );
125
126            trace
127        } else {
128            MergeTrace::disabled()
129        };
130
131        if !self_wins {
132            self.value = other.value.clone();
133            self.stamp = other.stamp.clone();
134            self.wall_ts = other.wall_ts;
135            self.agent_id.clone_from(&other.agent_id);
136            self.event_hash.clone_from(&other.event_hash);
137        }
138
139        trace
140    }
141
142    /// Returns `true` if `self` wins over `other` in the tie-breaking chain.
143    fn wins_over(&self, other: &Self) -> bool {
144        self.compare(other).0
145    }
146
147    fn compare(&self, other: &Self) -> (bool, TieBreakStep) {
148        // Step 1: ITC causal dominance
149        let self_leq_other = self.stamp.leq(&other.stamp);
150        let other_leq_self = other.stamp.leq(&self.stamp);
151
152        match (self_leq_other, other_leq_self) {
153            (true, false) => {
154                // other causally dominates self → other wins
155                return (false, TieBreakStep::ItcCausal);
156            }
157            (false, true) => {
158                // self causally dominates other → self wins
159                return (true, TieBreakStep::ItcCausal);
160            }
161            (true, true) | (false, false) => {
162                // Either equal (both leq each other) or concurrent.
163                // Fall through to tie-breaking to ensure convergence.
164            }
165        }
166
167        // Step 2: Wall-clock timestamp (higher wins)
168        match self.wall_ts.cmp(&other.wall_ts) {
169            std::cmp::Ordering::Greater => return (true, TieBreakStep::WallTimestamp),
170            std::cmp::Ordering::Less => return (false, TieBreakStep::WallTimestamp),
171            std::cmp::Ordering::Equal => {}
172        }
173
174        // Step 3: Agent ID (lexicographically greater wins)
175        match self.agent_id.cmp(&other.agent_id) {
176            std::cmp::Ordering::Greater => return (true, TieBreakStep::AgentId),
177            std::cmp::Ordering::Less => return (false, TieBreakStep::AgentId),
178            std::cmp::Ordering::Equal => {}
179        }
180
181        // Step 4: Event hash (lexicographically greater wins — guaranteed unique)
182        (self.event_hash >= other.event_hash, TieBreakStep::EventHash)
183    }
184}
185
186impl<T: fmt::Display> fmt::Display for LwwRegister<T> {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        write!(f, "{}", self.value)
189    }
190}
191
192// ---------------------------------------------------------------------------
193// Tests
194// ---------------------------------------------------------------------------
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::clock::itc::Stamp;
200
201    /// Helper: create a stamp with a specific event counter (seed identity).
202    fn make_stamp(counter: u64) -> Stamp {
203        let mut s = Stamp::seed();
204        for _ in 0..counter {
205            s.event();
206        }
207        s
208    }
209
210    /// Helper: create a stamp from a fork (anonymous identity, specific event).
211    fn make_forked_stamps(counter_a: u64, counter_b: u64) -> (Stamp, Stamp) {
212        let seed = Stamp::seed();
213        let (mut a, mut b) = seed.fork();
214        for _ in 0..counter_a {
215            a.event();
216        }
217        for _ in 0..counter_b {
218            b.event();
219        }
220        (a, b)
221    }
222
223    fn reg(
224        value: &str,
225        stamp: Stamp,
226        wall_ts: u64,
227        agent: &str,
228        hash: &str,
229    ) -> LwwRegister<String> {
230        LwwRegister::new(
231            value.to_string(),
232            stamp,
233            wall_ts,
234            agent.to_string(),
235            hash.to_string(),
236        )
237    }
238
239    // === Step 1: ITC causal dominance ===
240
241    #[test]
242    fn causal_later_wins() {
243        let s1 = make_stamp(1);
244        let s2 = make_stamp(2);
245        // s1 is causally before s2 (same lineage, s2 has more events)
246        assert!(s1.leq(&s2));
247        assert!(!s2.leq(&s1));
248
249        let mut a = reg("old", s1, 100, "alice", "aaa");
250        let b = reg("new", s2, 100, "alice", "aaa");
251        a.merge(&b);
252        assert_eq!(a.value, "new");
253    }
254
255    #[test]
256    fn causal_earlier_loses() {
257        let s1 = make_stamp(1);
258        let s2 = make_stamp(2);
259
260        let mut a = reg("new", s2, 100, "alice", "aaa");
261        let b = reg("old", s1, 100, "alice", "aaa");
262        a.merge(&b);
263        assert_eq!(a.value, "new"); // a (later) wins
264    }
265
266    // === Step 2: Concurrent, wall_ts tie-break ===
267
268    #[test]
269    fn concurrent_higher_wall_ts_wins() {
270        let (sa, sb) = make_forked_stamps(1, 1);
271        // sa and sb are concurrent (forked, both have events)
272        assert!(sa.concurrent(&sb));
273
274        let mut a = reg("alice-val", sa, 200, "alice", "aaa");
275        let b = reg("bob-val", sb, 300, "bob", "bbb");
276        a.merge(&b);
277        assert_eq!(a.value, "bob-val"); // higher wall_ts wins
278    }
279
280    #[test]
281    fn concurrent_lower_wall_ts_loses() {
282        let (sa, sb) = make_forked_stamps(1, 1);
283
284        let mut a = reg("alice-val", sa, 300, "alice", "aaa");
285        let b = reg("bob-val", sb, 200, "bob", "bbb");
286        a.merge(&b);
287        assert_eq!(a.value, "alice-val"); // a has higher wall_ts
288    }
289
290    // === Step 3: Concurrent, same wall_ts, agent_id tie-break ===
291
292    #[test]
293    fn concurrent_same_ts_higher_agent_wins() {
294        let (sa, sb) = make_forked_stamps(1, 1);
295
296        let mut a = reg("alice-val", sa, 100, "alice", "aaa");
297        let b = reg("bob-val", sb, 100, "bob", "bbb");
298        a.merge(&b);
299        assert_eq!(a.value, "bob-val"); // "bob" > "alice" lexicographically
300    }
301
302    #[test]
303    fn concurrent_same_ts_lower_agent_loses() {
304        let (sa, sb) = make_forked_stamps(1, 1);
305
306        let mut a = reg("bob-val", sa, 100, "bob", "bbb");
307        let b = reg("alice-val", sb, 100, "alice", "aaa");
308        a.merge(&b);
309        assert_eq!(a.value, "bob-val"); // "bob" > "alice"
310    }
311
312    // === Step 4: Concurrent, same ts, same agent, event_hash tie-break ===
313
314    #[test]
315    fn concurrent_same_agent_higher_hash_wins() {
316        let (sa, sb) = make_forked_stamps(1, 1);
317
318        let mut a = reg("val-a", sa, 100, "alice", "hash-aaa");
319        let b = reg("val-b", sb, 100, "alice", "hash-zzz");
320        a.merge(&b);
321        assert_eq!(a.value, "val-b"); // "hash-zzz" > "hash-aaa"
322    }
323
324    #[test]
325    fn concurrent_same_agent_lower_hash_loses() {
326        let (sa, sb) = make_forked_stamps(1, 1);
327
328        let mut a = reg("val-a", sa, 100, "alice", "hash-zzz");
329        let b = reg("val-b", sb, 100, "alice", "hash-aaa");
330        a.merge(&b);
331        assert_eq!(a.value, "val-a"); // "hash-zzz" > "hash-aaa"
332    }
333
334    // === Semilattice properties ===
335
336    #[test]
337    fn semilattice_commutative() {
338        let (sa, sb) = make_forked_stamps(1, 1);
339
340        let a = reg("val-a", sa.clone(), 100, "alice", "hash-a");
341        let b = reg("val-b", sb.clone(), 200, "bob", "hash-b");
342
343        let mut ab = a.clone();
344        ab.merge(&b);
345
346        let mut ba = b.clone();
347        ba.merge(&a);
348
349        assert_eq!(ab, ba);
350    }
351
352    #[test]
353    fn semilattice_associative() {
354        let seed = Stamp::seed();
355        let (left, right) = seed.fork();
356        let (mut sa, sb) = left.fork();
357        let (mut sc, _) = right.fork();
358        sa.event();
359        // sb stays as is (concurrent with sa)
360        sc.event();
361
362        let a = reg("val-a", sa, 100, "alice", "hash-a");
363        let b = reg("val-b", sb, 200, "bob", "hash-b");
364        let c = reg("val-c", sc, 150, "carol", "hash-c");
365
366        // (a merge b) merge c
367        let mut left_merge = a.clone();
368        left_merge.merge(&b);
369        left_merge.merge(&c);
370
371        // a merge (b merge c)
372        let mut bc = b.clone();
373        bc.merge(&c);
374        let mut right_merge = a.clone();
375        right_merge.merge(&bc);
376
377        assert_eq!(left_merge, right_merge);
378    }
379
380    #[test]
381    fn semilattice_idempotent_self_merge() {
382        let s = make_stamp(3);
383        let a = reg("value", s, 500, "agent", "hash-123");
384        let mut m = a.clone();
385        m.merge(&a);
386        assert_eq!(m, a);
387    }
388
389    // === Edge cases ===
390
391    #[test]
392    fn equal_stamps_are_idempotent() {
393        // Two registers with identical stamps (both leq each other)
394        let s = make_stamp(2);
395        let a = reg("same", s.clone(), 100, "agent", "hash");
396        let mut m = a.clone();
397        m.merge(&a);
398        assert_eq!(m, a);
399    }
400
401    #[test]
402    fn identical_timestamps_different_agents() {
403        let (sa, sb) = make_forked_stamps(1, 1);
404
405        let a = reg("alice-val", sa.clone(), 999, "alice", "hash-same");
406        let b = reg("bob-val", sb.clone(), 999, "bob", "hash-same");
407
408        let mut ab = a.clone();
409        ab.merge(&b);
410        assert_eq!(ab.value, "bob-val"); // "bob" > "alice"
411
412        let mut ba = b.clone();
413        ba.merge(&a);
414        assert_eq!(ba.value, "bob-val");
415
416        assert_eq!(ab, ba); // commutative
417    }
418
419    #[test]
420    fn same_agent_concurrent_writes() {
421        // Same agent can have concurrent writes if forked
422        let (sa, sb) = make_forked_stamps(1, 1);
423
424        let a = reg("write-1", sa, 100, "alice", "hash-111");
425        let b = reg("write-2", sb, 100, "alice", "hash-222");
426
427        let mut ab = a.clone();
428        ab.merge(&b);
429
430        let mut ba = b.clone();
431        ba.merge(&a);
432
433        assert_eq!(ab, ba); // commutative
434        assert_eq!(ab.value, "write-2"); // "hash-222" > "hash-111"
435    }
436
437    #[test]
438    fn display_shows_value() {
439        let s = make_stamp(1);
440        let r = reg("Hello, World!", s, 0, "agent", "hash");
441        assert_eq!(r.to_string(), "Hello, World!");
442    }
443
444    #[test]
445    fn serde_roundtrip() {
446        let s = make_stamp(2);
447        let r = reg("test-value", s, 42, "agent-1", "blake3:abc");
448        let json = serde_json::to_string(&r).unwrap();
449        let deserialized: LwwRegister<String> = serde_json::from_str(&json).unwrap();
450        assert_eq!(r, deserialized);
451    }
452
453    #[test]
454    fn numeric_value_type() {
455        let s = make_stamp(1);
456        let mut a = LwwRegister::new(42u64, s.clone(), 100, "alice".to_string(), "h1".to_string());
457        let s2 = make_stamp(2);
458        let b = LwwRegister::new(99u64, s2, 200, "bob".to_string(), "h2".to_string());
459        a.merge(&b);
460        assert_eq!(a.value, 99);
461    }
462
463    #[test]
464    fn merge_with_trace_disabled_by_default_has_no_payload() {
465        let s1 = make_stamp(1);
466        let s2 = make_stamp(2);
467
468        let mut a = reg("old", s1, 100, "alice", "aaa");
469        let b = reg("new", s2, 100, "alice", "bbb");
470
471        let trace = a.merge_with_trace(&b, "title");
472        assert_eq!(a.value, "new");
473        assert!(!trace.enabled);
474        assert_eq!(trace.step, TieBreakStep::Equal);
475        assert!(trace.field.is_empty());
476    }
477
478    #[test]
479    fn merge_with_trace_reports_decisive_step_when_enabled() {
480        if !merge_tracing_enabled() {
481            return;
482        }
483
484        let (sa, sb) = make_forked_stamps(1, 1);
485        let mut a = reg("alice-val", sa, 100, "alice", "aaa");
486        let b = reg("bob-val", sb, 200, "bob", "bbb");
487
488        let trace = a.merge_with_trace(&b, "title");
489        assert!(trace.enabled);
490        assert_eq!(trace.field, "title");
491        assert_eq!(trace.winner, "bob-val");
492        assert_eq!(trace.step, TieBreakStep::WallTimestamp);
493        assert!(!trace.correlation_id.is_empty());
494    }
495
496    #[test]
497    fn merge_chain_converges() {
498        // Multiple agents writing concurrently, all merge in different orders
499        let seed = Stamp::seed();
500        let (left, right) = seed.fork();
501        let (mut s1, mut s2) = left.fork();
502        let (mut s3, _) = right.fork();
503        s1.event();
504        s2.event();
505        s3.event();
506
507        let r1 = reg("v1", s1, 100, "alice", "h1");
508        let r2 = reg("v2", s2, 200, "bob", "h2");
509        let r3 = reg("v3", s3, 200, "carol", "h3");
510
511        // Order 1: r1, r2, r3
512        let mut m1 = r1.clone();
513        m1.merge(&r2);
514        m1.merge(&r3);
515
516        // Order 2: r3, r1, r2
517        let mut m2 = r3.clone();
518        m2.merge(&r1);
519        m2.merge(&r2);
520
521        // Order 3: r2, r3, r1
522        let mut m3 = r2.clone();
523        m3.merge(&r3);
524        m3.merge(&r1);
525
526        assert_eq!(m1, m2);
527        assert_eq!(m2, m3);
528    }
529}