Skip to main content

logicaffeine_data/crdt/
pncounter.rs

1//! PN-Counter (Positive-Negative Counter) CRDT.
2//!
3//! A counter that supports both increment and decrement.
4//! Implemented as two G-Counters: one for increments, one for decrements.
5
6use super::causal::VClock;
7use super::delta::DeltaCrdt;
8use super::replica::{generate_replica_id, ReplicaId};
9use super::Merge;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13/// Delta for PNCounter synchronization.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PNCounterDelta {
16    pub increments: HashMap<ReplicaId, u64>,
17    pub decrements: HashMap<ReplicaId, u64>,
18    pub version: VClock,
19}
20
21/// A counter that can be incremented and decremented.
22///
23/// The value is the difference between total increments and total decrements.
24/// Each replica maintains its own increment and decrement counts.
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26pub struct PNCounter {
27    /// Map from replica ID to increment count
28    increments: HashMap<ReplicaId, u64>,
29    /// Map from replica ID to decrement count
30    decrements: HashMap<ReplicaId, u64>,
31    /// This replica's ID
32    replica_id: ReplicaId,
33    /// Version clock tracking operations
34    #[serde(default)]
35    version: VClock,
36}
37
38impl PNCounter {
39    /// Create a new counter with a random replica ID.
40    pub fn new() -> Self {
41        Self {
42            increments: HashMap::new(),
43            decrements: HashMap::new(),
44            replica_id: generate_replica_id(),
45            version: VClock::new(),
46        }
47    }
48
49    /// Create a counter with a specific replica ID.
50    pub fn with_replica_id(id: ReplicaId) -> Self {
51        Self {
52            increments: HashMap::new(),
53            decrements: HashMap::new(),
54            replica_id: id,
55            version: VClock::new(),
56        }
57    }
58
59    /// Increment the counter by the given amount.
60    pub fn increment(&mut self, amount: u64) {
61        *self.increments.entry(self.replica_id).or_insert(0) += amount;
62        self.version.increment(self.replica_id);
63    }
64
65    /// Decrement the counter by the given amount.
66    pub fn decrement(&mut self, amount: u64) {
67        *self.decrements.entry(self.replica_id).or_insert(0) += amount;
68        self.version.increment(self.replica_id);
69    }
70
71    /// Get the current value (increments - decrements).
72    pub fn value(&self) -> i64 {
73        let inc: u64 = self.increments.values().sum();
74        let dec: u64 = self.decrements.values().sum();
75        inc as i64 - dec as i64
76    }
77
78    /// Get the replica ID for this counter.
79    pub fn replica_id(&self) -> ReplicaId {
80        self.replica_id
81    }
82}
83
84impl Merge for PNCounter {
85    /// Merge another counter into this one.
86    ///
87    /// For each replica ID, takes the maximum increment and decrement counts.
88    fn merge(&mut self, other: &Self) {
89        for (&replica, &count) in &other.increments {
90            let entry = self.increments.entry(replica).or_insert(0);
91            *entry = (*entry).max(count);
92        }
93        for (&replica, &count) in &other.decrements {
94            let entry = self.decrements.entry(replica).or_insert(0);
95            *entry = (*entry).max(count);
96        }
97        self.version.merge_vclock(&other.version);
98    }
99}
100
101impl DeltaCrdt for PNCounter {
102    type Delta = PNCounterDelta;
103
104    fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
105        // If we're at or behind the given version, no delta needed
106        if since.dominates(&self.version) {
107            return None;
108        }
109
110        // For simplicity, return full state as delta
111        // A more efficient implementation would track only changes
112        Some(PNCounterDelta {
113            increments: self.increments.clone(),
114            decrements: self.decrements.clone(),
115            version: self.version.clone(),
116        })
117    }
118
119    fn apply_delta(&mut self, delta: &Self::Delta) {
120        // Merge the delta's state
121        for (&replica, &count) in &delta.increments {
122            let entry = self.increments.entry(replica).or_insert(0);
123            *entry = (*entry).max(count);
124        }
125        for (&replica, &count) in &delta.decrements {
126            let entry = self.decrements.entry(replica).or_insert(0);
127            *entry = (*entry).max(count);
128        }
129        self.version.merge_vclock(&delta.version);
130    }
131
132    fn version(&self) -> VClock {
133        self.version.clone()
134    }
135}
136
137// NOTE: Showable impl is in logicaffeine_system (io module)
138
139/// Allow comparing PNCounter to integers for ergonomic conditionals
140impl PartialEq<i64> for PNCounter {
141    fn eq(&self, other: &i64) -> bool {
142        self.value() == *other
143    }
144}
145
146impl PartialEq<i32> for PNCounter {
147    fn eq(&self, other: &i32) -> bool {
148        self.value() == (*other as i64)
149    }
150}
151
152impl Default for PNCounter {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn test_pncounter_new() {
164        let c = PNCounter::new();
165        assert_eq!(c.value(), 0);
166    }
167
168    #[test]
169    fn test_pncounter_increment_decrement() {
170        let mut c = PNCounter::with_replica_id(1);
171        c.increment(10);
172        c.decrement(3);
173        assert_eq!(c.value(), 7);
174    }
175
176    #[test]
177    fn test_pncounter_negative() {
178        let mut c = PNCounter::with_replica_id(1);
179        c.decrement(5);
180        assert_eq!(c.value(), -5);
181    }
182
183    #[test]
184    fn test_pncounter_merge() {
185        let mut a = PNCounter::with_replica_id(1);
186        let mut b = PNCounter::with_replica_id(2);
187
188        a.increment(10);
189        b.decrement(3);
190
191        a.merge(&b);
192        assert_eq!(a.value(), 7);
193    }
194
195    #[test]
196    fn test_pncounter_merge_commutative() {
197        let mut a = PNCounter::with_replica_id(1);
198        let mut b = PNCounter::with_replica_id(2);
199
200        a.increment(10);
201        b.decrement(5);
202
203        let mut a1 = a.clone();
204        let mut b1 = b.clone();
205        a1.merge(&b);
206        b1.merge(&a);
207
208        assert_eq!(a1.value(), b1.value());
209    }
210}