Skip to main content

cloudillo_types/
crdt_adapter.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! CRDT Document Adapter
5//!
6//! Trait and types for pluggable CRDT document backends that store binary updates
7//! for collaborative documents using Yjs/yrs (Rust port of Yjs).
8//!
9//! The adapter handles:
10//! - Persistence of binary CRDT updates (Yjs sync protocol format)
11//! - Change subscriptions for real-time updates
12//! - Document lifecycle (creation, deletion)
13//!
14//! Each adapter implementation provides its own constructor handling backend-specific
15//! initialization (database path, connection settings, etc.).
16//!
17//! The adapter works with binary updates (Uint8Array) rather than typed documents,
18//! allowing flexibility in how updates are stored and reconstructed into Y.Doc instances.
19
20use async_trait::async_trait;
21use futures_core::Stream;
22use serde::{Deserialize, Serialize};
23use std::fmt::Debug;
24use std::pin::Pin;
25
26use crate::prelude::*;
27
28/// A binary CRDT update (serialized Yjs sync protocol message).
29///
30/// These updates are the fundamental unit of change in CRDT systems.
31/// They can be applied in any order and are commutative.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct CrdtUpdate {
34	/// Raw bytes of the update in Yjs sync protocol format
35	pub data: Vec<u8>,
36
37	/// Optional user/client ID that created this update
38	pub client_id: Option<Box<str>>,
39
40	/// Storage sequence number (populated by get_updates, used by compact_updates)
41	#[serde(skip)]
42	pub seq: Option<u64>,
43}
44
45impl CrdtUpdate {
46	/// Create a new CRDT update from raw bytes.
47	pub fn new(data: Vec<u8>) -> Self {
48		Self { data, client_id: None, seq: None }
49	}
50
51	/// Create a new CRDT update with client ID.
52	pub fn with_client(data: Vec<u8>, client_id: impl Into<Box<str>>) -> Self {
53		Self { data, client_id: Some(client_id.into()), seq: None }
54	}
55}
56
57/// Real-time change notification for a CRDT document.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct CrdtChangeEvent {
60	/// Document ID
61	pub doc_id: Box<str>,
62
63	/// The update that caused this change
64	pub update: CrdtUpdate,
65}
66
67/// Options for subscribing to CRDT document changes.
68#[derive(Debug, Clone)]
69pub struct CrdtSubscriptionOptions {
70	/// Document ID to subscribe to
71	pub doc_id: Box<str>,
72
73	/// If true, send existing updates as initial snapshot
74	pub send_snapshot: bool,
75}
76
77impl CrdtSubscriptionOptions {
78	/// Create a subscription to a document with snapshot.
79	pub fn with_snapshot(doc_id: impl Into<Box<str>>) -> Self {
80		Self { doc_id: doc_id.into(), send_snapshot: true }
81	}
82
83	/// Create a subscription to future updates only (no snapshot).
84	pub fn updates_only(doc_id: impl Into<Box<str>>) -> Self {
85		Self { doc_id: doc_id.into(), send_snapshot: false }
86	}
87}
88
89/// CRDT Document statistics.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct CrdtDocStats {
92	/// Document ID
93	pub doc_id: Box<str>,
94
95	/// Total size of stored updates in bytes
96	pub size_bytes: u64,
97
98	/// Number of updates stored
99	pub update_count: u32,
100}
101
102/// CRDT Adapter trait.
103///
104/// Unified interface for CRDT document backends. Handles persistence of binary updates
105/// and real-time subscriptions.
106///
107/// # Multi-Tenancy
108///
109/// All operations are tenant-aware (tn_id parameter). Adapters must ensure:
110/// - Updates from different tenants are stored separately
111/// - Subscriptions only receive updates for the subscribing tenant
112#[async_trait]
113pub trait CrdtAdapter: Debug + Send + Sync {
114	/// Get all stored updates for a document.
115	///
116	/// Returns updates in the order they were stored. These can be applied
117	/// to a fresh Y.Doc to reconstruct the current state.
118	///
119	/// Returns empty vec if document doesn't exist (safe to treat as new doc).
120	async fn get_updates(&self, tn_id: TnId, doc_id: &str) -> ClResult<Vec<CrdtUpdate>>;
121
122	/// Store a new update for a document.
123	///
124	/// The update is persisted immediately. For high-frequency updates,
125	/// implementations may batch or compress updates.
126	///
127	/// If the document doesn't exist, it's implicitly created.
128	async fn store_update(&self, tn_id: TnId, doc_id: &str, update: CrdtUpdate) -> ClResult<()>;
129
130	/// Subscribe to updates for a document.
131	///
132	/// Returns a stream of updates. Depending on subscription options,
133	/// may include a snapshot of existing updates followed by new updates.
134	async fn subscribe(
135		&self,
136		tn_id: TnId,
137		opts: CrdtSubscriptionOptions,
138	) -> ClResult<Pin<Box<dyn Stream<Item = CrdtChangeEvent> + Send>>>;
139
140	/// Get statistics for a document.
141	async fn stats(&self, tn_id: TnId, doc_id: &str) -> ClResult<CrdtDocStats> {
142		let updates = self.get_updates(tn_id, doc_id).await?;
143		let update_count = u32::try_from(updates.len()).unwrap_or_default();
144		let size_bytes: u64 = updates.iter().map(|u| u.data.len() as u64).sum();
145
146		Ok(CrdtDocStats { doc_id: doc_id.into(), size_bytes, update_count })
147	}
148
149	/// Atomically replace specific updates with a single compacted update.
150	///
151	/// Deletes the updates identified by `remove_seqs` and inserts the
152	/// `replacement` update, all in a single transaction. Updates not listed
153	/// in `remove_seqs` (e.g., ones that failed to decode) are preserved.
154	///
155	/// Non-existent seqs in `remove_seqs` are silently ignored.
156	///
157	/// **Important:** This method does not broadcast a change event. It should
158	/// only be called when no active subscribers exist (e.g., after the last
159	/// connection to the document has closed).
160	async fn compact_updates(
161		&self,
162		tn_id: TnId,
163		doc_id: &str,
164		remove_seqs: &[u64],
165		replacement: CrdtUpdate,
166	) -> ClResult<()>;
167
168	/// Delete a document and all its updates.
169	///
170	/// This removes all stored data for the document. Use with caution.
171	async fn delete_doc(&self, tn_id: TnId, doc_id: &str) -> ClResult<()>;
172
173	/// Close/flush a document instance, ensuring all updates are persisted.
174	///
175	/// Some implementations may keep documents in-memory and need explicit
176	/// flush before shutdown. Others may be no-op.
177	async fn close_doc(&self, _tn_id: TnId, _doc_id: &str) -> ClResult<()> {
178		// Default: no-op. Implementations can override.
179		Ok(())
180	}
181
182	/// List all document IDs for a tenant.
183	///
184	/// Useful for administrative tasks and migrations.
185	async fn list_docs(&self, tn_id: TnId) -> ClResult<Vec<Box<str>>>;
186}
187
188// vim: ts=4