Skip to main content

citadel_sync/
crdt.rs

1use crate::hlc::HlcTimestamp;
2use crate::node_id::NodeId;
3
4/// Per-entry CRDT metadata for LWW (Last-Writer-Wins) conflict resolution.
5///
6/// 20 bytes on wire: entry_kind (1B) + padding (3B) + HLC timestamp (12B) + NodeId (8B).
7///
8/// Conflict resolution: higher timestamp wins, NodeId tiebreaker.
9/// This forms a join-semilattice with a total order, guaranteeing:
10/// - Commutativity: merge(a, b) == merge(b, a)
11/// - Associativity: merge(merge(a, b), c) == merge(a, merge(b, c))
12/// - Idempotency: merge(a, a) == a
13#[derive(Clone, Copy, PartialEq, Eq, Hash)]
14pub struct CrdtMeta {
15    pub timestamp: HlcTimestamp,
16    pub node_id: NodeId,
17}
18
19/// Wire size of CrdtMeta: HLC (12B) + NodeId (8B) = 20 bytes.
20pub const CRDT_META_SIZE: usize = 20;
21
22/// Wire size of a full CRDT-encoded value header: kind (1B) + padding (3B) + meta (20B) = 24 bytes.
23pub const CRDT_HEADER_SIZE: usize = 24;
24
25/// Type of CRDT entry.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27#[repr(u8)]
28pub enum EntryKind {
29    /// Key-value write (user value follows the header).
30    Put = 0,
31    /// Logical delete (tombstone). No user value follows.
32    Tombstone = 1,
33}
34
35impl EntryKind {
36    pub fn from_u8(v: u8) -> Option<Self> {
37        match v {
38            0 => Some(Self::Put),
39            1 => Some(Self::Tombstone),
40            _ => None,
41        }
42    }
43}
44
45impl CrdtMeta {
46    /// Create new CRDT metadata.
47    #[inline]
48    pub fn new(timestamp: HlcTimestamp, node_id: NodeId) -> Self {
49        Self { timestamp, node_id }
50    }
51
52    /// Serialize to 20 bytes: HLC (12B big-endian) + NodeId (8B big-endian).
53    pub fn to_bytes(&self) -> [u8; CRDT_META_SIZE] {
54        let mut buf = [0u8; CRDT_META_SIZE];
55        let ts_bytes = self.timestamp.to_bytes();
56        let nid_bytes = self.node_id.to_bytes();
57        buf[0..12].copy_from_slice(&ts_bytes);
58        buf[12..20].copy_from_slice(&nid_bytes);
59        buf
60    }
61
62    /// Deserialize from 20 bytes.
63    pub fn from_bytes(b: &[u8; CRDT_META_SIZE]) -> Self {
64        let ts = HlcTimestamp::from_bytes(b[0..12].try_into().unwrap());
65        let nid = NodeId::from_bytes(b[12..20].try_into().unwrap());
66        Self {
67            timestamp: ts,
68            node_id: nid,
69        }
70    }
71
72    /// LWW comparison: higher timestamp wins, NodeId tiebreaker.
73    ///
74    /// This total order is the foundation of LWW conflict resolution.
75    /// If `self > other`, self is the winner.
76    #[inline]
77    pub fn lww_cmp(&self, other: &Self) -> std::cmp::Ordering {
78        self.timestamp
79            .cmp(&other.timestamp)
80            .then(self.node_id.cmp(&other.node_id))
81    }
82
83    /// Returns true if `self` wins over `other` in LWW resolution.
84    #[inline]
85    pub fn wins_over(&self, other: &Self) -> bool {
86        self.lww_cmp(other) == std::cmp::Ordering::Greater
87    }
88}
89
90impl std::fmt::Debug for CrdtMeta {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        write!(f, "CrdtMeta({:?}, {:?})", self.timestamp, self.node_id)
93    }
94}
95
96/// Encode a user value with CRDT header.
97///
98/// Format: `[entry_kind: u8][_pad: 3B][HLC: 12B][NodeId: 8B][user_value...]`
99///
100/// Total header: 24 bytes. User value follows immediately after.
101pub fn encode_lww_value(meta: &CrdtMeta, kind: EntryKind, user_value: &[u8]) -> Vec<u8> {
102    let user_len = if kind == EntryKind::Tombstone {
103        0
104    } else {
105        user_value.len()
106    };
107    let mut buf = Vec::with_capacity(CRDT_HEADER_SIZE + user_len);
108    buf.push(kind as u8);
109    buf.extend_from_slice(&[0u8; 3]); // padding
110    buf.extend_from_slice(&meta.to_bytes());
111    if kind == EntryKind::Put {
112        buf.extend_from_slice(user_value);
113    }
114    buf
115}
116
117/// Decoded CRDT value.
118#[derive(Debug)]
119pub struct DecodedValue<'a> {
120    pub meta: CrdtMeta,
121    pub kind: EntryKind,
122    pub user_value: &'a [u8],
123}
124
125/// Decode a CRDT-encoded value.
126///
127/// Returns the metadata, entry kind, and a slice to the user value.
128/// For tombstones, `user_value` is an empty slice.
129pub fn decode_lww_value(data: &[u8]) -> Result<DecodedValue<'_>, DecodeError> {
130    if data.len() < CRDT_HEADER_SIZE {
131        return Err(DecodeError::TooShort {
132            expected: CRDT_HEADER_SIZE,
133            actual: data.len(),
134        });
135    }
136
137    let kind = EntryKind::from_u8(data[0]).ok_or(DecodeError::InvalidEntryKind(data[0]))?;
138    // bytes 1..4 are padding (ignored on read)
139    let meta_bytes: &[u8; CRDT_META_SIZE] = data[4..24].try_into().unwrap();
140    let meta = CrdtMeta::from_bytes(meta_bytes);
141
142    let user_value = if kind == EntryKind::Tombstone {
143        &data[CRDT_HEADER_SIZE..CRDT_HEADER_SIZE] // empty slice
144    } else {
145        &data[CRDT_HEADER_SIZE..]
146    };
147
148    Ok(DecodedValue {
149        meta,
150        kind,
151        user_value,
152    })
153}
154
155/// Errors from CRDT value decoding.
156#[derive(Debug, thiserror::Error)]
157pub enum DecodeError {
158    #[error("CRDT value too short: expected at least {expected} bytes, got {actual}")]
159    TooShort { expected: usize, actual: usize },
160
161    #[error("invalid CRDT entry kind: {0}")]
162    InvalidEntryKind(u8),
163}
164
165/// Merge two CRDT entries for the same key.
166///
167/// Returns which side wins using LWW resolution:
168/// higher timestamp wins, NodeId tiebreaker.
169/// The entry kind (Put vs Tombstone) does NOT affect the merge —
170/// a tombstone with a higher timestamp defeats a put with a lower one.
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum MergeResult {
173    /// Keep the local entry.
174    Local,
175    /// Take the remote entry.
176    Remote,
177    /// Both entries are identical.
178    Equal,
179}
180
181/// Resolve a conflict between local and remote entries for the same key.
182///
183/// The merge function operates only on metadata — it doesn't need
184/// to know the actual values or entry kinds.
185pub fn lww_merge(local: &CrdtMeta, remote: &CrdtMeta) -> MergeResult {
186    match local.lww_cmp(remote) {
187        std::cmp::Ordering::Greater => MergeResult::Local,
188        std::cmp::Ordering::Less => MergeResult::Remote,
189        std::cmp::Ordering::Equal => MergeResult::Equal,
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use crate::hlc::HlcTimestamp;
197    use crate::node_id::NodeId;
198
199    const SECOND: i64 = 1_000_000_000;
200
201    fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
202        CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
203    }
204
205    // ── CrdtMeta basics ──────────────────────────────────────────────
206
207    #[test]
208    fn meta_new_and_accessors() {
209        let ts = HlcTimestamp::new(1000 * SECOND, 5);
210        let nid = NodeId::from_u64(42);
211        let m = CrdtMeta::new(ts, nid);
212        assert_eq!(m.timestamp, ts);
213        assert_eq!(m.node_id, nid);
214    }
215
216    #[test]
217    fn meta_bytes_roundtrip() {
218        let m = meta(1000 * SECOND, 42, 0xDEADBEEF);
219        let bytes = m.to_bytes();
220        assert_eq!(bytes.len(), CRDT_META_SIZE);
221        let m2 = CrdtMeta::from_bytes(&bytes);
222        assert_eq!(m, m2);
223    }
224
225    #[test]
226    fn meta_bytes_roundtrip_zero() {
227        let m = meta(0, 0, 0);
228        let bytes = m.to_bytes();
229        let m2 = CrdtMeta::from_bytes(&bytes);
230        assert_eq!(m, m2);
231    }
232
233    #[test]
234    fn meta_bytes_roundtrip_max() {
235        let m = meta(i64::MAX, i32::MAX, u64::MAX);
236        let bytes = m.to_bytes();
237        let m2 = CrdtMeta::from_bytes(&bytes);
238        assert_eq!(m, m2);
239    }
240
241    #[test]
242    fn meta_debug_format() {
243        let m = meta(1_000_000_000, 5, 255);
244        let s = format!("{m:?}");
245        assert!(s.contains("CrdtMeta"));
246        assert!(s.contains("HLC"));
247        assert!(s.contains("NodeId"));
248    }
249
250    // ── LWW comparison ───────────────────────────────────────────────
251
252    #[test]
253    fn lww_higher_timestamp_wins() {
254        let a = meta(1000 * SECOND, 0, 1);
255        let b = meta(1001 * SECOND, 0, 1);
256        assert!(b.wins_over(&a));
257        assert!(!a.wins_over(&b));
258    }
259
260    #[test]
261    fn lww_higher_logical_wins() {
262        let a = meta(1000 * SECOND, 5, 1);
263        let b = meta(1000 * SECOND, 6, 1);
264        assert!(b.wins_over(&a));
265        assert!(!a.wins_over(&b));
266    }
267
268    #[test]
269    fn lww_node_id_tiebreaker() {
270        let a = meta(1000 * SECOND, 5, 100);
271        let b = meta(1000 * SECOND, 5, 200);
272        assert!(b.wins_over(&a));
273        assert!(!a.wins_over(&b));
274    }
275
276    #[test]
277    fn lww_equal_entries() {
278        let a = meta(1000 * SECOND, 5, 100);
279        let b = meta(1000 * SECOND, 5, 100);
280        assert!(!a.wins_over(&b));
281        assert!(!b.wins_over(&a));
282        assert_eq!(a.lww_cmp(&b), std::cmp::Ordering::Equal);
283    }
284
285    #[test]
286    fn lww_timestamp_dominates_node_id() {
287        // Even with lower node_id, higher timestamp wins
288        let a = meta(1001 * SECOND, 0, 1);
289        let b = meta(1000 * SECOND, 0, u64::MAX);
290        assert!(a.wins_over(&b));
291    }
292
293    // ── LWW merge function ───────────────────────────────────────────
294
295    #[test]
296    fn merge_local_wins() {
297        let local = meta(1001 * SECOND, 0, 1);
298        let remote = meta(1000 * SECOND, 0, 1);
299        assert_eq!(lww_merge(&local, &remote), MergeResult::Local);
300    }
301
302    #[test]
303    fn merge_remote_wins() {
304        let local = meta(1000 * SECOND, 0, 1);
305        let remote = meta(1001 * SECOND, 0, 1);
306        assert_eq!(lww_merge(&local, &remote), MergeResult::Remote);
307    }
308
309    #[test]
310    fn merge_equal() {
311        let local = meta(1000 * SECOND, 5, 100);
312        let remote = meta(1000 * SECOND, 5, 100);
313        assert_eq!(lww_merge(&local, &remote), MergeResult::Equal);
314    }
315
316    // ── CRDT properties ──────────────────────────────────────────────
317
318    #[test]
319    fn merge_commutativity() {
320        let entries = [
321            meta(1000 * SECOND, 0, 1),
322            meta(1000 * SECOND, 0, 2),
323            meta(1001 * SECOND, 0, 1),
324            meta(1000 * SECOND, 1, 1),
325        ];
326
327        for a in &entries {
328            for b in &entries {
329                let ab = lww_merge(a, b);
330                let ba = lww_merge(b, a);
331                // Commutativity: merge(a,b) mirror equals merge(b,a)
332                match (ab, ba) {
333                    (MergeResult::Local, MergeResult::Remote) => {}
334                    (MergeResult::Remote, MergeResult::Local) => {}
335                    (MergeResult::Equal, MergeResult::Equal) => {}
336                    _ => panic!("commutativity violated for {a:?} vs {b:?}: {ab:?} vs {ba:?}"),
337                }
338            }
339        }
340    }
341
342    #[test]
343    fn merge_associativity() {
344        // For three entries, the winner should be the same regardless of merge order.
345        let a = meta(1000 * SECOND, 0, 1);
346        let b = meta(1001 * SECOND, 5, 2);
347        let c = meta(1001 * SECOND, 5, 3);
348
349        // Winner is c (same timestamp as b, higher node_id)
350        // merge(merge(a, b), c) should pick the same winner as merge(a, merge(b, c))
351
352        fn winner(local: &CrdtMeta, remote: &CrdtMeta) -> CrdtMeta {
353            match lww_merge(local, remote) {
354                MergeResult::Local | MergeResult::Equal => *local,
355                MergeResult::Remote => *remote,
356            }
357        }
358
359        let ab = winner(&a, &b);
360        let ab_c = winner(&ab, &c);
361
362        let bc = winner(&b, &c);
363        let a_bc = winner(&a, &bc);
364
365        assert_eq!(ab_c, a_bc, "associativity violated");
366    }
367
368    #[test]
369    fn merge_idempotency() {
370        let a = meta(1000 * SECOND, 5, 42);
371        assert_eq!(lww_merge(&a, &a), MergeResult::Equal);
372    }
373
374    // ── EntryKind ────────────────────────────────────────────────────
375
376    #[test]
377    fn entry_kind_roundtrip() {
378        assert_eq!(EntryKind::from_u8(0), Some(EntryKind::Put));
379        assert_eq!(EntryKind::from_u8(1), Some(EntryKind::Tombstone));
380        assert_eq!(EntryKind::from_u8(2), None);
381        assert_eq!(EntryKind::from_u8(255), None);
382    }
383
384    // ── Value encoding ───────────────────────────────────────────────
385
386    #[test]
387    fn encode_decode_put_roundtrip() {
388        let m = meta(1000 * SECOND, 5, 42);
389        let user_val = b"hello world";
390        let encoded = encode_lww_value(&m, EntryKind::Put, user_val);
391
392        assert_eq!(encoded.len(), CRDT_HEADER_SIZE + user_val.len());
393
394        let decoded = decode_lww_value(&encoded).unwrap();
395        assert_eq!(decoded.meta, m);
396        assert_eq!(decoded.kind, EntryKind::Put);
397        assert_eq!(decoded.user_value, user_val);
398    }
399
400    #[test]
401    fn encode_decode_tombstone_roundtrip() {
402        let m = meta(1000 * SECOND, 5, 42);
403        let encoded = encode_lww_value(&m, EntryKind::Tombstone, b"");
404
405        assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
406
407        let decoded = decode_lww_value(&encoded).unwrap();
408        assert_eq!(decoded.meta, m);
409        assert_eq!(decoded.kind, EntryKind::Tombstone);
410        assert_eq!(decoded.user_value.len(), 0);
411    }
412
413    #[test]
414    fn encode_tombstone_ignores_user_value() {
415        let m = meta(1000 * SECOND, 5, 42);
416        // Even if user_value is non-empty, tombstone encoding ignores it
417        let encoded = encode_lww_value(&m, EntryKind::Tombstone, b"should be ignored");
418        assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
419    }
420
421    #[test]
422    fn encode_decode_empty_value() {
423        let m = meta(1000 * SECOND, 0, 1);
424        let encoded = encode_lww_value(&m, EntryKind::Put, b"");
425
426        assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
427
428        let decoded = decode_lww_value(&encoded).unwrap();
429        assert_eq!(decoded.kind, EntryKind::Put);
430        assert_eq!(decoded.user_value.len(), 0);
431    }
432
433    #[test]
434    fn encode_decode_large_value() {
435        let m = meta(1000 * SECOND, 0, 1);
436        let user_val = vec![0xAB; 4096];
437        let encoded = encode_lww_value(&m, EntryKind::Put, &user_val);
438
439        assert_eq!(encoded.len(), CRDT_HEADER_SIZE + 4096);
440
441        let decoded = decode_lww_value(&encoded).unwrap();
442        assert_eq!(decoded.user_value, &user_val[..]);
443    }
444
445    #[test]
446    fn decode_too_short() {
447        let err = decode_lww_value(&[0u8; 10]).unwrap_err();
448        assert!(matches!(err, DecodeError::TooShort { .. }));
449    }
450
451    #[test]
452    fn decode_invalid_entry_kind() {
453        let mut data = [0u8; CRDT_HEADER_SIZE];
454        data[0] = 255; // invalid
455        let err = decode_lww_value(&data).unwrap_err();
456        assert!(matches!(err, DecodeError::InvalidEntryKind(255)));
457    }
458
459    #[test]
460    fn header_size_constant() {
461        assert_eq!(CRDT_HEADER_SIZE, 24);
462        assert_eq!(CRDT_META_SIZE, 20);
463        // 1 (kind) + 3 (pad) + 12 (HLC) + 8 (NodeId) = 24
464        assert_eq!(1 + 3 + 12 + 8, CRDT_HEADER_SIZE);
465    }
466
467    // ── Encoding preserves metadata across merge ─────────────────────
468
469    #[test]
470    fn merge_encoded_values() {
471        let local_meta = meta(1000 * SECOND, 0, 1);
472        let remote_meta = meta(1001 * SECOND, 0, 2);
473
474        let local_encoded = encode_lww_value(&local_meta, EntryKind::Put, b"local");
475        let remote_encoded = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
476
477        let local_decoded = decode_lww_value(&local_encoded).unwrap();
478        let remote_decoded = decode_lww_value(&remote_encoded).unwrap();
479
480        let result = lww_merge(&local_decoded.meta, &remote_decoded.meta);
481        assert_eq!(result, MergeResult::Remote);
482    }
483
484    #[test]
485    fn tombstone_wins_over_put_with_lower_timestamp() {
486        let put_meta = meta(1000 * SECOND, 0, 1);
487        let del_meta = meta(1001 * SECOND, 0, 1);
488
489        let put_encoded = encode_lww_value(&put_meta, EntryKind::Put, b"value");
490        let del_encoded = encode_lww_value(&del_meta, EntryKind::Tombstone, b"");
491
492        let put_decoded = decode_lww_value(&put_encoded).unwrap();
493        let del_decoded = decode_lww_value(&del_encoded).unwrap();
494
495        // Tombstone has higher timestamp — it wins
496        let result = lww_merge(&put_decoded.meta, &del_decoded.meta);
497        assert_eq!(result, MergeResult::Remote);
498        assert_eq!(del_decoded.kind, EntryKind::Tombstone);
499    }
500
501    #[test]
502    fn put_wins_over_tombstone_with_lower_timestamp() {
503        let del_meta = meta(1000 * SECOND, 0, 1);
504        let put_meta = meta(1001 * SECOND, 0, 1);
505
506        let del_encoded = encode_lww_value(&del_meta, EntryKind::Tombstone, b"");
507        let put_encoded = encode_lww_value(&put_meta, EntryKind::Put, b"value");
508
509        let del_decoded = decode_lww_value(&del_encoded).unwrap();
510        let put_decoded = decode_lww_value(&put_encoded).unwrap();
511
512        // Put has higher timestamp — it wins over the tombstone
513        let result = lww_merge(&del_decoded.meta, &put_decoded.meta);
514        assert_eq!(result, MergeResult::Remote);
515        assert_eq!(put_decoded.kind, EntryKind::Put);
516    }
517
518    // ── Binary format verification ───────────────────────────────────
519
520    #[test]
521    fn encoded_format_put() {
522        let m = CrdtMeta::new(
523            HlcTimestamp::new(0x0102_0304_0506_0708, 0x090A0B0C),
524            NodeId::from_u64(0x1112_1314_1516_1718),
525        );
526        let encoded = encode_lww_value(&m, EntryKind::Put, b"\xAA\xBB");
527
528        // kind=0, pad=[0,0,0], HLC=8B+4B, NodeId=8B, value=2B
529        assert_eq!(encoded[0], 0x00); // Put
530        assert_eq!(&encoded[1..4], &[0, 0, 0]); // padding
531                                                // HLC wall_time big-endian
532        assert_eq!(
533            &encoded[4..12],
534            &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
535        );
536        // HLC logical big-endian
537        assert_eq!(&encoded[12..16], &[0x09, 0x0A, 0x0B, 0x0C]);
538        // NodeId big-endian
539        assert_eq!(
540            &encoded[16..24],
541            &[0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18]
542        );
543        // User value
544        assert_eq!(&encoded[24..26], &[0xAA, 0xBB]);
545    }
546
547    #[test]
548    fn encoded_format_tombstone() {
549        let m = meta(1000 * SECOND, 0, 1);
550        let encoded = encode_lww_value(&m, EntryKind::Tombstone, b"");
551        assert_eq!(encoded[0], 0x01); // Tombstone
552        assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
553    }
554
555    // ── Stress: many merges ──────────────────────────────────────────
556
557    #[test]
558    fn merge_many_entries_finds_latest() {
559        let entries: Vec<CrdtMeta> = (0..100)
560            .map(|i| meta(1000 * SECOND + i as i64, 0, i as u64))
561            .collect();
562
563        let mut winner = entries[0];
564        for e in &entries[1..] {
565            if lww_merge(&winner, e) == MergeResult::Remote {
566                winner = *e;
567            }
568        }
569
570        // Last entry should win (highest timestamp and node_id)
571        assert_eq!(winner.timestamp.wall_time(), 1000 * SECOND + 99);
572        assert_eq!(winner.node_id.as_u64(), 99);
573    }
574
575    #[test]
576    fn merge_reverse_order_same_result() {
577        let entries: Vec<CrdtMeta> = (0..100)
578            .map(|i| meta(1000 * SECOND + i as i64, 0, i as u64))
579            .collect();
580
581        // Forward merge
582        let mut fwd_winner = entries[0];
583        for e in &entries[1..] {
584            if lww_merge(&fwd_winner, e) == MergeResult::Remote {
585                fwd_winner = *e;
586            }
587        }
588
589        // Reverse merge
590        let mut rev_winner = entries[99];
591        for e in entries[..99].iter().rev() {
592            if lww_merge(&rev_winner, e) == MergeResult::Remote {
593                rev_winner = *e;
594            }
595        }
596
597        assert_eq!(fwd_winner, rev_winner);
598    }
599
600    #[test]
601    fn merge_shuffled_order_same_result() {
602        use std::collections::BTreeSet;
603
604        // Create entries with different timestamps
605        let entries: Vec<CrdtMeta> = (0..50)
606            .map(|i| meta(1000 * SECOND + (i * 7 % 50) as i64, 0, i as u64))
607            .collect();
608
609        // Find absolute winner (max by lww_cmp)
610        let expected = entries.iter().max_by(|a, b| a.lww_cmp(b)).unwrap();
611
612        // Merge in original order
613        let mut winner = entries[0];
614        for e in &entries[1..] {
615            if lww_merge(&winner, e) == MergeResult::Remote {
616                winner = *e;
617            }
618        }
619
620        assert_eq!(winner, *expected);
621
622        // Merge in BTreeSet-sorted order (different from insertion order)
623        let sorted: BTreeSet<u64> = entries
624            .iter()
625            .map(|e| e.timestamp.wall_time() as u64)
626            .collect();
627        assert!(sorted.len() <= entries.len()); // some might collide, that's fine
628    }
629}