uni-crdt 2.2.0

Conflict-free Replicated Data Types for Uni graph database
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team

use crate::CrdtMerge;
use fxhash::{FxHashMap, FxHashSet};
use serde::{Deserialize, Serialize};
use std::hash::Hash;
use uuid::Uuid;

/// A causal dot: `(actor id, per-actor monotonic counter)`. Globally unique by
/// construction — a given actor never re-issues a counter.
pub type Dot = (String, u64);

/// Reserved actor id used when upgrading a legacy v1 (`{elements, tombstones}`)
/// payload: each live v1 element is reassigned a synthetic dot under this actor.
const LEGACY_ACTOR: &str = "__legacy__";

/// Mint a fresh, globally-unique actor id for a replica.
fn new_actor() -> String {
    Uuid::new_v4().to_string()
}

/// An Observed-Remove Set, implemented as an **ORSWOT** (Observed-Remove Set
/// Without Tombstones).
///
/// Conflict resolution is add-wins (a concurrent add + remove leaves the element
/// present). Unlike a classic tombstone-based OR-Set, this representation is
/// **tombstone-free**: element provenance is tracked with causal *dots*
/// `(actor, counter)` plus a *version vector* recording the highest counter seen
/// per actor. `remove` simply drops the element's dots; `merge` keeps a dot when
/// both replicas still hold it, or when the *other* replica's version vector has
/// not yet observed it (i.e. it is a genuinely new add, not a remove). State is
/// therefore bounded by the number of *live* elements and participating actors,
/// not by the total number of operations ever performed.
///
/// ## Wire format & backward compatibility
///
/// Serialized (v2) shape is `{ dots, vv }`. Legacy v1 payloads
/// (`{ elements, tombstones }`, opaque `Uuid` tags) are still accepted on
/// decode and upgraded in place: every element with at least one non-tombstoned
/// tag is preserved (its dead tags discarded). The per-replica `actor` is
/// runtime-only and never serialized — it is minted fresh on construction and on
/// decode, so two processes loading the same persisted blob can never collide on
/// a shared actor id.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(
    from = "ORSetWire<T>",
    into = "ORSetWireV2<T>",
    bound(
        serialize = "T: Serialize + Hash + Eq + Clone",
        deserialize = "T: Deserialize<'de> + Hash + Eq + Clone"
    )
)]
pub struct ORSet<T: Hash + Eq + Clone> {
    /// element -> set of causal dots that currently keep it present.
    dots: FxHashMap<T, FxHashSet<Dot>>,
    /// version vector: actor -> highest counter observed from that actor.
    vv: FxHashMap<String, u64>,
    /// This replica's actor id. Runtime-only (not serialized).
    actor: String,
}

impl<T: Hash + Eq + Clone> Default for ORSet<T> {
    fn default() -> Self {
        Self {
            dots: FxHashMap::default(),
            vv: FxHashMap::default(),
            actor: new_actor(),
        }
    }
}

impl<T: Hash + Eq + Clone> ORSet<T> {
    /// Create a new, empty ORSet with a fresh replica actor id.
    pub fn new() -> Self {
        Self::default()
    }

    /// Create an independent replica: an exact copy of the observed state but
    /// with a **new** actor id. Use this — not `clone()` — when forking a set so
    /// that the two replicas can be mutated concurrently and later merged
    /// without minting colliding dots. (`clone()` is an exact snapshot copy,
    /// including the actor id.)
    pub fn fork(&self) -> Self {
        let mut forked = self.clone();
        forked.actor = new_actor();
        forked
    }

    /// Add an element. Mints a fresh dot under this replica's actor that
    /// supersedes any earlier dots for the element, and returns it.
    pub fn add(&mut self, element: T) -> Dot {
        let counter = self.vv.entry(self.actor.clone()).or_insert(0);
        *counter += 1;
        let dot: Dot = (self.actor.clone(), *counter);
        let mut set = FxHashSet::default();
        set.insert(dot.clone());
        // A new add supersedes the element's prior dots on this replica;
        // concurrent dots from other replicas are reconciled in `merge`.
        self.dots.insert(element, set);
        dot
    }

    /// Remove an element by dropping its dots. No tombstone is retained: the
    /// version vector already records these dots as "observed", so a stale copy
    /// of them cannot resurrect the element on merge, while a concurrent add
    /// (a dot the remover never saw) still wins.
    pub fn remove(&mut self, element: &T) {
        self.dots.remove(element);
    }

    /// Check if an element is in the set (present iff it has at least one dot).
    pub fn contains(&self, element: &T) -> bool {
        self.dots.get(element).is_some_and(|dots| !dots.is_empty())
    }

    /// Returns a vector of all visible elements in the set.
    pub fn elements(&self) -> Vec<T> {
        self.dots
            .iter()
            .filter(|(_, dots)| !dots.is_empty())
            .map(|(elem, _)| elem.clone())
            .collect()
    }

    /// Returns the number of visible elements.
    pub fn len(&self) -> usize {
        self.dots.values().filter(|dots| !dots.is_empty()).count()
    }

