cloudillo_types/crdt_adapter.rs
1//! CRDT Document Adapter
2//!
3//! Trait and types for pluggable CRDT document backends that store binary updates
4//! for collaborative documents using Yjs/yrs (Rust port of Yjs).
5//!
6//! The adapter handles:
7//! - Persistence of binary CRDT updates (Yjs sync protocol format)
8//! - Change subscriptions for real-time updates
9//! - Document lifecycle (creation, deletion)
10//!
11//! Each adapter implementation provides its own constructor handling backend-specific
12//! initialization (database path, connection settings, etc.).
13//!
14//! The adapter works with binary updates (Uint8Array) rather than typed documents,
15//! allowing flexibility in how updates are stored and reconstructed into Y.Doc instances.
16
17use async_trait::async_trait;
18use futures_core::Stream;
19use serde::{Deserialize, Serialize};
20use std::fmt::Debug;
21use std::pin::Pin;
22
23use crate::prelude::*;
24
25/// A binary CRDT update (serialized Yjs sync protocol message).
26///
27/// These updates are the fundamental unit of change in CRDT systems.
28/// They can be applied in any order and are commutative.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct CrdtUpdate {
31 /// Raw bytes of the update in Yjs sync protocol format
32 pub data: Vec<u8>,
33
34 /// Optional user/client ID that created this update
35 pub client_id: Option<Box<str>>,
36}
37
38impl CrdtUpdate {
39 /// Create a new CRDT update from raw bytes.
40 pub fn new(data: Vec<u8>) -> Self {
41 Self { data, client_id: None }
42 }
43
44 /// Create a new CRDT update with client ID.
45 pub fn with_client(data: Vec<u8>, client_id: impl Into<Box<str>>) -> Self {
46 Self { data, client_id: Some(client_id.into()) }
47 }
48}
49
50/// Real-time change notification for a CRDT document.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct CrdtChangeEvent {
53 /// Document ID
54 pub doc_id: Box<str>,
55
56 /// The update that caused this change
57 pub update: CrdtUpdate,
58}
59
60/// Options for subscribing to CRDT document changes.
61#[derive(Debug, Clone)]
62pub struct CrdtSubscriptionOptions {
63 /// Document ID to subscribe to
64 pub doc_id: Box<str>,
65
66 /// If true, send existing updates as initial snapshot
67 pub send_snapshot: bool,
68}
69
70impl CrdtSubscriptionOptions {
71 /// Create a subscription to a document with snapshot.
72 pub fn with_snapshot(doc_id: impl Into<Box<str>>) -> Self {
73 Self { doc_id: doc_id.into(), send_snapshot: true }
74 }
75
76 /// Create a subscription to future updates only (no snapshot).
77 pub fn updates_only(doc_id: impl Into<Box<str>>) -> Self {
78 Self { doc_id: doc_id.into(), send_snapshot: false }
79 }
80}
81
82/// CRDT Document statistics.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct CrdtDocStats {
85 /// Document ID
86 pub doc_id: Box<str>,
87
88 /// Total size of stored updates in bytes
89 pub size_bytes: u64,
90
91 /// Number of updates stored
92 pub update_count: u32,
93}
94
95/// CRDT Adapter trait.
96///
97/// Unified interface for CRDT document backends. Handles persistence of binary updates
98/// and real-time subscriptions.
99///
100/// # Multi-Tenancy
101///
102/// All operations are tenant-aware (tn_id parameter). Adapters must ensure:
103/// - Updates from different tenants are stored separately
104/// - Subscriptions only receive updates for the subscribing tenant
105#[async_trait]
106pub trait CrdtAdapter: Debug + Send + Sync {
107 /// Get all stored updates for a document.
108 ///
109 /// Returns updates in the order they were stored. These can be applied
110 /// to a fresh Y.Doc to reconstruct the current state.
111 ///
112 /// Returns empty vec if document doesn't exist (safe to treat as new doc).
113 async fn get_updates(&self, tn_id: TnId, doc_id: &str) -> ClResult<Vec<CrdtUpdate>>;
114
115 /// Store a new update for a document.
116 ///
117 /// The update is persisted immediately. For high-frequency updates,
118 /// implementations may batch or compress updates.
119 ///
120 /// If the document doesn't exist, it's implicitly created.
121 async fn store_update(&self, tn_id: TnId, doc_id: &str, update: CrdtUpdate) -> ClResult<()>;
122
123 /// Subscribe to updates for a document.
124 ///
125 /// Returns a stream of updates. Depending on subscription options,
126 /// may include a snapshot of existing updates followed by new updates.
127 async fn subscribe(
128 &self,
129 tn_id: TnId,
130 opts: CrdtSubscriptionOptions,
131 ) -> ClResult<Pin<Box<dyn Stream<Item = CrdtChangeEvent> + Send>>>;
132
133 /// Get statistics for a document.
134 async fn stats(&self, tn_id: TnId, doc_id: &str) -> ClResult<CrdtDocStats> {
135 let updates = self.get_updates(tn_id, doc_id).await?;
136 let update_count = u32::try_from(updates.len()).unwrap_or_default();
137 let size_bytes: u64 = updates.iter().map(|u| u.data.len() as u64).sum();
138
139 Ok(CrdtDocStats { doc_id: doc_id.into(), size_bytes, update_count })
140 }
141
142 /// Delete a document and all its updates.
143 ///
144 /// This removes all stored data for the document. Use with caution.
145 async fn delete_doc(&self, tn_id: TnId, doc_id: &str) -> ClResult<()>;
146
147 /// Close/flush a document instance, ensuring all updates are persisted.
148 ///
149 /// Some implementations may keep documents in-memory and need explicit
150 /// flush before shutdown. Others may be no-op.
151 async fn close_doc(&self, _tn_id: TnId, _doc_id: &str) -> ClResult<()> {
152 // Default: no-op. Implementations can override.
153 Ok(())
154 }
155
156 /// List all document IDs for a tenant.
157 ///
158 /// Useful for administrative tasks and migrations.
159 async fn list_docs(&self, tn_id: TnId) -> ClResult<Vec<Box<str>>>;
160}
161
162// vim: ts=4