cdx_core/extensions/collaboration/
crdt.rs1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
15#[serde(rename_all = "kebab-case")]
16#[strum(serialize_all = "kebab-case")]
17pub enum CrdtFormat {
18 Yjs,
20 Automerge,
22 DiamondTypes,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct CrdtMetadata {
33 pub clock: std::collections::HashMap<String, u64>,
35
36 pub origin: String,
38
39 pub seq: u64,
41}
42
43impl CrdtMetadata {
44 #[must_use]
46 pub fn new(origin: impl Into<String>, seq: u64) -> Self {
47 let origin = origin.into();
48 let mut clock = std::collections::HashMap::new();
49 clock.insert(origin.clone(), seq);
50 Self { clock, origin, seq }
51 }
52
53 pub fn increment(&mut self, site_id: &str) {
55 let count = self.clock.entry(site_id.to_string()).or_insert(0);
56 *count += 1;
57 if site_id == self.origin {
58 self.seq = *count;
59 }
60 }
61
62 pub fn merge(&mut self, other: &Self) {
64 for (site, &count) in &other.clock {
65 let entry = self.clock.entry(site.clone()).or_insert(0);
66 *entry = (*entry).max(count);
67 }
68 }
69
70 #[must_use]
72 pub fn happened_before(&self, other: &Self) -> bool {
73 let mut dominated = false;
74 for (site, &self_count) in &self.clock {
75 let other_count = other.clock.get(site).copied().unwrap_or(0);
76 if self_count > other_count {
77 return false;
78 }
79 if self_count < other_count {
80 dominated = true;
81 }
82 }
83 for (site, &other_count) in &other.clock {
85 if !self.clock.contains_key(site) && other_count > 0 {
86 dominated = true;
87 }
88 }
89 dominated
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "camelCase")]
99pub struct TextCrdtMetadata {
100 pub positions: Vec<TextCrdtPosition>,
102}
103
104impl TextCrdtMetadata {
105 #[must_use]
107 pub fn new() -> Self {
108 Self {
109 positions: Vec::new(),
110 }
111 }
112
113 #[must_use]
115 pub fn from_text(text: &str, site_id: &str) -> Self {
116 let positions = text
117 .chars()
118 .enumerate()
119 .map(|(i, c)| TextCrdtPosition {
120 id: format!("{site_id}:{}", i + 1),
121 char: c,
122 })
123 .collect();
124 Self { positions }
125 }
126
127 #[must_use]
129 pub fn text(&self) -> String {
130 self.positions.iter().map(|p| p.char).collect()
131 }
132
133 #[must_use]
135 pub fn len(&self) -> usize {
136 self.positions.len()
137 }
138
139 #[must_use]
141 pub fn is_empty(&self) -> bool {
142 self.positions.is_empty()
143 }
144}
145
146impl Default for TextCrdtMetadata {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct TextCrdtPosition {
155 pub id: String,
157
158 pub char: char,
160}
161
162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166#[serde(rename_all = "camelCase")]
167pub struct SyncState {
168 pub crdt_format: CrdtFormat,
170
171 #[serde(default, skip_serializing_if = "Option::is_none")]
173 pub crdt_version: Option<String>,
174
175 #[serde(default, skip_serializing_if = "Option::is_none")]
177 pub sync_version: Option<u64>,
178
179 #[serde(default, skip_serializing_if = "Option::is_none")]
181 pub last_sync: Option<DateTime<Utc>>,
182
183 #[serde(default, skip_serializing_if = "Vec::is_empty")]
185 pub peers: Vec<Peer>,
186}
187
188impl SyncState {
189 #[must_use]
191 pub fn new(crdt_format: CrdtFormat) -> Self {
192 Self {
193 crdt_format,
194 crdt_version: None,
195 sync_version: None,
196 last_sync: None,
197 peers: Vec::new(),
198 }
199 }
200
201 #[must_use]
203 pub fn yjs() -> Self {
204 Self::new(CrdtFormat::Yjs)
205 }
206
207 #[must_use]
209 pub fn automerge() -> Self {
210 Self::new(CrdtFormat::Automerge)
211 }
212
213 #[must_use]
215 pub fn diamond_types() -> Self {
216 Self::new(CrdtFormat::DiamondTypes)
217 }
218
219 #[must_use]
221 pub fn with_version(mut self, version: impl Into<String>) -> Self {
222 self.crdt_version = Some(version.into());
223 self
224 }
225
226 #[must_use]
228 pub fn with_sync_version(mut self, version: u64) -> Self {
229 self.sync_version = Some(version);
230 self
231 }
232
233 pub fn mark_synced(&mut self) {
235 self.last_sync = Some(Utc::now());
236 }
237
238 pub fn add_peer(&mut self, peer: Peer) {
240 if let Some(existing) = self.peers.iter_mut().find(|p| p.id == peer.id) {
242 existing.last_seen = peer.last_seen;
243 } else {
244 self.peers.push(peer);
245 }
246 }
247
248 pub fn remove_peer(&mut self, peer_id: &str) {
250 self.peers.retain(|p| p.id != peer_id);
251 }
252
253 #[must_use]
255 pub fn active_peers(&self, within: chrono::Duration) -> Vec<&Peer> {
256 let cutoff = Utc::now() - within;
257 self.peers.iter().filter(|p| p.last_seen > cutoff).collect()
258 }
259}
260
261#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
263#[serde(rename_all = "camelCase")]
264pub struct Peer {
265 pub id: String,
267
268 pub last_seen: DateTime<Utc>,
270
271 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub name: Option<String>,
274}
275
276impl Peer {
277 #[must_use]
279 pub fn new(id: impl Into<String>) -> Self {
280 Self {
281 id: id.into(),
282 last_seen: Utc::now(),
283 name: None,
284 }
285 }
286
287 #[must_use]
289 pub fn with_name(mut self, name: impl Into<String>) -> Self {
290 self.name = Some(name.into());
291 self
292 }
293
294 pub fn touch(&mut self) {
296 self.last_seen = Utc::now();
297 }
298}