    /// Returns true if the set has no visible elements.
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

/// Semantic equality: two replicas are equal when they hold the same observed
/// state, regardless of their (runtime-only) actor ids.
impl<T: Hash + Eq + Clone> PartialEq for ORSet<T> {
    fn eq(&self, other: &Self) -> bool {
        self.dots == other.dots && self.vv == other.vv
    }
}

impl<T: Hash + Eq + Clone> CrdtMerge for ORSet<T> {
    fn merge(&mut self, other: &Self) {
        // Union of element keys across both replicas.
        let mut keys: Vec<T> = Vec::new();
        {
            let mut seen: FxHashSet<&T> = FxHashSet::default();
            for k in self.dots.keys().chain(other.dots.keys()) {
                if seen.insert(k) {
                    keys.push(k.clone());
                }
            }
        }

        let empty: FxHashSet<Dot> = FxHashSet::default();
        for key in keys {
            // Clone self's dots for this key so we can mutate `self.dots` below.
            let sd: FxHashSet<Dot> = self.dots.get(&key).cloned().unwrap_or_default();
            let od: &FxHashSet<Dot> = other.dots.get(&key).unwrap_or(&empty);

            let mut surviving: FxHashSet<Dot> = FxHashSet::default();
            // Dots both replicas still hold are kept.
            for d in sd.intersection(od) {
                surviving.insert(d.clone());
            }
            // Self-only dots survive unless `other` has already observed them
            // (observed-but-absent ⇒ removed by other).
            for d in sd.difference(od) {
                if d.1 > other.vv.get(&d.0).copied().unwrap_or(0) {
                    surviving.insert(d.clone());
                }
            }
            // Symmetric: other-only dots survive unless self has observed them.
            for d in od.difference(&sd) {
                if d.1 > self.vv.get(&d.0).copied().unwrap_or(0) {
                    surviving.insert(d.clone());
                }
            }

            if surviving.is_empty() {
                self.dots.remove(&key);
            } else {
                self.dots.insert(key, surviving);
            }
        }

        // Join the version vectors (pointwise max).
        for (actor, &counter) in &other.vv {
            let entry = self.vv.entry(actor.clone()).or_insert(0);
            if counter > *entry {
                *entry = counter;
            }
        }
    }
}

// --- Serialization wire formats -------------------------------------------

/// v2 on-disk shape (the only form we *write*).
#[derive(Serialize)]
#[serde(bound(serialize = "T: Serialize + Hash + Eq + Clone"))]
struct ORSetWireV2<T: Hash + Eq + Clone> {
    dots: FxHashMap<T, FxHashSet<Dot>>,
    vv: FxHashMap<String, u64>,
}

impl<T: Hash + Eq + Clone> From<ORSet<T>> for ORSetWireV2<T> {
    fn from(set: ORSet<T>) -> Self {
        ORSetWireV2 {
            dots: set.dots,
            vv: set.vv,
        }
    }
}

/// Permissive decode shape accepting both v2 (`dots`/`vv`) and legacy v1
/// (`elements`/`tombstones`) payloads. All fields are optional so a payload
/// carrying only one shape's fields deserializes cleanly.
#[derive(Deserialize)]
#[serde(bound(deserialize = "T: Deserialize<'de> + Hash + Eq + Clone"))]
struct ORSetWire<T: Hash + Eq + Clone> {
    #[serde(default)]
    dots: Option<FxHashMap<T, FxHashSet<Dot>>>,
    #[serde(default)]
    vv: Option<FxHashMap<String, u64>>,
    // Legacy v1 fields (opaque Uuid tags).
    #[serde(default)]
    elements: Option<FxHashMap<T, FxHashSet<Uuid>>>,
    #[serde(default)]
    tombstones: Option<FxHashSet<Uuid>>,
}

impl<T: Hash + Eq + Clone> From<ORSetWire<T>> for ORSet<T> {
    fn from(wire: ORSetWire<T>) -> Self {
        let ORSetWire {
            dots,
            vv,
            elements,
            tombstones,
        } = wire;

        // v2: use as-is, mint a fresh local actor.
        if let (Some(dots), Some(vv)) = (dots, vv) {
            return ORSet {
                dots,
                vv,
                actor: new_actor(),
            };
        }

        // v1 → v2 upgrade: keep elements with at least one live tag, assigning
        // each a synthetic dot under the legacy actor; discard tombstones.
        let tombstones = tombstones.unwrap_or_default();
        let mut new_dots: FxHashMap<T, FxHashSet<Dot>> = FxHashMap::default();
        let mut counter: u64 = 0;
        if let Some(elements) = elements {
            for (elem, tags) in elements {
                if tags.iter().any(|tag| !tombstones.contains(tag)) {
                    counter += 1;
                    let mut set = FxHashSet::default();
                    set.insert((LEGACY_ACTOR.to_string(), counter));
                    new_dots.insert(elem, set);
                }
            }
        }
        let mut new_vv = FxHashMap::default();
        if counter > 0 {
            new_vv.insert(LEGACY_ACTOR.to_string(), counter);
        }
        ORSet {
            dots: new_dots,
            vv: new_vv,
            actor: new_actor(),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::Crdt;

    #[test]
    fn test_add_remove() {
        let mut os = ORSet::new();
        os.add("apple".to_string());
        assert!(os.contains(&"apple".to_string()));

        os.remove(&"apple".to_string());
        assert!(!os.contains(&"apple".to_string()));
    }

    #[test]
    fn test_add_wins() {
        let mut a = ORSet::new();
        a.add("apple".to_string());

        // Fork to an independent replica (new actor) before diverging.
        let mut b = a.fork();
        b.remove(&"apple".to_string());

        // Concurrent add on 'a'.
        a.add("apple".to_string());

        a.merge(&b);

        // Add wins: a's new dot was never observed by b, so b's remove can't
        // suppress it.
        assert!(a.contains(&"apple".to_string()));
    }

    #[test]
    fn test_merge() {
        let mut a = ORSet::new();
        a.add(1);
        a.add(2);

        let mut b = ORSet::new();
        b.add(2);
        b.add(3);

        a.merge(&b);

        let elements = a.elements();
        assert!(elements.contains(&1));
        assert!(elements.contains(&2));
        assert!(elements.contains(&3));
        assert_eq!(elements.len(), 3);
    }

    #[test]
    fn merge_is_commutative_and_idempotent() {
        let mut a = ORSet::new();
        a.add("x".to_string());
        let mut b = a.fork();
        b.add("y".to_string());
        b.remove(&"x".to_string());

        let mut ab = a.clone();
        ab.merge(&b);
        let mut ba = b.clone();
        ba.merge(&a);
        assert_eq!(ab, ba, "merge must be commutative");

        // Idempotent: merging again changes nothing.
        let mut ab2 = ab.clone();
        ab2.merge(&b);
        assert_eq!(ab, ab2, "merge must be idempotent");
    }

    /// The core H10 fix: serialized state stays bounded under churn instead of
    /// growing with the number of operations (the old tombstone set leaked).
    #[test]
    fn serialized_size_bounded_under_churn() {
        let mut a = ORSet::new();
        let mut b = a.fork();

        let size_after =
            |set: &ORSet<String>| -> usize { Crdt::ORSet(set.clone()).to_msgpack().unwrap().len() };

        // Churn the same element many times across two replicas + merge.
        for _ in 0..1000 {
            a.add("k".to_string());
            a.remove(&"k".to_string());
            b.add("k".to_string());
            b.remove(&"k".to_string());
            a.merge(&b);
            b.merge(&a);
        }

        // Two actors, no live elements → tiny, O(actors) state. The old
        // tombstone-based ORSet grew ~4000 dead tags here.
        let bytes = size_after(&a);
        assert!(
            bytes < 256,
            "serialized churned ORSet should stay small, got {bytes} bytes"
        );
        assert!(a.is_empty());
    }

    /// Legacy v1 `{elements, tombstones}` payloads must still decode, recovering
    /// exactly the live element set and dropping tombstoned ones.
    #[test]
    fn v1_payload_decodes_and_upgrades() {
        // Hand-built v1 JSON: "live" present, "dead" tombstoned.
        let v1 = serde_json::json!({
            "t": "os",
            "d": {
                "elements": {
                    "live": ["6f9619ff-8b86-d011-b42d-00cf4fc964ff"],
                    "dead": ["7f9619ff-8b86-d011-b42d-00cf4fc964ff"]
                },
                "tombstones": ["7f9619ff-8b86-d011-b42d-00cf4fc964ff"]
            }
        });

        let crdt: Crdt = serde_json::from_value(v1).expect("v1 payload must decode");
        let Crdt::ORSet(os) = crdt else {
            panic!("expected ORSet");
        };
        assert!(os.contains(&"live".to_string()), "live element preserved");
        assert!(
            !os.contains(&"dead".to_string()),
            "tombstoned element dropped"
        );
        assert_eq!(os.len(), 1);

        // And it re-serializes in the v2 shape.
        let json = serde_json::to_value(Crdt::ORSet(os)).unwrap();
        let d = json.get("d").unwrap();
        assert!(d.get("dots").is_some(), "re-serializes as v2 (dots)");
        assert!(d.get("vv").is_some(), "re-serializes as v2 (vv)");
    }

    #[test]
    fn v2_roundtrip_preserves_visibility() {
        let mut os = ORSet::new();
        os.add("a".to_string());
        os.add("b".to_string());
        os.remove(&"b".to_string());

        let bytes = Crdt::ORSet(os.clone()).to_msgpack().unwrap();
        let Crdt::ORSet(decoded) = Crdt::from_msgpack(&bytes).unwrap() else {
            panic!("expected ORSet");
        };
        assert!(decoded.contains(&"a".to_string()));
        assert!(!decoded.contains(&"b".to_string()));
        assert_eq!(os, decoded, "v2 round-trip is state-preserving");
    }
}