Skip to main content

peat_lite/
counter.rs

1// Copyright (c) 2025-2026 Defense Unicorns, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Grow-only Counter (G-Counter) CRDT.
5//!
6//! A distributed counter that can only be incremented. Each node maintains
7//! its own count, and the total is the sum of all node counts.
8
9use crate::node_id::NodeId;
10use heapless::FnvIndexMap;
11
12/// A Grow-only Counter (G-Counter).
13///
14/// Each node can only increment its own count. The counter value is
15/// the sum of all node counts. Merge takes the maximum of each node's count.
16///
17/// Memory usage: approximately `4 + (MAX_NODES * 8)` bytes.
18/// Default capacity of 32 nodes ≈ 260 bytes.
19///
20/// # Example
21///
22/// ```rust
23/// use peat_lite::{GCounter, NodeId};
24///
25/// let node1 = NodeId::new(1);
26/// let node2 = NodeId::new(2);
27///
28/// let mut counter1 = GCounter::<8>::new();
29/// counter1.increment(node1, 5);
30///
31/// let mut counter2 = GCounter::<8>::new();
32/// counter2.increment(node2, 3);
33///
34/// counter1.merge(&counter2);
35/// assert_eq!(counter1.value(), 8);
36/// ```
37#[derive(Debug, Clone)]
38pub struct GCounter<const MAX_NODES: usize = 32> {
39    counts: FnvIndexMap<NodeId, u32, MAX_NODES>,
40}
41
42impl<const MAX_NODES: usize> Default for GCounter<MAX_NODES> {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl<const MAX_NODES: usize> GCounter<MAX_NODES> {
49    /// Create a new empty counter.
50    pub const fn new() -> Self {
51        Self {
52            counts: FnvIndexMap::new(),
53        }
54    }
55
56    /// Get the total counter value (sum of all node counts).
57    pub fn value(&self) -> u64 {
58        self.counts.values().map(|&v| v as u64).sum()
59    }
60
61    /// Get a specific node's count.
62    pub fn node_count(&self, node: NodeId) -> u32 {
63        self.counts.get(&node).copied().unwrap_or(0)
64    }
65
66    /// Increment the counter for a specific node.
67    ///
68    /// Returns the new count for that node, or `None` if the counter
69    /// is full and can't add a new node.
70    pub fn increment(&mut self, node: NodeId, delta: u32) -> Option<u32> {
71        match self.counts.get_mut(&node) {
72            Some(count) => {
73                *count = count.saturating_add(delta);
74                Some(*count)
75            }
76            None => {
77                let new_count = delta;
78                self.counts.insert(node, new_count).ok()?;
79                Some(new_count)
80            }
81        }
82    }
83
84    /// Increment by 1.
85    pub fn inc(&mut self, node: NodeId) -> Option<u32> {
86        self.increment(node, 1)
87    }
88
89    /// Merge with another counter.
90    ///
91    /// Takes the maximum of each node's count.
92    pub fn merge(&mut self, other: &Self) {
93        for (&node, &other_count) in other.counts.iter() {
94            match self.counts.get_mut(&node) {
95                Some(count) => {
96                    *count = (*count).max(other_count);
97                }
98                None => {
99                    // Try to insert; ignore if full
100                    let _ = self.counts.insert(node, other_count);
101                }
102            }
103        }
104    }
105
106    /// Get the number of nodes that have contributed to this counter.
107    pub fn node_count_total(&self) -> usize {
108        self.counts.len()
109    }
110
111    /// Check if this counter is empty (all counts are 0 or no nodes).
112    pub fn is_empty(&self) -> bool {
113        self.counts.is_empty() || self.value() == 0
114    }
115
116    /// Clear all counts.
117    pub fn clear(&mut self) {
118        self.counts.clear();
119    }
120
121    /// Iterate over (node_id, count) pairs.
122    pub fn iter(&self) -> impl Iterator<Item = (NodeId, u32)> + '_ {
123        self.counts.iter().map(|(&node, &count)| (node, count))
124    }
125
126    /// Encode to bytes for transmission.
127    ///
128    /// Format: `[num_entries: u16][entry1][entry2]...`
129    /// Each entry: `[node_id: 4B][count: 4B]` = 8 bytes
130    pub fn encode(&self) -> heapless::Vec<u8, 258> {
131        // 2 + 32*8 = 258 max
132        let mut buf = heapless::Vec::new();
133
134        let count = self.counts.len() as u16;
135        let _ = buf.extend_from_slice(&count.to_le_bytes());
136
137        for (&node, &value) in self.counts.iter() {
138            let _ = buf.extend_from_slice(&node.to_le_bytes());
139            let _ = buf.extend_from_slice(&value.to_le_bytes());
140        }
141
142        buf
143    }
144
145    /// Decode from bytes.
146    pub fn decode(data: &[u8]) -> Option<Self> {
147        if data.len() < 2 {
148            return None;
149        }
150
151        let count = u16::from_le_bytes([data[0], data[1]]) as usize;
152
153        if data.len() < 2 + count * 8 {
154            return None;
155        }
156
157        let mut counter = Self::new();
158        let mut offset = 2;
159
160        for _ in 0..count {
161            let node = NodeId::from_le_bytes([
162                data[offset],
163                data[offset + 1],
164                data[offset + 2],
165                data[offset + 3],
166            ]);
167            let value = u32::from_le_bytes([
168                data[offset + 4],
169                data[offset + 5],
170                data[offset + 6],
171                data[offset + 7],
172            ]);
173            offset += 8;
174
175            let _ = counter.counts.insert(node, value);
176        }
177
178        Some(counter)
179    }
180}
181
182impl<const MAX_NODES: usize> PartialEq for GCounter<MAX_NODES> {
183    fn eq(&self, other: &Self) -> bool {
184        if self.counts.len() != other.counts.len() {
185            return false;
186        }
187        for (&node, &count) in self.counts.iter() {
188            if other.node_count(node) != count {
189                return false;
190            }
191        }
192        true
193    }
194}
195
196impl<const MAX_NODES: usize> Eq for GCounter<MAX_NODES> {}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    #[test]
203    fn test_gcounter_basic() {
204        let mut counter = GCounter::<8>::new();
205        let node = NodeId::new(1);
206
207        assert_eq!(counter.value(), 0);
208
209        counter.inc(node);
210        assert_eq!(counter.value(), 1);
211        assert_eq!(counter.node_count(node), 1);
212
213        counter.increment(node, 5);
214        assert_eq!(counter.value(), 6);
215        assert_eq!(counter.node_count(node), 6);
216    }
217
218    #[test]
219    fn test_gcounter_multiple_nodes() {
220        let mut counter = GCounter::<8>::new();
221        let node1 = NodeId::new(1);
222        let node2 = NodeId::new(2);
223
224        counter.increment(node1, 10);
225        counter.increment(node2, 20);
226
227        assert_eq!(counter.value(), 30);
228        assert_eq!(counter.node_count(node1), 10);
229        assert_eq!(counter.node_count(node2), 20);
230    }
231
232    #[test]
233    fn test_gcounter_merge() {
234        let node1 = NodeId::new(1);
235        let node2 = NodeId::new(2);
236
237        let mut counter1 = GCounter::<8>::new();
238        counter1.increment(node1, 10);
239        counter1.increment(node2, 5);
240
241        let mut counter2 = GCounter::<8>::new();
242        counter2.increment(node1, 8); // Lower than counter1
243        counter2.increment(node2, 15); // Higher than counter1
244
245        counter1.merge(&counter2);
246
247        // Should take max of each
248        assert_eq!(counter1.node_count(node1), 10); // max(10, 8)
249        assert_eq!(counter1.node_count(node2), 15); // max(5, 15)
250        assert_eq!(counter1.value(), 25);
251    }
252
253    #[test]
254    fn test_gcounter_encode_decode() {
255        let mut counter = GCounter::<8>::new();
256        counter.increment(NodeId::new(1), 100);
257        counter.increment(NodeId::new(2), 200);
258
259        let encoded = counter.encode();
260        let decoded = GCounter::<8>::decode(&encoded).unwrap();
261
262        assert_eq!(counter, decoded);
263    }
264
265    #[test]
266    fn test_gcounter_saturating() {
267        let mut counter = GCounter::<8>::new();
268        let node = NodeId::new(1);
269
270        counter.increment(node, u32::MAX - 10);
271        counter.increment(node, 100); // Would overflow
272
273        // Should saturate at MAX
274        assert_eq!(counter.node_count(node), u32::MAX);
275    }
276}