Skip to main content

automorph/crdt/
counter.rs

1//! Counter type for concurrent increment/decrement operations.
2//!
3//! Unlike regular integers which use last-writer-wins semantics, Counter values
4//! merge all concurrent increments/decrements. This makes them ideal for:
5//! - View counts
6//! - Like/vote counts
7//! - Inventory quantities
8//! - Any numeric value that multiple users might update concurrently
9//!
10//! # Example
11//!
12//! ```rust,ignore
13//! use automorph::{Automorph, crdt::Counter};
14//!
15//! #[derive(Automorph)]
16//! struct Post {
17//!     title: String,
18//!     likes: Counter,  // Concurrent likes merge correctly
19//! }
20//!
21//! // User A likes the post
22//! post.likes.increment(1);
23//! post.save(&mut doc_a, &ROOT, "post")?;
24//!
25//! // User B also likes the post (concurrently)
26//! post.likes.increment(1);
27//! post.save(&mut doc_b, &ROOT, "post")?;
28//!
29//! // After merge: likes = 2 (both increments preserved)
30//! ```
31//!
32//! # CRDT Semantics
33//!
34//! Counter uses Automerge's native counter type, which is a **G-Counter** (grow-only counter)
35//! extended to support decrements. Key properties:
36//!
37//! - **Commutative**: Order of operations doesn't matter
38//! - **Associative**: Grouping of operations doesn't matter
39//! - **Idempotent**: Duplicate operations are handled correctly
40//!
41//! This means concurrent increments ALWAYS result in all increments being applied,
42//! unlike regular integers where one update would overwrite another.
43
44use std::fmt;
45use std::ops::{Add, AddAssign, Sub, SubAssign};
46
47use automerge::{ChangeHash, ObjId, Prop, ReadDoc, ScalarValue, Value, transaction::Transactable};
48
49use crate::{Automorph, Error, PrimitiveChanged, Result, ScalarCursor};
50
51/// A CRDT counter that supports concurrent increment/decrement.
52///
53/// This wraps Automerge's native counter type, ensuring that concurrent
54/// modifications merge correctly instead of using last-writer-wins.
55///
56/// # Important
57///
58/// To get the CRDT benefits, you must use [`increment()`](Self::increment) and
59/// [`decrement()`](Self::decrement) methods. Setting the value directly with
60/// [`set()`](Self::set) uses last-writer-wins semantics.
61#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
62pub struct Counter {
63    value: i64,
64    /// Pending increment to apply on next save.
65    /// This is how we track increments between load and save.
66    pending_delta: i64,
67}
68
69impl Counter {
70    /// Creates a new counter with the given initial value.
71    #[must_use]
72    pub fn new(value: i64) -> Self {
73        Self {
74            value,
75            pending_delta: 0,
76        }
77    }
78
79    /// Returns the current value of the counter.
80    #[must_use]
81    pub fn value(&self) -> i64 {
82        self.value + self.pending_delta
83    }
84
85    /// Increments the counter by the given amount.
86    ///
87    /// This increment will be merged with concurrent increments from other peers,
88    /// rather than overwriting them.
89    pub fn increment(&mut self, amount: i64) {
90        self.pending_delta += amount;
91    }
92
93    /// Decrements the counter by the given amount.
94    ///
95    /// This is equivalent to `increment(-amount)`.
96    pub fn decrement(&mut self, amount: i64) {
97        self.pending_delta -= amount;
98    }
99
100    /// Sets the counter to a specific value.
101    ///
102    /// **Warning**: This uses last-writer-wins semantics and will overwrite
103    /// concurrent changes. Prefer [`increment()`](Self::increment) and
104    /// [`decrement()`](Self::decrement) for collaborative scenarios.
105    pub fn set(&mut self, value: i64) {
106        // Setting resets both value and delta
107        self.value = value;
108        self.pending_delta = 0;
109    }
110
111    /// Returns true if there are pending changes to save.
112    #[must_use]
113    pub fn has_pending_changes(&self) -> bool {
114        self.pending_delta != 0
115    }
116}
117
118impl fmt::Debug for Counter {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        write!(f, "Counter({})", self.value())
121    }
122}
123
124impl fmt::Display for Counter {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        write!(f, "{}", self.value())
127    }
128}
129
130impl From<i64> for Counter {
131    fn from(value: i64) -> Self {
132        Self::new(value)
133    }
134}
135
136impl From<Counter> for i64 {
137    fn from(counter: Counter) -> Self {
138        counter.value()
139    }
140}
141
142// Arithmetic operations
143
144impl Add<i64> for Counter {
145    type Output = Self;
146
147    fn add(mut self, rhs: i64) -> Self::Output {
148        self.increment(rhs);
149        self
150    }
151}
152
153impl AddAssign<i64> for Counter {
154    fn add_assign(&mut self, rhs: i64) {
155        self.increment(rhs);
156    }
157}
158
159impl Sub<i64> for Counter {
160    type Output = Self;
161
162    fn sub(mut self, rhs: i64) -> Self::Output {
163        self.decrement(rhs);
164        self
165    }
166}
167
168impl SubAssign<i64> for Counter {
169    fn sub_assign(&mut self, rhs: i64) {
170        self.decrement(rhs);
171    }
172}
173
174impl Automorph for Counter {
175    type Changes = PrimitiveChanged;
176    type Cursor = ScalarCursor;
177
178    fn save<D: Transactable + ReadDoc>(
179        &self,
180        doc: &mut D,
181        obj: impl AsRef<ObjId>,
182        prop: impl Into<Prop>,
183    ) -> Result<()> {
184        let prop: Prop = prop.into();
185        let obj = obj.as_ref();
186
187        // Check if counter already exists
188        match doc.get(obj, prop.clone())? {
189            Some((Value::Scalar(s), _)) if s.is_counter() => {
190                // Counter exists - apply increment if any
191                if self.pending_delta != 0 {
192                    doc.increment(obj, prop, self.pending_delta)?;
193                }
194            }
195            _ => {
196                // No counter exists or wrong type - create new counter
197                doc.put(obj, prop, ScalarValue::counter(self.value()))?;
198            }
199        }
200
201        Ok(())
202    }
203
204    fn load<D: ReadDoc>(doc: &D, obj: impl AsRef<ObjId>, prop: impl Into<Prop>) -> Result<Self> {
205        let prop: Prop = prop.into();
206        let obj = obj.as_ref();
207
208        match doc.get(obj, prop)? {
209            Some((Value::Scalar(s), _)) => {
210                // Counters and integers both provide i64 values
211                // is_counter() tells us if it's a proper counter type
212                if let Some(int_val) = s.to_i64() {
213                    Ok(Self::new(int_val))
214                } else {
215                    Err(Error::type_mismatch("Counter", Some(format!("{:?}", s))))
216                }
217            }
218            Some((v, _)) => Err(Error::type_mismatch("Counter", Some(format!("{:?}", v)))),
219            None => Err(Error::missing_value()),
220        }
221    }
222
223    fn load_at<D: ReadDoc>(
224        doc: &D,
225        obj: impl AsRef<ObjId>,
226        prop: impl Into<Prop>,
227        heads: &[ChangeHash],
228    ) -> Result<Self> {
229        let prop: Prop = prop.into();
230        let obj = obj.as_ref();
231
232        match doc.get_at(obj, prop, heads)? {
233            Some((Value::Scalar(s), _)) => {
234                // Counters and integers both provide i64 values
235                // is_counter() tells us if it's a proper counter type
236                if let Some(int_val) = s.to_i64() {
237                    Ok(Self::new(int_val))
238                } else {
239                    Err(Error::type_mismatch("Counter", Some(format!("{:?}", s))))
240                }
241            }
242            Some((v, _)) => Err(Error::type_mismatch("Counter", Some(format!("{:?}", v)))),
243            None => Err(Error::missing_value()),
244        }
245    }
246
247    fn diff<D: ReadDoc>(
248        &self,
249        doc: &D,
250        obj: impl AsRef<ObjId>,
251        prop: impl Into<Prop>,
252    ) -> Result<Self::Changes> {
253        let loaded = Self::load(doc, obj, prop)?;
254        Ok(PrimitiveChanged::new(self.value() != loaded.value()))
255    }
256
257    fn diff_at<D: ReadDoc>(
258        &self,
259        doc: &D,
260        obj: impl AsRef<ObjId>,
261        prop: impl Into<Prop>,
262        heads: &[ChangeHash],
263    ) -> Result<Self::Changes> {
264        let loaded = Self::load_at(doc, obj, prop, heads)?;
265        Ok(PrimitiveChanged::new(self.value() != loaded.value()))
266    }
267
268    fn update<D: ReadDoc>(
269        &mut self,
270        doc: &D,
271        obj: impl AsRef<ObjId>,
272        prop: impl Into<Prop>,
273    ) -> Result<Self::Changes> {
274        let loaded = Self::load(doc, obj, prop)?;
275        let changed = self.value() != loaded.value();
276        self.value = loaded.value;
277        self.pending_delta = 0;
278        Ok(PrimitiveChanged::new(changed))
279    }
280
281    fn update_at<D: ReadDoc>(
282        &mut self,
283        doc: &D,
284        obj: impl AsRef<ObjId>,
285        prop: impl Into<Prop>,
286        heads: &[ChangeHash],
287    ) -> Result<Self::Changes> {
288        let loaded = Self::load_at(doc, obj, prop, heads)?;
289        let changed = self.value() != loaded.value();
290        self.value = loaded.value;
291        self.pending_delta = 0;
292        Ok(PrimitiveChanged::new(changed))
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use automerge::{ActorId, AutoCommit, ROOT};
300
301    #[test]
302    fn test_counter_basic_operations() {
303        let mut counter = Counter::new(0);
304        assert_eq!(counter.value(), 0);
305
306        counter.increment(5);
307        assert_eq!(counter.value(), 5);
308
309        counter.decrement(2);
310        assert_eq!(counter.value(), 3);
311    }
312
313    #[test]
314    fn test_counter_roundtrip() {
315        let mut doc = AutoCommit::new();
316
317        let counter = Counter::new(42);
318        counter.save(&mut doc, &ROOT, "count").unwrap();
319
320        let loaded = Counter::load(&doc, &ROOT, "count").unwrap();
321        assert_eq!(loaded.value(), 42);
322    }
323
324    #[test]
325    fn test_counter_increment_merges() {
326        // Create initial document
327        let mut doc1 = AutoCommit::new();
328        let counter = Counter::new(0);
329        counter.save(&mut doc1, &ROOT, "count").unwrap();
330
331        // Fork for concurrent editing
332        let mut doc2 = doc1.fork().with_actor(ActorId::random());
333
334        // User 1 increments by 5
335        let mut counter1 = Counter::load(&doc1, &ROOT, "count").unwrap();
336        counter1.increment(5);
337        counter1.save(&mut doc1, &ROOT, "count").unwrap();
338
339        // User 2 increments by 3 (concurrently)
340        let mut counter2 = Counter::load(&doc2, &ROOT, "count").unwrap();
341        counter2.increment(3);
342        counter2.save(&mut doc2, &ROOT, "count").unwrap();
343
344        // Merge
345        doc1.merge(&mut doc2).unwrap();
346
347        // Both increments should be preserved
348        let merged = Counter::load(&doc1, &ROOT, "count").unwrap();
349        assert_eq!(
350            merged.value(),
351            8,
352            "Both increments should merge: 0 + 5 + 3 = 8"
353        );
354    }
355
356    #[test]
357    fn test_counter_arithmetic_operators() {
358        let mut counter = Counter::new(10);
359
360        counter += 5;
361        assert_eq!(counter.value(), 15);
362
363        counter -= 3;
364        assert_eq!(counter.value(), 12);
365
366        let counter2 = counter + 8;
367        assert_eq!(counter2.value(), 20);
368    }
369}