Skip to main content

cloudillo_types/
crdt_adapter.rs

1//! CRDT Document Adapter
2//!
3//! Trait and types for pluggable CRDT document backends that store binary updates
4//! for collaborative documents using Yjs/yrs (Rust port of Yjs).
5//!
6//! The adapter handles:
7//! - Persistence of binary CRDT updates (Yjs sync protocol format)
8//! - Change subscriptions for real-time updates
9//! - Document lifecycle (creation, deletion)
10//!
11//! Each adapter implementation provides its own constructor handling backend-specific
12//! initialization (database path, connection settings, etc.).
13//!
14//! The adapter works with binary updates (Uint8Array) rather than typed documents,
15//! allowing flexibility in how updates are stored and reconstructed into Y.Doc instances.
16
17use async_trait::async_trait;
18use futures_core::Stream;
19use serde::{Deserialize, Serialize};
20use std::fmt::Debug;
21use std::pin::Pin;
22
23use crate::prelude::*;
24
25/// A binary CRDT update (serialized Yjs sync protocol message).
26///
27/// These updates are the fundamental unit of change in CRDT systems.
28/// They can be applied in any order and are commutative.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct CrdtUpdate {
31	/// Raw bytes of the update in Yjs sync protocol format
32	pub data: Vec<u8>,
33
34	/// Optional user/client ID that created this update
35	pub client_id: Option<Box<str>>,
36}
37
38impl CrdtUpdate {
39	/// Create a new CRDT update from raw bytes.
40	pub fn new(data: Vec<u8>) -> Self {
41		Self { data, client_id: None }
42	}
43
44	/// Create a new CRDT update with client ID.
45	pub fn with_client(data: Vec<u8>, client_id: impl Into<Box<str>>) -> Self {
46		Self { data, client_id: Some(client_id.into()) }
47	}
48}
49
50/// Real-time change notification for a CRDT document.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct CrdtChangeEvent {
53	/// Document ID
54	pub doc_id: Box<str>,
55
56	/// The update that caused this change
57	pub update: CrdtUpdate,
58}
59
60/// Options for subscribing to CRDT document changes.
61#[derive(Debug, Clone)]
62pub struct CrdtSubscriptionOptions {
63	/// Document ID to subscribe to
64	pub doc_id: Box<str>,
65
66	/// If true, send existing updates as initial snapshot
67	pub send_snapshot: bool,
68}
69
70impl CrdtSubscriptionOptions {
71	/// Create a subscription to a document with snapshot.
72	pub fn with_snapshot(doc_id: impl Into<Box<str>>) -> Self {
73		Self { doc_id: doc_id.into(), send_snapshot: true }
74	}
75
76	/// Create a subscription to future updates only (no snapshot).
77	pub fn updates_only(doc_id: impl Into<Box<str>>) -> Self {
78		Self { doc_id: doc_id.into(), send_snapshot: false }
79	}
80}
81
82/// CRDT Document statistics.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct CrdtDocStats {
85	/// Document ID
86	pub doc_id: Box<str>,
87
88	/// Total size of stored updates in bytes
89	pub size_bytes: u64,
90
91	/// Number of updates stored
92	pub update_count: u32,
93}
94
95/// CRDT Adapter trait.
96///
97/// Unified interface for CRDT document backends. Handles persistence of binary updates
98/// and real-time subscriptions.
99///
100/// # Multi-Tenancy
101///
102/// All operations are tenant-aware (tn_id parameter). Adapters must ensure:
103/// - Updates from different tenants are stored separately
104/// - Subscriptions only receive updates for the subscribing tenant
105#[async_trait]
106pub trait CrdtAdapter: Debug + Send + Sync {
107	/// Get all stored updates for a document.
108	///
109	/// Returns updates in the order they were stored. These can be applied
110	/// to a fresh Y.Doc to reconstruct the current state.
111	///
112	/// Returns empty vec if document doesn't exist (safe to treat as new doc).
113	async fn get_updates(&self, tn_id: TnId, doc_id: &str) -> ClResult<Vec<CrdtUpdate>>;
114
115	/// Store a new update for a document.
116	///
117	/// The update is persisted immediately. For high-frequency updates,
118	/// implementations may batch or compress updates.
119	///
120	/// If the document doesn't exist, it's implicitly created.
121	async fn store_update(&self, tn_id: TnId, doc_id: &str, update: CrdtUpdate) -> ClResult<()>;
122
123	/// Subscribe to updates for a document.
124	///
125	/// Returns a stream of updates. Depending on subscription options,
126	/// may include a snapshot of existing updates followed by new updates.
127	async fn subscribe(
128		&self,
129		tn_id: TnId,
130		opts: CrdtSubscriptionOptions,
131	) -> ClResult<Pin<Box<dyn Stream<Item = CrdtChangeEvent> + Send>>>;
132
133	/// Get statistics for a document.
134	async fn stats(&self, tn_id: TnId, doc_id: &str) -> ClResult<CrdtDocStats> {
135		let updates = self.get_updates(tn_id, doc_id).await?;
136		let update_count = u32::try_from(updates.len()).unwrap_or_default();
137		let size_bytes: u64 = updates.iter().map(|u| u.data.len() as u64).sum();
138
139		Ok(CrdtDocStats { doc_id: doc_id.into(), size_bytes, update_count })
140	}
141
142	/// Delete a document and all its updates.
143	///
144	/// This removes all stored data for the document. Use with caution.
145	async fn delete_doc(&self, tn_id: TnId, doc_id: &str) -> ClResult<()>;
146
147	/// Close/flush a document instance, ensuring all updates are persisted.
148	///
149	/// Some implementations may keep documents in-memory and need explicit
150	/// flush before shutdown. Others may be no-op.
151	async fn close_doc(&self, _tn_id: TnId, _doc_id: &str) -> ClResult<()> {
152		// Default: no-op. Implementations can override.
153		Ok(())
154	}
155
156	/// List all document IDs for a tenant.
157	///
158	/// Useful for administrative tasks and migrations.
159	async fn list_docs(&self, tn_id: TnId) -> ClResult<Vec<Box<str>>>;
160}
161
162// vim: ts=4