Skip to main content

pylon_runtime/
pg_loro_store.rs

1//! Postgres-backed CRDT snapshot store.
2//!
3//! Mirrors `loro_store::LoroStore` (the SQLite path) but persists the
4//! per-row Loro snapshots into a PG `_pylon_crdt_snapshots` table
5//! instead of a SQLite sidecar. Cache shape, hydrate-on-miss, and
6//! locking model are identical so the CRDT semantics don't drift
7//! between backends.
8//!
9//! Every method is generic over `PgConn`, which is implemented for
10//! both `postgres::Client` and `postgres::Transaction`. That's what
11//! lets the runtime call `apply_patch` *inside* the same transaction
12//! that writes the materialized entity row + maintains the FTS
13//! shadow — sidecar write, entity write, and FTS write either all
14//! commit or all roll back, so a crash mid-write can't desync the
15//! CRDT snapshot from the materialized columns.
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex};
19
20use postgres::Client;
21use pylon_crdt::{
22    apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
23    loro::{LoroDoc, VersionVector},
24    project_doc_to_json, CrdtField,
25};
26use pylon_storage::pg_exec::PgConn;
27use serde_json::Value;
28
29use crate::loro_store::LoroStoreError;
30
31/// SQL to create the PG sidecar table. Idempotent — called every time
32/// the runtime opens a Postgres backend so a fresh database gets the
33/// table without a manual migration step.
34pub const CREATE_PG_SIDECAR_SQL: &str = "\
35CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (\
36    entity     text NOT NULL,\
37    row_id     text NOT NULL,\
38    snapshot   bytea NOT NULL,\
39    updated_at timestamptz NOT NULL DEFAULT now(),\
40    PRIMARY KEY (entity, row_id)\
41)";
42
43pub fn ensure_sidecar(client: &mut Client) -> Result<(), LoroStoreError> {
44    client
45        .execute(CREATE_PG_SIDECAR_SQL, &[])
46        .map(|_| ())
47        .map_err(|e| LoroStoreError::Storage(format!("create pg sidecar: {e}")))
48}
49
50/// PG analogue of `LoroStore`. Lives on the Postgres-backed runtime;
51/// holds the per-row LoroDoc cache (mutated only behind the inner
52/// per-row Mutex) and persists snapshots to the PG sidecar table.
53#[derive(Default)]
54pub struct PgLoroStore {
55    docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
56}
57
58impl PgLoroStore {
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    /// Hydrate a doc for a CRDT write, taking a transaction-scoped
64    /// advisory lock keyed on (entity, row_id). The lock auto-releases
65    /// at COMMIT/ROLLBACK.
66    ///
67    /// Why advisory + not row-level: `SELECT ... FOR UPDATE` only
68    /// locks rows that exist. The very first CRDT write to a row has
69    /// no sidecar row to lock, so two replicas would both hydrate
70    /// empty docs and race the UPSERT. The advisory lock is keyed on
71    /// the (entity, row_id) hash and works whether the sidecar row
72    /// exists or not. Codex flagged this.
73    ///
74    /// Bypass the in-memory cache on the write path — the cache is
75    /// only safe to read across tx boundaries when every write goes
76    /// through ONE process. For multi-replica it's a foot-gun.
77    /// Re-decoding the snapshot per write is cheap (a few hundred µs
78    /// for a typical row) compared to the round-trip we already pay.
79    fn hydrate_for_write<C: PgConn>(
80        conn: &mut C,
81        entity: &str,
82        row_id: &str,
83    ) -> Result<LoroDoc, LoroStoreError> {
84        // pg_advisory_xact_lock(key1, key2) — two-key form fits the
85        // (entity, row_id) tuple naturally. We hash each side into an
86        // i32 so the same logical row maps to the same lock across
87        // processes. Released automatically at tx end.
88        let entity_key = pg_advisory_key(entity);
89        let row_key = pg_advisory_key(row_id);
90        conn.execute(
91            "SELECT pg_advisory_xact_lock($1::int, $2::int)",
92            &[&entity_key, &row_key],
93        )
94        .map_err(|e| LoroStoreError::Storage(format!("crdt advisory lock: {e}")))?;
95
96        let snapshot: Option<Vec<u8>> = conn
97            .query_opt(
98                "SELECT snapshot FROM _pylon_crdt_snapshots \
99                 WHERE entity = $1 AND row_id = $2",
100                &[&entity, &row_id],
101            )
102            .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
103            .map(|r| r.get::<_, Vec<u8>>(0));
104
105        let doc = LoroDoc::new();
106        if let Some(bytes) = snapshot {
107            crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
108        }
109        Ok(doc)
110    }
111}
112
113/// Hash a string into an i32 suitable for `pg_advisory_xact_lock`.
114/// PG's two-key advisory lock form takes int4 args; we use SipHash
115/// (the std hasher) and truncate to 32 bits. Collisions are
116/// possible but the worst outcome is two unrelated rows blocking
117/// each other briefly — never correctness loss.
118fn pg_advisory_key(s: &str) -> i32 {
119    use std::hash::{Hash, Hasher};
120    let mut hasher = std::collections::hash_map::DefaultHasher::new();
121    s.hash(&mut hasher);
122    let h = hasher.finish();
123    // Take the low 32 bits and reinterpret as i32 — PG accepts the
124    // full int4 range. Using `as i32` would panic-truncate; `as u32
125    // as i32` round-trips through the bit pattern.
126    (h as u32) as i32
127}
128
129impl PgLoroStore {
130    /// Read-only hydrate — no FOR UPDATE lock. Used by `snapshot()`
131    /// and `update_since()` which don't mutate. Hits the in-memory
132    /// cache on a hit so repeated reads of the same row don't pay
133    /// the decode cost.
134    fn get_or_hydrate_read<C: PgConn>(
135        &self,
136        conn: &mut C,
137        entity: &str,
138        row_id: &str,
139    ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
140        let key = (entity.to_string(), row_id.to_string());
141
142        // Fast path: already cached.
143        {
144            let guard = self.docs.lock().unwrap();
145            if let Some(doc) = guard.get(&key) {
146                return Ok(Arc::clone(doc));
147            }
148        }
149
150        let snapshot: Option<Vec<u8>> = conn
151            .query_opt(
152                "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
153                &[&entity, &row_id],
154            )
155            .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
156            .map(|r| r.get::<_, Vec<u8>>(0));
157
158        let doc = LoroDoc::new();
159        if let Some(bytes) = snapshot {
160            crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
161        }
162        let handle = Arc::new(Mutex::new(doc));
163
164        let mut guard = self.docs.lock().unwrap();
165        let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
166        Ok(Arc::clone(entry))
167    }
168
169    /// Persist the current snapshot via UPSERT. Called after every
170    /// apply.
171    fn persist_snapshot<C: PgConn>(
172        conn: &mut C,
173        entity: &str,
174        row_id: &str,
175        doc: &LoroDoc,
176    ) -> Result<(), LoroStoreError> {
177        let snap = encode_snapshot(doc);
178        conn.execute(
179            "INSERT INTO _pylon_crdt_snapshots (entity, row_id, snapshot, updated_at) \
180             VALUES ($1, $2, $3, now()) \
181             ON CONFLICT (entity, row_id) DO UPDATE \
182             SET snapshot = EXCLUDED.snapshot, updated_at = EXCLUDED.updated_at",
183            &[&entity, &row_id, &snap],
184        )
185        .map(|_| ())
186        .map_err(|e| LoroStoreError::Storage(format!("persist pg snapshot: {e}")))
187    }
188
189    /// Apply a JSON patch, persist the new snapshot, return the
190    /// projected JSON. Caller is responsible for materializing the
191    /// projected JSON into the entity row — typically done in the
192    /// same `with_transaction_raw` so both writes share BEGIN/COMMIT.
193    ///
194    /// Multi-replica safe: hydrates with `SELECT ... FOR UPDATE`,
195    /// which serializes concurrent updates to the same row across
196    /// processes. Bypasses the in-memory cache on the write path —
197    /// the cache only updates after commit (see `cache_after_commit`),
198    /// so a stale cache from a different process can't shadow the
199    /// row-locked snapshot we just read.
200    pub fn apply_patch<C: PgConn>(
201        &self,
202        conn: &mut C,
203        entity: &str,
204        row_id: &str,
205        fields: &[CrdtField],
206        patch: &Value,
207    ) -> Result<Value, LoroStoreError> {
208        let doc = Self::hydrate_for_write(conn, entity, row_id)?;
209        apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
210        Self::persist_snapshot(conn, entity, row_id, &doc)?;
211        let projected = project_doc_to_json(&doc, fields);
212        // Cache update happens through `cache_after_commit` from the
213        // runtime layer once the surrounding tx commits. If the tx
214        // rolls back, no cache write happens — so the next read
215        // hydrates from the (unchanged) sidecar.
216        Ok(projected)
217    }
218
219    /// Apply a binary update from a peer. Returns the projected JSON
220    /// for re-materialization on the entity row. Same locking shape
221    /// as `apply_patch`.
222    pub fn apply_remote_update<C: PgConn>(
223        &self,
224        conn: &mut C,
225        entity: &str,
226        row_id: &str,
227        fields: &[CrdtField],
228        update: &[u8],
229    ) -> Result<Value, LoroStoreError> {
230        let doc = Self::hydrate_for_write(conn, entity, row_id)?;
231        crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
232        Self::persist_snapshot(conn, entity, row_id, &doc)?;
233        let projected = project_doc_to_json(&doc, fields);
234        Ok(projected)
235    }
236
237    /// Full snapshot for the row. Returns the encoded LoroDoc bytes
238    /// (empty doc if the row hasn't been written yet — same shape as
239    /// the SQLite path). Read-only, no FOR UPDATE.
240    pub fn snapshot<C: PgConn>(
241        &self,
242        conn: &mut C,
243        entity: &str,
244        row_id: &str,
245    ) -> Result<Vec<u8>, LoroStoreError> {
246        let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
247        let doc = handle.lock().unwrap();
248        Ok(encode_snapshot(&doc))
249    }
250
251    /// Incremental update since `since` for catch-up. Same shape as
252    /// the SQLite path's `update_since`.
253    pub fn update_since<C: PgConn>(
254        &self,
255        conn: &mut C,
256        entity: &str,
257        row_id: &str,
258        since: &VersionVector,
259    ) -> Result<Vec<u8>, LoroStoreError> {
260        let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
261        let doc = handle.lock().unwrap();
262        Ok(encode_update_since(&doc, since))
263    }
264
265    /// Read the snapshot bytes directly through the supplied
266    /// connection, bypassing the in-memory cache. Used by the
267    /// crdt_apply_update path: the cache may hold stale bytes from a
268    /// prior read (snapshot() populates it), and we need the bytes
269    /// that *just* committed to land in the broadcast.
270    pub fn read_snapshot_via_conn<C: PgConn>(
271        conn: &mut C,
272        entity: &str,
273        row_id: &str,
274    ) -> Result<Vec<u8>, LoroStoreError> {
275        let snap: Option<Vec<u8>> = conn
276            .query_opt(
277                "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
278                &[&entity, &row_id],
279            )
280            .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
281            .map(|r| r.get::<_, Vec<u8>>(0));
282        let bytes = snap.unwrap_or_default();
283        // If the row exists, return its bytes verbatim. If it
284        // doesn't, return an encoded empty doc so the broadcast
285        // shape stays consistent with the SQLite path.
286        if bytes.is_empty() {
287            let doc = LoroDoc::new();
288            Ok(encode_snapshot(&doc))
289        } else {
290            Ok(bytes)
291        }
292    }
293
294    /// Refresh the in-memory cache entry for a row from the
295    /// just-committed sidecar bytes. Called by the runtime layer
296    /// after `with_transaction_raw` commits the CRDT write — this
297    /// way the cache only ever reflects what's on disk, and a
298    /// rolled-back tx leaves no cache poison.
299    ///
300    /// On any read error we evict instead of caching stale state.
301    pub fn cache_after_commit<C: PgConn>(&self, conn: &mut C, entity: &str, row_id: &str) {
302        let snap_result = conn.query_opt(
303            "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
304            &[&entity, &row_id],
305        );
306        let bytes = match snap_result {
307            Ok(Some(row)) => row.get::<_, Vec<u8>>(0),
308            _ => {
309                self.evict(entity, row_id);
310                return;
311            }
312        };
313        let doc = LoroDoc::new();
314        if crdt_apply_update(&doc, &bytes).is_err() {
315            self.evict(entity, row_id);
316            return;
317        }
318        let handle = Arc::new(Mutex::new(doc));
319        let mut guard = self.docs.lock().unwrap();
320        guard.insert((entity.to_string(), row_id.to_string()), handle);
321    }
322
323    /// Drop a row's cached doc. Next access re-hydrates from the PG
324    /// sidecar.
325    pub fn evict(&self, entity: &str, row_id: &str) {
326        self.docs
327            .lock()
328            .unwrap()
329            .remove(&(entity.to_string(), row_id.to_string()));
330    }
331
332    /// Diagnostic — number of rows cached in memory.
333    pub fn cached_rows(&self) -> usize {
334        self.docs.lock().unwrap().len()
335    }
336}
337
338// ---------------------------------------------------------------------------
339// PgCrdtHook impl — bridges pylon-storage's PgTxStore to PgLoroStore so
340// TS-mutation `ctx.db.X` calls maintain the CRDT sidecar in the same tx.
341// ---------------------------------------------------------------------------
342
343use pylon_kernel::AppManifest;
344use pylon_storage::pg_tx_store::PgCrdtHook;
345
346/// Bridge struct that lets PgTxStore (in pylon-storage) call back
347/// into the runtime's CRDT machinery without a direct dependency.
348/// Lives only for the duration of a single mutation tx.
349pub struct PgCrdtHookImpl {
350    /// Reference to the runtime's PgLoroStore. `Arc` so the trait
351    /// object can be cloned across the storage / runtime boundary.
352    pub crdt: std::sync::Arc<PgLoroStore>,
353    /// Shared with the runtime so we can resolve the field-shape
354    /// for each CRDT entity (which Loro types each field uses).
355    pub manifest: std::sync::Arc<AppManifest>,
356}
357
358impl PgCrdtHook for PgCrdtHookImpl {
359    fn before_insert(
360        &self,
361        tx: &mut postgres::Transaction<'_>,
362        entity: &str,
363        data: &serde_json::Value,
364    ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
365        let ent = self
366            .manifest
367            .entities
368            .iter()
369            .find(|e| e.name == entity)
370            .ok_or_else(|| pylon_http::DataError {
371                code: "ENTITY_NOT_FOUND".into(),
372                message: format!("Unknown entity: {entity}"),
373            })?;
374        let crdt_fields = crdt_fields_for(ent)?;
375
376        // If the caller supplied an `id`, reuse it as the snapshot
377        // key so the materialized row and the sidecar stay aligned.
378        // Otherwise generate one and inject it back into the data
379        // (build_insert_sql honors `data["id"]`).
380        let id = data
381            .get("id")
382            .and_then(|v| v.as_str())
383            .map(|s| s.to_string())
384            .unwrap_or_else(crate::generate_id);
385
386        self.crdt
387            .apply_patch(tx, entity, &id, &crdt_fields, data)
388            .map_err(|e| pylon_http::DataError {
389                code: "CRDT_APPLY_FAILED".into(),
390                message: format!("crdt write {entity}/{id}: {e}"),
391            })?;
392
393        // Bake the id back into the row so PgTxStore's tx_insert
394        // uses it instead of generating a fresh one.
395        let mut row = data.clone();
396        if let Some(obj) = row.as_object_mut() {
397            obj.insert("id".into(), serde_json::Value::String(id.clone()));
398        }
399        Ok(Some(row))
400    }
401
402    fn before_update(
403        &self,
404        tx: &mut postgres::Transaction<'_>,
405        entity: &str,
406        id: &str,
407        data: &serde_json::Value,
408    ) -> Result<(), pylon_http::DataError> {
409        let ent = self
410            .manifest
411            .entities
412            .iter()
413            .find(|e| e.name == entity)
414            .ok_or_else(|| pylon_http::DataError {
415                code: "ENTITY_NOT_FOUND".into(),
416                message: format!("Unknown entity: {entity}"),
417            })?;
418        let crdt_fields = crdt_fields_for(ent)?;
419        self.crdt
420            .apply_patch(tx, entity, id, &crdt_fields, data)
421            .map(|_| ())
422            .map_err(|e| pylon_http::DataError {
423                code: "CRDT_APPLY_FAILED".into(),
424                message: format!("crdt update {entity}/{id}: {e}"),
425            })
426    }
427
428    fn before_delete(
429        &self,
430        tx: &mut postgres::Transaction<'_>,
431        entity: &str,
432        id: &str,
433    ) -> Result<(), pylon_http::DataError> {
434        // Drop the sidecar row inside the same tx; runtime evicts
435        // cache entry on commit via after_commit/on_rollback.
436        tx.execute(
437            "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
438            &[&entity, &id],
439        )
440        .map(|_| ())
441        .map_err(|e| pylon_http::DataError {
442            code: "CRDT_SIDECAR_DELETE_FAILED".into(),
443            message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
444        })
445    }
446
447    fn after_commit(&self, entity: &str, id: &str) {
448        // Refresh cache via a fresh client connection. Can't pass
449        // the tx in here since it's already committed and dropped.
450        // The cache_after_commit method on PgLoroStore expects a
451        // PgConn — we don't have one here. Simplest: evict so the
452        // next read re-hydrates from the persisted snapshot. This
453        // is correct (just one extra round-trip for the next read);
454        // the alternative would require the runtime to hand us a
455        // fresh client which is more plumbing for marginal benefit.
456        self.crdt.evict(entity, id);
457    }
458
459    fn on_rollback(&self, entity: &str, id: &str) {
460        // Rolled-back tx: the in-memory doc may have been mutated
461        // in place by apply_patch. Evict to force re-hydration from
462        // the (unchanged) persisted snapshot.
463        self.crdt.evict(entity, id);
464    }
465}
466
467/// Resolve the CRDT field shape for an entity. Same logic as
468/// `Runtime::crdt_fields_for` but without the runtime borrow — the
469/// hook lives across the storage/runtime boundary and only needs
470/// the manifest. Returns `Err` if any field's CRDT annotation is
471/// invalid, matching `Runtime::crdt_fields_for`'s strict behavior:
472/// silently dropping an invalid field would commit the SQL row
473/// while omitting that field from the snapshot — exactly the
474/// sidecar/row divergence we're trying to prevent. Codex flagged.
475fn crdt_fields_for(
476    ent: &pylon_kernel::ManifestEntity,
477) -> Result<Vec<pylon_crdt::CrdtField>, pylon_http::DataError> {
478    let mut out = Vec::with_capacity(ent.fields.len());
479    for f in &ent.fields {
480        if f.name == "id" {
481            continue;
482        }
483        let kind =
484            pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| pylon_http::DataError {
485                code: "INVALID_CRDT_FIELD".into(),
486                message: format!(
487                    "{}.{}: {e} (declared type={}, crdt={:?})",
488                    ent.name, f.name, f.field_type, f.crdt
489                ),
490            })?;
491        out.push(pylon_crdt::CrdtField {
492            name: f.name.clone(),
493            kind,
494        });
495    }
496    Ok(out)
497}