Skip to main content

cloudillo_types/
crdt_adapter.rs

1//! CRDT Document Adapter
2//!
3//! Trait and types for pluggable CRDT document backends that store binary updates
4//! for collaborative documents using Yjs/yrs (Rust port of Yjs).
5//!
6//! The adapter handles:
7//! - Persistence of binary CRDT updates (Yjs sync protocol format)
8//! - Change subscriptions for real-time updates
9//! - Document lifecycle (creation, deletion)
10//!
11//! Each adapter implementation provides its own constructor handling backend-specific
12//! initialization (database path, connection settings, etc.).
13//!
14//! The adapter works with binary updates (Uint8Array) rather than typed documents,
15//! allowing flexibility in how updates are stored and reconstructed into Y.Doc instances.
16
17use async_trait::async_trait;
18use futures_core::Stream;
19use serde::{Deserialize, Serialize};
20use std::fmt::Debug;
21use std::pin::Pin;
22
23use crate::prelude::*;
24use crate::types::TnId;
25
26/// A binary CRDT update (serialized Yjs sync protocol message).
27///
28/// These updates are the fundamental unit of change in CRDT systems.
29/// They can be applied in any order and are commutative.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CrdtUpdate {
32	/// Raw bytes of the update in Yjs sync protocol format
33	pub data: Vec<u8>,
34
35	/// Optional user/client ID that created this update
36	pub client_id: Option<Box<str>>,
37}
38
39impl CrdtUpdate {
40	/// Create a new CRDT update from raw bytes.
41	pub fn new(data: Vec<u8>) -> Self {
42		Self { data, client_id: None }
43	}
44
45	/// Create a new CRDT update with client ID.
46	pub fn with_client(data: Vec<u8>, client_id: impl Into<Box<str>>) -> Self {
47		Self { data, client_id: Some(client_id.into()) }
48	}
49}
50
51/// Real-time change notification for a CRDT document.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct CrdtChangeEvent {
54	/// Document ID
55	pub doc_id: Box<str>,
56
57	/// The update that caused this change
58	pub update: CrdtUpdate,
59}
60
61/// Options for subscribing to CRDT document changes.
62#[derive(Debug, Clone)]
63pub struct CrdtSubscriptionOptions {
64	/// Document ID to subscribe to
65	pub doc_id: Box<str>,
66
67	/// If true, send existing updates as initial snapshot
68	pub send_snapshot: bool,
69}
70
71impl CrdtSubscriptionOptions {
72	/// Create a subscription to a document with snapshot.
73	pub fn with_snapshot(doc_id: impl Into<Box<str>>) -> Self {
74		Self { doc_id: doc_id.into(), send_snapshot: true }
75	}
76
77	/// Create a subscription to future updates only (no snapshot).
78	pub fn updates_only(doc_id: impl Into<Box<str>>) -> Self {
79		Self { doc_id: doc_id.into(), send_snapshot: false }
80	}
81}
82
83/// CRDT Document statistics.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct CrdtDocStats {
86	/// Document ID
87	pub doc_id: Box<str>,
88
89	/// Total size of stored updates in bytes
90	pub size_bytes: u64,
91
92	/// Number of updates stored
93	pub update_count: u32,
94}
95
96/// CRDT Adapter trait.
97///
98/// Unified interface for CRDT document backends. Handles persistence of binary updates
99/// and real-time subscriptions.
100///
101/// # Multi-Tenancy
102///
103/// All operations are tenant-aware (tn_id parameter). Adapters must ensure:
104/// - Updates from different tenants are stored separately
105/// - Subscriptions only receive updates for the subscribing tenant
106#[async_trait]
107pub trait CrdtAdapter: Debug + Send + Sync {
108	/// Get all stored updates for a document.
109	///
110	/// Returns updates in the order they were stored. These can be applied
111	/// to a fresh Y.Doc to reconstruct the current state.
112	///
113	/// Returns empty vec if document doesn't exist (safe to treat as new doc).
114	async fn get_updates(&self, tn_id: TnId, doc_id: &str) -> ClResult<Vec<CrdtUpdate>>;
115
116	/// Store a new update for a document.
117	///
118	/// The update is persisted immediately. For high-frequency updates,
119	/// implementations may batch or compress updates.
120	///
121	/// If the document doesn't exist, it's implicitly created.
122	async fn store_update(&self, tn_id: TnId, doc_id: &str, update: CrdtUpdate) -> ClResult<()>;
123
124	/// Subscribe to updates for a document.
125	///
126	/// Returns a stream of updates. Depending on subscription options,
127	/// may include a snapshot of existing updates followed by new updates.
128	async fn subscribe(
129		&self,
130		tn_id: TnId,
131		opts: CrdtSubscriptionOptions,
132	) -> ClResult<Pin<Box<dyn Stream<Item = CrdtChangeEvent> + Send>>>;
133
134	/// Get statistics for a document.
135	async fn stats(&self, tn_id: TnId, doc_id: &str) -> ClResult<CrdtDocStats> {
136		let updates = self.get_updates(tn_id, doc_id).await?;
137		let update_count = updates.len() as u32;
138		let size_bytes: u64 = updates.iter().map(|u| u.data.len() as u64).sum();
139
140		Ok(CrdtDocStats { doc_id: doc_id.into(), size_bytes, update_count })
141	}
142
143	/// Delete a document and all its updates.
144	///
145	/// This removes all stored data for the document. Use with caution.
146	async fn delete_doc(&self, tn_id: TnId, doc_id: &str) -> ClResult<()>;
147
148	/// Close/flush a document instance, ensuring all updates are persisted.
149	///
150	/// Some implementations may keep documents in-memory and need explicit
151	/// flush before shutdown. Others may be no-op.
152	async fn close_doc(&self, _tn_id: TnId, _doc_id: &str) -> ClResult<()> {
153		// Default: no-op. Implementations can override.
154		Ok(())
155	}
156
157	/// List all document IDs for a tenant.
158	///
159	/// Useful for administrative tasks and migrations.
160	async fn list_docs(&self, tn_id: TnId) -> ClResult<Vec<Box<str>>>;
161}
162
163// vim: ts=4