Skip to main content

pylon_runtime/
loro_store.rs

1//! Server-side per-row LoroDoc cache with snapshot persistence.
2//!
3//! For CRDT-backed entities (`crdt: true` in the manifest, the default),
4//! every row corresponds to one [`LoroDoc`]. This store owns those docs
5//! in memory, hydrates them on demand from a sidecar SQLite table,
6//! write-throughs every commit, and projects the doc state into the JSON
7//! shape Pylon's existing storage layer expects.
8//!
9//! # Persistence shape
10//!
11//! Single sidecar table:
12//!
13//! ```sql
14//! CREATE TABLE _pylon_crdt_snapshots (
15//!     entity     TEXT NOT NULL,
16//!     row_id     TEXT NOT NULL,
17//!     snapshot   BLOB NOT NULL,
18//!     updated_at TEXT NOT NULL,
19//!     PRIMARY KEY (entity, row_id)
20//! );
21//! ```
22//!
23//! Snapshots are full-state Loro snapshots (`ExportMode::Snapshot`).
24//! Loro applies internal compaction so the snapshot size stays bounded;
25//! we don't track an op log separately.
26//!
27//! # In-memory cache
28//!
29//! Active rows live in a `HashMap<(entity, row_id), Arc<Mutex<LoroDoc>>>`.
30//! First access for a row hydrates the doc from the sidecar (or creates
31//! a fresh one). Subsequent accesses reuse the in-memory doc — required
32//! both for correctness (Loro's CRDT identity is per-doc-instance) and
33//! perf (snapshot decode is ~100µs per row).
34//!
35//! No eviction yet. Working sets up to ~100K active rows are fine on
36//! commodity hardware (~5-50 MB). For larger working sets a follow-up
37//! adds LRU eviction with snapshot reload on next access.
38//!
39//! # Bandwidth: full snapshot per write (TODO)
40//!
41//! Every CRDT-mode write triggers a binary WS broadcast carrying the
42//! row's *full* current snapshot, not just the incremental update.
43//! Loro's compaction bounds individual snapshots, but the per-write
44//! cost still scales with total state size, not write size.
45//!
46//! Concrete numbers:
47//!
48//! | Workload                           | Snapshot/row | Per-write fanout |
49//! |------------------------------------|--------------|------------------|
50//! | Chat message                       | ~200 B       | tiny             |
51//! | Boring CRUD record                 | ~500 B       | tiny             |
52//! | Whiteboard with 1k strokes         | ~30 KB       | uncomfortable    |
53//! | Document with 50K-char body        | ~80 KB       | bad              |
54//!
55//! Multiply by `connected_clients × writes_per_second` to get total
56//! broadcast bandwidth. For chat-shaped workloads it's free. For collab
57//! whiteboards / large documents it bites once you pass ~10 connected
58//! clients on a hot row.
59//!
60//! # Switching to incremental updates
61//!
62//! Loro already supports `export(ExportMode::updates(version_vector))`
63//! returning only the ops a peer hasn't seen — the building block is
64//! there. What's missing is the per-client tracking:
65//!
66//! 1. Subscribe protocol — clients tell the server "I want updates for
67//!    rows X, Y, Z" instead of every CRDT write fanning out to every
68//!    client. Pylon's existing room layer is the natural transport
69//!    once room semantics extend to per-row subscriptions.
70//! 2. Server-side state — `(client_id, entity, row_id) → version_vector`
71//!    so the server knows what each client is missing. Bounded by the
72//!    subscribe set; LRU-evicted with the doc cache.
73//! 3. Encoder swap — `notify_crdt` calls `encode_update_since(vv)`
74//!    instead of `encode_snapshot()` and ships frame type `0x11`
75//!    (CRDT_FRAME_UPDATE) instead of `0x10` (CRDT_FRAME_SNAPSHOT).
76//!    Wire format already reserves both bytes.
77//! 4. New-subscriber bootstrap — first frame is still a snapshot
78//!    (`0x10`), subsequent frames are deltas (`0x11`).
79//!
80//! Estimated effort: ~2 days for a working slice plus a week of
81//! production hardening (correct VV tracking under reconnects,
82//! garbage-collecting subscriptions on disconnect, handling missed
83//! frames via resync request).
84//!
85//! Until then this implementation is fine for chat / boring CRUD /
86//! demo workloads. Don't run a Figma clone on it.
87
88use std::collections::HashMap;
89use std::sync::{Arc, Mutex};
90
91use pylon_crdt::{
92    apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
93    loro::{LoroDoc, VersionVector},
94    project_doc_to_json, CrdtField,
95};
96use rusqlite::{params, Connection};
97use serde_json::Value;
98
99// ---------------------------------------------------------------------------
100// Sidecar table
101// ---------------------------------------------------------------------------
102
103/// SQL to create the snapshot sidecar. Idempotent. Called by Runtime
104/// constructor for any database where CRDT mode could be in use (always,
105/// since `crdt: true` is the default).
106pub const CREATE_SIDECAR_SQL: &str = "
107CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (
108    entity     TEXT NOT NULL,
109    row_id     TEXT NOT NULL,
110    snapshot   BLOB NOT NULL,
111    updated_at TEXT NOT NULL,
112    PRIMARY KEY (entity, row_id)
113)
114";
115
116/// Create the sidecar table. Safe to call repeatedly.
117pub fn ensure_sidecar(conn: &Connection) -> Result<(), LoroStoreError> {
118    conn.execute(CREATE_SIDECAR_SQL, [])
119        .map(|_| ())
120        .map_err(|e| LoroStoreError::Storage(format!("create sidecar: {e}")))
121}
122
123// ---------------------------------------------------------------------------
124// Errors
125// ---------------------------------------------------------------------------
126
127#[derive(Debug)]
128pub enum LoroStoreError {
129    /// Patch contained a value that didn't match the field's CRDT shape
130    /// (e.g. number on a Bool field). Schema/caller mismatch.
131    Apply(String),
132    /// Storage layer error — sidecar create / read / write failed.
133    Storage(String),
134    /// Loro decode error — corrupted snapshot in the sidecar, or a peer
135    /// sent an invalid binary update. The owning code should surface this
136    /// to the client (for remote updates) or fail loud (for stored snapshots).
137    Decode(String),
138}
139
140impl std::fmt::Display for LoroStoreError {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        match self {
143            Self::Apply(m) => write!(f, "apply: {m}"),
144            Self::Storage(m) => write!(f, "storage: {m}"),
145            Self::Decode(m) => write!(f, "decode: {m}"),
146        }
147    }
148}
149
150impl std::error::Error for LoroStoreError {}
151
152/// Lift a `DataError` into a `LoroStoreError::Storage` so closures
153/// passed to `PostgresDataStore::with_client` can return
154/// `LoroStoreError` directly. The `with_client` bound requires
155/// `E: From<DataError>` so the lock-poisoning case can fan out into
156/// the caller's error type.
157impl From<pylon_http::DataError> for LoroStoreError {
158    fn from(e: pylon_http::DataError) -> Self {
159        LoroStoreError::Storage(format!("[{}] {}", e.code, e.message))
160    }
161}
162
163// ---------------------------------------------------------------------------
164// Store
165// ---------------------------------------------------------------------------
166
167/// Server-side per-row LoroDoc cache + persistence layer.
168///
169/// One instance per Runtime. Cheap to clone via [`Arc`]; internally
170/// guards a `HashMap` of doc handles, each itself behind a `Mutex` so
171/// concurrent access to *different* rows doesn't contend.
172#[derive(Default)]
173pub struct LoroStore {
174    /// Per-row cache. The outer Mutex guards lookup; the inner Mutex
175    /// guards mutation of the specific doc. We hold the outer briefly
176    /// (insert/lookup), then release before doing any Loro work, so two
177    /// requests targeting different rows never block each other.
178    docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
179}
180
181impl LoroStore {
182    pub fn new() -> Self {
183        Self::default()
184    }
185
186    /// Get the cached doc for a row, hydrating from the sidecar if absent.
187    /// Returns a freshly-created doc if the row has no snapshot yet.
188    fn get_or_hydrate(
189        &self,
190        conn: &Connection,
191        entity: &str,
192        row_id: &str,
193    ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
194        let key = (entity.to_string(), row_id.to_string());
195
196        // Fast path: already cached.
197        {
198            let guard = self.docs.lock().unwrap();
199            if let Some(doc) = guard.get(&key) {
200                return Ok(Arc::clone(doc));
201            }
202        }
203
204        // Slow path: hydrate (or create fresh) outside the cache lock.
205        // Two concurrent first-accesses can both do this; the loser's
206        // doc is dropped after the cache check below. Loro's snapshot
207        // decode is deterministic, so both copies are byte-identical;
208        // the race only wastes a microsecond, never produces divergence.
209        let snapshot: Option<Vec<u8>> = conn
210            .query_row(
211                "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = ?1 AND row_id = ?2",
212                params![entity, row_id],
213                |r| r.get(0),
214            )
215            .map(Some)
216            .or_else(|e| {
217                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
218                    Ok(None)
219                } else {
220                    Err(LoroStoreError::Storage(format!("read snapshot: {e}")))
221                }
222            })?;
223
224        let doc = LoroDoc::new();
225        if let Some(bytes) = snapshot {
226            crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
227        }
228        let handle = Arc::new(Mutex::new(doc));
229
230        // Re-acquire cache lock and publish, but defer to whatever's
231        // already there if we lost the race.
232        let mut guard = self.docs.lock().unwrap();
233        let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
234        Ok(Arc::clone(entry))
235    }
236
237    /// Persist the current snapshot for a row to the sidecar. Called
238    /// after every commit. Synchronous; tests rely on read-after-write.
239    fn persist_snapshot(
240        &self,
241        conn: &Connection,
242        entity: &str,
243        row_id: &str,
244        doc: &LoroDoc,
245    ) -> Result<(), LoroStoreError> {
246        let snap = encode_snapshot(doc);
247        let now = chrono_now_iso();
248        conn.execute(
249            "INSERT OR REPLACE INTO _pylon_crdt_snapshots
250                (entity, row_id, snapshot, updated_at)
251             VALUES (?1, ?2, ?3, ?4)",
252            params![entity, row_id, snap, now],
253        )
254        .map(|_| ())
255        .map_err(|e| LoroStoreError::Storage(format!("persist snapshot: {e}")))
256    }
257
258    /// Apply a JSON `{field: value}` patch to the row's doc, persist the
259    /// new snapshot, and return the projected JSON (the row shape SQLite
260    /// stores in the materialized view).
261    pub fn apply_patch(
262        &self,
263        conn: &Connection,
264        entity: &str,
265        row_id: &str,
266        fields: &[CrdtField],
267        patch: &Value,
268    ) -> Result<Value, LoroStoreError> {
269        let handle = self.get_or_hydrate(conn, entity, row_id)?;
270        let projected = {
271            let doc = handle.lock().unwrap();
272            apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
273            self.persist_snapshot(conn, entity, row_id, &doc)?;
274            project_doc_to_json(&doc, fields)
275        };
276        Ok(projected)
277    }
278
279    /// Apply a binary update from a peer (typed-protocol client push or
280    /// server-to-server replication). Persists the new snapshot. Returns
281    /// the projected JSON for SQLite materialization so the materialized
282    /// view stays in sync with the CRDT after remote-driven changes.
283    pub fn apply_remote_update(
284        &self,
285        conn: &Connection,
286        entity: &str,
287        row_id: &str,
288        fields: &[CrdtField],
289        update: &[u8],
290    ) -> Result<Value, LoroStoreError> {
291        let handle = self.get_or_hydrate(conn, entity, row_id)?;
292        let projected = {
293            let doc = handle.lock().unwrap();
294            crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
295            self.persist_snapshot(conn, entity, row_id, &doc)?;
296            project_doc_to_json(&doc, fields)
297        };
298        Ok(projected)
299    }
300
301    /// Get the full snapshot for a row. Sent to a fresh client when it
302    /// subscribes. Returns an empty `Vec` for rows that don't exist yet.
303    pub fn snapshot(
304        &self,
305        conn: &Connection,
306        entity: &str,
307        row_id: &str,
308    ) -> Result<Vec<u8>, LoroStoreError> {
309        let handle = self.get_or_hydrate(conn, entity, row_id)?;
310        let doc = handle.lock().unwrap();
311        Ok(encode_snapshot(&doc))
312    }
313
314    /// Get an incremental update since `since` — only the ops the peer
315    /// hasn't seen. Used to catch up a peer that's been disconnected.
316    pub fn update_since(
317        &self,
318        conn: &Connection,
319        entity: &str,
320        row_id: &str,
321        since: &VersionVector,
322    ) -> Result<Vec<u8>, LoroStoreError> {
323        let handle = self.get_or_hydrate(conn, entity, row_id)?;
324        let doc = handle.lock().unwrap();
325        Ok(encode_update_since(&doc, since))
326    }
327
328    /// Drop a row's doc from the in-memory cache. Useful for tests and
329    /// for the eventual eviction policy. Doesn't touch the sidecar; the
330    /// next read will re-hydrate from disk.
331    pub fn evict(&self, entity: &str, row_id: &str) {
332        self.docs
333            .lock()
334            .unwrap()
335            .remove(&(entity.to_string(), row_id.to_string()));
336    }
337
338    /// Number of rows currently held in memory. Diagnostic.
339    pub fn cached_rows(&self) -> usize {
340        self.docs.lock().unwrap().len()
341    }
342}
343
344// ---------------------------------------------------------------------------
345// Helpers
346// ---------------------------------------------------------------------------
347
348fn chrono_now_iso() -> String {
349    use std::time::{SystemTime, UNIX_EPOCH};
350    let secs = SystemTime::now()
351        .duration_since(UNIX_EPOCH)
352        .map(|d| d.as_secs())
353        .unwrap_or(0);
354    format!("{}Z", secs)
355}
356
357// ---------------------------------------------------------------------------
358// Tests
359// ---------------------------------------------------------------------------
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364    use pylon_crdt::CrdtFieldKind;
365
366    fn open_test_db() -> Connection {
367        let conn = Connection::open_in_memory().unwrap();
368        ensure_sidecar(&conn).unwrap();
369        conn
370    }
371
372    fn fields() -> Vec<CrdtField> {
373        vec![
374            CrdtField {
375                name: "title".into(),
376                kind: CrdtFieldKind::LwwString,
377            },
378            CrdtField {
379                name: "body".into(),
380                kind: CrdtFieldKind::Text,
381            },
382            CrdtField {
383                name: "qty".into(),
384                kind: CrdtFieldKind::LwwNumber,
385            },
386        ]
387    }
388
389    #[test]
390    fn sidecar_is_idempotent() {
391        let conn = open_test_db();
392        ensure_sidecar(&conn).unwrap(); // Re-create OK.
393    }
394
395    #[test]
396    fn apply_patch_persists_and_projects() {
397        let conn = open_test_db();
398        let store = LoroStore::new();
399        let projected = store
400            .apply_patch(
401                &conn,
402                "Note",
403                "n1",
404                &fields(),
405                &serde_json::json!({"title": "Hello", "body": "world", "qty": 7}),
406            )
407            .unwrap();
408        assert_eq!(projected["title"], "Hello");
409        assert_eq!(projected["body"], "world");
410        assert_eq!(projected["qty"], 7.0);
411
412        // Sidecar row exists.
413        let count: i64 = conn
414            .query_row(
415                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity='Note' AND row_id='n1'",
416                [],
417                |r| r.get(0),
418            )
419            .unwrap();
420        assert_eq!(count, 1);
421    }
422
423    #[test]
424    fn second_open_hydrates_from_sidecar() {
425        let conn = open_test_db();
426        let store = LoroStore::new();
427        store
428            .apply_patch(
429                &conn,
430                "Note",
431                "n1",
432                &fields(),
433                &serde_json::json!({"title": "A", "qty": 1}),
434            )
435            .unwrap();
436
437        // Drop in-memory cache; next read must rehydrate from disk.
438        store.evict("Note", "n1");
439        assert_eq!(store.cached_rows(), 0);
440
441        let snap = store.snapshot(&conn, "Note", "n1").unwrap();
442        assert!(
443            !snap.is_empty(),
444            "snapshot should be non-empty after writes"
445        );
446        assert_eq!(store.cached_rows(), 1, "snapshot() rehydrated the cache");
447    }
448
449    #[test]
450    fn empty_row_yields_empty_snapshot() {
451        let conn = open_test_db();
452        let store = LoroStore::new();
453        let snap = store.snapshot(&conn, "Note", "missing").unwrap();
454        // An empty Loro doc still produces a small snapshot with version
455        // bookkeeping — just assert it round-trips, not its size.
456        let store2 = LoroStore::new();
457        store2
458            .apply_remote_update(&conn, "Note", "missing", &fields(), &snap)
459            .unwrap();
460    }
461
462    #[test]
463    fn remote_update_merges_with_local_state() {
464        let conn = open_test_db();
465
466        // Server has a row with title=A, qty=1.
467        let server = LoroStore::new();
468        server
469            .apply_patch(
470                &conn,
471                "Note",
472                "n1",
473                &fields(),
474                &serde_json::json!({"title": "A", "qty": 1}),
475            )
476            .unwrap();
477        let server_snap = server.snapshot(&conn, "Note", "n1").unwrap();
478
479        // A different LoroStore (think: peer / replica) starts from a
480        // fresh DB, applies the snapshot, then makes a divergent edit.
481        let conn2 = open_test_db();
482        let peer = LoroStore::new();
483        peer.apply_remote_update(&conn2, "Note", "n1", &fields(), &server_snap)
484            .unwrap();
485        peer.apply_patch(
486            &conn2,
487            "Note",
488            "n1",
489            &fields(),
490            &serde_json::json!({"qty": 2}),
491        )
492        .unwrap();
493        let peer_update = peer.snapshot(&conn2, "Note", "n1").unwrap();
494
495        // Server applies the peer's update. Both fields converge.
496        let projected = server
497            .apply_remote_update(&conn, "Note", "n1", &fields(), &peer_update)
498            .unwrap();
499        assert_eq!(projected["title"], "A");
500        assert_eq!(projected["qty"], 2.0);
501    }
502
503    #[test]
504    fn concurrent_text_writes_converge() {
505        let conn_a = open_test_db();
506        let conn_b = open_test_db();
507        let a = LoroStore::new();
508        let b = LoroStore::new();
509
510        a.apply_patch(
511            &conn_a,
512            "Note",
513            "n1",
514            &fields(),
515            &serde_json::json!({"body": "from-a"}),
516        )
517        .unwrap();
518        b.apply_patch(
519            &conn_b,
520            "Note",
521            "n1",
522            &fields(),
523            &serde_json::json!({"body": "from-b"}),
524        )
525        .unwrap();
526
527        let snap_a = a.snapshot(&conn_a, "Note", "n1").unwrap();
528        let snap_b = b.snapshot(&conn_b, "Note", "n1").unwrap();
529
530        let projected_a = a
531            .apply_remote_update(&conn_a, "Note", "n1", &fields(), &snap_b)
532            .unwrap();
533        let projected_b = b
534            .apply_remote_update(&conn_b, "Note", "n1", &fields(), &snap_a)
535            .unwrap();
536
537        // Both stores converge to the same byte-for-byte state.
538        assert_eq!(projected_a, projected_b);
539        let body = projected_a["body"].as_str().unwrap();
540        assert!(!body.is_empty(), "body should contain merged text");
541    }
542
543    #[test]
544    fn incremental_update_carries_only_delta() {
545        let conn = open_test_db();
546        let store = LoroStore::new();
547
548        store
549            .apply_patch(
550                &conn,
551                "Note",
552                "n1",
553                &fields(),
554                &serde_json::json!({"title": "v1", "qty": 1}),
555            )
556            .unwrap();
557
558        // Snapshot before the next edit — represents what a connected
559        // peer has already seen.
560        let early_vv = {
561            let handle = store.get_or_hydrate(&conn, "Note", "n1").unwrap();
562            let vv = handle.lock().unwrap().oplog_vv();
563            vv
564        };
565        let snap_full = store.snapshot(&conn, "Note", "n1").unwrap();
566
567        store
568            .apply_patch(
569                &conn,
570                "Note",
571                "n1",
572                &fields(),
573                &serde_json::json!({"qty": 7}),
574            )
575            .unwrap();
576
577        let delta = store.update_since(&conn, "Note", "n1", &early_vv).unwrap();
578        assert!(
579            delta.len() < snap_full.len(),
580            "incremental delta ({}) must be smaller than full snapshot ({})",
581            delta.len(),
582            snap_full.len()
583        );
584    }
585
586    #[test]
587    fn cache_keeps_distinct_rows_separate() {
588        let conn = open_test_db();
589        let store = LoroStore::new();
590        store
591            .apply_patch(
592                &conn,
593                "Note",
594                "n1",
595                &fields(),
596                &serde_json::json!({"title": "first"}),
597            )
598            .unwrap();
599        store
600            .apply_patch(
601                &conn,
602                "Note",
603                "n2",
604                &fields(),
605                &serde_json::json!({"title": "second"}),
606            )
607            .unwrap();
608        assert_eq!(store.cached_rows(), 2);
609
610        let p1 = store
611            .apply_patch(&conn, "Note", "n1", &fields(), &serde_json::json!({}))
612            .unwrap();
613        let p2 = store
614            .apply_patch(&conn, "Note", "n2", &fields(), &serde_json::json!({}))
615            .unwrap();
616        assert_eq!(p1["title"], "first");
617        assert_eq!(p2["title"], "second");
618    }
619}