Skip to main content

host_extensions/first_party/
crdt_runtime.rs

1use std::collections::HashMap;
2use std::sync::Mutex;
3
4use crate::executor_contract::CrdtJoinResult;
5use crate::HostPushEvent;
6
7/// Pluggable CRDT backend — host applications inject their own implementation
8/// (e.g. Statement Store relay) while tests and headless hosts use
9/// [`InMemoryCrdtRuntime`].
10///
11/// # Synchronous contract
12///
13/// All methods are synchronous because [`HostExtension::handle_message`] is
14/// synchronous by design (all extensions share this constraint). Real backends
15/// should accept operations synchronously (e.g., enqueue to a background task)
16/// and deliver asynchronous results (remote updates, peer changes) via
17/// [`drain_events`](CrdtRuntime::drain_events), which the host polls on a
18/// regular tick cycle.
19pub trait CrdtRuntime: Send + Sync + 'static {
20    /// Join (or create) a room.  Returns a [`CrdtJoinResult`].
21    fn join(&self, room_id: &str, transport: &str) -> Result<CrdtJoinResult, CrdtRuntimeError>;
22
23    /// Apply a local CRDT update (base64-encoded binary).
24    fn apply_update(&self, room_id: &str, update_base64: &str) -> Result<bool, CrdtRuntimeError>;
25
26    /// Return the current state vector (base64-encoded).
27    fn get_state_vector(&self, room_id: &str) -> Result<String, CrdtRuntimeError>;
28
29    /// Return a full snapshot of the document (base64-encoded).
30    fn get_full_state(&self, room_id: &str) -> Result<String, CrdtRuntimeError>;
31
32    /// Set ephemeral awareness/presence state (JSON string).
33    fn set_awareness(&self, room_id: &str, state: &str) -> Result<bool, CrdtRuntimeError>;
34
35    /// Destroy a room and release its resources.
36    fn destroy(&self, room_id: &str) -> Result<bool, CrdtRuntimeError>;
37
38    /// Drain queued push events destined for the SPA.
39    ///
40    /// The default implementation returns an empty vec. Production runtimes
41    /// should override this to deliver `crdtRemoteUpdate`, `crdtAwareness`,
42    /// and `crdtPeerChange` events received from the network.
43    /// [`InMemoryCrdtRuntime`] intentionally uses the default (no events)
44    /// because it has no network layer.
45    fn drain_events(&self) -> Vec<HostPushEvent> {
46        Vec::new()
47    }
48}
49
50// ── Error type ──────────────────────────────────────────────────────────
51
52/// Structured error type for [`CrdtRuntime`] operations.
53#[derive(Debug, thiserror::Error)]
54pub enum CrdtRuntimeError {
55    #[error("crdt room not found")]
56    RoomNotFound,
57
58    #[error("roomId is required")]
59    RoomIdRequired,
60
61    #[error("{0}")]
62    Other(String),
63}
64
65// ── In-memory implementation ────────────────────────────────────────────
66
67/// Maximum updates stored per room in the test-only [`InMemoryCrdtRuntime`].
68/// Production runtimes manage their own storage; this cap prevents unbounded
69/// memory growth during long-running headless-host test sessions.
70const MAX_UPDATES_PER_ROOM: usize = 10_000;
71
72/// Minimal in-memory CRDT runtime for testing and headless-host scenarios.
73///
74/// This implementation does **not** perform real CRDT merge/encoding. Updates
75/// are stored as opaque base64 strings and returned verbatim. State vectors
76/// are represented as simple update counts. Production hosts must inject a
77/// real CRDT backend (e.g., Yjs/y-crdt over Statement Store relay) via
78/// [`CrdtExtension::with_runtime`](super::crdt::CrdtExtension::with_runtime).
79#[derive(Debug, Default)]
80pub struct InMemoryCrdtRuntime {
81    state: Mutex<InMemoryCrdtState>,
82}
83
84#[derive(Debug, Default)]
85struct InMemoryCrdtState {
86    rooms: HashMap<String, RoomState>,
87}
88
89#[derive(Debug, Default)]
90struct RoomState {
91    transport: String,
92    /// Accumulated updates (base64 chunks), simulating a document.
93    updates: Vec<String>,
94    awareness: Option<String>,
95}
96
97impl InMemoryCrdtRuntime {
98    pub fn new() -> Self {
99        Self {
100            state: Mutex::new(InMemoryCrdtState::default()),
101        }
102    }
103}
104
105fn room_not_found() -> CrdtRuntimeError {
106    CrdtRuntimeError::RoomNotFound
107}
108
109impl CrdtRuntime for InMemoryCrdtRuntime {
110    fn join(&self, room_id: &str, transport: &str) -> Result<CrdtJoinResult, CrdtRuntimeError> {
111        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
112        let room = state
113            .rooms
114            .entry(room_id.to_string())
115            .or_insert_with(|| RoomState {
116                transport: transport.to_string(),
117                ..Default::default()
118            });
119        Ok(CrdtJoinResult {
120            room_id: room_id.to_string(),
121            transport: room.transport.clone(),
122        })
123    }
124
125    fn apply_update(&self, room_id: &str, update_base64: &str) -> Result<bool, CrdtRuntimeError> {
126        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
127        let room = state.rooms.get_mut(room_id).ok_or_else(room_not_found)?;
128        if room.updates.len() >= MAX_UPDATES_PER_ROOM {
129            return Err(CrdtRuntimeError::Other(
130                "in-memory update buffer full (test-only runtime)".into(),
131            ));
132        }
133        room.updates.push(update_base64.to_string());
134        Ok(true)
135    }
136
137    fn get_state_vector(&self, room_id: &str) -> Result<String, CrdtRuntimeError> {
138        let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
139        let room = state.rooms.get(room_id).ok_or_else(room_not_found)?;
140        // Test stand-in: returns the count of updates, not a real CRDT state vector.
141        Ok(format!("{}", room.updates.len()))
142    }
143
144    fn get_full_state(&self, room_id: &str) -> Result<String, CrdtRuntimeError> {
145        let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
146        let room = state.rooms.get(room_id).ok_or_else(room_not_found)?;
147        // Test stand-in: concatenates base64 chunks, not a real CRDT snapshot.
148        Ok(room.updates.join(","))
149    }
150
151    fn set_awareness(
152        &self,
153        room_id: &str,
154        awareness_state: &str,
155    ) -> Result<bool, CrdtRuntimeError> {
156        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
157        let room = state.rooms.get_mut(room_id).ok_or_else(room_not_found)?;
158        room.awareness = Some(awareness_state.to_string());
159        Ok(true)
160    }
161
162    fn destroy(&self, room_id: &str) -> Result<bool, CrdtRuntimeError> {
163        let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
164        state
165            .rooms
166            .remove(room_id)
167            .map(|_| true)
168            .ok_or_else(room_not_found)
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    #[test]
177    fn join_creates_room() {
178        let rt = InMemoryCrdtRuntime::new();
179        let result = rt.join("doc-1", "relay").unwrap();
180        assert_eq!(result.room_id, "doc-1");
181        assert_eq!(result.transport, "relay");
182    }
183
184    #[test]
185    fn join_is_idempotent_and_preserves_state() {
186        let rt = InMemoryCrdtRuntime::new();
187        rt.join("doc-1", "relay").unwrap();
188        rt.apply_update("doc-1", "AQID").unwrap();
189        let result = rt.join("doc-1", "relay").unwrap();
190        assert_eq!(result.room_id, "doc-1");
191        assert_eq!(rt.get_full_state("doc-1").unwrap(), "AQID");
192    }
193
194    #[test]
195    fn join_returns_stored_transport_on_rejoin() {
196        let rt = InMemoryCrdtRuntime::new();
197        rt.join("doc-1", "relay").unwrap();
198        let result = rt.join("doc-1", "p2p").unwrap();
199        assert_eq!(result.transport, "relay");
200    }
201
202    #[test]
203    fn apply_update_stores_data() {
204        let rt = InMemoryCrdtRuntime::new();
205        rt.join("doc-1", "relay").unwrap();
206        assert!(rt.apply_update("doc-1", "AQID").unwrap());
207        assert_eq!(rt.get_full_state("doc-1").unwrap(), "AQID");
208    }
209
210    #[test]
211    fn apply_update_rejects_missing_room() {
212        let rt = InMemoryCrdtRuntime::new();
213        assert!(matches!(
214            rt.apply_update("missing", "AQID").unwrap_err(),
215            CrdtRuntimeError::RoomNotFound
216        ));
217    }
218
219    #[test]
220    fn apply_update_rejects_when_buffer_full() {
221        let rt = InMemoryCrdtRuntime::new();
222        rt.join("doc-1", "relay").unwrap();
223        for i in 0..MAX_UPDATES_PER_ROOM {
224            rt.apply_update("doc-1", &format!("u{i}")).unwrap();
225        }
226        let err = rt.apply_update("doc-1", "overflow").unwrap_err();
227        assert!(
228            matches!(err, CrdtRuntimeError::Other(ref msg) if msg.contains("buffer full")),
229            "expected buffer full error, got: {err}"
230        );
231    }
232
233    #[test]
234    fn get_state_vector_reflects_update_count() {
235        let rt = InMemoryCrdtRuntime::new();
236        rt.join("doc-1", "relay").unwrap();
237        assert_eq!(rt.get_state_vector("doc-1").unwrap(), "0");
238        rt.apply_update("doc-1", "AQID").unwrap();
239        assert_eq!(rt.get_state_vector("doc-1").unwrap(), "1");
240        rt.apply_update("doc-1", "BAUE").unwrap();
241        assert_eq!(rt.get_state_vector("doc-1").unwrap(), "2");
242    }
243
244    #[test]
245    fn get_state_vector_rejects_missing_room() {
246        let rt = InMemoryCrdtRuntime::new();
247        assert!(matches!(
248            rt.get_state_vector("missing").unwrap_err(),
249            CrdtRuntimeError::RoomNotFound
250        ));
251    }
252
253    #[test]
254    fn get_full_state_concatenates_updates() {
255        let rt = InMemoryCrdtRuntime::new();
256        rt.join("doc-1", "relay").unwrap();
257        rt.apply_update("doc-1", "AQID").unwrap();
258        rt.apply_update("doc-1", "BAUE").unwrap();
259        assert_eq!(rt.get_full_state("doc-1").unwrap(), "AQID,BAUE");
260    }
261
262    #[test]
263    fn get_full_state_rejects_missing_room() {
264        let rt = InMemoryCrdtRuntime::new();
265        assert!(matches!(
266            rt.get_full_state("missing").unwrap_err(),
267            CrdtRuntimeError::RoomNotFound
268        ));
269    }
270
271    #[test]
272    fn set_awareness_stores_state() {
273        let rt = InMemoryCrdtRuntime::new();
274        rt.join("doc-1", "relay").unwrap();
275        assert!(rt.set_awareness("doc-1", r#"{"cursor":5}"#).unwrap());
276    }
277
278    #[test]
279    fn set_awareness_rejects_missing_room() {
280        let rt = InMemoryCrdtRuntime::new();
281        assert!(matches!(
282            rt.set_awareness("missing", "{}").unwrap_err(),
283            CrdtRuntimeError::RoomNotFound
284        ));
285    }
286
287    #[test]
288    fn destroy_removes_room() {
289        let rt = InMemoryCrdtRuntime::new();
290        rt.join("doc-1", "relay").unwrap();
291        assert!(rt.destroy("doc-1").unwrap());
292        // All operations should fail after destroy.
293        assert!(matches!(
294            rt.apply_update("doc-1", "AQID").unwrap_err(),
295            CrdtRuntimeError::RoomNotFound
296        ));
297    }
298
299    #[test]
300    fn destroy_rejects_missing_room() {
301        let rt = InMemoryCrdtRuntime::new();
302        assert!(matches!(
303            rt.destroy("missing").unwrap_err(),
304            CrdtRuntimeError::RoomNotFound
305        ));
306    }
307
308    #[test]
309    fn double_destroy_returns_error() {
310        let rt = InMemoryCrdtRuntime::new();
311        rt.join("doc-1", "relay").unwrap();
312        assert!(rt.destroy("doc-1").unwrap());
313        assert!(matches!(
314            rt.destroy("doc-1").unwrap_err(),
315            CrdtRuntimeError::RoomNotFound
316        ));
317    }
318
319    #[test]
320    fn drain_events_returns_empty_by_default() {
321        let rt = InMemoryCrdtRuntime::new();
322        assert!(rt.drain_events().is_empty());
323    }
324
325    #[test]
326    fn crdt_runtime_error_display_matches_contract() {
327        assert_eq!(
328            CrdtRuntimeError::RoomNotFound.to_string(),
329            crate::executor_contract::ERR_CRDT_ROOM_NOT_FOUND
330        );
331        assert_eq!(
332            CrdtRuntimeError::RoomIdRequired.to_string(),
333            crate::executor_contract::ERR_CRDT_ROOM_ID_REQUIRED
334        );
335    }
336}