host_extensions/first_party/
crdt_runtime.rs1use std::collections::HashMap;
2use std::sync::Mutex;
3
4use crate::executor_contract::CrdtJoinResult;
5use crate::HostPushEvent;
6
7pub trait CrdtRuntime: Send + Sync + 'static {
20 fn join(&self, room_id: &str, transport: &str) -> Result<CrdtJoinResult, CrdtRuntimeError>;
22
23 fn apply_update(&self, room_id: &str, update_base64: &str) -> Result<bool, CrdtRuntimeError>;
25
26 fn get_state_vector(&self, room_id: &str) -> Result<String, CrdtRuntimeError>;
28
29 fn get_full_state(&self, room_id: &str) -> Result<String, CrdtRuntimeError>;
31
32 fn set_awareness(&self, room_id: &str, state: &str) -> Result<bool, CrdtRuntimeError>;
34
35 fn destroy(&self, room_id: &str) -> Result<bool, CrdtRuntimeError>;
37
38 fn drain_events(&self) -> Vec<HostPushEvent> {
46 Vec::new()
47 }
48}
49
50#[derive(Debug, thiserror::Error)]
54pub enum CrdtRuntimeError {
55 #[error("crdt room not found")]
56 RoomNotFound,
57
58 #[error("roomId is required")]
59 RoomIdRequired,
60
61 #[error("{0}")]
62 Other(String),
63}
64
65const MAX_UPDATES_PER_ROOM: usize = 10_000;
71
72#[derive(Debug, Default)]
80pub struct InMemoryCrdtRuntime {
81 state: Mutex<InMemoryCrdtState>,
82}
83
84#[derive(Debug, Default)]
85struct InMemoryCrdtState {
86 rooms: HashMap<String, RoomState>,
87}
88
89#[derive(Debug, Default)]
90struct RoomState {
91 transport: String,
92 updates: Vec<String>,
94 awareness: Option<String>,
95}
96
97impl InMemoryCrdtRuntime {
98 pub fn new() -> Self {
99 Self {
100 state: Mutex::new(InMemoryCrdtState::default()),
101 }
102 }
103}
104
105fn room_not_found() -> CrdtRuntimeError {
106 CrdtRuntimeError::RoomNotFound
107}
108
109impl CrdtRuntime for InMemoryCrdtRuntime {
110 fn join(&self, room_id: &str, transport: &str) -> Result<CrdtJoinResult, CrdtRuntimeError> {
111 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
112 let room = state
113 .rooms
114 .entry(room_id.to_string())
115 .or_insert_with(|| RoomState {
116 transport: transport.to_string(),
117 ..Default::default()
118 });
119 Ok(CrdtJoinResult {
120 room_id: room_id.to_string(),
121 transport: room.transport.clone(),
122 })
123 }
124
125 fn apply_update(&self, room_id: &str, update_base64: &str) -> Result<bool, CrdtRuntimeError> {
126 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
127 let room = state.rooms.get_mut(room_id).ok_or_else(room_not_found)?;
128 if room.updates.len() >= MAX_UPDATES_PER_ROOM {
129 return Err(CrdtRuntimeError::Other(
130 "in-memory update buffer full (test-only runtime)".into(),
131 ));
132 }
133 room.updates.push(update_base64.to_string());
134 Ok(true)
135 }
136
137 fn get_state_vector(&self, room_id: &str) -> Result<String, CrdtRuntimeError> {
138 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
139 let room = state.rooms.get(room_id).ok_or_else(room_not_found)?;
140 Ok(format!("{}", room.updates.len()))
142 }
143
144 fn get_full_state(&self, room_id: &str) -> Result<String, CrdtRuntimeError> {
145 let state = self.state.lock().unwrap_or_else(|e| e.into_inner());
146 let room = state.rooms.get(room_id).ok_or_else(room_not_found)?;
147 Ok(room.updates.join(","))
149 }
150
151 fn set_awareness(
152 &self,
153 room_id: &str,
154 awareness_state: &str,
155 ) -> Result<bool, CrdtRuntimeError> {
156 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
157 let room = state.rooms.get_mut(room_id).ok_or_else(room_not_found)?;
158 room.awareness = Some(awareness_state.to_string());
159 Ok(true)
160 }
161
162 fn destroy(&self, room_id: &str) -> Result<bool, CrdtRuntimeError> {
163 let mut state = self.state.lock().unwrap_or_else(|e| e.into_inner());
164 state
165 .rooms
166 .remove(room_id)
167 .map(|_| true)
168 .ok_or_else(room_not_found)
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn join_creates_room() {
178 let rt = InMemoryCrdtRuntime::new();
179 let result = rt.join("doc-1", "relay").unwrap();
180 assert_eq!(result.room_id, "doc-1");
181 assert_eq!(result.transport, "relay");
182 }
183
184 #[test]
185 fn join_is_idempotent_and_preserves_state() {
186 let rt = InMemoryCrdtRuntime::new();
187 rt.join("doc-1", "relay").unwrap();
188 rt.apply_update("doc-1", "AQID").unwrap();
189 let result = rt.join("doc-1", "relay").unwrap();
190 assert_eq!(result.room_id, "doc-1");
191 assert_eq!(rt.get_full_state("doc-1").unwrap(), "AQID");
192 }
193
194 #[test]
195 fn join_returns_stored_transport_on_rejoin() {
196 let rt = InMemoryCrdtRuntime::new();
197 rt.join("doc-1", "relay").unwrap();
198 let result = rt.join("doc-1", "p2p").unwrap();
199 assert_eq!(result.transport, "relay");
200 }
201
202 #[test]
203 fn apply_update_stores_data() {
204 let rt = InMemoryCrdtRuntime::new();
205 rt.join("doc-1", "relay").unwrap();
206 assert!(rt.apply_update("doc-1", "AQID").unwrap());
207 assert_eq!(rt.get_full_state("doc-1").unwrap(), "AQID");
208 }
209
210 #[test]
211 fn apply_update_rejects_missing_room() {
212 let rt = InMemoryCrdtRuntime::new();
213 assert!(matches!(
214 rt.apply_update("missing", "AQID").unwrap_err(),
215 CrdtRuntimeError::RoomNotFound
216 ));
217 }
218
219 #[test]
220 fn apply_update_rejects_when_buffer_full() {
221 let rt = InMemoryCrdtRuntime::new();
222 rt.join("doc-1", "relay").unwrap();
223 for i in 0..MAX_UPDATES_PER_ROOM {
224 rt.apply_update("doc-1", &format!("u{i}")).unwrap();
225 }
226 let err = rt.apply_update("doc-1", "overflow").unwrap_err();
227 assert!(
228 matches!(err, CrdtRuntimeError::Other(ref msg) if msg.contains("buffer full")),
229 "expected buffer full error, got: {err}"
230 );
231 }
232
233 #[test]
234 fn get_state_vector_reflects_update_count() {
235 let rt = InMemoryCrdtRuntime::new();
236 rt.join("doc-1", "relay").unwrap();
237 assert_eq!(rt.get_state_vector("doc-1").unwrap(), "0");
238 rt.apply_update("doc-1", "AQID").unwrap();
239 assert_eq!(rt.get_state_vector("doc-1").unwrap(), "1");
240 rt.apply_update("doc-1", "BAUE").unwrap();
241 assert_eq!(rt.get_state_vector("doc-1").unwrap(), "2");
242 }
243
244 #[test]
245 fn get_state_vector_rejects_missing_room() {
246 let rt = InMemoryCrdtRuntime::new();
247 assert!(matches!(
248 rt.get_state_vector("missing").unwrap_err(),
249 CrdtRuntimeError::RoomNotFound
250 ));
251 }
252
253 #[test]
254 fn get_full_state_concatenates_updates() {
255 let rt = InMemoryCrdtRuntime::new();
256 rt.join("doc-1", "relay").unwrap();
257 rt.apply_update("doc-1", "AQID").unwrap();
258 rt.apply_update("doc-1", "BAUE").unwrap();
259 assert_eq!(rt.get_full_state("doc-1").unwrap(), "AQID,BAUE");
260 }
261
262 #[test]
263 fn get_full_state_rejects_missing_room() {
264 let rt = InMemoryCrdtRuntime::new();
265 assert!(matches!(
266 rt.get_full_state("missing").unwrap_err(),
267 CrdtRuntimeError::RoomNotFound
268 ));
269 }
270
271 #[test]
272 fn set_awareness_stores_state() {
273 let rt = InMemoryCrdtRuntime::new();
274 rt.join("doc-1", "relay").unwrap();
275 assert!(rt.set_awareness("doc-1", r#"{"cursor":5}"#).unwrap());
276 }
277
278 #[test]
279 fn set_awareness_rejects_missing_room() {
280 let rt = InMemoryCrdtRuntime::new();
281 assert!(matches!(
282 rt.set_awareness("missing", "{}").unwrap_err(),
283 CrdtRuntimeError::RoomNotFound
284 ));
285 }
286
287 #[test]
288 fn destroy_removes_room() {
289 let rt = InMemoryCrdtRuntime::new();
290 rt.join("doc-1", "relay").unwrap();
291 assert!(rt.destroy("doc-1").unwrap());
292 assert!(matches!(
294 rt.apply_update("doc-1", "AQID").unwrap_err(),
295 CrdtRuntimeError::RoomNotFound
296 ));
297 }
298
299 #[test]
300 fn destroy_rejects_missing_room() {
301 let rt = InMemoryCrdtRuntime::new();
302 assert!(matches!(
303 rt.destroy("missing").unwrap_err(),
304 CrdtRuntimeError::RoomNotFound
305 ));
306 }
307
308 #[test]
309 fn double_destroy_returns_error() {
310 let rt = InMemoryCrdtRuntime::new();
311 rt.join("doc-1", "relay").unwrap();
312 assert!(rt.destroy("doc-1").unwrap());
313 assert!(matches!(
314 rt.destroy("doc-1").unwrap_err(),
315 CrdtRuntimeError::RoomNotFound
316 ));
317 }
318
319 #[test]
320 fn drain_events_returns_empty_by_default() {
321 let rt = InMemoryCrdtRuntime::new();
322 assert!(rt.drain_events().is_empty());
323 }
324
325 #[test]
326 fn crdt_runtime_error_display_matches_contract() {
327 assert_eq!(
328 CrdtRuntimeError::RoomNotFound.to_string(),
329 crate::executor_contract::ERR_CRDT_ROOM_NOT_FOUND
330 );
331 assert_eq!(
332 CrdtRuntimeError::RoomIdRequired.to_string(),
333 crate::executor_contract::ERR_CRDT_ROOM_ID_REQUIRED
334 );
335 }
336}