pylon_runtime/loro_store.rs
1//! Server-side per-row LoroDoc cache with snapshot persistence.
2//!
3//! For CRDT-backed entities (`crdt: true` in the manifest, the default),
4//! every row corresponds to one [`LoroDoc`]. This store owns those docs
5//! in memory, hydrates them on demand from a sidecar SQLite table,
6//! write-throughs every commit, and projects the doc state into the JSON
7//! shape Pylon's existing storage layer expects.
8//!
9//! # Persistence shape
10//!
11//! Single sidecar table:
12//!
13//! ```sql
14//! CREATE TABLE _pylon_crdt_snapshots (
15//! entity TEXT NOT NULL,
16//! row_id TEXT NOT NULL,
17//! snapshot BLOB NOT NULL,
18//! updated_at TEXT NOT NULL,
19//! PRIMARY KEY (entity, row_id)
20//! );
21//! ```
22//!
23//! Snapshots are full-state Loro snapshots (`ExportMode::Snapshot`).
24//! Loro applies internal compaction so the snapshot size stays bounded;
25//! we don't track an op log separately.
26//!
27//! # In-memory cache
28//!
29//! Active rows live in a `HashMap<(entity, row_id), Arc<Mutex<LoroDoc>>>`.
30//! First access for a row hydrates the doc from the sidecar (or creates
31//! a fresh one). Subsequent accesses reuse the in-memory doc — required
32//! both for correctness (Loro's CRDT identity is per-doc-instance) and
33//! perf (snapshot decode is ~100µs per row).
34//!
35//! No eviction yet. Working sets up to ~100K active rows are fine on
36//! commodity hardware (~5-50 MB). For larger working sets a follow-up
37//! adds LRU eviction with snapshot reload on next access.
38//!
39//! # Bandwidth: full snapshot per write (TODO)
40//!
41//! Every CRDT-mode write triggers a binary WS broadcast carrying the
42//! row's *full* current snapshot, not just the incremental update.
43//! Loro's compaction bounds individual snapshots, but the per-write
44//! cost still scales with total state size, not write size.
45//!
46//! Concrete numbers:
47//!
48//! | Workload | Snapshot/row | Per-write fanout |
49//! |------------------------------------|--------------|------------------|
50//! | Chat message | ~200 B | tiny |
51//! | Boring CRUD record | ~500 B | tiny |
52//! | Whiteboard with 1k strokes | ~30 KB | uncomfortable |
53//! | Document with 50K-char body | ~80 KB | bad |
54//!
55//! Multiply by `connected_clients × writes_per_second` to get total
56//! broadcast bandwidth. For chat-shaped workloads it's free. For collab
57//! whiteboards / large documents it bites once you pass ~10 connected
58//! clients on a hot row.
59//!
60//! # Switching to incremental updates
61//!
62//! Loro already supports `export(ExportMode::updates(version_vector))`
63//! returning only the ops a peer hasn't seen — the building block is
64//! there. What's missing is the per-client tracking:
65//!
66//! 1. Subscribe protocol — clients tell the server "I want updates for
67//! rows X, Y, Z" instead of every CRDT write fanning out to every
68//! client. Pylon's existing room layer is the natural transport
69//! once room semantics extend to per-row subscriptions.
70//! 2. Server-side state — `(client_id, entity, row_id) → version_vector`
71//! so the server knows what each client is missing. Bounded by the
72//! subscribe set; LRU-evicted with the doc cache.
73//! 3. Encoder swap — `notify_crdt` calls `encode_update_since(vv)`
74//! instead of `encode_snapshot()` and ships frame type `0x11`
75//! (CRDT_FRAME_UPDATE) instead of `0x10` (CRDT_FRAME_SNAPSHOT).
76//! Wire format already reserves both bytes.
77//! 4. New-subscriber bootstrap — first frame is still a snapshot
78//! (`0x10`), subsequent frames are deltas (`0x11`).
79//!
80//! Estimated effort: ~2 days for a working slice plus a week of
81//! production hardening (correct VV tracking under reconnects,
82//! garbage-collecting subscriptions on disconnect, handling missed
83//! frames via resync request).
84//!
85//! Until then this implementation is fine for chat / boring CRUD /
86//! demo workloads. Don't run a Figma clone on it.
87
88use std::collections::HashMap;
89use std::sync::{Arc, Mutex};
90
91use pylon_crdt::{
92 apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
93 loro::{LoroDoc, VersionVector},
94 project_doc_to_json, CrdtField,
95};
96use rusqlite::{params, Connection};
97use serde_json::Value;
98
99// ---------------------------------------------------------------------------
100// Sidecar table
101// ---------------------------------------------------------------------------
102
103/// SQL to create the snapshot sidecar. Idempotent. Called by Runtime
104/// constructor for any database where CRDT mode could be in use (always,
105/// since `crdt: true` is the default).
106pub const CREATE_SIDECAR_SQL: &str = "
107CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (
108 entity TEXT NOT NULL,
109 row_id TEXT NOT NULL,
110 snapshot BLOB NOT NULL,
111 updated_at TEXT NOT NULL,
112 PRIMARY KEY (entity, row_id)
113)
114";
115
116/// Create the sidecar table. Safe to call repeatedly.
117pub fn ensure_sidecar(conn: &Connection) -> Result<(), LoroStoreError> {
118 conn.execute(CREATE_SIDECAR_SQL, [])
119 .map(|_| ())
120 .map_err(|e| LoroStoreError::Storage(format!("create sidecar: {e}")))
121}
122
123// ---------------------------------------------------------------------------
124// Errors
125// ---------------------------------------------------------------------------
126
127#[derive(Debug)]
128pub enum LoroStoreError {
129 /// Patch contained a value that didn't match the field's CRDT shape
130 /// (e.g. number on a Bool field). Schema/caller mismatch.
131 Apply(String),
132 /// Storage layer error — sidecar create / read / write failed.
133 Storage(String),
134 /// Loro decode error — corrupted snapshot in the sidecar, or a peer
135 /// sent an invalid binary update. The owning code should surface this
136 /// to the client (for remote updates) or fail loud (for stored snapshots).
137 Decode(String),
138}
139
140impl std::fmt::Display for LoroStoreError {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 match self {
143 Self::Apply(m) => write!(f, "apply: {m}"),
144 Self::Storage(m) => write!(f, "storage: {m}"),
145 Self::Decode(m) => write!(f, "decode: {m}"),
146 }
147 }
148}
149
150impl std::error::Error for LoroStoreError {}
151
152// ---------------------------------------------------------------------------
153// Store
154// ---------------------------------------------------------------------------
155
156/// Server-side per-row LoroDoc cache + persistence layer.
157///
158/// One instance per Runtime. Cheap to clone via [`Arc`]; internally
159/// guards a `HashMap` of doc handles, each itself behind a `Mutex` so
160/// concurrent access to *different* rows doesn't contend.
161#[derive(Default)]
162pub struct LoroStore {
163 /// Per-row cache. The outer Mutex guards lookup; the inner Mutex
164 /// guards mutation of the specific doc. We hold the outer briefly
165 /// (insert/lookup), then release before doing any Loro work, so two
166 /// requests targeting different rows never block each other.
167 docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
168}
169
170impl LoroStore {
171 pub fn new() -> Self {
172 Self::default()
173 }
174
175 /// Get the cached doc for a row, hydrating from the sidecar if absent.
176 /// Returns a freshly-created doc if the row has no snapshot yet.
177 fn get_or_hydrate(
178 &self,
179 conn: &Connection,
180 entity: &str,
181 row_id: &str,
182 ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
183 let key = (entity.to_string(), row_id.to_string());
184
185 // Fast path: already cached.
186 {
187 let guard = self.docs.lock().unwrap();
188 if let Some(doc) = guard.get(&key) {
189 return Ok(Arc::clone(doc));
190 }
191 }
192
193 // Slow path: hydrate (or create fresh) outside the cache lock.
194 // Two concurrent first-accesses can both do this; the loser's
195 // doc is dropped after the cache check below. Loro's snapshot
196 // decode is deterministic, so both copies are byte-identical;
197 // the race only wastes a microsecond, never produces divergence.
198 let snapshot: Option<Vec<u8>> = conn
199 .query_row(
200 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = ?1 AND row_id = ?2",
201 params![entity, row_id],
202 |r| r.get(0),
203 )
204 .map(Some)
205 .or_else(|e| {
206 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
207 Ok(None)
208 } else {
209 Err(LoroStoreError::Storage(format!("read snapshot: {e}")))
210 }
211 })?;
212
213 let doc = LoroDoc::new();
214 if let Some(bytes) = snapshot {
215 crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
216 }
217 let handle = Arc::new(Mutex::new(doc));
218
219 // Re-acquire cache lock and publish, but defer to whatever's
220 // already there if we lost the race.
221 let mut guard = self.docs.lock().unwrap();
222 let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
223 Ok(Arc::clone(entry))
224 }
225
226 /// Persist the current snapshot for a row to the sidecar. Called
227 /// after every commit. Synchronous; tests rely on read-after-write.
228 fn persist_snapshot(
229 &self,
230 conn: &Connection,
231 entity: &str,
232 row_id: &str,
233 doc: &LoroDoc,
234 ) -> Result<(), LoroStoreError> {
235 let snap = encode_snapshot(doc);
236 let now = chrono_now_iso();
237 conn.execute(
238 "INSERT OR REPLACE INTO _pylon_crdt_snapshots
239 (entity, row_id, snapshot, updated_at)
240 VALUES (?1, ?2, ?3, ?4)",
241 params![entity, row_id, snap, now],
242 )
243 .map(|_| ())
244 .map_err(|e| LoroStoreError::Storage(format!("persist snapshot: {e}")))
245 }
246
247 /// Apply a JSON `{field: value}` patch to the row's doc, persist the
248 /// new snapshot, and return the projected JSON (the row shape SQLite
249 /// stores in the materialized view).
250 pub fn apply_patch(
251 &self,
252 conn: &Connection,
253 entity: &str,
254 row_id: &str,
255 fields: &[CrdtField],
256 patch: &Value,
257 ) -> Result<Value, LoroStoreError> {
258 let handle = self.get_or_hydrate(conn, entity, row_id)?;
259 let projected = {
260 let doc = handle.lock().unwrap();
261 apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
262 self.persist_snapshot(conn, entity, row_id, &doc)?;
263 project_doc_to_json(&doc, fields)
264 };
265 Ok(projected)
266 }
267
268 /// Apply a binary update from a peer (typed-protocol client push or
269 /// server-to-server replication). Persists the new snapshot. Returns
270 /// the projected JSON for SQLite materialization so the materialized
271 /// view stays in sync with the CRDT after remote-driven changes.
272 pub fn apply_remote_update(
273 &self,
274 conn: &Connection,
275 entity: &str,
276 row_id: &str,
277 fields: &[CrdtField],
278 update: &[u8],
279 ) -> Result<Value, LoroStoreError> {
280 let handle = self.get_or_hydrate(conn, entity, row_id)?;
281 let projected = {
282 let doc = handle.lock().unwrap();
283 crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
284 self.persist_snapshot(conn, entity, row_id, &doc)?;
285 project_doc_to_json(&doc, fields)
286 };
287 Ok(projected)
288 }
289
290 /// Get the full snapshot for a row. Sent to a fresh client when it
291 /// subscribes. Returns an empty `Vec` for rows that don't exist yet.
292 pub fn snapshot(
293 &self,
294 conn: &Connection,
295 entity: &str,
296 row_id: &str,
297 ) -> Result<Vec<u8>, LoroStoreError> {
298 let handle = self.get_or_hydrate(conn, entity, row_id)?;
299 let doc = handle.lock().unwrap();
300 Ok(encode_snapshot(&doc))
301 }
302
303 /// Get an incremental update since `since` — only the ops the peer
304 /// hasn't seen. Used to catch up a peer that's been disconnected.
305 pub fn update_since(
306 &self,
307 conn: &Connection,
308 entity: &str,
309 row_id: &str,
310 since: &VersionVector,
311 ) -> Result<Vec<u8>, LoroStoreError> {
312 let handle = self.get_or_hydrate(conn, entity, row_id)?;
313 let doc = handle.lock().unwrap();
314 Ok(encode_update_since(&doc, since))
315 }
316
317 /// Drop a row's doc from the in-memory cache. Useful for tests and
318 /// for the eventual eviction policy. Doesn't touch the sidecar; the
319 /// next read will re-hydrate from disk.
320 pub fn evict(&self, entity: &str, row_id: &str) {
321 self.docs
322 .lock()
323 .unwrap()
324 .remove(&(entity.to_string(), row_id.to_string()));
325 }
326
327 /// Number of rows currently held in memory. Diagnostic.
328 pub fn cached_rows(&self) -> usize {
329 self.docs.lock().unwrap().len()
330 }
331}
332
333// ---------------------------------------------------------------------------
334// Helpers
335// ---------------------------------------------------------------------------
336
337fn chrono_now_iso() -> String {
338 use std::time::{SystemTime, UNIX_EPOCH};
339 let secs = SystemTime::now()
340 .duration_since(UNIX_EPOCH)
341 .map(|d| d.as_secs())
342 .unwrap_or(0);
343 format!("{}Z", secs)
344}
345
346// ---------------------------------------------------------------------------
347// Tests
348// ---------------------------------------------------------------------------
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use pylon_crdt::CrdtFieldKind;
354
355 fn open_test_db() -> Connection {
356 let conn = Connection::open_in_memory().unwrap();
357 ensure_sidecar(&conn).unwrap();
358 conn
359 }
360
361 fn fields() -> Vec<CrdtField> {
362 vec![
363 CrdtField {
364 name: "title".into(),
365 kind: CrdtFieldKind::LwwString,
366 },
367 CrdtField {
368 name: "body".into(),
369 kind: CrdtFieldKind::Text,
370 },
371 CrdtField {
372 name: "qty".into(),
373 kind: CrdtFieldKind::LwwNumber,
374 },
375 ]
376 }
377
378 #[test]
379 fn sidecar_is_idempotent() {
380 let conn = open_test_db();
381 ensure_sidecar(&conn).unwrap(); // Re-create OK.
382 }
383
384 #[test]
385 fn apply_patch_persists_and_projects() {
386 let conn = open_test_db();
387 let store = LoroStore::new();
388 let projected = store
389 .apply_patch(
390 &conn,
391 "Note",
392 "n1",
393 &fields(),
394 &serde_json::json!({"title": "Hello", "body": "world", "qty": 7}),
395 )
396 .unwrap();
397 assert_eq!(projected["title"], "Hello");
398 assert_eq!(projected["body"], "world");
399 assert_eq!(projected["qty"], 7.0);
400
401 // Sidecar row exists.
402 let count: i64 = conn
403 .query_row(
404 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity='Note' AND row_id='n1'",
405 [],
406 |r| r.get(0),
407 )
408 .unwrap();
409 assert_eq!(count, 1);
410 }
411
412 #[test]
413 fn second_open_hydrates_from_sidecar() {
414 let conn = open_test_db();
415 let store = LoroStore::new();
416 store
417 .apply_patch(
418 &conn,
419 "Note",
420 "n1",
421 &fields(),
422 &serde_json::json!({"title": "A", "qty": 1}),
423 )
424 .unwrap();
425
426 // Drop in-memory cache; next read must rehydrate from disk.
427 store.evict("Note", "n1");
428 assert_eq!(store.cached_rows(), 0);
429
430 let snap = store.snapshot(&conn, "Note", "n1").unwrap();
431 assert!(
432 !snap.is_empty(),
433 "snapshot should be non-empty after writes"
434 );
435 assert_eq!(store.cached_rows(), 1, "snapshot() rehydrated the cache");
436 }
437
438 #[test]
439 fn empty_row_yields_empty_snapshot() {
440 let conn = open_test_db();
441 let store = LoroStore::new();
442 let snap = store.snapshot(&conn, "Note", "missing").unwrap();
443 // An empty Loro doc still produces a small snapshot with version
444 // bookkeeping — just assert it round-trips, not its size.
445 let store2 = LoroStore::new();
446 store2
447 .apply_remote_update(&conn, "Note", "missing", &fields(), &snap)
448 .unwrap();
449 }
450
451 #[test]
452 fn remote_update_merges_with_local_state() {
453 let conn = open_test_db();
454
455 // Server has a row with title=A, qty=1.
456 let server = LoroStore::new();
457 server
458 .apply_patch(
459 &conn,
460 "Note",
461 "n1",
462 &fields(),
463 &serde_json::json!({"title": "A", "qty": 1}),
464 )
465 .unwrap();
466 let server_snap = server.snapshot(&conn, "Note", "n1").unwrap();
467
468 // A different LoroStore (think: peer / replica) starts from a
469 // fresh DB, applies the snapshot, then makes a divergent edit.
470 let conn2 = open_test_db();
471 let peer = LoroStore::new();
472 peer.apply_remote_update(&conn2, "Note", "n1", &fields(), &server_snap)
473 .unwrap();
474 peer.apply_patch(
475 &conn2,
476 "Note",
477 "n1",
478 &fields(),
479 &serde_json::json!({"qty": 2}),
480 )
481 .unwrap();
482 let peer_update = peer.snapshot(&conn2, "Note", "n1").unwrap();
483
484 // Server applies the peer's update. Both fields converge.
485 let projected = server
486 .apply_remote_update(&conn, "Note", "n1", &fields(), &peer_update)
487 .unwrap();
488 assert_eq!(projected["title"], "A");
489 assert_eq!(projected["qty"], 2.0);
490 }
491
492 #[test]
493 fn concurrent_text_writes_converge() {
494 let conn_a = open_test_db();
495 let conn_b = open_test_db();
496 let a = LoroStore::new();
497 let b = LoroStore::new();
498
499 a.apply_patch(
500 &conn_a,
501 "Note",
502 "n1",
503 &fields(),
504 &serde_json::json!({"body": "from-a"}),
505 )
506 .unwrap();
507 b.apply_patch(
508 &conn_b,
509 "Note",
510 "n1",
511 &fields(),
512 &serde_json::json!({"body": "from-b"}),
513 )
514 .unwrap();
515
516 let snap_a = a.snapshot(&conn_a, "Note", "n1").unwrap();
517 let snap_b = b.snapshot(&conn_b, "Note", "n1").unwrap();
518
519 let projected_a = a
520 .apply_remote_update(&conn_a, "Note", "n1", &fields(), &snap_b)
521 .unwrap();
522 let projected_b = b
523 .apply_remote_update(&conn_b, "Note", "n1", &fields(), &snap_a)
524 .unwrap();
525
526 // Both stores converge to the same byte-for-byte state.
527 assert_eq!(projected_a, projected_b);
528 let body = projected_a["body"].as_str().unwrap();
529 assert!(!body.is_empty(), "body should contain merged text");
530 }
531
532 #[test]
533 fn incremental_update_carries_only_delta() {
534 let conn = open_test_db();
535 let store = LoroStore::new();
536
537 store
538 .apply_patch(
539 &conn,
540 "Note",
541 "n1",
542 &fields(),
543 &serde_json::json!({"title": "v1", "qty": 1}),
544 )
545 .unwrap();
546
547 // Snapshot before the next edit — represents what a connected
548 // peer has already seen.
549 let early_vv = {
550 let handle = store.get_or_hydrate(&conn, "Note", "n1").unwrap();
551 let vv = handle.lock().unwrap().oplog_vv();
552 vv
553 };
554 let snap_full = store.snapshot(&conn, "Note", "n1").unwrap();
555
556 store
557 .apply_patch(
558 &conn,
559 "Note",
560 "n1",
561 &fields(),
562 &serde_json::json!({"qty": 7}),
563 )
564 .unwrap();
565
566 let delta = store.update_since(&conn, "Note", "n1", &early_vv).unwrap();
567 assert!(
568 delta.len() < snap_full.len(),
569 "incremental delta ({}) must be smaller than full snapshot ({})",
570 delta.len(),
571 snap_full.len()
572 );
573 }
574
575 #[test]
576 fn cache_keeps_distinct_rows_separate() {
577 let conn = open_test_db();
578 let store = LoroStore::new();
579 store
580 .apply_patch(
581 &conn,
582 "Note",
583 "n1",
584 &fields(),
585 &serde_json::json!({"title": "first"}),
586 )
587 .unwrap();
588 store
589 .apply_patch(
590 &conn,
591 "Note",
592 "n2",
593 &fields(),
594 &serde_json::json!({"title": "second"}),
595 )
596 .unwrap();
597 assert_eq!(store.cached_rows(), 2);
598
599 let p1 = store
600 .apply_patch(&conn, "Note", "n1", &fields(), &serde_json::json!({}))
601 .unwrap();
602 let p2 = store
603 .apply_patch(&conn, "Note", "n2", &fields(), &serde_json::json!({}))
604 .unwrap();
605 assert_eq!(p1["title"], "first");
606 assert_eq!(p2["title"], "second");
607 }
608}