logicaffeine_data/crdt/
pncounter.rs1use super::causal::VClock;
7use super::delta::DeltaCrdt;
8use super::replica::{generate_replica_id, ReplicaId};
9use super::Merge;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PNCounterDelta {
16 pub increments: HashMap<ReplicaId, u64>,
17 pub decrements: HashMap<ReplicaId, u64>,
18 pub version: VClock,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26pub struct PNCounter {
27 increments: HashMap<ReplicaId, u64>,
29 decrements: HashMap<ReplicaId, u64>,
31 replica_id: ReplicaId,
33 #[serde(default)]
35 version: VClock,
36}
37
38impl PNCounter {
39 pub fn new() -> Self {
41 Self {
42 increments: HashMap::new(),
43 decrements: HashMap::new(),
44 replica_id: generate_replica_id(),
45 version: VClock::new(),
46 }
47 }
48
49 pub fn with_replica_id(id: ReplicaId) -> Self {
51 Self {
52 increments: HashMap::new(),
53 decrements: HashMap::new(),
54 replica_id: id,
55 version: VClock::new(),
56 }
57 }
58
59 pub fn increment(&mut self, amount: u64) {
61 *self.increments.entry(self.replica_id).or_insert(0) += amount;
62 self.version.increment(self.replica_id);
63 }
64
65 pub fn decrement(&mut self, amount: u64) {
67 *self.decrements.entry(self.replica_id).or_insert(0) += amount;
68 self.version.increment(self.replica_id);
69 }
70
71 pub fn value(&self) -> i64 {
73 let inc: u64 = self.increments.values().sum();
74 let dec: u64 = self.decrements.values().sum();
75 inc as i64 - dec as i64
76 }
77
78 pub fn replica_id(&self) -> ReplicaId {
80 self.replica_id
81 }
82}
83
84impl Merge for PNCounter {
85 fn merge(&mut self, other: &Self) {
89 for (&replica, &count) in &other.increments {
90 let entry = self.increments.entry(replica).or_insert(0);
91 *entry = (*entry).max(count);
92 }
93 for (&replica, &count) in &other.decrements {
94 let entry = self.decrements.entry(replica).or_insert(0);
95 *entry = (*entry).max(count);
96 }
97 self.version.merge_vclock(&other.version);
98 }
99}
100
101impl DeltaCrdt for PNCounter {
102 type Delta = PNCounterDelta;
103
104 fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
105 if since.dominates(&self.version) {
107 return None;
108 }
109
110 Some(PNCounterDelta {
113 increments: self.increments.clone(),
114 decrements: self.decrements.clone(),
115 version: self.version.clone(),
116 })
117 }
118
119 fn apply_delta(&mut self, delta: &Self::Delta) {
120 for (&replica, &count) in &delta.increments {
122 let entry = self.increments.entry(replica).or_insert(0);
123 *entry = (*entry).max(count);
124 }
125 for (&replica, &count) in &delta.decrements {
126 let entry = self.decrements.entry(replica).or_insert(0);
127 *entry = (*entry).max(count);
128 }
129 self.version.merge_vclock(&delta.version);
130 }
131
132 fn version(&self) -> VClock {
133 self.version.clone()
134 }
135}
136
137impl PartialEq<i64> for PNCounter {
141 fn eq(&self, other: &i64) -> bool {
142 self.value() == *other
143 }
144}
145
146impl PartialEq<i32> for PNCounter {
147 fn eq(&self, other: &i32) -> bool {
148 self.value() == (*other as i64)
149 }
150}
151
152impl Default for PNCounter {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_pncounter_new() {
164 let c = PNCounter::new();
165 assert_eq!(c.value(), 0);
166 }
167
168 #[test]
169 fn test_pncounter_increment_decrement() {
170 let mut c = PNCounter::with_replica_id(1);
171 c.increment(10);
172 c.decrement(3);
173 assert_eq!(c.value(), 7);
174 }
175
176 #[test]
177 fn test_pncounter_negative() {
178 let mut c = PNCounter::with_replica_id(1);
179 c.decrement(5);
180 assert_eq!(c.value(), -5);
181 }
182
183 #[test]
184 fn test_pncounter_merge() {
185 let mut a = PNCounter::with_replica_id(1);
186 let mut b = PNCounter::with_replica_id(2);
187
188 a.increment(10);
189 b.decrement(3);
190
191 a.merge(&b);
192 assert_eq!(a.value(), 7);
193 }
194
195 #[test]
196 fn test_pncounter_merge_commutative() {
197 let mut a = PNCounter::with_replica_id(1);
198 let mut b = PNCounter::with_replica_id(2);
199
200 a.increment(10);
201 b.decrement(5);
202
203 let mut a1 = a.clone();
204 let mut b1 = b.clone();
205 a1.merge(&b);
206 b1.merge(&a);
207
208 assert_eq!(a1.value(), b1.value());
209 }
210}