cloudillo_types/crdt_adapter.rs
1//! CRDT Document Adapter
2//!
3//! Trait and types for pluggable CRDT document backends that store binary updates
4//! for collaborative documents using Yjs/yrs (Rust port of Yjs).
5//!
6//! The adapter handles:
7//! - Persistence of binary CRDT updates (Yjs sync protocol format)
8//! - Change subscriptions for real-time updates
9//! - Document lifecycle (creation, deletion)
10//!
11//! Each adapter implementation provides its own constructor handling backend-specific
12//! initialization (database path, connection settings, etc.).
13//!
14//! The adapter works with binary updates (Uint8Array) rather than typed documents,
15//! allowing flexibility in how updates are stored and reconstructed into Y.Doc instances.
16
17use async_trait::async_trait;
18use futures_core::Stream;
19use serde::{Deserialize, Serialize};
20use std::fmt::Debug;
21use std::pin::Pin;
22
23use crate::prelude::*;
24use crate::types::TnId;
25
26/// A binary CRDT update (serialized Yjs sync protocol message).
27///
28/// These updates are the fundamental unit of change in CRDT systems.
29/// They can be applied in any order and are commutative.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CrdtUpdate {
32 /// Raw bytes of the update in Yjs sync protocol format
33 pub data: Vec<u8>,
34
35 /// Optional user/client ID that created this update
36 pub client_id: Option<Box<str>>,
37}
38
39impl CrdtUpdate {
40 /// Create a new CRDT update from raw bytes.
41 pub fn new(data: Vec<u8>) -> Self {
42 Self { data, client_id: None }
43 }
44
45 /// Create a new CRDT update with client ID.
46 pub fn with_client(data: Vec<u8>, client_id: impl Into<Box<str>>) -> Self {
47 Self { data, client_id: Some(client_id.into()) }
48 }
49}
50
51/// Real-time change notification for a CRDT document.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct CrdtChangeEvent {
54 /// Document ID
55 pub doc_id: Box<str>,
56
57 /// The update that caused this change
58 pub update: CrdtUpdate,
59}
60
61/// Options for subscribing to CRDT document changes.
62#[derive(Debug, Clone)]
63pub struct CrdtSubscriptionOptions {
64 /// Document ID to subscribe to
65 pub doc_id: Box<str>,
66
67 /// If true, send existing updates as initial snapshot
68 pub send_snapshot: bool,
69}
70
71impl CrdtSubscriptionOptions {
72 /// Create a subscription to a document with snapshot.
73 pub fn with_snapshot(doc_id: impl Into<Box<str>>) -> Self {
74 Self { doc_id: doc_id.into(), send_snapshot: true }
75 }
76
77 /// Create a subscription to future updates only (no snapshot).
78 pub fn updates_only(doc_id: impl Into<Box<str>>) -> Self {
79 Self { doc_id: doc_id.into(), send_snapshot: false }
80 }
81}
82
83/// CRDT Document statistics.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct CrdtDocStats {
86 /// Document ID
87 pub doc_id: Box<str>,
88
89 /// Total size of stored updates in bytes
90 pub size_bytes: u64,
91
92 /// Number of updates stored
93 pub update_count: u32,
94}
95
96/// CRDT Adapter trait.
97///
98/// Unified interface for CRDT document backends. Handles persistence of binary updates
99/// and real-time subscriptions.
100///
101/// # Multi-Tenancy
102///
103/// All operations are tenant-aware (tn_id parameter). Adapters must ensure:
104/// - Updates from different tenants are stored separately
105/// - Subscriptions only receive updates for the subscribing tenant
106#[async_trait]
107pub trait CrdtAdapter: Debug + Send + Sync {
108 /// Get all stored updates for a document.
109 ///
110 /// Returns updates in the order they were stored. These can be applied
111 /// to a fresh Y.Doc to reconstruct the current state.
112 ///
113 /// Returns empty vec if document doesn't exist (safe to treat as new doc).
114 async fn get_updates(&self, tn_id: TnId, doc_id: &str) -> ClResult<Vec<CrdtUpdate>>;
115
116 /// Store a new update for a document.
117 ///
118 /// The update is persisted immediately. For high-frequency updates,
119 /// implementations may batch or compress updates.
120 ///
121 /// If the document doesn't exist, it's implicitly created.
122 async fn store_update(&self, tn_id: TnId, doc_id: &str, update: CrdtUpdate) -> ClResult<()>;
123
124 /// Subscribe to updates for a document.
125 ///
126 /// Returns a stream of updates. Depending on subscription options,
127 /// may include a snapshot of existing updates followed by new updates.
128 async fn subscribe(
129 &self,
130 tn_id: TnId,
131 opts: CrdtSubscriptionOptions,
132 ) -> ClResult<Pin<Box<dyn Stream<Item = CrdtChangeEvent> + Send>>>;
133
134 /// Get statistics for a document.
135 async fn stats(&self, tn_id: TnId, doc_id: &str) -> ClResult<CrdtDocStats> {
136 let updates = self.get_updates(tn_id, doc_id).await?;
137 let update_count = updates.len() as u32;
138 let size_bytes: u64 = updates.iter().map(|u| u.data.len() as u64).sum();
139
140 Ok(CrdtDocStats { doc_id: doc_id.into(), size_bytes, update_count })
141 }
142
143 /// Delete a document and all its updates.
144 ///
145 /// This removes all stored data for the document. Use with caution.
146 async fn delete_doc(&self, tn_id: TnId, doc_id: &str) -> ClResult<()>;
147
148 /// Close/flush a document instance, ensuring all updates are persisted.
149 ///
150 /// Some implementations may keep documents in-memory and need explicit
151 /// flush before shutdown. Others may be no-op.
152 async fn close_doc(&self, _tn_id: TnId, _doc_id: &str) -> ClResult<()> {
153 // Default: no-op. Implementations can override.
154 Ok(())
155 }
156
157 /// List all document IDs for a tenant.
158 ///
159 /// Useful for administrative tasks and migrations.
160 async fn list_docs(&self, tn_id: TnId) -> ClResult<Vec<Box<str>>>;
161}
162
163// vim: ts=4