Skip to main content

cdx_core/extensions/collaboration/
crdt.rs

1//! CRDT integration for real-time collaboration.
2//!
3//! This module provides types for CRDT-based real-time collaboration,
4//! enabling conflict-free merging of concurrent edits.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8
9/// CRDT format identifier.
10///
11/// Specifies which CRDT library/format is used for real-time collaboration.
12/// Documents using CRDT-based collaboration MUST declare the format to enable
13/// correct interpretation of CRDT state.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::Display)]
15#[serde(rename_all = "kebab-case")]
16#[strum(serialize_all = "kebab-case")]
17pub enum CrdtFormat {
18    /// [Yjs](https://yjs.dev/) CRDT library.
19    Yjs,
20    /// [Automerge](https://automerge.org/) JSON CRDT.
21    Automerge,
22    /// [Diamond Types](https://github.com/josephg/diamond-types) text CRDT.
23    DiamondTypes,
24}
25
26/// CRDT metadata for a content block.
27///
28/// Each content block can carry CRDT metadata for tracking distributed state.
29/// This enables conflict-free merging of concurrent edits.
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct CrdtMetadata {
33    /// Vector clock mapping site IDs to logical timestamps.
34    pub clock: std::collections::HashMap<String, u64>,
35
36    /// Site ID where this version originated.
37    pub origin: String,
38
39    /// Sequence number within the origin site.
40    pub seq: u64,
41}
42
43impl CrdtMetadata {
44    /// Create new CRDT metadata.
45    #[must_use]
46    pub fn new(origin: impl Into<String>, seq: u64) -> Self {
47        let origin = origin.into();
48        let mut clock = std::collections::HashMap::new();
49        clock.insert(origin.clone(), seq);
50        Self { clock, origin, seq }
51    }
52
53    /// Increment the sequence number for a site.
54    pub fn increment(&mut self, site_id: &str) {
55        let count = self.clock.entry(site_id.to_string()).or_insert(0);
56        *count += 1;
57        if site_id == self.origin {
58            self.seq = *count;
59        }
60    }
61
62    /// Merge with another CRDT metadata (takes maximum of each clock entry).
63    pub fn merge(&mut self, other: &Self) {
64        for (site, &count) in &other.clock {
65            let entry = self.clock.entry(site.clone()).or_insert(0);
66            *entry = (*entry).max(count);
67        }
68    }
69
70    /// Check if this metadata happened before another (causally).
71    #[must_use]
72    pub fn happened_before(&self, other: &Self) -> bool {
73        let mut dominated = false;
74        for (site, &self_count) in &self.clock {
75            let other_count = other.clock.get(site).copied().unwrap_or(0);
76            if self_count > other_count {
77                return false;
78            }
79            if self_count < other_count {
80                dominated = true;
81            }
82        }
83        // Check sites only in other
84        for (site, &other_count) in &other.clock {
85            if !self.clock.contains_key(site) && other_count > 0 {
86                dominated = true;
87            }
88        }
89        dominated
90    }
91}
92
93/// CRDT metadata for text content within a block.
94///
95/// For rich text editing, each character can have a unique position ID
96/// enabling character-level conflict resolution.
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "camelCase")]
99pub struct TextCrdtMetadata {
100    /// Character positions with unique IDs.
101    pub positions: Vec<TextCrdtPosition>,
102}
103
104impl TextCrdtMetadata {
105    /// Create new text CRDT metadata.
106    #[must_use]
107    pub fn new() -> Self {
108        Self {
109            positions: Vec::new(),
110        }
111    }
112
113    /// Create metadata from text with a site ID prefix.
114    #[must_use]
115    pub fn from_text(text: &str, site_id: &str) -> Self {
116        let positions = text
117            .chars()
118            .enumerate()
119            .map(|(i, c)| TextCrdtPosition {
120                id: format!("{site_id}:{}", i + 1),
121                char: c,
122            })
123            .collect();
124        Self { positions }
125    }
126
127    /// Get the text content.
128    #[must_use]
129    pub fn text(&self) -> String {
130        self.positions.iter().map(|p| p.char).collect()
131    }
132
133    /// Get the number of characters.
134    #[must_use]
135    pub fn len(&self) -> usize {
136        self.positions.len()
137    }
138
139    /// Check if empty.
140    #[must_use]
141    pub fn is_empty(&self) -> bool {
142        self.positions.is_empty()
143    }
144}
145
146impl Default for TextCrdtMetadata {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152/// A single character position in text CRDT.
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct TextCrdtPosition {
155    /// Unique position identifier (e.g., "site1:42").
156    pub id: String,
157
158    /// The character at this position.
159    pub char: char,
160}
161
162/// Synchronization state for CRDT-based collaboration.
163///
164/// Tracks the current sync state of a document for real-time collaboration.
165#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166#[serde(rename_all = "camelCase")]
167pub struct SyncState {
168    /// CRDT format being used.
169    pub crdt_format: CrdtFormat,
170
171    /// Version of the CRDT library.
172    #[serde(default, skip_serializing_if = "Option::is_none")]
173    pub crdt_version: Option<String>,
174
175    /// Logical clock or sequence number for sync state.
176    #[serde(default, skip_serializing_if = "Option::is_none")]
177    pub sync_version: Option<u64>,
178
179    /// Timestamp of last synchronization.
180    #[serde(default, skip_serializing_if = "Option::is_none")]
181    pub last_sync: Option<DateTime<Utc>>,
182
183    /// Known collaboration peers.
184    #[serde(default, skip_serializing_if = "Vec::is_empty")]
185    pub peers: Vec<Peer>,
186}
187
188impl SyncState {
189    /// Create new sync state with the specified CRDT format.
190    #[must_use]
191    pub fn new(crdt_format: CrdtFormat) -> Self {
192        Self {
193            crdt_format,
194            crdt_version: None,
195            sync_version: None,
196            last_sync: None,
197            peers: Vec::new(),
198        }
199    }
200
201    /// Create sync state for Yjs.
202    #[must_use]
203    pub fn yjs() -> Self {
204        Self::new(CrdtFormat::Yjs)
205    }
206
207    /// Create sync state for Automerge.
208    #[must_use]
209    pub fn automerge() -> Self {
210        Self::new(CrdtFormat::Automerge)
211    }
212
213    /// Create sync state for Diamond Types.
214    #[must_use]
215    pub fn diamond_types() -> Self {
216        Self::new(CrdtFormat::DiamondTypes)
217    }
218
219    /// Set the CRDT library version.
220    #[must_use]
221    pub fn with_version(mut self, version: impl Into<String>) -> Self {
222        self.crdt_version = Some(version.into());
223        self
224    }
225
226    /// Set the sync version.
227    #[must_use]
228    pub fn with_sync_version(mut self, version: u64) -> Self {
229        self.sync_version = Some(version);
230        self
231    }
232
233    /// Update the last sync timestamp to now.
234    pub fn mark_synced(&mut self) {
235        self.last_sync = Some(Utc::now());
236    }
237
238    /// Add a peer.
239    pub fn add_peer(&mut self, peer: Peer) {
240        // Update existing peer or add new one
241        if let Some(existing) = self.peers.iter_mut().find(|p| p.id == peer.id) {
242            existing.last_seen = peer.last_seen;
243        } else {
244            self.peers.push(peer);
245        }
246    }
247
248    /// Remove a peer by ID.
249    pub fn remove_peer(&mut self, peer_id: &str) {
250        self.peers.retain(|p| p.id != peer_id);
251    }
252
253    /// Get active peers (seen within the given duration).
254    #[must_use]
255    pub fn active_peers(&self, within: chrono::Duration) -> Vec<&Peer> {
256        let cutoff = Utc::now() - within;
257        self.peers.iter().filter(|p| p.last_seen > cutoff).collect()
258    }
259}
260
261/// A collaboration peer.
262#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
263#[serde(rename_all = "camelCase")]
264pub struct Peer {
265    /// Unique peer identifier.
266    pub id: String,
267
268    /// When the peer was last seen.
269    pub last_seen: DateTime<Utc>,
270
271    /// Optional display name.
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub name: Option<String>,
274}
275
276impl Peer {
277    /// Create a new peer.
278    #[must_use]
279    pub fn new(id: impl Into<String>) -> Self {
280        Self {
281            id: id.into(),
282            last_seen: Utc::now(),
283            name: None,
284        }
285    }
286
287    /// Set the display name.
288    #[must_use]
289    pub fn with_name(mut self, name: impl Into<String>) -> Self {
290        self.name = Some(name.into());
291        self
292    }
293
294    /// Update the last seen timestamp.
295    pub fn touch(&mut self) {
296        self.last_seen = Utc::now();
297    }
298}