1use std::collections::{BTreeSet, HashMap};
17
18use serde::{Deserialize, Serialize};
19
20use crate::message::PeerId;
21
22#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
24pub struct GSet<T: Ord> {
25 pub items: BTreeSet<T>,
27}
28
29impl<T: Ord + Clone> GSet<T> {
30 pub fn new() -> Self {
32 Self {
33 items: BTreeSet::new(),
34 }
35 }
36
37 pub fn add(&mut self, item: T) {
39 self.items.insert(item);
40 }
41
42 pub fn merge(&mut self, other: &Self) {
44 for v in &other.items {
45 self.items.insert(v.clone());
46 }
47 }
48
49 pub fn len(&self) -> usize {
51 self.items.len()
52 }
53
54 pub fn is_empty(&self) -> bool {
56 self.items.is_empty()
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct LwwRegister<V> {
63 pub value: V,
65 pub timestamp: u64,
67 pub author: PeerId,
69}
70
71impl<V: Clone> LwwRegister<V> {
72 pub fn new(value: V, author: impl Into<PeerId>, timestamp: u64) -> Self {
75 Self {
76 value,
77 timestamp,
78 author: author.into(),
79 }
80 }
81
82 pub fn write(&mut self, value: V, author: impl Into<PeerId>, timestamp: u64) {
84 let author = author.into();
85 if (timestamp, author.as_str()) > (self.timestamp, self.author.as_str()) {
86 self.value = value;
87 self.timestamp = timestamp;
88 self.author = author;
89 }
90 }
91
92 pub fn merge(&mut self, other: &Self) {
95 if (other.timestamp, other.author.as_str()) > (self.timestamp, self.author.as_str()) {
96 self.value = other.value.clone();
97 self.timestamp = other.timestamp;
98 self.author = other.author.clone();
99 }
100 }
101}
102
103#[derive(Debug, Clone, Default, Serialize, Deserialize)]
105pub struct PnCounter {
106 pub p: HashMap<PeerId, u64>,
108 pub n: HashMap<PeerId, u64>,
110}
111
112impl PnCounter {
113 pub fn new() -> Self {
115 Self::default()
116 }
117
118 pub fn change(&mut self, peer: impl Into<PeerId>, amount: i64) {
121 let peer = peer.into();
122 if amount >= 0 {
123 *self.p.entry(peer).or_default() += amount as u64;
124 } else {
125 *self.n.entry(peer).or_default() += amount.unsigned_abs();
126 }
127 }
128
129 pub fn value(&self) -> i64 {
131 let pos: u64 = self.p.values().sum();
132 let neg: u64 = self.n.values().sum();
133 pos as i64 - neg as i64
134 }
135
136 pub fn merge(&mut self, other: &Self) {
139 for (k, v) in &other.p {
140 let slot = self.p.entry(k.clone()).or_default();
141 if *v > *slot {
142 *slot = *v;
143 }
144 }
145 for (k, v) in &other.n {
146 let slot = self.n.entry(k.clone()).or_default();
147 if *v > *slot {
148 *slot = *v;
149 }
150 }
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn gset_merge_is_commutative_and_idempotent() {
160 let mut a: GSet<i32> = GSet::new();
161 let mut b: GSet<i32> = GSet::new();
162 a.add(1);
163 a.add(2);
164 b.add(2);
165 b.add(3);
166
167 let mut ab = a.clone();
168 ab.merge(&b);
169 let mut ba = b.clone();
170 ba.merge(&a);
171 assert_eq!(ab.items, ba.items);
172 assert_eq!(ab.items, [1, 2, 3].into_iter().collect());
173
174 let before = ab.clone();
176 ab.merge(&before);
177 assert_eq!(ab.items, before.items);
178 }
179
180 #[test]
181 fn lww_register_resolves_by_timestamp_then_author() {
182 let mut reg = LwwRegister::new("v1".to_string(), "a", 10);
183 reg.write("v2".to_string(), "b", 5);
184 assert_eq!(reg.value, "v1", "older write must lose");
185 reg.write("v3".to_string(), "c", 10);
186 assert_eq!(reg.value, "v3", "tie broken by author > a");
187 reg.write("v4".to_string(), "z", 20);
188 assert_eq!(reg.value, "v4");
189
190 let mut r1: LwwRegister<String> = LwwRegister::new("x".to_string(), "a", 1u64);
192 let mut r2: LwwRegister<String> = LwwRegister::new("y".to_string(), "b", 2u64);
193 let r2_copy = r2.clone();
194 r1.merge(&r2);
195 r2.merge(&r1);
196 assert_eq!(r1.value, r2_copy.value);
197 assert_eq!(r1.value, "y");
198 }
199
200 #[test]
201 fn pn_counter_commutes_and_merges_via_max() {
202 let mut c = PnCounter::new();
203 c.change("a", 5);
204 c.change("b", 3);
205 c.change("a", -2);
206 assert_eq!(c.value(), 6);
207
208 let mut other = PnCounter::new();
209 other.change("a", 10);
210 let merged_value = {
211 let mut copy = c.clone();
212 copy.merge(&other);
213 copy.value()
214 };
215 assert_eq!(merged_value, 11);
218 }
219}