Skip to main content

pylon_runtime/
loro_store.rs

1//! Server-side per-row LoroDoc cache with snapshot persistence.
2//!
3//! For CRDT-backed entities (`crdt: true` in the manifest, the default),
4//! every row corresponds to one [`LoroDoc`]. This store owns those docs
5//! in memory, hydrates them on demand from a sidecar SQLite table,
6//! write-throughs every commit, and projects the doc state into the JSON
7//! shape Pylon's existing storage layer expects.
8//!
9//! # Persistence shape
10//!
11//! Single sidecar table:
12//!
13//! ```sql
14//! CREATE TABLE _pylon_crdt_snapshots (
15//!     entity     TEXT NOT NULL,
16//!     row_id     TEXT NOT NULL,
17//!     snapshot   BLOB NOT NULL,
18//!     updated_at TEXT NOT NULL,
19//!     PRIMARY KEY (entity, row_id)
20//! );
21//! ```
22//!
23//! Snapshots are full-state Loro snapshots (`ExportMode::Snapshot`).
24//! Loro applies internal compaction so the snapshot size stays bounded;
25//! we don't track an op log separately.
26//!
27//! # In-memory cache
28//!
29//! Active rows live in a `HashMap<(entity, row_id), Arc<Mutex<LoroDoc>>>`.
30//! First access for a row hydrates the doc from the sidecar (or creates
31//! a fresh one). Subsequent accesses reuse the in-memory doc — required
32//! both for correctness (Loro's CRDT identity is per-doc-instance) and
33//! perf (snapshot decode is ~100µs per row).
34//!
35//! No eviction yet. Working sets up to ~100K active rows are fine on
36//! commodity hardware (~5-50 MB). For larger working sets a follow-up
37//! adds LRU eviction with snapshot reload on next access.
38//!
39//! # Bandwidth: full snapshot per write (TODO)
40//!
41//! Every CRDT-mode write triggers a binary WS broadcast carrying the
42//! row's *full* current snapshot, not just the incremental update.
43//! Loro's compaction bounds individual snapshots, but the per-write
44//! cost still scales with total state size, not write size.
45//!
46//! Concrete numbers:
47//!
48//! | Workload                           | Snapshot/row | Per-write fanout |
49//! |------------------------------------|--------------|------------------|
50//! | Chat message                       | ~200 B       | tiny             |
51//! | Boring CRUD record                 | ~500 B       | tiny             |
52//! | Whiteboard with 1k strokes         | ~30 KB       | uncomfortable    |
53//! | Document with 50K-char body        | ~80 KB       | bad              |
54//!
55//! Multiply by `connected_clients × writes_per_second` to get total
56//! broadcast bandwidth. For chat-shaped workloads it's free. For collab
57//! whiteboards / large documents it bites once you pass ~10 connected
58//! clients on a hot row.
59//!
60//! # Switching to incremental updates
61//!
62//! Loro already supports `export(ExportMode::updates(version_vector))`
63//! returning only the ops a peer hasn't seen — the building block is
64//! there. What's missing is the per-client tracking:
65//!
66//! 1. Subscribe protocol — clients tell the server "I want updates for
67//!    rows X, Y, Z" instead of every CRDT write fanning out to every
68//!    client. Pylon's existing room layer is the natural transport
69//!    once room semantics extend to per-row subscriptions.
70//! 2. Server-side state — `(client_id, entity, row_id) → version_vector`
71//!    so the server knows what each client is missing. Bounded by the
72//!    subscribe set; LRU-evicted with the doc cache.
73//! 3. Encoder swap — `notify_crdt` calls `encode_update_since(vv)`
74//!    instead of `encode_snapshot()` and ships frame type `0x11`
75//!    (CRDT_FRAME_UPDATE) instead of `0x10` (CRDT_FRAME_SNAPSHOT).
76//!    Wire format already reserves both bytes.
77//! 4. New-subscriber bootstrap — first frame is still a snapshot
78//!    (`0x10`), subsequent frames are deltas (`0x11`).
79//!
80//! Estimated effort: ~2 days for a working slice plus a week of
81//! production hardening (correct VV tracking under reconnects,
82//! garbage-collecting subscriptions on disconnect, handling missed
83//! frames via resync request).
84//!
85//! Until then this implementation is fine for chat / boring CRUD /
86//! demo workloads. Don't run a Figma clone on it.
87
88use std::collections::HashMap;
89use std::sync::{Arc, Mutex};
90
91use pylon_crdt::{
92    apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
93    loro::{LoroDoc, VersionVector},
94    project_doc_to_json, CrdtField,
95};
96use rusqlite::{params, Connection};
97use serde_json::Value;
98
99// ---------------------------------------------------------------------------
100// Sidecar table
101// ---------------------------------------------------------------------------
102
103/// SQL to create the snapshot sidecar. Idempotent. Called by Runtime
104/// constructor for any database where CRDT mode could be in use (always,
105/// since `crdt: true` is the default).
106pub const CREATE_SIDECAR_SQL: &str = "
107CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (
108    entity     TEXT NOT NULL,
109    row_id     TEXT NOT NULL,
110    snapshot   BLOB NOT NULL,
111    updated_at TEXT NOT NULL,
112    PRIMARY KEY (entity, row_id)
113)
114";
115
116/// Create the sidecar table. Safe to call repeatedly.
117pub fn ensure_sidecar(conn: &Connection) -> Result<(), LoroStoreError> {
118    conn.execute(CREATE_SIDECAR_SQL, [])
119        .map(|_| ())
120        .map_err(|e| LoroStoreError::Storage(format!("create sidecar: {e}")))
121}
122
123// ---------------------------------------------------------------------------
124// Errors
125// ---------------------------------------------------------------------------
126
127#[derive(Debug)]
128pub enum LoroStoreError {
129    /// Patch contained a value that didn't match the field's CRDT shape
130    /// (e.g. number on a Bool field). Schema/caller mismatch.
131    Apply(String),
132    /// Storage layer error — sidecar create / read / write failed.
133    Storage(String),
134    /// Loro decode error — corrupted snapshot in the sidecar, or a peer
135    /// sent an invalid binary update. The owning code should surface this
136    /// to the client (for remote updates) or fail loud (for stored snapshots).
137    Decode(String),
138}
139
140impl std::fmt::Display for LoroStoreError {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        match self {
143            Self::Apply(m) => write!(f, "apply: {m}"),
144            Self::Storage(m) => write!(f, "storage: {m}"),
145            Self::Decode(m) => write!(f, "decode: {m}"),
146        }
147    }
148}
149
150impl std::error::Error for LoroStoreError {}
151
152// ---------------------------------------------------------------------------
153// Store
154// ---------------------------------------------------------------------------
155
156/// Server-side per-row LoroDoc cache + persistence layer.
157///
158/// One instance per Runtime. Cheap to clone via [`Arc`]; internally
159/// guards a `HashMap` of doc handles, each itself behind a `Mutex` so
160/// concurrent access to *different* rows doesn't contend.
161#[derive(Default)]
162pub struct LoroStore {
163    /// Per-row cache. The outer Mutex guards lookup; the inner Mutex
164    /// guards mutation of the specific doc. We hold the outer briefly
165    /// (insert/lookup), then release before doing any Loro work, so two
166    /// requests targeting different rows never block each other.
167    docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
168}
169
170impl LoroStore {
171    pub fn new() -> Self {
172        Self::default()
173    }
174
175    /// Get the cached doc for a row, hydrating from the sidecar if absent.
176    /// Returns a freshly-created doc if the row has no snapshot yet.
177    fn get_or_hydrate(
178        &self,
179        conn: &Connection,
180        entity: &str,
181        row_id: &str,
182    ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
183        let key = (entity.to_string(), row_id.to_string());
184
185        // Fast path: already cached.
186        {
187            let guard = self.docs.lock().unwrap();
188            if let Some(doc) = guard.get(&key) {
189                return Ok(Arc::clone(doc));
190            }
191        }
192
193        // Slow path: hydrate (or create fresh) outside the cache lock.
194        // Two concurrent first-accesses can both do this; the loser's
195        // doc is dropped after the cache check below. Loro's snapshot
196        // decode is deterministic, so both copies are byte-identical;
197        // the race only wastes a microsecond, never produces divergence.
198        let snapshot: Option<Vec<u8>> = conn
199            .query_row(
200                "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = ?1 AND row_id = ?2",
201                params![entity, row_id],
202                |r| r.get(0),
203            )
204            .map(Some)
205            .or_else(|e| {
206                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
207                    Ok(None)
208                } else {
209                    Err(LoroStoreError::Storage(format!("read snapshot: {e}")))
210                }
211            })?;
212
213        let doc = LoroDoc::new();
214        if let Some(bytes) = snapshot {
215            crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
216        }
217        let handle = Arc::new(Mutex::new(doc));
218
219        // Re-acquire cache lock and publish, but defer to whatever's
220        // already there if we lost the race.
221        let mut guard = self.docs.lock().unwrap();
222        let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
223        Ok(Arc::clone(entry))
224    }
225
226    /// Persist the current snapshot for a row to the sidecar. Called
227    /// after every commit. Synchronous; tests rely on read-after-write.
228    fn persist_snapshot(
229        &self,
230        conn: &Connection,
231        entity: &str,
232        row_id: &str,
233        doc: &LoroDoc,
234    ) -> Result<(), LoroStoreError> {
235        let snap = encode_snapshot(doc);
236        let now = chrono_now_iso();
237        conn.execute(
238            "INSERT OR REPLACE INTO _pylon_crdt_snapshots
239                (entity, row_id, snapshot, updated_at)
240             VALUES (?1, ?2, ?3, ?4)",
241            params![entity, row_id, snap, now],
242        )
243        .map(|_| ())
244        .map_err(|e| LoroStoreError::Storage(format!("persist snapshot: {e}")))
245    }
246
247    /// Apply a JSON `{field: value}` patch to the row's doc, persist the
248    /// new snapshot, and return the projected JSON (the row shape SQLite
249    /// stores in the materialized view).
250    pub fn apply_patch(
251        &self,
252        conn: &Connection,
253        entity: &str,
254        row_id: &str,
255        fields: &[CrdtField],
256        patch: &Value,
257    ) -> Result<Value, LoroStoreError> {
258        let handle = self.get_or_hydrate(conn, entity, row_id)?;
259        let projected = {
260            let doc = handle.lock().unwrap();
261            apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
262            self.persist_snapshot(conn, entity, row_id, &doc)?;
263            project_doc_to_json(&doc, fields)
264        };
265        Ok(projected)
266    }
267
268    /// Apply a binary update from a peer (typed-protocol client push or
269    /// server-to-server replication). Persists the new snapshot. Returns
270    /// the projected JSON for SQLite materialization so the materialized
271    /// view stays in sync with the CRDT after remote-driven changes.
272    pub fn apply_remote_update(
273        &self,
274        conn: &Connection,
275        entity: &str,
276        row_id: &str,
277        fields: &[CrdtField],
278        update: &[u8],
279    ) -> Result<Value, LoroStoreError> {
280        let handle = self.get_or_hydrate(conn, entity, row_id)?;
281        let projected = {
282            let doc = handle.lock().unwrap();
283            crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
284            self.persist_snapshot(conn, entity, row_id, &doc)?;
285            project_doc_to_json(&doc, fields)
286        };
287        Ok(projected)
288    }
289
290    /// Get the full snapshot for a row. Sent to a fresh client when it
291    /// subscribes. Returns an empty `Vec` for rows that don't exist yet.
292    pub fn snapshot(
293        &self,
294        conn: &Connection,
295        entity: &str,
296        row_id: &str,
297    ) -> Result<Vec<u8>, LoroStoreError> {
298        let handle = self.get_or_hydrate(conn, entity, row_id)?;
299        let doc = handle.lock().unwrap();
300        Ok(encode_snapshot(&doc))
301    }
302
303    /// Get an incremental update since `since` — only the ops the peer
304    /// hasn't seen. Used to catch up a peer that's been disconnected.
305    pub fn update_since(
306        &self,
307        conn: &Connection,
308        entity: &str,
309        row_id: &str,
310        since: &VersionVector,
311    ) -> Result<Vec<u8>, LoroStoreError> {
312        let handle = self.get_or_hydrate(conn, entity, row_id)?;
313        let doc = handle.lock().unwrap();
314        Ok(encode_update_since(&doc, since))
315    }
316
317    /// Drop a row's doc from the in-memory cache. Useful for tests and
318    /// for the eventual eviction policy. Doesn't touch the sidecar; the
319    /// next read will re-hydrate from disk.
320    pub fn evict(&self, entity: &str, row_id: &str) {
321        self.docs
322            .lock()
323            .unwrap()
324            .remove(&(entity.to_string(), row_id.to_string()));
325    }
326
327    /// Number of rows currently held in memory. Diagnostic.
328    pub fn cached_rows(&self) -> usize {
329        self.docs.lock().unwrap().len()
330    }
331}
332
333// ---------------------------------------------------------------------------
334// Helpers
335// ---------------------------------------------------------------------------
336
337fn chrono_now_iso() -> String {
338    use std::time::{SystemTime, UNIX_EPOCH};
339    let secs = SystemTime::now()
340        .duration_since(UNIX_EPOCH)
341        .map(|d| d.as_secs())
342        .unwrap_or(0);
343    format!("{}Z", secs)
344}
345
346// ---------------------------------------------------------------------------
347// Tests
348// ---------------------------------------------------------------------------
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use pylon_crdt::CrdtFieldKind;
354
355    fn open_test_db() -> Connection {
356        let conn = Connection::open_in_memory().unwrap();
357        ensure_sidecar(&conn).unwrap();
358        conn
359    }
360
361    fn fields() -> Vec<CrdtField> {
362        vec![
363            CrdtField {
364                name: "title".into(),
365                kind: CrdtFieldKind::LwwString,
366            },
367            CrdtField {
368                name: "body".into(),
369                kind: CrdtFieldKind::Text,
370            },
371            CrdtField {
372                name: "qty".into(),
373                kind: CrdtFieldKind::LwwNumber,
374            },
375        ]
376    }
377
378    #[test]
379    fn sidecar_is_idempotent() {
380        let conn = open_test_db();
381        ensure_sidecar(&conn).unwrap(); // Re-create OK.
382    }
383
384    #[test]
385    fn apply_patch_persists_and_projects() {
386        let conn = open_test_db();
387        let store = LoroStore::new();
388        let projected = store
389            .apply_patch(
390                &conn,
391                "Note",
392                "n1",
393                &fields(),
394                &serde_json::json!({"title": "Hello", "body": "world", "qty": 7}),
395            )
396            .unwrap();
397        assert_eq!(projected["title"], "Hello");
398        assert_eq!(projected["body"], "world");
399        assert_eq!(projected["qty"], 7.0);
400
401        // Sidecar row exists.
402        let count: i64 = conn
403            .query_row(
404                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity='Note' AND row_id='n1'",
405                [],
406                |r| r.get(0),
407            )
408            .unwrap();
409        assert_eq!(count, 1);
410    }
411
412    #[test]
413    fn second_open_hydrates_from_sidecar() {
414        let conn = open_test_db();
415        let store = LoroStore::new();
416        store
417            .apply_patch(
418                &conn,
419                "Note",
420                "n1",
421                &fields(),
422                &serde_json::json!({"title": "A", "qty": 1}),
423            )
424            .unwrap();
425
426        // Drop in-memory cache; next read must rehydrate from disk.
427        store.evict("Note", "n1");
428        assert_eq!(store.cached_rows(), 0);
429
430        let snap = store.snapshot(&conn, "Note", "n1").unwrap();
431        assert!(
432            !snap.is_empty(),
433            "snapshot should be non-empty after writes"
434        );
435        assert_eq!(store.cached_rows(), 1, "snapshot() rehydrated the cache");
436    }
437
438    #[test]
439    fn empty_row_yields_empty_snapshot() {
440        let conn = open_test_db();
441        let store = LoroStore::new();
442        let snap = store.snapshot(&conn, "Note", "missing").unwrap();
443        // An empty Loro doc still produces a small snapshot with version
444        // bookkeeping — just assert it round-trips, not its size.
445        let store2 = LoroStore::new();
446        store2
447            .apply_remote_update(&conn, "Note", "missing", &fields(), &snap)
448            .unwrap();
449    }
450
451    #[test]
452    fn remote_update_merges_with_local_state() {
453        let conn = open_test_db();
454
455        // Server has a row with title=A, qty=1.
456        let server = LoroStore::new();
457        server
458            .apply_patch(
459                &conn,
460                "Note",
461                "n1",
462                &fields(),
463                &serde_json::json!({"title": "A", "qty": 1}),
464            )
465            .unwrap();
466        let server_snap = server.snapshot(&conn, "Note", "n1").unwrap();
467
468        // A different LoroStore (think: peer / replica) starts from a
469        // fresh DB, applies the snapshot, then makes a divergent edit.
470        let conn2 = open_test_db();
471        let peer = LoroStore::new();
472        peer.apply_remote_update(&conn2, "Note", "n1", &fields(), &server_snap)
473            .unwrap();
474        peer.apply_patch(
475            &conn2,
476            "Note",
477            "n1",
478            &fields(),
479            &serde_json::json!({"qty": 2}),
480        )
481        .unwrap();
482        let peer_update = peer.snapshot(&conn2, "Note", "n1").unwrap();
483
484        // Server applies the peer's update. Both fields converge.
485        let projected = server
486            .apply_remote_update(&conn, "Note", "n1", &fields(), &peer_update)
487            .unwrap();
488        assert_eq!(projected["title"], "A");
489        assert_eq!(projected["qty"], 2.0);
490    }
491
492    #[test]
493    fn concurrent_text_writes_converge() {
494        let conn_a = open_test_db();
495        let conn_b = open_test_db();
496        let a = LoroStore::new();
497        let b = LoroStore::new();
498
499        a.apply_patch(
500            &conn_a,
501            "Note",
502            "n1",
503            &fields(),
504            &serde_json::json!({"body": "from-a"}),
505        )
506        .unwrap();
507        b.apply_patch(
508            &conn_b,
509            "Note",
510            "n1",
511            &fields(),
512            &serde_json::json!({"body": "from-b"}),
513        )
514        .unwrap();
515
516        let snap_a = a.snapshot(&conn_a, "Note", "n1").unwrap();
517        let snap_b = b.snapshot(&conn_b, "Note", "n1").unwrap();
518
519        let projected_a = a
520            .apply_remote_update(&conn_a, "Note", "n1", &fields(), &snap_b)
521            .unwrap();
522        let projected_b = b
523            .apply_remote_update(&conn_b, "Note", "n1", &fields(), &snap_a)
524            .unwrap();
525
526        // Both stores converge to the same byte-for-byte state.
527        assert_eq!(projected_a, projected_b);
528        let body = projected_a["body"].as_str().unwrap();
529        assert!(!body.is_empty(), "body should contain merged text");
530    }
531
532    #[test]
533    fn incremental_update_carries_only_delta() {
534        let conn = open_test_db();
535        let store = LoroStore::new();
536
537        store
538            .apply_patch(
539                &conn,
540                "Note",
541                "n1",
542                &fields(),
543                &serde_json::json!({"title": "v1", "qty": 1}),
544            )
545            .unwrap();
546
547        // Snapshot before the next edit — represents what a connected
548        // peer has already seen.
549        let early_vv = {
550            let handle = store.get_or_hydrate(&conn, "Note", "n1").unwrap();
551            let vv = handle.lock().unwrap().oplog_vv();
552            vv
553        };
554        let snap_full = store.snapshot(&conn, "Note", "n1").unwrap();
555
556        store
557            .apply_patch(
558                &conn,
559                "Note",
560                "n1",
561                &fields(),
562                &serde_json::json!({"qty": 7}),
563            )
564            .unwrap();
565
566        let delta = store.update_since(&conn, "Note", "n1", &early_vv).unwrap();
567        assert!(
568            delta.len() < snap_full.len(),
569            "incremental delta ({}) must be smaller than full snapshot ({})",
570            delta.len(),
571            snap_full.len()
572        );
573    }
574
575    #[test]
576    fn cache_keeps_distinct_rows_separate() {
577        let conn = open_test_db();
578        let store = LoroStore::new();
579        store
580            .apply_patch(
581                &conn,
582                "Note",
583                "n1",
584                &fields(),
585                &serde_json::json!({"title": "first"}),
586            )
587            .unwrap();
588        store
589            .apply_patch(
590                &conn,
591                "Note",
592                "n2",
593                &fields(),
594                &serde_json::json!({"title": "second"}),
595            )
596            .unwrap();
597        assert_eq!(store.cached_rows(), 2);
598
599        let p1 = store
600            .apply_patch(&conn, "Note", "n1", &fields(), &serde_json::json!({}))
601            .unwrap();
602        let p2 = store
603            .apply_patch(&conn, "Note", "n2", &fields(), &serde_json::json!({}))
604            .unwrap();
605        assert_eq!(p1["title"], "first");
606        assert_eq!(p2["title"], "second");
607    }
608}