1use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7pub mod gcounter;
8pub mod gset;
9pub mod lww_map;
10pub mod lww_register;
11pub mod orset;
12pub mod registry_dispatch;
13pub mod rga;
14pub mod vc_register;
15pub mod vector_clock;
16
17pub use gcounter::GCounter;
18pub use gset::GSet;
19pub use lww_map::LWWMap;
20pub use lww_register::LWWRegister;
21pub use orset::ORSet;
22pub use rga::Rga;
23pub use vc_register::VCRegister;
24pub use vector_clock::VectorClock;
25
26#[derive(Error, Debug)]
27pub enum CrdtError {
28 #[error("Type mismatch: cannot merge {0} with {1}")]
29 TypeMismatch(String, String),
30 #[error("Serialization error: {0}")]
31 Serialization(String),
32}
33
34pub trait CrdtMerge {
36 fn merge(&mut self, other: &Self);
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44#[serde(tag = "t", content = "d")]
45pub enum Crdt {
46 #[serde(rename = "gc")]
47 GCounter(GCounter),
48 #[serde(rename = "gs")]
49 GSet(GSet<String>),
50 #[serde(rename = "os")]
51 ORSet(ORSet<String>),
52 #[serde(rename = "lr")]
53 LWWRegister(LWWRegister<serde_json::Value>),
54 #[serde(rename = "lm")]
55 LWWMap(LWWMap<String, serde_json::Value>),
56 #[serde(rename = "rg")]
57 Rga(Rga<String>),
58 #[serde(rename = "vc")]
59 VectorClock(VectorClock),
60 #[serde(rename = "vr")]
61 VCRegister(VCRegister<serde_json::Value>),
62}
63
64#[macro_export]
72macro_rules! for_each_crdt_variant {
73 ($mac:ident) => {
74 $mac! {
75 GCounter => "GCounter" => "uni-crdt:g-counter",
76 GSet => "GSet" => "uni-crdt:g-set",
77 ORSet => "ORSet" => "uni-crdt:or-set",
78 LWWRegister => "LWWRegister" => "uni-crdt:lww-register",
79 LWWMap => "LWWMap" => "uni-crdt:lww-map",
80 Rga => "Rga" => "uni-crdt:rga",
81 VectorClock => "VectorClock" => "uni-crdt:vector-clock",
82 VCRegister => "VCRegister" => "uni-crdt:vc-register",
83 }
84 };
85}
86
87macro_rules! try_merge_body {
88 ($($variant:ident => $type_name:literal => $kind:literal,)*) => {
89 impl Crdt {
90 pub fn try_merge(&mut self, other: &Self) -> Result<(), CrdtError> {
94 match (self, other) {
95 $(
96 (Crdt::$variant(a), Crdt::$variant(b)) => a.merge(b),
97 )*
98 (a, b) => {
99 return Err(CrdtError::TypeMismatch(
100 a.type_name().to_owned(),
101 b.type_name().to_owned(),
102 ));
103 }
104 }
105 Ok(())
106 }
107
108 pub fn type_name(&self) -> &'static str {
110 match self {
111 $(
112 Crdt::$variant(_) => $type_name,
113 )*
114 }
115 }
116 }
117 };
118}
119for_each_crdt_variant!(try_merge_body);
120
121impl CrdtMerge for Crdt {
122 fn merge(&mut self, other: &Self) {
125 if let Err(e) = self.try_merge(other) {
126 panic!("CRDT merge failed: {e}");
127 }
128 }
129}
130
131impl Crdt {
132 pub fn to_msgpack(&self) -> Result<Vec<u8>, CrdtError> {
134 rmp_serde::to_vec_named(self).map_err(|e| CrdtError::Serialization(e.to_string()))
135 }
136
137 pub fn from_msgpack(bytes: &[u8]) -> Result<Self, CrdtError> {
139 rmp_serde::from_slice(bytes).map_err(|e| CrdtError::Serialization(e.to_string()))
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn test_crdt_serialization() {
149 let mut gc = GCounter::new();
150 gc.increment("actor1", 42);
151 let crdt = Crdt::GCounter(gc);
152
153 let bytes = crdt.to_msgpack().unwrap();
154 let decoded = Crdt::from_msgpack(&bytes).unwrap();
155
156 assert_eq!(crdt, decoded);
157 }
158
159 #[test]
160 fn try_merge_type_mismatch_surfaces_readable_names() {
161 let mut a = Crdt::GCounter(GCounter::new());
165 let b = Crdt::GSet(GSet::new());
166 let err = a.try_merge(&b).expect_err("type mismatch must error");
167 match err {
168 CrdtError::TypeMismatch(left, right) => {
169 assert_eq!(left, "GCounter");
170 assert_eq!(right, "GSet");
171 }
172 other => panic!("expected TypeMismatch, got {other:?}"),
173 }
174 }
175}