cloudillo_types/crdt_adapter.rs
1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! CRDT Document Adapter
5//!
6//! Trait and types for pluggable CRDT document backends that store binary updates
7//! for collaborative documents using Yjs/yrs (Rust port of Yjs).
8//!
9//! The adapter handles:
10//! - Persistence of binary CRDT updates (Yjs sync protocol format)
11//! - Change subscriptions for real-time updates
12//! - Document lifecycle (creation, deletion)
13//!
14//! Each adapter implementation provides its own constructor handling backend-specific
15//! initialization (database path, connection settings, etc.).
16//!
17//! The adapter works with binary updates (Uint8Array) rather than typed documents,
18//! allowing flexibility in how updates are stored and reconstructed into Y.Doc instances.
19
20use async_trait::async_trait;
21use futures_core::Stream;
22use serde::{Deserialize, Serialize};
23use std::fmt::Debug;
24use std::pin::Pin;
25
26use crate::prelude::*;
27
28/// A binary CRDT update (serialized Yjs sync protocol message).
29///
30/// These updates are the fundamental unit of change in CRDT systems.
31/// They can be applied in any order and are commutative.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct CrdtUpdate {
34 /// Raw bytes of the update in Yjs sync protocol format
35 pub data: Vec<u8>,
36
37 /// Optional user/client ID that created this update
38 pub client_id: Option<Box<str>>,
39
40 /// Storage sequence number (populated by get_updates, used by compact_updates)
41 #[serde(skip)]
42 pub seq: Option<u64>,
43}
44
45impl CrdtUpdate {
46 /// Create a new CRDT update from raw bytes.
47 pub fn new(data: Vec<u8>) -> Self {
48 Self { data, client_id: None, seq: None }
49 }
50
51 /// Create a new CRDT update with client ID.
52 pub fn with_client(data: Vec<u8>, client_id: impl Into<Box<str>>) -> Self {
53 Self { data, client_id: Some(client_id.into()), seq: None }
54 }
55}
56
57/// Real-time change notification for a CRDT document.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct CrdtChangeEvent {
60 /// Document ID
61 pub doc_id: Box<str>,
62
63 /// The update that caused this change
64 pub update: CrdtUpdate,
65}
66
67/// Options for subscribing to CRDT document changes.
68#[derive(Debug, Clone)]
69pub struct CrdtSubscriptionOptions {
70 /// Document ID to subscribe to
71 pub doc_id: Box<str>,
72
73 /// If true, send existing updates as initial snapshot
74 pub send_snapshot: bool,
75}
76
77impl CrdtSubscriptionOptions {
78 /// Create a subscription to a document with snapshot.
79 pub fn with_snapshot(doc_id: impl Into<Box<str>>) -> Self {
80 Self { doc_id: doc_id.into(), send_snapshot: true }
81 }
82
83 /// Create a subscription to future updates only (no snapshot).
84 pub fn updates_only(doc_id: impl Into<Box<str>>) -> Self {
85 Self { doc_id: doc_id.into(), send_snapshot: false }
86 }
87}
88
89/// CRDT Document statistics.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct CrdtDocStats {
92 /// Document ID
93 pub doc_id: Box<str>,
94
95 /// Total size of stored updates in bytes
96 pub size_bytes: u64,
97
98 /// Number of updates stored
99 pub update_count: u32,
100}
101
102/// CRDT Adapter trait.
103///
104/// Unified interface for CRDT document backends. Handles persistence of binary updates
105/// and real-time subscriptions.
106///
107/// # Multi-Tenancy
108///
109/// All operations are tenant-aware (tn_id parameter). Adapters must ensure:
110/// - Updates from different tenants are stored separately
111/// - Subscriptions only receive updates for the subscribing tenant
112#[async_trait]
113pub trait CrdtAdapter: Debug + Send + Sync {
114 /// Get all stored updates for a document.
115 ///
116 /// Returns updates in the order they were stored. These can be applied
117 /// to a fresh Y.Doc to reconstruct the current state.
118 ///
119 /// Returns empty vec if document doesn't exist (safe to treat as new doc).
120 async fn get_updates(&self, tn_id: TnId, doc_id: &str) -> ClResult<Vec<CrdtUpdate>>;
121
122 /// Store a new update for a document.
123 ///
124 /// The update is persisted immediately. For high-frequency updates,
125 /// implementations may batch or compress updates.
126 ///
127 /// If the document doesn't exist, it's implicitly created.
128 async fn store_update(&self, tn_id: TnId, doc_id: &str, update: CrdtUpdate) -> ClResult<()>;
129
130 /// Subscribe to updates for a document.
131 ///
132 /// Returns a stream of updates. Depending on subscription options,
133 /// may include a snapshot of existing updates followed by new updates.
134 async fn subscribe(
135 &self,
136 tn_id: TnId,
137 opts: CrdtSubscriptionOptions,
138 ) -> ClResult<Pin<Box<dyn Stream<Item = CrdtChangeEvent> + Send>>>;
139
140 /// Get statistics for a document.
141 async fn stats(&self, tn_id: TnId, doc_id: &str) -> ClResult<CrdtDocStats> {
142 let updates = self.get_updates(tn_id, doc_id).await?;
143 let update_count = u32::try_from(updates.len()).unwrap_or_default();
144 let size_bytes: u64 = updates.iter().map(|u| u.data.len() as u64).sum();
145
146 Ok(CrdtDocStats { doc_id: doc_id.into(), size_bytes, update_count })
147 }
148
149 /// Atomically replace specific updates with a single compacted update.
150 ///
151 /// Deletes the updates identified by `remove_seqs` and inserts the
152 /// `replacement` update, all in a single transaction. Updates not listed
153 /// in `remove_seqs` (e.g., ones that failed to decode) are preserved.
154 ///
155 /// Non-existent seqs in `remove_seqs` are silently ignored.
156 ///
157 /// **Important:** This method does not broadcast a change event. It should
158 /// only be called when no active subscribers exist (e.g., after the last
159 /// connection to the document has closed).
160 async fn compact_updates(
161 &self,
162 tn_id: TnId,
163 doc_id: &str,
164 remove_seqs: &[u64],
165 replacement: CrdtUpdate,
166 ) -> ClResult<()>;
167
168 /// Delete a document and all its updates.
169 ///
170 /// This removes all stored data for the document. Use with caution.
171 async fn delete_doc(&self, tn_id: TnId, doc_id: &str) -> ClResult<()>;
172
173 /// Close/flush a document instance, ensuring all updates are persisted.
174 ///
175 /// Some implementations may keep documents in-memory and need explicit
176 /// flush before shutdown. Others may be no-op.
177 async fn close_doc(&self, _tn_id: TnId, _doc_id: &str) -> ClResult<()> {
178 // Default: no-op. Implementations can override.
179 Ok(())
180 }
181
182 /// List all document IDs for a tenant.
183 ///
184 /// Useful for administrative tasks and migrations.
185 async fn list_docs(&self, tn_id: TnId) -> ClResult<Vec<Box<str>>>;
186}
187
188// vim: ts=4