pylon_runtime/loro_store.rs
1//! Server-side per-row LoroDoc cache with snapshot persistence.
2//!
3//! For CRDT-backed entities (`crdt: true` in the manifest, the default),
4//! every row corresponds to one [`LoroDoc`]. This store owns those docs
5//! in memory, hydrates them on demand from a sidecar SQLite table,
6//! write-throughs every commit, and projects the doc state into the JSON
7//! shape Pylon's existing storage layer expects.
8//!
9//! # Persistence shape
10//!
11//! Single sidecar table:
12//!
13//! ```sql
14//! CREATE TABLE _pylon_crdt_snapshots (
15//! entity TEXT NOT NULL,
16//! row_id TEXT NOT NULL,
17//! snapshot BLOB NOT NULL,
18//! updated_at TEXT NOT NULL,
19//! PRIMARY KEY (entity, row_id)
20//! );
21//! ```
22//!
23//! Snapshots are full-state Loro snapshots (`ExportMode::Snapshot`).
24//! Loro applies internal compaction so the snapshot size stays bounded;
25//! we don't track an op log separately.
26//!
27//! # In-memory cache
28//!
29//! Active rows live in a `HashMap<(entity, row_id), Arc<Mutex<LoroDoc>>>`.
30//! First access for a row hydrates the doc from the sidecar (or creates
31//! a fresh one). Subsequent accesses reuse the in-memory doc — required
32//! both for correctness (Loro's CRDT identity is per-doc-instance) and
33//! perf (snapshot decode is ~100µs per row).
34//!
35//! No eviction yet. Working sets up to ~100K active rows are fine on
36//! commodity hardware (~5-50 MB). For larger working sets a follow-up
37//! adds LRU eviction with snapshot reload on next access.
38//!
39//! # Bandwidth: full snapshot per write (TODO)
40//!
41//! Every CRDT-mode write triggers a binary WS broadcast carrying the
42//! row's *full* current snapshot, not just the incremental update.
43//! Loro's compaction bounds individual snapshots, but the per-write
44//! cost still scales with total state size, not write size.
45//!
46//! Concrete numbers:
47//!
48//! | Workload | Snapshot/row | Per-write fanout |
49//! |------------------------------------|--------------|------------------|
50//! | Chat message | ~200 B | tiny |
51//! | Boring CRUD record | ~500 B | tiny |
52//! | Whiteboard with 1k strokes | ~30 KB | uncomfortable |
53//! | Document with 50K-char body | ~80 KB | bad |
54//!
55//! Multiply by `connected_clients × writes_per_second` to get total
56//! broadcast bandwidth. For chat-shaped workloads it's free. For collab
57//! whiteboards / large documents it bites once you pass ~10 connected
58//! clients on a hot row.
59//!
60//! # Switching to incremental updates
61//!
62//! Loro already supports `export(ExportMode::updates(version_vector))`
63//! returning only the ops a peer hasn't seen — the building block is
64//! there. What's missing is the per-client tracking:
65//!
66//! 1. Subscribe protocol — clients tell the server "I want updates for
67//! rows X, Y, Z" instead of every CRDT write fanning out to every
68//! client. Pylon's existing room layer is the natural transport
69//! once room semantics extend to per-row subscriptions.
70//! 2. Server-side state — `(client_id, entity, row_id) → version_vector`
71//! so the server knows what each client is missing. Bounded by the
72//! subscribe set; LRU-evicted with the doc cache.
73//! 3. Encoder swap — `notify_crdt` calls `encode_update_since(vv)`
74//! instead of `encode_snapshot()` and ships frame type `0x11`
75//! (CRDT_FRAME_UPDATE) instead of `0x10` (CRDT_FRAME_SNAPSHOT).
76//! Wire format already reserves both bytes.
77//! 4. New-subscriber bootstrap — first frame is still a snapshot
78//! (`0x10`), subsequent frames are deltas (`0x11`).
79//!
80//! Estimated effort: ~2 days for a working slice plus a week of
81//! production hardening (correct VV tracking under reconnects,
82//! garbage-collecting subscriptions on disconnect, handling missed
83//! frames via resync request).
84//!
85//! Until then this implementation is fine for chat / boring CRUD /
86//! demo workloads. Don't run a Figma clone on it.
87
88use std::collections::HashMap;
89use std::sync::{Arc, Mutex};
90
91use pylon_crdt::{
92 apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
93 loro::{LoroDoc, VersionVector},
94 project_doc_to_json, CrdtField,
95};
96use rusqlite::{params, Connection};
97use serde_json::Value;
98
99// ---------------------------------------------------------------------------
100// Sidecar table
101// ---------------------------------------------------------------------------
102
103/// SQL to create the snapshot sidecar. Idempotent. Called by Runtime
104/// constructor for any database where CRDT mode could be in use (always,
105/// since `crdt: true` is the default).
106pub const CREATE_SIDECAR_SQL: &str = "
107CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (
108 entity TEXT NOT NULL,
109 row_id TEXT NOT NULL,
110 snapshot BLOB NOT NULL,
111 updated_at TEXT NOT NULL,
112 PRIMARY KEY (entity, row_id)
113)
114";
115
116/// Create the sidecar table. Safe to call repeatedly.
117pub fn ensure_sidecar(conn: &Connection) -> Result<(), LoroStoreError> {
118 conn.execute(CREATE_SIDECAR_SQL, [])
119 .map(|_| ())
120 .map_err(|e| LoroStoreError::Storage(format!("create sidecar: {e}")))
121}
122
123// ---------------------------------------------------------------------------
124// Errors
125// ---------------------------------------------------------------------------
126
127#[derive(Debug)]
128pub enum LoroStoreError {
129 /// Patch contained a value that didn't match the field's CRDT shape
130 /// (e.g. number on a Bool field). Schema/caller mismatch.
131 Apply(String),
132 /// Storage layer error — sidecar create / read / write failed.
133 Storage(String),
134 /// Loro decode error — corrupted snapshot in the sidecar, or a peer
135 /// sent an invalid binary update. The owning code should surface this
136 /// to the client (for remote updates) or fail loud (for stored snapshots).
137 Decode(String),
138}
139
140impl std::fmt::Display for LoroStoreError {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 match self {
143 Self::Apply(m) => write!(f, "apply: {m}"),
144 Self::Storage(m) => write!(f, "storage: {m}"),
145 Self::Decode(m) => write!(f, "decode: {m}"),
146 }
147 }
148}
149
150impl std::error::Error for LoroStoreError {}
151
152/// Lift a `DataError` into a `LoroStoreError::Storage` so closures
153/// passed to `PostgresDataStore::with_client` can return
154/// `LoroStoreError` directly. The `with_client` bound requires
155/// `E: From<DataError>` so the lock-poisoning case can fan out into
156/// the caller's error type.
157impl From<pylon_http::DataError> for LoroStoreError {
158 fn from(e: pylon_http::DataError) -> Self {
159 LoroStoreError::Storage(format!("[{}] {}", e.code, e.message))
160 }
161}
162
163// ---------------------------------------------------------------------------
164// Store
165// ---------------------------------------------------------------------------
166
167/// Server-side per-row LoroDoc cache + persistence layer.
168///
169/// One instance per Runtime. Cheap to clone via [`Arc`]; internally
170/// guards a `HashMap` of doc handles, each itself behind a `Mutex` so
171/// concurrent access to *different* rows doesn't contend.
172#[derive(Default)]
173pub struct LoroStore {
174 /// Per-row cache. The outer Mutex guards lookup; the inner Mutex
175 /// guards mutation of the specific doc. We hold the outer briefly
176 /// (insert/lookup), then release before doing any Loro work, so two
177 /// requests targeting different rows never block each other.
178 docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
179}
180
181impl LoroStore {
182 pub fn new() -> Self {
183 Self::default()
184 }
185
186 /// Get the cached doc for a row, hydrating from the sidecar if absent.
187 /// Returns a freshly-created doc if the row has no snapshot yet.
188 fn get_or_hydrate(
189 &self,
190 conn: &Connection,
191 entity: &str,
192 row_id: &str,
193 ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
194 let key = (entity.to_string(), row_id.to_string());
195
196 // Fast path: already cached.
197 {
198 let guard = self.docs.lock().unwrap();
199 if let Some(doc) = guard.get(&key) {
200 return Ok(Arc::clone(doc));
201 }
202 }
203
204 // Slow path: hydrate (or create fresh) outside the cache lock.
205 // Two concurrent first-accesses can both do this; the loser's
206 // doc is dropped after the cache check below. Loro's snapshot
207 // decode is deterministic, so both copies are byte-identical;
208 // the race only wastes a microsecond, never produces divergence.
209 let snapshot: Option<Vec<u8>> = conn
210 .query_row(
211 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = ?1 AND row_id = ?2",
212 params![entity, row_id],
213 |r| r.get(0),
214 )
215 .map(Some)
216 .or_else(|e| {
217 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
218 Ok(None)
219 } else {
220 Err(LoroStoreError::Storage(format!("read snapshot: {e}")))
221 }
222 })?;
223
224 let doc = LoroDoc::new();
225 if let Some(bytes) = snapshot {
226 crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
227 }
228 let handle = Arc::new(Mutex::new(doc));
229
230 // Re-acquire cache lock and publish, but defer to whatever's
231 // already there if we lost the race.
232 let mut guard = self.docs.lock().unwrap();
233 let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
234 Ok(Arc::clone(entry))
235 }
236
237 /// Persist the current snapshot for a row to the sidecar. Called
238 /// after every commit. Synchronous; tests rely on read-after-write.
239 fn persist_snapshot(
240 &self,
241 conn: &Connection,
242 entity: &str,
243 row_id: &str,
244 doc: &LoroDoc,
245 ) -> Result<(), LoroStoreError> {
246 let snap = encode_snapshot(doc);
247 let now = chrono_now_iso();
248 conn.execute(
249 "INSERT OR REPLACE INTO _pylon_crdt_snapshots
250 (entity, row_id, snapshot, updated_at)
251 VALUES (?1, ?2, ?3, ?4)",
252 params![entity, row_id, snap, now],
253 )
254 .map(|_| ())
255 .map_err(|e| LoroStoreError::Storage(format!("persist snapshot: {e}")))
256 }
257
258 /// Apply a JSON `{field: value}` patch to the row's doc, persist the
259 /// new snapshot, and return the projected JSON (the row shape SQLite
260 /// stores in the materialized view).
261 pub fn apply_patch(
262 &self,
263 conn: &Connection,
264 entity: &str,
265 row_id: &str,
266 fields: &[CrdtField],
267 patch: &Value,
268 ) -> Result<Value, LoroStoreError> {
269 let handle = self.get_or_hydrate(conn, entity, row_id)?;
270 let projected = {
271 let doc = handle.lock().unwrap();
272 apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
273 self.persist_snapshot(conn, entity, row_id, &doc)?;
274 project_doc_to_json(&doc, fields)
275 };
276 Ok(projected)
277 }
278
279 /// Apply a binary update from a peer (typed-protocol client push or
280 /// server-to-server replication). Persists the new snapshot. Returns
281 /// the projected JSON for SQLite materialization so the materialized
282 /// view stays in sync with the CRDT after remote-driven changes.
283 pub fn apply_remote_update(
284 &self,
285 conn: &Connection,
286 entity: &str,
287 row_id: &str,
288 fields: &[CrdtField],
289 update: &[u8],
290 ) -> Result<Value, LoroStoreError> {
291 let handle = self.get_or_hydrate(conn, entity, row_id)?;
292 let projected = {
293 let doc = handle.lock().unwrap();
294 crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
295 self.persist_snapshot(conn, entity, row_id, &doc)?;
296 project_doc_to_json(&doc, fields)
297 };
298 Ok(projected)
299 }
300
301 /// Get the full snapshot for a row. Sent to a fresh client when it
302 /// subscribes. Returns an empty `Vec` for rows that don't exist yet.
303 pub fn snapshot(
304 &self,
305 conn: &Connection,
306 entity: &str,
307 row_id: &str,
308 ) -> Result<Vec<u8>, LoroStoreError> {
309 let handle = self.get_or_hydrate(conn, entity, row_id)?;
310 let doc = handle.lock().unwrap();
311 Ok(encode_snapshot(&doc))
312 }
313
314 /// Get an incremental update since `since` — only the ops the peer
315 /// hasn't seen. Used to catch up a peer that's been disconnected.
316 pub fn update_since(
317 &self,
318 conn: &Connection,
319 entity: &str,
320 row_id: &str,
321 since: &VersionVector,
322 ) -> Result<Vec<u8>, LoroStoreError> {
323 let handle = self.get_or_hydrate(conn, entity, row_id)?;
324 let doc = handle.lock().unwrap();
325 Ok(encode_update_since(&doc, since))
326 }
327
328 /// Drop a row's doc from the in-memory cache. Useful for tests and
329 /// for the eventual eviction policy. Doesn't touch the sidecar; the
330 /// next read will re-hydrate from disk.
331 pub fn evict(&self, entity: &str, row_id: &str) {
332 self.docs
333 .lock()
334 .unwrap()
335 .remove(&(entity.to_string(), row_id.to_string()));
336 }
337
338 /// Number of rows currently held in memory. Diagnostic.
339 pub fn cached_rows(&self) -> usize {
340 self.docs.lock().unwrap().len()
341 }
342}
343
344// ---------------------------------------------------------------------------
345// Helpers
346// ---------------------------------------------------------------------------
347
348fn chrono_now_iso() -> String {
349 use std::time::{SystemTime, UNIX_EPOCH};
350 let secs = SystemTime::now()
351 .duration_since(UNIX_EPOCH)
352 .map(|d| d.as_secs())
353 .unwrap_or(0);
354 format!("{}Z", secs)
355}
356
357// ---------------------------------------------------------------------------
358// Tests
359// ---------------------------------------------------------------------------
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use pylon_crdt::CrdtFieldKind;
365
366 fn open_test_db() -> Connection {
367 let conn = Connection::open_in_memory().unwrap();
368 ensure_sidecar(&conn).unwrap();
369 conn
370 }
371
372 fn fields() -> Vec<CrdtField> {
373 vec![
374 CrdtField {
375 name: "title".into(),
376 kind: CrdtFieldKind::LwwString,
377 },
378 CrdtField {
379 name: "body".into(),
380 kind: CrdtFieldKind::Text,
381 },
382 CrdtField {
383 name: "qty".into(),
384 kind: CrdtFieldKind::LwwNumber,
385 },
386 ]
387 }
388
389 #[test]
390 fn sidecar_is_idempotent() {
391 let conn = open_test_db();
392 ensure_sidecar(&conn).unwrap(); // Re-create OK.
393 }
394
395 #[test]
396 fn apply_patch_persists_and_projects() {
397 let conn = open_test_db();
398 let store = LoroStore::new();
399 let projected = store
400 .apply_patch(
401 &conn,
402 "Note",
403 "n1",
404 &fields(),
405 &serde_json::json!({"title": "Hello", "body": "world", "qty": 7}),
406 )
407 .unwrap();
408 assert_eq!(projected["title"], "Hello");
409 assert_eq!(projected["body"], "world");
410 assert_eq!(projected["qty"], 7.0);
411
412 // Sidecar row exists.
413 let count: i64 = conn
414 .query_row(
415 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity='Note' AND row_id='n1'",
416 [],
417 |r| r.get(0),
418 )
419 .unwrap();
420 assert_eq!(count, 1);
421 }
422
423 #[test]
424 fn second_open_hydrates_from_sidecar() {
425 let conn = open_test_db();
426 let store = LoroStore::new();
427 store
428 .apply_patch(
429 &conn,
430 "Note",
431 "n1",
432 &fields(),
433 &serde_json::json!({"title": "A", "qty": 1}),
434 )
435 .unwrap();
436
437 // Drop in-memory cache; next read must rehydrate from disk.
438 store.evict("Note", "n1");
439 assert_eq!(store.cached_rows(), 0);
440
441 let snap = store.snapshot(&conn, "Note", "n1").unwrap();
442 assert!(
443 !snap.is_empty(),
444 "snapshot should be non-empty after writes"
445 );
446 assert_eq!(store.cached_rows(), 1, "snapshot() rehydrated the cache");
447 }
448
449 #[test]
450 fn empty_row_yields_empty_snapshot() {
451 let conn = open_test_db();
452 let store = LoroStore::new();
453 let snap = store.snapshot(&conn, "Note", "missing").unwrap();
454 // An empty Loro doc still produces a small snapshot with version
455 // bookkeeping — just assert it round-trips, not its size.
456 let store2 = LoroStore::new();
457 store2
458 .apply_remote_update(&conn, "Note", "missing", &fields(), &snap)
459 .unwrap();
460 }
461
462 #[test]
463 fn remote_update_merges_with_local_state() {
464 let conn = open_test_db();
465
466 // Server has a row with title=A, qty=1.
467 let server = LoroStore::new();
468 server
469 .apply_patch(
470 &conn,
471 "Note",
472 "n1",
473 &fields(),
474 &serde_json::json!({"title": "A", "qty": 1}),
475 )
476 .unwrap();
477 let server_snap = server.snapshot(&conn, "Note", "n1").unwrap();
478
479 // A different LoroStore (think: peer / replica) starts from a
480 // fresh DB, applies the snapshot, then makes a divergent edit.
481 let conn2 = open_test_db();
482 let peer = LoroStore::new();
483 peer.apply_remote_update(&conn2, "Note", "n1", &fields(), &server_snap)
484 .unwrap();
485 peer.apply_patch(
486 &conn2,
487 "Note",
488 "n1",
489 &fields(),
490 &serde_json::json!({"qty": 2}),
491 )
492 .unwrap();
493 let peer_update = peer.snapshot(&conn2, "Note", "n1").unwrap();
494
495 // Server applies the peer's update. Both fields converge.
496 let projected = server
497 .apply_remote_update(&conn, "Note", "n1", &fields(), &peer_update)
498 .unwrap();
499 assert_eq!(projected["title"], "A");
500 assert_eq!(projected["qty"], 2.0);
501 }
502
503 #[test]
504 fn concurrent_text_writes_converge() {
505 let conn_a = open_test_db();
506 let conn_b = open_test_db();
507 let a = LoroStore::new();
508 let b = LoroStore::new();
509
510 a.apply_patch(
511 &conn_a,
512 "Note",
513 "n1",
514 &fields(),
515 &serde_json::json!({"body": "from-a"}),
516 )
517 .unwrap();
518 b.apply_patch(
519 &conn_b,
520 "Note",
521 "n1",
522 &fields(),
523 &serde_json::json!({"body": "from-b"}),
524 )
525 .unwrap();
526
527 let snap_a = a.snapshot(&conn_a, "Note", "n1").unwrap();
528 let snap_b = b.snapshot(&conn_b, "Note", "n1").unwrap();
529
530 let projected_a = a
531 .apply_remote_update(&conn_a, "Note", "n1", &fields(), &snap_b)
532 .unwrap();
533 let projected_b = b
534 .apply_remote_update(&conn_b, "Note", "n1", &fields(), &snap_a)
535 .unwrap();
536
537 // Both stores converge to the same byte-for-byte state.
538 assert_eq!(projected_a, projected_b);
539 let body = projected_a["body"].as_str().unwrap();
540 assert!(!body.is_empty(), "body should contain merged text");
541 }
542
543 #[test]
544 fn incremental_update_carries_only_delta() {
545 let conn = open_test_db();
546 let store = LoroStore::new();
547
548 store
549 .apply_patch(
550 &conn,
551 "Note",
552 "n1",
553 &fields(),
554 &serde_json::json!({"title": "v1", "qty": 1}),
555 )
556 .unwrap();
557
558 // Snapshot before the next edit — represents what a connected
559 // peer has already seen.
560 let early_vv = {
561 let handle = store.get_or_hydrate(&conn, "Note", "n1").unwrap();
562 let vv = handle.lock().unwrap().oplog_vv();
563 vv
564 };
565 let snap_full = store.snapshot(&conn, "Note", "n1").unwrap();
566
567 store
568 .apply_patch(
569 &conn,
570 "Note",
571 "n1",
572 &fields(),
573 &serde_json::json!({"qty": 7}),
574 )
575 .unwrap();
576
577 let delta = store.update_since(&conn, "Note", "n1", &early_vv).unwrap();
578 assert!(
579 delta.len() < snap_full.len(),
580 "incremental delta ({}) must be smaller than full snapshot ({})",
581 delta.len(),
582 snap_full.len()
583 );
584 }
585
586 #[test]
587 fn cache_keeps_distinct_rows_separate() {
588 let conn = open_test_db();
589 let store = LoroStore::new();
590 store
591 .apply_patch(
592 &conn,
593 "Note",
594 "n1",
595 &fields(),
596 &serde_json::json!({"title": "first"}),
597 )
598 .unwrap();
599 store
600 .apply_patch(
601 &conn,
602 "Note",
603 "n2",
604 &fields(),
605 &serde_json::json!({"title": "second"}),
606 )
607 .unwrap();
608 assert_eq!(store.cached_rows(), 2);
609
610 let p1 = store
611 .apply_patch(&conn, "Note", "n1", &fields(), &serde_json::json!({}))
612 .unwrap();
613 let p2 = store
614 .apply_patch(&conn, "Note", "n2", &fields(), &serde_json::json!({}))
615 .unwrap();
616 assert_eq!(p1["title"], "first");
617 assert_eq!(p2["title"], "second");
618 }
619}