Skip to main content

uni_crdt/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7pub mod gcounter;
8pub mod gset;
9pub mod lww_map;
10pub mod lww_register;
11pub mod orset;
12pub mod registry_dispatch;
13pub mod rga;
14pub mod vc_register;
15pub mod vector_clock;
16
17pub use gcounter::GCounter;
18pub use gset::GSet;
19pub use lww_map::LWWMap;
20pub use lww_register::LWWRegister;
21pub use orset::ORSet;
22pub use rga::Rga;
23pub use vc_register::VCRegister;
24pub use vector_clock::VectorClock;
25
26#[derive(Error, Debug)]
27pub enum CrdtError {
28    #[error("Type mismatch: cannot merge {0} with {1}")]
29    TypeMismatch(String, String),
30    #[error("Serialization error: {0}")]
31    Serialization(String),
32}
33
34/// Trait for state-based CRDTs (CvRDTs)
35pub trait CrdtMerge {
36    /// Merge another instance into self.
37    /// Must satisfy: commutativity, associativity, idempotency.
38    fn merge(&mut self, other: &Self);
39}
40
41/// Dynamic CRDT wrapper for storage and query layers.
42/// Using MessagePack for binary serialization in the storage layer.
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44#[serde(tag = "t", content = "d")]
45pub enum Crdt {
46    #[serde(rename = "gc")]
47    GCounter(GCounter),
48    #[serde(rename = "gs")]
49    GSet(GSet<String>),
50    #[serde(rename = "os")]
51    ORSet(ORSet<String>),
52    #[serde(rename = "lr")]
53    LWWRegister(LWWRegister<serde_json::Value>),
54    #[serde(rename = "lm")]
55    LWWMap(LWWMap<String, serde_json::Value>),
56    #[serde(rename = "rg")]
57    Rga(Rga<String>),
58    #[serde(rename = "vc")]
59    VectorClock(VectorClock),
60    #[serde(rename = "vr")]
61    VCRegister(VCRegister<serde_json::Value>),
62}
63
64/// Single source of truth for the `Crdt` variant table.
65///
66/// Expands a passed-in `$mac` over the rows
67/// `<Variant> => <type_name_str> => <registry_kind_str>`. Every consumer
68/// of the variant set — `try_merge`, `type_name`, and
69/// `registry_dispatch::Crdt::kind` — drives off this list, so adding a
70/// new CRDT means editing exactly one place.
71#[macro_export]
72macro_rules! for_each_crdt_variant {
73    ($mac:ident) => {
74        $mac! {
75            GCounter    => "GCounter"    => "uni-crdt:g-counter",
76            GSet        => "GSet"        => "uni-crdt:g-set",
77            ORSet       => "ORSet"       => "uni-crdt:or-set",
78            LWWRegister => "LWWRegister" => "uni-crdt:lww-register",
79            LWWMap      => "LWWMap"      => "uni-crdt:lww-map",
80            Rga         => "Rga"         => "uni-crdt:rga",
81            VectorClock => "VectorClock" => "uni-crdt:vector-clock",
82            VCRegister  => "VCRegister"  => "uni-crdt:vc-register",
83        }
84    };
85}
86
87macro_rules! try_merge_body {
88    ($($variant:ident => $type_name:literal => $kind:literal,)*) => {
89        impl Crdt {
90            /// Try to merge another CRDT into this one.
91            /// Returns an error if the types don't match.
92            /// This is the safe, non-panicking version of merge.
93            pub fn try_merge(&mut self, other: &Self) -> Result<(), CrdtError> {
94                match (self, other) {
95                    $(
96                        (Crdt::$variant(a), Crdt::$variant(b)) => a.merge(b),
97                    )*
98                    (a, b) => {
99                        return Err(CrdtError::TypeMismatch(
100                            a.type_name().to_owned(),
101                            b.type_name().to_owned(),
102                        ));
103                    }
104                }
105                Ok(())
106            }
107
108            /// Returns the type name of this CRDT variant for error messages.
109            pub fn type_name(&self) -> &'static str {
110                match self {
111                    $(
112                        Crdt::$variant(_) => $type_name,
113                    )*
114                }
115            }
116        }
117    };
118}
119for_each_crdt_variant!(try_merge_body);
120
121impl CrdtMerge for Crdt {
122    /// Merge another CRDT into this one.
123    /// Panics if the types don't match. For a non-panicking version, use `try_merge`.
124    fn merge(&mut self, other: &Self) {
125        if let Err(e) = self.try_merge(other) {
126            panic!("CRDT merge failed: {e}");
127        }
128    }
129}
130
131impl Crdt {
132    /// Serialize the CRDT to MessagePack bytes.
133    pub fn to_msgpack(&self) -> Result<Vec<u8>, CrdtError> {
134        rmp_serde::to_vec_named(self).map_err(|e| CrdtError::Serialization(e.to_string()))
135    }
136
137    /// Deserialize a CRDT from MessagePack bytes.
138    pub fn from_msgpack(bytes: &[u8]) -> Result<Self, CrdtError> {
139        rmp_serde::from_slice(bytes).map_err(|e| CrdtError::Serialization(e.to_string()))
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn test_crdt_serialization() {
149        let mut gc = GCounter::new();
150        gc.increment("actor1", 42);
151        let crdt = Crdt::GCounter(gc);
152
153        let bytes = crdt.to_msgpack().unwrap();
154        let decoded = Crdt::from_msgpack(&bytes).unwrap();
155
156        assert_eq!(crdt, decoded);
157    }
158
159    #[test]
160    fn try_merge_type_mismatch_surfaces_readable_names() {
161        // Regression: previously formatted via `mem::discriminant`, producing
162        // opaque `Discriminant(...)` strings. Both names should now be the
163        // same human-readable identifiers `type_name()` returns.
164        let mut a = Crdt::GCounter(GCounter::new());
165        let b = Crdt::GSet(GSet::new());
166        let err = a.try_merge(&b).expect_err("type mismatch must error");
167        match err {
168            CrdtError::TypeMismatch(left, right) => {
169                assert_eq!(left, "GCounter");
170                assert_eq!(right, "GSet");
171            }
172            other => panic!("expected TypeMismatch, got {other:?}"),
173        }
174    }
175}