1use serde::{Deserialize, Serialize};
21use std::fmt;
22
23use crate::clock::itc::Stamp;
24use crate::crdt::trace::{MergeTrace, TieBreakStep, merge_tracing_enabled};
25use tracing::debug;
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct LwwRegister<T> {
38 pub value: T,
40 pub stamp: Stamp,
42 pub wall_ts: u64,
44 pub agent_id: String,
46 pub event_hash: String,
48}
49
50impl<T> LwwRegister<T> {
51 pub const fn new(
53 value: T,
54 stamp: Stamp,
55 wall_ts: u64,
56 agent_id: String,
57 event_hash: String,
58 ) -> Self {
59 Self {
60 value,
61 stamp,
62 wall_ts,
63 agent_id,
64 event_hash,
65 }
66 }
67}
68
69impl<T: Clone> LwwRegister<T> {
70 pub fn merge(&mut self, other: &Self) {
80 if self.wins_over(other) {
81 } else {
83 self.value = other.value.clone();
84 self.stamp = other.stamp.clone();
85 self.wall_ts = other.wall_ts;
86 self.agent_id.clone_from(&other.agent_id);
87 self.event_hash.clone_from(&other.event_hash);
88 }
89 }
90
91 pub fn merge_with_trace(&mut self, other: &Self, field: &str) -> MergeTrace
96 where
97 T: fmt::Display,
98 {
99 let (self_wins, step) = self.compare(other);
100
101 let trace = if merge_tracing_enabled() {
102 let winner = if self_wins {
103 self.value.to_string()
104 } else {
105 other.value.to_string()
106 };
107
108 let trace = MergeTrace {
109 field: field.to_string(),
110 values: (self.value.to_string(), other.value.to_string()),
111 winner,
112 step,
113 correlation_id: format!("{}..{}", self.event_hash, other.event_hash),
114 enabled: true,
115 };
116
117 debug!(
118 target: "bones_core::crdt::merge_trace",
119 field = trace.field,
120 winner = trace.winner,
121 step = ?trace.step,
122 correlation_id = trace.correlation_id,
123 "LWW merge decision"
124 );
125
126 trace
127 } else {
128 MergeTrace::disabled()
129 };
130
131 if !self_wins {
132 self.value = other.value.clone();
133 self.stamp = other.stamp.clone();
134 self.wall_ts = other.wall_ts;
135 self.agent_id.clone_from(&other.agent_id);
136 self.event_hash.clone_from(&other.event_hash);
137 }
138
139 trace
140 }
141
142 fn wins_over(&self, other: &Self) -> bool {
144 self.compare(other).0
145 }
146
147 fn compare(&self, other: &Self) -> (bool, TieBreakStep) {
148 let self_leq_other = self.stamp.leq(&other.stamp);
150 let other_leq_self = other.stamp.leq(&self.stamp);
151
152 match (self_leq_other, other_leq_self) {
153 (true, false) => {
154 return (false, TieBreakStep::ItcCausal);
156 }
157 (false, true) => {
158 return (true, TieBreakStep::ItcCausal);
160 }
161 (true, true) | (false, false) => {
162 }
165 }
166
167 match self.wall_ts.cmp(&other.wall_ts) {
169 std::cmp::Ordering::Greater => return (true, TieBreakStep::WallTimestamp),
170 std::cmp::Ordering::Less => return (false, TieBreakStep::WallTimestamp),
171 std::cmp::Ordering::Equal => {}
172 }
173
174 match self.agent_id.cmp(&other.agent_id) {
176 std::cmp::Ordering::Greater => return (true, TieBreakStep::AgentId),
177 std::cmp::Ordering::Less => return (false, TieBreakStep::AgentId),
178 std::cmp::Ordering::Equal => {}
179 }
180
181 (self.event_hash >= other.event_hash, TieBreakStep::EventHash)
183 }
184}
185
186impl<T: fmt::Display> fmt::Display for LwwRegister<T> {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 write!(f, "{}", self.value)
189 }
190}
191
192#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::clock::itc::Stamp;
200
201 fn make_stamp(counter: u64) -> Stamp {
203 let mut s = Stamp::seed();
204 for _ in 0..counter {
205 s.event();
206 }
207 s
208 }
209
210 fn make_forked_stamps(counter_a: u64, counter_b: u64) -> (Stamp, Stamp) {
212 let seed = Stamp::seed();
213 let (mut a, mut b) = seed.fork();
214 for _ in 0..counter_a {
215 a.event();
216 }
217 for _ in 0..counter_b {
218 b.event();
219 }
220 (a, b)
221 }
222
223 fn reg(
224 value: &str,
225 stamp: Stamp,
226 wall_ts: u64,
227 agent: &str,
228 hash: &str,
229 ) -> LwwRegister<String> {
230 LwwRegister::new(
231 value.to_string(),
232 stamp,
233 wall_ts,
234 agent.to_string(),
235 hash.to_string(),
236 )
237 }
238
239 #[test]
242 fn causal_later_wins() {
243 let s1 = make_stamp(1);
244 let s2 = make_stamp(2);
245 assert!(s1.leq(&s2));
247 assert!(!s2.leq(&s1));
248
249 let mut a = reg("old", s1, 100, "alice", "aaa");
250 let b = reg("new", s2, 100, "alice", "aaa");
251 a.merge(&b);
252 assert_eq!(a.value, "new");
253 }
254
255 #[test]
256 fn causal_earlier_loses() {
257 let s1 = make_stamp(1);
258 let s2 = make_stamp(2);
259
260 let mut a = reg("new", s2, 100, "alice", "aaa");
261 let b = reg("old", s1, 100, "alice", "aaa");
262 a.merge(&b);
263 assert_eq!(a.value, "new"); }
265
266 #[test]
269 fn concurrent_higher_wall_ts_wins() {
270 let (sa, sb) = make_forked_stamps(1, 1);
271 assert!(sa.concurrent(&sb));
273
274 let mut a = reg("alice-val", sa, 200, "alice", "aaa");
275 let b = reg("bob-val", sb, 300, "bob", "bbb");
276 a.merge(&b);
277 assert_eq!(a.value, "bob-val"); }
279
280 #[test]
281 fn concurrent_lower_wall_ts_loses() {
282 let (sa, sb) = make_forked_stamps(1, 1);
283
284 let mut a = reg("alice-val", sa, 300, "alice", "aaa");
285 let b = reg("bob-val", sb, 200, "bob", "bbb");
286 a.merge(&b);
287 assert_eq!(a.value, "alice-val"); }
289
290 #[test]
293 fn concurrent_same_ts_higher_agent_wins() {
294 let (sa, sb) = make_forked_stamps(1, 1);
295
296 let mut a = reg("alice-val", sa, 100, "alice", "aaa");
297 let b = reg("bob-val", sb, 100, "bob", "bbb");
298 a.merge(&b);
299 assert_eq!(a.value, "bob-val"); }
301
302 #[test]
303 fn concurrent_same_ts_lower_agent_loses() {
304 let (sa, sb) = make_forked_stamps(1, 1);
305
306 let mut a = reg("bob-val", sa, 100, "bob", "bbb");
307 let b = reg("alice-val", sb, 100, "alice", "aaa");
308 a.merge(&b);
309 assert_eq!(a.value, "bob-val"); }
311
312 #[test]
315 fn concurrent_same_agent_higher_hash_wins() {
316 let (sa, sb) = make_forked_stamps(1, 1);
317
318 let mut a = reg("val-a", sa, 100, "alice", "hash-aaa");
319 let b = reg("val-b", sb, 100, "alice", "hash-zzz");
320 a.merge(&b);
321 assert_eq!(a.value, "val-b"); }
323
324 #[test]
325 fn concurrent_same_agent_lower_hash_loses() {
326 let (sa, sb) = make_forked_stamps(1, 1);
327
328 let mut a = reg("val-a", sa, 100, "alice", "hash-zzz");
329 let b = reg("val-b", sb, 100, "alice", "hash-aaa");
330 a.merge(&b);
331 assert_eq!(a.value, "val-a"); }
333
334 #[test]
337 fn semilattice_commutative() {
338 let (sa, sb) = make_forked_stamps(1, 1);
339
340 let a = reg("val-a", sa.clone(), 100, "alice", "hash-a");
341 let b = reg("val-b", sb.clone(), 200, "bob", "hash-b");
342
343 let mut ab = a.clone();
344 ab.merge(&b);
345
346 let mut ba = b.clone();
347 ba.merge(&a);
348
349 assert_eq!(ab, ba);
350 }
351
352 #[test]
353 fn semilattice_associative() {
354 let seed = Stamp::seed();
355 let (left, right) = seed.fork();
356 let (mut sa, sb) = left.fork();
357 let (mut sc, _) = right.fork();
358 sa.event();
359 sc.event();
361
362 let a = reg("val-a", sa, 100, "alice", "hash-a");
363 let b = reg("val-b", sb, 200, "bob", "hash-b");
364 let c = reg("val-c", sc, 150, "carol", "hash-c");
365
366 let mut left_merge = a.clone();
368 left_merge.merge(&b);
369 left_merge.merge(&c);
370
371 let mut bc = b.clone();
373 bc.merge(&c);
374 let mut right_merge = a.clone();
375 right_merge.merge(&bc);
376
377 assert_eq!(left_merge, right_merge);
378 }
379
380 #[test]
381 fn semilattice_idempotent_self_merge() {
382 let s = make_stamp(3);
383 let a = reg("value", s, 500, "agent", "hash-123");
384 let mut m = a.clone();
385 m.merge(&a);
386 assert_eq!(m, a);
387 }
388
389 #[test]
392 fn equal_stamps_are_idempotent() {
393 let s = make_stamp(2);
395 let a = reg("same", s.clone(), 100, "agent", "hash");
396 let mut m = a.clone();
397 m.merge(&a);
398 assert_eq!(m, a);
399 }
400
401 #[test]
402 fn identical_timestamps_different_agents() {
403 let (sa, sb) = make_forked_stamps(1, 1);
404
405 let a = reg("alice-val", sa.clone(), 999, "alice", "hash-same");
406 let b = reg("bob-val", sb.clone(), 999, "bob", "hash-same");
407
408 let mut ab = a.clone();
409 ab.merge(&b);
410 assert_eq!(ab.value, "bob-val"); let mut ba = b.clone();
413 ba.merge(&a);
414 assert_eq!(ba.value, "bob-val");
415
416 assert_eq!(ab, ba); }
418
419 #[test]
420 fn same_agent_concurrent_writes() {
421 let (sa, sb) = make_forked_stamps(1, 1);
423
424 let a = reg("write-1", sa, 100, "alice", "hash-111");
425 let b = reg("write-2", sb, 100, "alice", "hash-222");
426
427 let mut ab = a.clone();
428 ab.merge(&b);
429
430 let mut ba = b.clone();
431 ba.merge(&a);
432
433 assert_eq!(ab, ba); assert_eq!(ab.value, "write-2"); }
436
437 #[test]
438 fn display_shows_value() {
439 let s = make_stamp(1);
440 let r = reg("Hello, World!", s, 0, "agent", "hash");
441 assert_eq!(r.to_string(), "Hello, World!");
442 }
443
444 #[test]
445 fn serde_roundtrip() {
446 let s = make_stamp(2);
447 let r = reg("test-value", s, 42, "agent-1", "blake3:abc");
448 let json = serde_json::to_string(&r).unwrap();
449 let deserialized: LwwRegister<String> = serde_json::from_str(&json).unwrap();
450 assert_eq!(r, deserialized);
451 }
452
453 #[test]
454 fn numeric_value_type() {
455 let s = make_stamp(1);
456 let mut a = LwwRegister::new(42u64, s.clone(), 100, "alice".to_string(), "h1".to_string());
457 let s2 = make_stamp(2);
458 let b = LwwRegister::new(99u64, s2, 200, "bob".to_string(), "h2".to_string());
459 a.merge(&b);
460 assert_eq!(a.value, 99);
461 }
462
463 #[test]
464 fn merge_with_trace_disabled_by_default_has_no_payload() {
465 let s1 = make_stamp(1);
466 let s2 = make_stamp(2);
467
468 let mut a = reg("old", s1, 100, "alice", "aaa");
469 let b = reg("new", s2, 100, "alice", "bbb");
470
471 let trace = a.merge_with_trace(&b, "title");
472 assert_eq!(a.value, "new");
473 assert!(!trace.enabled);
474 assert_eq!(trace.step, TieBreakStep::Equal);
475 assert!(trace.field.is_empty());
476 }
477
478 #[test]
479 fn merge_with_trace_reports_decisive_step_when_enabled() {
480 if !merge_tracing_enabled() {
481 return;
482 }
483
484 let (sa, sb) = make_forked_stamps(1, 1);
485 let mut a = reg("alice-val", sa, 100, "alice", "aaa");
486 let b = reg("bob-val", sb, 200, "bob", "bbb");
487
488 let trace = a.merge_with_trace(&b, "title");
489 assert!(trace.enabled);
490 assert_eq!(trace.field, "title");
491 assert_eq!(trace.winner, "bob-val");
492 assert_eq!(trace.step, TieBreakStep::WallTimestamp);
493 assert!(!trace.correlation_id.is_empty());
494 }
495
496 #[test]
497 fn merge_chain_converges() {
498 let seed = Stamp::seed();
500 let (left, right) = seed.fork();
501 let (mut s1, mut s2) = left.fork();
502 let (mut s3, _) = right.fork();
503 s1.event();
504 s2.event();
505 s3.event();
506
507 let r1 = reg("v1", s1, 100, "alice", "h1");
508 let r2 = reg("v2", s2, 200, "bob", "h2");
509 let r3 = reg("v3", s3, 200, "carol", "h3");
510
511 let mut m1 = r1.clone();
513 m1.merge(&r2);
514 m1.merge(&r3);
515
516 let mut m2 = r3.clone();
518 m2.merge(&r1);
519 m2.merge(&r2);
520
521 let mut m3 = r2.clone();
523 m3.merge(&r3);
524 m3.merge(&r1);
525
526 assert_eq!(m1, m2);
527 assert_eq!(m2, m3);
528 }
529}