Skip to main content

diamond_types_extended/
lib.rs

1//! # Diamond Types Extended - Unified CRDT Library
2//!
3//! Diamond Types Extended provides high-performance CRDTs (Conflict-free Replicated Data Types) for
4//! collaborative applications. It's a fork of [diamond-types](https://github.com/josephg/diamond-types)
5//! with an ergonomic, unified API.
6//!
7//! ## Quick Start
8//!
9//! ```
10//! use diamond_types_extended::{Document, Uuid, Value};
11//!
12//! // Create a document
13//! let mut doc = Document::new();
14//! let alice = doc.create_agent(Uuid::now_v7());
15//!
16//! // All mutations happen in transactions
17//! doc.transact(alice, |tx| {
18//!     tx.root().set("title", "My Document");
19//!     tx.root().set("count", 42);
20//!     tx.root().create_text("content");
21//! });
22//!
23//! // Access the text CRDT
24//! doc.transact(alice, |tx| {
25//!     if let Some(mut text) = tx.get_text_mut(&["content"]) {
26//!         text.insert(0, "Hello, world!");
27//!     }
28//! });
29//!
30//! // Read values directly
31//! assert_eq!(doc.root().get("title").unwrap().as_str(), Some("My Document"));
32//! assert_eq!(doc.root().get_text("content").unwrap().content(), "Hello, world!");
33//! ```
34//!
35//! ## CRDT Types
36//!
37//! - **Map**: Key-value container with LWW (Last-Writer-Wins) registers per key
38//! - **Text**: Sequence CRDT for collaborative text editing
39//! - **Set**: OR-Set (Observed-Remove) with add-wins semantics
40//! - **Register**: Single-value LWW container
41//!
42//! All types can be nested within Maps.
43//!
44//! ## Replication
45//!
46//! ```ignore
47//! // Peer A creates some changes
48//! let ops = doc_a.ops_since(&Frontier::root()).into_owned();
49//!
50//! // Peer B merges them
51//! doc_b.merge_ops(ops)?;
52//! ```
53//!
54//! ## Attribution
55//!
56//! Diamond Types Extended is built on diamond-types by Joseph Gentle. See ATTRIBUTION.md for details.
57
58#![allow(clippy::module_inception)]
59
60use std::collections::{BTreeMap, BTreeSet};
61use std::fmt::{Debug, Formatter};
62
63use jumprope::JumpRopeBuf;
64use serde::{Deserialize, Serialize};
65use smallvec::SmallVec;
66use smartstring::alias::String as SmartString;
67
68// HasLength is re-exported from our vendored rle module for internal use.
69use causalgraph::graph::Graph;
70pub use frontier::Frontier;
71
72pub use uuid::Uuid;
73pub use ordered_float::NotNan;
74pub use crate::causalgraph::agent_assignment::remote_ids::{RemoteVersion, RemoteFrontier};
75use crate::causalgraph::agent_span::AgentVersion;
76pub(crate) use crate::causalgraph::CausalGraph;
77pub(crate) use crate::dtrange::DTRange;
78use crate::list::op_metrics::{ListOperationCtx, ListOpMetrics};
79
80use crate::rle::{KVPair, RleVec};
81use crate::textinfo::TextInfo;
82
83// use crate::list::internal_op::OperationInternal as TextOpInternal;
84
85// ============ New Diamond Types Extended Public API ============
86mod value;
87mod document;
88mod refs;
89mod muts;
90
91pub use value::{Value, PrimitiveValue, MaterializedValue, CrdtId, Conflicted};
92pub use document::{Document, DocumentWriter, Transaction};
93pub use refs::{MapRef, TextRef, SetRef, RegisterRef};
94pub use muts::{MapMut, TextMut, SetMut};
95pub use encoding::parseerror::ParseError;
96
97// ============ Original diamond-types modules (upstream internals) ============
98// These modules contain upstream diamond-types code. Many have WIP features
99// and unused items that we preserve for compatibility. Dead code warnings are
100// suppressed at the module level rather than per-item.
101#[allow(dead_code)]
102pub(crate) mod list;
103#[allow(dead_code)]
104pub(crate) mod register;
105#[allow(dead_code)]
106pub(crate) mod map;
107#[allow(dead_code)]
108pub(crate) mod set;
109
110#[allow(dead_code)]
111pub(crate) mod rle;
112#[allow(dead_code)]
113mod dtrange;
114mod unicount;
115#[allow(dead_code)]
116mod rev_range;
117#[allow(dead_code)]
118pub(crate) mod frontier;
119mod check;
120#[allow(dead_code)]
121pub(crate) mod encoding;
122#[allow(dead_code)]
123pub(crate) mod causalgraph;
124#[allow(dead_code)]
125mod wal;
126#[allow(dead_code)]
127mod ost;
128
129#[allow(dead_code, unused_imports)]
130pub(crate) mod serde_helpers;
131
132#[allow(dead_code)]
133mod listmerge;
134
135#[cfg(any(test, feature = "gen_test_data"))]
136mod list_fuzzer_tools;
137#[cfg(test)]
138mod fuzzer;
139#[allow(dead_code)]
140mod branch;
141mod textinfo;
142#[allow(dead_code)]
143mod oplog;
144#[cfg(feature = "storage")]
145#[allow(dead_code)]
146mod storage;
147#[allow(dead_code)]
148mod simple_checkout;
149#[allow(dead_code)]
150mod stats;
151
152pub type AgentId = u32;
153
154// TODO: Consider changing this to u64 to add support for very long lived documents even on 32 bit
155// systems like wasm32
156/// An LV (LocalVersion) is used all over the place internally to identify a single operation.
157///
158/// A local version (as the name implies) is local-only. Local versions generally need to be
159/// converted to RawVersions before being sent over the wire or saved to disk.
160pub type LV = usize;
161
162#[derive(Clone, Eq, PartialEq, Ord, PartialOrd)]
163#[derive(Serialize, Deserialize)]
164pub(crate) enum Primitive {
165    Nil,
166    Bool(bool),
167    I64(i64),
168    F64(NotNan<f64>),
169    Str(SmartString),
170
171    #[serde(skip)]
172    #[allow(dead_code)]
173    InvalidUninitialized,
174}
175
176impl Debug for Primitive {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        match self {
179            Primitive::Nil => f.debug_struct("Nil").finish(),
180            Primitive::Bool(val) => val.fmt(f),
181            // Primitive::I64(val) => f.debug_tuple("I64").field(val).finish(),
182            Primitive::I64(val) => val.fmt(f),
183            Primitive::F64(val) => val.fmt(f),
184            Primitive::Str(val) => val.fmt(f),
185            Primitive::InvalidUninitialized => f.debug_tuple("InvalidUninitialized").finish()
186        }
187    }
188}
189
190// #[derive(Debug, Eq, PartialEq, Copy, Clone, TryFromPrimitive)]
191#[derive(Debug, Eq, PartialEq, Copy, Clone)]
192#[derive(Serialize, Deserialize)]
193pub(crate) enum CRDTKind {
194    Map,        // String => Register (like a JS object)
195    Register,   // Single LWW value
196    #[allow(dead_code)]
197    Collection, // SQL table / mongo collection
198    Text,       // Text/sequence CRDT
199    Set,        // OR-Set (add-wins semantics)
200}
201
202#[derive(Debug, Clone, Eq, PartialEq)]
203#[derive(Serialize, Deserialize)]
204pub(crate) enum CreateValue {
205    Primitive(Primitive),
206    NewCRDT(CRDTKind),
207    // Deleted, // Marks that the key / contents should be deleted.
208}
209
210// #[derive(Debug, Clone, Eq, PartialEq)]
211// pub enum CollectionOp {
212//     Insert(CreateValue),
213//     Remove(LV),
214// }
215
216// #[derive(Debug, Clone, Eq, PartialEq)]
217// pub(crate) enum OpContents {
218//     RegisterSet(CreateValue),
219//     MapSet(SmartString, CreateValue), // TODO: Inline the index here.
220//     MapDelete(SmartString), // TODO: And here.
221//     Collection(CollectionOp), // TODO: Consider just inlining this.
222//     Text(ListOpMetrics),
223//
224//
225//     // The other way to write this would be:
226//
227//
228//     // SetInsert(CRDTKind),
229//     // SetRemove(Time),
230//
231//     // TextInsert(..),
232//     // TextRemove(..)
233// }
234
235// #[derive(Debug, Clone, Eq, PartialEq)]
236// pub(crate) struct Op {
237//     pub target_id: LV,
238//     pub contents: OpContents,
239// }
240
241// #[derive(Debug, Clone, Eq, PartialEq, Default)]
242// pub(crate) struct Ops {
243//     /// Local version + op pairs
244//     ops: RleVec<KVPair<Op>>,
245//     list_ctx: ListOperationCtx,
246// }
247
248pub(crate) const ROOT_CRDT_ID: LV = usize::MAX;
249#[allow(dead_code)]
250pub(crate) const ROOT_CRDT_ID_AV: AgentVersion = (AgentId::MAX, 0);
251
252
253// #[derive(Debug, Clone, Eq, PartialEq)]
254// pub enum SnapshotValue {
255//     Primitive(Primitive),
256//     InnerCRDT(LV),
257//     // Ref(LV),
258// }
259//
260// #[derive(Debug, Clone, Eq, PartialEq)]
261// struct RegisterState {
262//     value: SnapshotValue,
263//     version: LV,
264// }
265
266// /// Guaranteed to always have at least 1 value inside.
267// type MVRegister = SmallVec<RegisterState, 1>;
268
269// // TODO: Probably should also store a dirty flag for when we flush to disk.
270// #[derive(Debug, Clone, Eq, PartialEq)]
271// enum OverlayValue {
272//     Register(MVRegister),
273//     Map(BTreeMap<SmartString, MVRegister>),
274//     Collection(BTreeMap<LV, SnapshotValue>),
275//     Text(Box<JumpRope>),
276// }
277
278// type Pair<T> = (LV, T);
279type ValPair = (LV, CreateValue);
280// type RawPair<'a, T> = (RemoteVersion<'a>, T);
281type LVKey = LV;
282
283
284#[derive(Debug, Clone, Default)]
285pub(crate) struct RegisterInfo {
286    // I bet there's a clever way to use RLE to optimize this. Right now this contains the full
287    // history of values this register has ever held.
288    ops: Vec<ValPair>,
289
290    /// Cached version(s) which together store the current HEAD for this register.
291    supremum: SmallVec<usize, 2>,
292}
293
294#[derive(Debug, Clone, PartialEq, Eq)]
295pub(crate) enum RegisterValue {
296    Primitive(Primitive),
297    OwnedCRDT(CRDTKind, LVKey),
298}
299
300
301#[derive(Debug, Clone, Default)]
302pub(crate) struct OpLog {
303    pub(crate) cg: CausalGraph,
304
305
306    // cg_storage: Option<CGStorage>,
307    // wal_storage: Option<WriteAheadLog>,
308
309    // Information about whether the map still exists!
310    // maps: BTreeMap<LVKey, MapInfo>,
311
312    /// (CRDT ID, key) -> MVRegister.
313    map_keys: BTreeMap<(LVKey, SmartString), RegisterInfo>,
314    /// CRDT ID -> Text CRDT.
315    texts: BTreeMap<LVKey, TextInfo>,
316
317    // These are always inserted at the end, but items in the middle are removed. There's probably
318    // a better data structure to accomplish this.
319    map_index: BTreeMap<LV, (LVKey, SmartString)>,
320    text_index: BTreeMap<LV, LVKey>,
321
322    /// Standalone registers (not inside maps).
323    registers: BTreeMap<LVKey, RegisterInfo>,
324    /// Index from operation LV to register CRDT ID.
325    #[allow(dead_code)]
326    register_index: BTreeMap<LV, LVKey>,
327
328    /// OR-Sets storing Primitive values.
329    sets: BTreeMap<LVKey, set::SetInfo<Primitive>>,
330    /// Index from operation LV to set CRDT ID.
331    set_index: BTreeMap<LV, LVKey>,
332
333    // The set of CRDTs which have been deleted or superseded in the current version. This data is
334    // pretty similar to the _index data, in that its mainly just useful for branches doing
335    // checkouts.
336    deleted_crdts: BTreeSet<LVKey>,
337}
338
339#[derive(Debug, Clone, Eq, PartialEq)]
340pub(crate) struct Branch {
341    pub(crate) frontier: Frontier,
342
343    // Objects are always created at the highest version ID, but can be deleted anywhere in the
344    // range.
345    //
346    // TODO: Replace BTreeMap with something more appropriate later.
347    maps: BTreeMap<LVKey, BTreeMap<SmartString, RegisterState>>, // any objects.
348    pub(crate) texts: BTreeMap<LVKey, JumpRopeBuf>,
349    /// Standalone registers (not inside maps).
350    pub(crate) registers: BTreeMap<LVKey, RegisterState>,
351    /// OR-Sets storing Primitive values.
352    pub(crate) sets: BTreeMap<LVKey, BTreeSet<Primitive>>,
353}
354
355/// The register stores the specified value, but if conflicts_with is not empty, it has some
356/// conflicting concurrent values too. The `value` field will be consistent across all peers.
357#[derive(Debug, Clone, PartialEq, Eq)]
358pub(crate) struct RegisterState {
359    /// The winning value according to LWW semantics.
360    pub(crate) value: RegisterValue,
361    /// Any concurrent values that lost the LWW tie-break.
362    pub(crate) conflicts_with: Vec<RegisterValue>,
363}
364
365#[derive(Debug, Clone)]
366#[derive(Serialize, Deserialize)]
367pub struct SerializedOps<'a> {
368    cg_changes: Vec<u8>,
369
370    // The version of the op, and the name of the containing CRDT.
371    #[serde(borrow)]
372    map_ops: Vec<(RemoteVersion, RemoteVersion, &'a str, CreateValue)>,
373    text_ops: Vec<(RemoteVersion, RemoteVersion, ListOpMetrics)>,
374    text_context: ListOperationCtx,
375    /// OR-Set operations: (crdt_name, op_version, `SerializedSetOp<Primitive>`)
376    set_ops: Vec<(RemoteVersion, RemoteVersion, set::SerializedSetOp<Primitive>)>,
377}
378
379impl<'a> From<SerializedOps<'a>> for SerializedOpsOwned {
380    fn from(ops: SerializedOps<'a>) -> Self {
381        Self {
382            cg_changes: ops.cg_changes,
383            map_ops: ops.map_ops.into_iter().map(|(crdt_name, rv, key, val)| {
384                (crdt_name, rv, SmartString::from(key), val)
385            }).collect(),
386            text_ops: ops.text_ops,
387            text_context: ops.text_context,
388            set_ops: ops.set_ops,
389        }
390    }
391}
392
393impl<'a> SerializedOps<'a> {
394    /// Convert to an owned version that can be sent across threads.
395    pub fn into_owned(self) -> SerializedOpsOwned {
396        self.into()
397    }
398
399    /// Check if this contains no operations.
400    pub fn is_empty(&self) -> bool {
401        self.cg_changes.is_empty()
402            && self.map_ops.is_empty()
403            && self.text_ops.is_empty()
404            && self.set_ops.is_empty()
405    }
406}
407
408#[derive(Debug, Clone)]
409#[derive(Serialize, Deserialize)]
410pub struct SerializedOpsOwned {
411    cg_changes: Vec<u8>,
412
413    // The version of the op, and the name of the containing CRDT.
414    map_ops: Vec<(RemoteVersion, RemoteVersion, SmartString, CreateValue)>,
415    text_ops: Vec<(RemoteVersion, RemoteVersion, ListOpMetrics)>,
416    text_context: ListOperationCtx,
417    /// OR-Set operations: (crdt_name, op_version, `SerializedSetOp<Primitive>`)
418    set_ops: Vec<(RemoteVersion, RemoteVersion, set::SerializedSetOp<Primitive>)>,
419}
420
421impl SerializedOpsOwned {
422    /// Check if this contains no operations.
423    pub fn is_empty(&self) -> bool {
424        self.cg_changes.is_empty()
425            && self.map_ops.is_empty()
426            && self.text_ops.is_empty()
427            && self.set_ops.is_empty()
428    }
429}
430
431/// This is used for checkouts. This is a value tree.
432#[derive(Debug, Clone, Eq, PartialEq)]
433#[derive(Serialize, Deserialize)]
434pub(crate) enum DTValue {
435    Primitive(Primitive),
436    /// A register containing a value (which could be a nested CRDT).
437    Register(Box<DTValue>),
438    Map(BTreeMap<SmartString, Box<DTValue>>),
439    // Collection(BTreeMap<LV, Box<DTValue>>),
440    Text(String),
441    /// An OR-Set containing primitive values.
442    Set(BTreeSet<Primitive>),
443}