1use crate::hlc::HlcTimestamp;
2use crate::node_id::NodeId;
3
4#[derive(Clone, Copy, PartialEq, Eq, Hash)]
14pub struct CrdtMeta {
15 pub timestamp: HlcTimestamp,
16 pub node_id: NodeId,
17}
18
19pub const CRDT_META_SIZE: usize = 20;
21
22pub const CRDT_HEADER_SIZE: usize = 24;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27#[repr(u8)]
28pub enum EntryKind {
29 Put = 0,
31 Tombstone = 1,
33}
34
35impl EntryKind {
36 pub fn from_u8(v: u8) -> Option<Self> {
37 match v {
38 0 => Some(Self::Put),
39 1 => Some(Self::Tombstone),
40 _ => None,
41 }
42 }
43}
44
45impl CrdtMeta {
46 #[inline]
48 pub fn new(timestamp: HlcTimestamp, node_id: NodeId) -> Self {
49 Self { timestamp, node_id }
50 }
51
52 pub fn to_bytes(&self) -> [u8; CRDT_META_SIZE] {
54 let mut buf = [0u8; CRDT_META_SIZE];
55 let ts_bytes = self.timestamp.to_bytes();
56 let nid_bytes = self.node_id.to_bytes();
57 buf[0..12].copy_from_slice(&ts_bytes);
58 buf[12..20].copy_from_slice(&nid_bytes);
59 buf
60 }
61
62 pub fn from_bytes(b: &[u8; CRDT_META_SIZE]) -> Self {
64 let ts = HlcTimestamp::from_bytes(b[0..12].try_into().unwrap());
65 let nid = NodeId::from_bytes(b[12..20].try_into().unwrap());
66 Self {
67 timestamp: ts,
68 node_id: nid,
69 }
70 }
71
72 #[inline]
77 pub fn lww_cmp(&self, other: &Self) -> std::cmp::Ordering {
78 self.timestamp
79 .cmp(&other.timestamp)
80 .then(self.node_id.cmp(&other.node_id))
81 }
82
83 #[inline]
85 pub fn wins_over(&self, other: &Self) -> bool {
86 self.lww_cmp(other) == std::cmp::Ordering::Greater
87 }
88}
89
90impl std::fmt::Debug for CrdtMeta {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 write!(f, "CrdtMeta({:?}, {:?})", self.timestamp, self.node_id)
93 }
94}
95
96pub fn encode_lww_value(meta: &CrdtMeta, kind: EntryKind, user_value: &[u8]) -> Vec<u8> {
102 let user_len = if kind == EntryKind::Tombstone {
103 0
104 } else {
105 user_value.len()
106 };
107 let mut buf = Vec::with_capacity(CRDT_HEADER_SIZE + user_len);
108 buf.push(kind as u8);
109 buf.extend_from_slice(&[0u8; 3]); buf.extend_from_slice(&meta.to_bytes());
111 if kind == EntryKind::Put {
112 buf.extend_from_slice(user_value);
113 }
114 buf
115}
116
117#[derive(Debug)]
119pub struct DecodedValue<'a> {
120 pub meta: CrdtMeta,
121 pub kind: EntryKind,
122 pub user_value: &'a [u8],
123}
124
125pub fn decode_lww_value(data: &[u8]) -> Result<DecodedValue<'_>, DecodeError> {
130 if data.len() < CRDT_HEADER_SIZE {
131 return Err(DecodeError::TooShort {
132 expected: CRDT_HEADER_SIZE,
133 actual: data.len(),
134 });
135 }
136
137 let kind = EntryKind::from_u8(data[0]).ok_or(DecodeError::InvalidEntryKind(data[0]))?;
138 let meta_bytes: &[u8; CRDT_META_SIZE] = data[4..24].try_into().unwrap();
140 let meta = CrdtMeta::from_bytes(meta_bytes);
141
142 let user_value = if kind == EntryKind::Tombstone {
143 &data[CRDT_HEADER_SIZE..CRDT_HEADER_SIZE] } else {
145 &data[CRDT_HEADER_SIZE..]
146 };
147
148 Ok(DecodedValue {
149 meta,
150 kind,
151 user_value,
152 })
153}
154
155#[derive(Debug, thiserror::Error)]
157pub enum DecodeError {
158 #[error("CRDT value too short: expected at least {expected} bytes, got {actual}")]
159 TooShort { expected: usize, actual: usize },
160
161 #[error("invalid CRDT entry kind: {0}")]
162 InvalidEntryKind(u8),
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum MergeResult {
173 Local,
175 Remote,
177 Equal,
179}
180
181pub fn lww_merge(local: &CrdtMeta, remote: &CrdtMeta) -> MergeResult {
186 match local.lww_cmp(remote) {
187 std::cmp::Ordering::Greater => MergeResult::Local,
188 std::cmp::Ordering::Less => MergeResult::Remote,
189 std::cmp::Ordering::Equal => MergeResult::Equal,
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use crate::hlc::HlcTimestamp;
197 use crate::node_id::NodeId;
198
199 const SECOND: i64 = 1_000_000_000;
200
201 fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
202 CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
203 }
204
205 #[test]
208 fn meta_new_and_accessors() {
209 let ts = HlcTimestamp::new(1000 * SECOND, 5);
210 let nid = NodeId::from_u64(42);
211 let m = CrdtMeta::new(ts, nid);
212 assert_eq!(m.timestamp, ts);
213 assert_eq!(m.node_id, nid);
214 }
215
216 #[test]
217 fn meta_bytes_roundtrip() {
218 let m = meta(1000 * SECOND, 42, 0xDEADBEEF);
219 let bytes = m.to_bytes();
220 assert_eq!(bytes.len(), CRDT_META_SIZE);
221 let m2 = CrdtMeta::from_bytes(&bytes);
222 assert_eq!(m, m2);
223 }
224
225 #[test]
226 fn meta_bytes_roundtrip_zero() {
227 let m = meta(0, 0, 0);
228 let bytes = m.to_bytes();
229 let m2 = CrdtMeta::from_bytes(&bytes);
230 assert_eq!(m, m2);
231 }
232
233 #[test]
234 fn meta_bytes_roundtrip_max() {
235 let m = meta(i64::MAX, i32::MAX, u64::MAX);
236 let bytes = m.to_bytes();
237 let m2 = CrdtMeta::from_bytes(&bytes);
238 assert_eq!(m, m2);
239 }
240
241 #[test]
242 fn meta_debug_format() {
243 let m = meta(1_000_000_000, 5, 255);
244 let s = format!("{m:?}");
245 assert!(s.contains("CrdtMeta"));
246 assert!(s.contains("HLC"));
247 assert!(s.contains("NodeId"));
248 }
249
250 #[test]
253 fn lww_higher_timestamp_wins() {
254 let a = meta(1000 * SECOND, 0, 1);
255 let b = meta(1001 * SECOND, 0, 1);
256 assert!(b.wins_over(&a));
257 assert!(!a.wins_over(&b));
258 }
259
260 #[test]
261 fn lww_higher_logical_wins() {
262 let a = meta(1000 * SECOND, 5, 1);
263 let b = meta(1000 * SECOND, 6, 1);
264 assert!(b.wins_over(&a));
265 assert!(!a.wins_over(&b));
266 }
267
268 #[test]
269 fn lww_node_id_tiebreaker() {
270 let a = meta(1000 * SECOND, 5, 100);
271 let b = meta(1000 * SECOND, 5, 200);
272 assert!(b.wins_over(&a));
273 assert!(!a.wins_over(&b));
274 }
275
276 #[test]
277 fn lww_equal_entries() {
278 let a = meta(1000 * SECOND, 5, 100);
279 let b = meta(1000 * SECOND, 5, 100);
280 assert!(!a.wins_over(&b));
281 assert!(!b.wins_over(&a));
282 assert_eq!(a.lww_cmp(&b), std::cmp::Ordering::Equal);
283 }
284
285 #[test]
286 fn lww_timestamp_dominates_node_id() {
287 let a = meta(1001 * SECOND, 0, 1);
289 let b = meta(1000 * SECOND, 0, u64::MAX);
290 assert!(a.wins_over(&b));
291 }
292
293 #[test]
296 fn merge_local_wins() {
297 let local = meta(1001 * SECOND, 0, 1);
298 let remote = meta(1000 * SECOND, 0, 1);
299 assert_eq!(lww_merge(&local, &remote), MergeResult::Local);
300 }
301
302 #[test]
303 fn merge_remote_wins() {
304 let local = meta(1000 * SECOND, 0, 1);
305 let remote = meta(1001 * SECOND, 0, 1);
306 assert_eq!(lww_merge(&local, &remote), MergeResult::Remote);
307 }
308
309 #[test]
310 fn merge_equal() {
311 let local = meta(1000 * SECOND, 5, 100);
312 let remote = meta(1000 * SECOND, 5, 100);
313 assert_eq!(lww_merge(&local, &remote), MergeResult::Equal);
314 }
315
316 #[test]
319 fn merge_commutativity() {
320 let entries = [
321 meta(1000 * SECOND, 0, 1),
322 meta(1000 * SECOND, 0, 2),
323 meta(1001 * SECOND, 0, 1),
324 meta(1000 * SECOND, 1, 1),
325 ];
326
327 for a in &entries {
328 for b in &entries {
329 let ab = lww_merge(a, b);
330 let ba = lww_merge(b, a);
331 match (ab, ba) {
333 (MergeResult::Local, MergeResult::Remote) => {}
334 (MergeResult::Remote, MergeResult::Local) => {}
335 (MergeResult::Equal, MergeResult::Equal) => {}
336 _ => panic!("commutativity violated for {a:?} vs {b:?}: {ab:?} vs {ba:?}"),
337 }
338 }
339 }
340 }
341
342 #[test]
343 fn merge_associativity() {
344 let a = meta(1000 * SECOND, 0, 1);
346 let b = meta(1001 * SECOND, 5, 2);
347 let c = meta(1001 * SECOND, 5, 3);
348
349 fn winner(local: &CrdtMeta, remote: &CrdtMeta) -> CrdtMeta {
353 match lww_merge(local, remote) {
354 MergeResult::Local | MergeResult::Equal => *local,
355 MergeResult::Remote => *remote,
356 }
357 }
358
359 let ab = winner(&a, &b);
360 let ab_c = winner(&ab, &c);
361
362 let bc = winner(&b, &c);
363 let a_bc = winner(&a, &bc);
364
365 assert_eq!(ab_c, a_bc, "associativity violated");
366 }
367
368 #[test]
369 fn merge_idempotency() {
370 let a = meta(1000 * SECOND, 5, 42);
371 assert_eq!(lww_merge(&a, &a), MergeResult::Equal);
372 }
373
374 #[test]
377 fn entry_kind_roundtrip() {
378 assert_eq!(EntryKind::from_u8(0), Some(EntryKind::Put));
379 assert_eq!(EntryKind::from_u8(1), Some(EntryKind::Tombstone));
380 assert_eq!(EntryKind::from_u8(2), None);
381 assert_eq!(EntryKind::from_u8(255), None);
382 }
383
384 #[test]
387 fn encode_decode_put_roundtrip() {
388 let m = meta(1000 * SECOND, 5, 42);
389 let user_val = b"hello world";
390 let encoded = encode_lww_value(&m, EntryKind::Put, user_val);
391
392 assert_eq!(encoded.len(), CRDT_HEADER_SIZE + user_val.len());
393
394 let decoded = decode_lww_value(&encoded).unwrap();
395 assert_eq!(decoded.meta, m);
396 assert_eq!(decoded.kind, EntryKind::Put);
397 assert_eq!(decoded.user_value, user_val);
398 }
399
400 #[test]
401 fn encode_decode_tombstone_roundtrip() {
402 let m = meta(1000 * SECOND, 5, 42);
403 let encoded = encode_lww_value(&m, EntryKind::Tombstone, b"");
404
405 assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
406
407 let decoded = decode_lww_value(&encoded).unwrap();
408 assert_eq!(decoded.meta, m);
409 assert_eq!(decoded.kind, EntryKind::Tombstone);
410 assert_eq!(decoded.user_value.len(), 0);
411 }
412
413 #[test]
414 fn encode_tombstone_ignores_user_value() {
415 let m = meta(1000 * SECOND, 5, 42);
416 let encoded = encode_lww_value(&m, EntryKind::Tombstone, b"should be ignored");
418 assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
419 }
420
421 #[test]
422 fn encode_decode_empty_value() {
423 let m = meta(1000 * SECOND, 0, 1);
424 let encoded = encode_lww_value(&m, EntryKind::Put, b"");
425
426 assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
427
428 let decoded = decode_lww_value(&encoded).unwrap();
429 assert_eq!(decoded.kind, EntryKind::Put);
430 assert_eq!(decoded.user_value.len(), 0);
431 }
432
433 #[test]
434 fn encode_decode_large_value() {
435 let m = meta(1000 * SECOND, 0, 1);
436 let user_val = vec![0xAB; 4096];
437 let encoded = encode_lww_value(&m, EntryKind::Put, &user_val);
438
439 assert_eq!(encoded.len(), CRDT_HEADER_SIZE + 4096);
440
441 let decoded = decode_lww_value(&encoded).unwrap();
442 assert_eq!(decoded.user_value, &user_val[..]);
443 }
444
445 #[test]
446 fn decode_too_short() {
447 let err = decode_lww_value(&[0u8; 10]).unwrap_err();
448 assert!(matches!(err, DecodeError::TooShort { .. }));
449 }
450
451 #[test]
452 fn decode_invalid_entry_kind() {
453 let mut data = [0u8; CRDT_HEADER_SIZE];
454 data[0] = 255; let err = decode_lww_value(&data).unwrap_err();
456 assert!(matches!(err, DecodeError::InvalidEntryKind(255)));
457 }
458
459 #[test]
460 fn header_size_constant() {
461 assert_eq!(CRDT_HEADER_SIZE, 24);
462 assert_eq!(CRDT_META_SIZE, 20);
463 assert_eq!(1 + 3 + 12 + 8, CRDT_HEADER_SIZE);
465 }
466
467 #[test]
470 fn merge_encoded_values() {
471 let local_meta = meta(1000 * SECOND, 0, 1);
472 let remote_meta = meta(1001 * SECOND, 0, 2);
473
474 let local_encoded = encode_lww_value(&local_meta, EntryKind::Put, b"local");
475 let remote_encoded = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
476
477 let local_decoded = decode_lww_value(&local_encoded).unwrap();
478 let remote_decoded = decode_lww_value(&remote_encoded).unwrap();
479
480 let result = lww_merge(&local_decoded.meta, &remote_decoded.meta);
481 assert_eq!(result, MergeResult::Remote);
482 }
483
484 #[test]
485 fn tombstone_wins_over_put_with_lower_timestamp() {
486 let put_meta = meta(1000 * SECOND, 0, 1);
487 let del_meta = meta(1001 * SECOND, 0, 1);
488
489 let put_encoded = encode_lww_value(&put_meta, EntryKind::Put, b"value");
490 let del_encoded = encode_lww_value(&del_meta, EntryKind::Tombstone, b"");
491
492 let put_decoded = decode_lww_value(&put_encoded).unwrap();
493 let del_decoded = decode_lww_value(&del_encoded).unwrap();
494
495 let result = lww_merge(&put_decoded.meta, &del_decoded.meta);
497 assert_eq!(result, MergeResult::Remote);
498 assert_eq!(del_decoded.kind, EntryKind::Tombstone);
499 }
500
501 #[test]
502 fn put_wins_over_tombstone_with_lower_timestamp() {
503 let del_meta = meta(1000 * SECOND, 0, 1);
504 let put_meta = meta(1001 * SECOND, 0, 1);
505
506 let del_encoded = encode_lww_value(&del_meta, EntryKind::Tombstone, b"");
507 let put_encoded = encode_lww_value(&put_meta, EntryKind::Put, b"value");
508
509 let del_decoded = decode_lww_value(&del_encoded).unwrap();
510 let put_decoded = decode_lww_value(&put_encoded).unwrap();
511
512 let result = lww_merge(&del_decoded.meta, &put_decoded.meta);
514 assert_eq!(result, MergeResult::Remote);
515 assert_eq!(put_decoded.kind, EntryKind::Put);
516 }
517
518 #[test]
521 fn encoded_format_put() {
522 let m = CrdtMeta::new(
523 HlcTimestamp::new(0x0102_0304_0506_0708, 0x090A0B0C),
524 NodeId::from_u64(0x1112_1314_1516_1718),
525 );
526 let encoded = encode_lww_value(&m, EntryKind::Put, b"\xAA\xBB");
527
528 assert_eq!(encoded[0], 0x00); assert_eq!(&encoded[1..4], &[0, 0, 0]); assert_eq!(
533 &encoded[4..12],
534 &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
535 );
536 assert_eq!(&encoded[12..16], &[0x09, 0x0A, 0x0B, 0x0C]);
538 assert_eq!(
540 &encoded[16..24],
541 &[0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18]
542 );
543 assert_eq!(&encoded[24..26], &[0xAA, 0xBB]);
545 }
546
547 #[test]
548 fn encoded_format_tombstone() {
549 let m = meta(1000 * SECOND, 0, 1);
550 let encoded = encode_lww_value(&m, EntryKind::Tombstone, b"");
551 assert_eq!(encoded[0], 0x01); assert_eq!(encoded.len(), CRDT_HEADER_SIZE);
553 }
554
555 #[test]
558 fn merge_many_entries_finds_latest() {
559 let entries: Vec<CrdtMeta> = (0..100)
560 .map(|i| meta(1000 * SECOND + i as i64, 0, i as u64))
561 .collect();
562
563 let mut winner = entries[0];
564 for e in &entries[1..] {
565 if lww_merge(&winner, e) == MergeResult::Remote {
566 winner = *e;
567 }
568 }
569
570 assert_eq!(winner.timestamp.wall_time(), 1000 * SECOND + 99);
572 assert_eq!(winner.node_id.as_u64(), 99);
573 }
574
575 #[test]
576 fn merge_reverse_order_same_result() {
577 let entries: Vec<CrdtMeta> = (0..100)
578 .map(|i| meta(1000 * SECOND + i as i64, 0, i as u64))
579 .collect();
580
581 let mut fwd_winner = entries[0];
583 for e in &entries[1..] {
584 if lww_merge(&fwd_winner, e) == MergeResult::Remote {
585 fwd_winner = *e;
586 }
587 }
588
589 let mut rev_winner = entries[99];
591 for e in entries[..99].iter().rev() {
592 if lww_merge(&rev_winner, e) == MergeResult::Remote {
593 rev_winner = *e;
594 }
595 }
596
597 assert_eq!(fwd_winner, rev_winner);
598 }
599
600 #[test]
601 fn merge_shuffled_order_same_result() {
602 use std::collections::BTreeSet;
603
604 let entries: Vec<CrdtMeta> = (0..50)
606 .map(|i| meta(1000 * SECOND + (i * 7 % 50) as i64, 0, i as u64))
607 .collect();
608
609 let expected = entries.iter().max_by(|a, b| a.lww_cmp(b)).unwrap();
611
612 let mut winner = entries[0];
614 for e in &entries[1..] {
615 if lww_merge(&winner, e) == MergeResult::Remote {
616 winner = *e;
617 }
618 }
619
620 assert_eq!(winner, *expected);
621
622 let sorted: BTreeSet<u64> = entries
624 .iter()
625 .map(|e| e.timestamp.wall_time() as u64)
626 .collect();
627 assert!(sorted.len() <= entries.len()); }
629}