Skip to main content

pylon_runtime/
pg_loro_store.rs

1//! Postgres-backed CRDT snapshot store.
2//!
3//! Mirrors `loro_store::LoroStore` (the SQLite path) but persists the
4//! per-row Loro snapshots into a PG `_pylon_crdt_snapshots` table
5//! instead of a SQLite sidecar. Cache shape, hydrate-on-miss, and
6//! locking model are identical so the CRDT semantics don't drift
7//! between backends.
8//!
9//! Every method is generic over `PgConn`, which is implemented for
10//! both `postgres::Client` and `postgres::Transaction`. That's what
11//! lets the runtime call `apply_patch` *inside* the same transaction
12//! that writes the materialized entity row + maintains the FTS
13//! shadow — sidecar write, entity write, and FTS write either all
14//! commit or all roll back, so a crash mid-write can't desync the
15//! CRDT snapshot from the materialized columns.
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex};
19
20use postgres::Client;
21use pylon_crdt::{
22    apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
23    loro::{LoroDoc, VersionVector},
24    project_doc_to_json, CrdtField,
25};
26use pylon_storage::pg_exec::PgConn;
27use serde_json::Value;
28
29use crate::loro_store::LoroStoreError;
30
31/// SQL to create the PG sidecar table. Idempotent — called every time
32/// the runtime opens a Postgres backend so a fresh database gets the
33/// table without a manual migration step.
34pub const CREATE_PG_SIDECAR_SQL: &str = "\
35CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (\
36    entity     text NOT NULL,\
37    row_id     text NOT NULL,\
38    snapshot   bytea NOT NULL,\
39    updated_at timestamptz NOT NULL DEFAULT now(),\
40    PRIMARY KEY (entity, row_id)\
41)";
42
43pub fn ensure_sidecar(client: &mut Client) -> Result<(), LoroStoreError> {
44    client
45        .execute(CREATE_PG_SIDECAR_SQL, &[])
46        .map(|_| ())
47        .map_err(|e| LoroStoreError::Storage(format!("create pg sidecar: {e}")))
48}
49
50/// PG analogue of `LoroStore`. Lives on the Postgres-backed runtime;
51/// holds the per-row LoroDoc cache (mutated only behind the inner
52/// per-row Mutex) and persists snapshots to the PG sidecar table.
53#[derive(Default)]
54pub struct PgLoroStore {
55    docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
56}
57
58impl PgLoroStore {
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    /// Hydrate a doc for a CRDT write, taking a transaction-scoped
64    /// advisory lock keyed on (entity, row_id). The lock auto-releases
65    /// at COMMIT/ROLLBACK.
66    ///
67    /// Why advisory + not row-level: `SELECT ... FOR UPDATE` only
68    /// locks rows that exist. The very first CRDT write to a row has
69    /// no sidecar row to lock, so two replicas would both hydrate
70    /// empty docs and race the UPSERT. The advisory lock is keyed on
71    /// the (entity, row_id) hash and works whether the sidecar row
72    /// exists or not. Codex flagged this.
73    ///
74    /// Bypass the in-memory cache on the write path — the cache is
75    /// only safe to read across tx boundaries when every write goes
76    /// through ONE process. For multi-replica it's a foot-gun.
77    /// Re-decoding the snapshot per write is cheap (a few hundred µs
78    /// for a typical row) compared to the round-trip we already pay.
79    fn hydrate_for_write<C: PgConn>(
80        conn: &mut C,
81        entity: &str,
82        row_id: &str,
83    ) -> Result<LoroDoc, LoroStoreError> {
84        // pg_advisory_xact_lock(key1, key2) — two-key form fits the
85        // (entity, row_id) tuple naturally. We hash each side into an
86        // i32 so the same logical row maps to the same lock across
87        // processes. Released automatically at tx end.
88        let entity_key = pg_advisory_key(entity);
89        let row_key = pg_advisory_key(row_id);
90        conn.execute(
91            "SELECT pg_advisory_xact_lock($1::int, $2::int)",
92            &[&entity_key, &row_key],
93        )
94        .map_err(|e| LoroStoreError::Storage(format!("crdt advisory lock: {e}")))?;
95
96        let snapshot: Option<Vec<u8>> = conn
97            .query_opt(
98                "SELECT snapshot FROM _pylon_crdt_snapshots \
99                 WHERE entity = $1 AND row_id = $2",
100                &[&entity, &row_id],
101            )
102            .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
103            .map(|r| r.get::<_, Vec<u8>>(0));
104
105        let doc = LoroDoc::new();
106        if let Some(bytes) = snapshot {
107            crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
108        }
109        Ok(doc)
110    }
111}
112
113/// Hash a string into an i32 suitable for `pg_advisory_xact_lock`.
114/// PG's two-key advisory lock form takes int4 args; we use SipHash
115/// (the std hasher) and truncate to 32 bits. Collisions are
116/// possible but the worst outcome is two unrelated rows blocking
117/// each other briefly — never correctness loss.
118fn pg_advisory_key(s: &str) -> i32 {
119    use std::hash::{Hash, Hasher};
120    let mut hasher = std::collections::hash_map::DefaultHasher::new();
121    s.hash(&mut hasher);
122    let h = hasher.finish();
123    // Take the low 32 bits and reinterpret as i32 — PG accepts the
124    // full int4 range. Using `as i32` would panic-truncate; `as u32
125    // as i32` round-trips through the bit pattern.
126    (h as u32) as i32
127}
128
129impl PgLoroStore {
130
131    /// Read-only hydrate — no FOR UPDATE lock. Used by `snapshot()`
132    /// and `update_since()` which don't mutate. Hits the in-memory
133    /// cache on a hit so repeated reads of the same row don't pay
134    /// the decode cost.
135    fn get_or_hydrate_read<C: PgConn>(
136        &self,
137        conn: &mut C,
138        entity: &str,
139        row_id: &str,
140    ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
141        let key = (entity.to_string(), row_id.to_string());
142
143        // Fast path: already cached.
144        {
145            let guard = self.docs.lock().unwrap();
146            if let Some(doc) = guard.get(&key) {
147                return Ok(Arc::clone(doc));
148            }
149        }
150
151        let snapshot: Option<Vec<u8>> = conn
152            .query_opt(
153                "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
154                &[&entity, &row_id],
155            )
156            .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
157            .map(|r| r.get::<_, Vec<u8>>(0));
158
159        let doc = LoroDoc::new();
160        if let Some(bytes) = snapshot {
161            crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
162        }
163        let handle = Arc::new(Mutex::new(doc));
164
165        let mut guard = self.docs.lock().unwrap();
166        let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
167        Ok(Arc::clone(entry))
168    }
169
170    /// Persist the current snapshot via UPSERT. Called after every
171    /// apply.
172    fn persist_snapshot<C: PgConn>(
173        conn: &mut C,
174        entity: &str,
175        row_id: &str,
176        doc: &LoroDoc,
177    ) -> Result<(), LoroStoreError> {
178        let snap = encode_snapshot(doc);
179        conn.execute(
180            "INSERT INTO _pylon_crdt_snapshots (entity, row_id, snapshot, updated_at) \
181             VALUES ($1, $2, $3, now()) \
182             ON CONFLICT (entity, row_id) DO UPDATE \
183             SET snapshot = EXCLUDED.snapshot, updated_at = EXCLUDED.updated_at",
184            &[&entity, &row_id, &snap],
185        )
186        .map(|_| ())
187        .map_err(|e| LoroStoreError::Storage(format!("persist pg snapshot: {e}")))
188    }
189
190    /// Apply a JSON patch, persist the new snapshot, return the
191    /// projected JSON. Caller is responsible for materializing the
192    /// projected JSON into the entity row — typically done in the
193    /// same `with_transaction_raw` so both writes share BEGIN/COMMIT.
194    ///
195    /// Multi-replica safe: hydrates with `SELECT ... FOR UPDATE`,
196    /// which serializes concurrent updates to the same row across
197    /// processes. Bypasses the in-memory cache on the write path —
198    /// the cache only updates after commit (see `cache_after_commit`),
199    /// so a stale cache from a different process can't shadow the
200    /// row-locked snapshot we just read.
201    pub fn apply_patch<C: PgConn>(
202        &self,
203        conn: &mut C,
204        entity: &str,
205        row_id: &str,
206        fields: &[CrdtField],
207        patch: &Value,
208    ) -> Result<Value, LoroStoreError> {
209        let doc = Self::hydrate_for_write(conn, entity, row_id)?;
210        apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
211        Self::persist_snapshot(conn, entity, row_id, &doc)?;
212        let projected = project_doc_to_json(&doc, fields);
213        // Cache update happens through `cache_after_commit` from the
214        // runtime layer once the surrounding tx commits. If the tx
215        // rolls back, no cache write happens — so the next read
216        // hydrates from the (unchanged) sidecar.
217        Ok(projected)
218    }
219
220    /// Apply a binary update from a peer. Returns the projected JSON
221    /// for re-materialization on the entity row. Same locking shape
222    /// as `apply_patch`.
223    pub fn apply_remote_update<C: PgConn>(
224        &self,
225        conn: &mut C,
226        entity: &str,
227        row_id: &str,
228        fields: &[CrdtField],
229        update: &[u8],
230    ) -> Result<Value, LoroStoreError> {
231        let doc = Self::hydrate_for_write(conn, entity, row_id)?;
232        crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
233        Self::persist_snapshot(conn, entity, row_id, &doc)?;
234        let projected = project_doc_to_json(&doc, fields);
235        Ok(projected)
236    }
237
238    /// Full snapshot for the row. Returns the encoded LoroDoc bytes
239    /// (empty doc if the row hasn't been written yet — same shape as
240    /// the SQLite path). Read-only, no FOR UPDATE.
241    pub fn snapshot<C: PgConn>(
242        &self,
243        conn: &mut C,
244        entity: &str,
245        row_id: &str,
246    ) -> Result<Vec<u8>, LoroStoreError> {
247        let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
248        let doc = handle.lock().unwrap();
249        Ok(encode_snapshot(&doc))
250    }
251
252    /// Incremental update since `since` for catch-up. Same shape as
253    /// the SQLite path's `update_since`.
254    pub fn update_since<C: PgConn>(
255        &self,
256        conn: &mut C,
257        entity: &str,
258        row_id: &str,
259        since: &VersionVector,
260    ) -> Result<Vec<u8>, LoroStoreError> {
261        let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
262        let doc = handle.lock().unwrap();
263        Ok(encode_update_since(&doc, since))
264    }
265
266    /// Read the snapshot bytes directly through the supplied
267    /// connection, bypassing the in-memory cache. Used by the
268    /// crdt_apply_update path: the cache may hold stale bytes from a
269    /// prior read (snapshot() populates it), and we need the bytes
270    /// that *just* committed to land in the broadcast.
271    pub fn read_snapshot_via_conn<C: PgConn>(
272        conn: &mut C,
273        entity: &str,
274        row_id: &str,
275    ) -> Result<Vec<u8>, LoroStoreError> {
276        let snap: Option<Vec<u8>> = conn
277            .query_opt(
278                "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
279                &[&entity, &row_id],
280            )
281            .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
282            .map(|r| r.get::<_, Vec<u8>>(0));
283        let bytes = snap.unwrap_or_default();
284        // If the row exists, return its bytes verbatim. If it
285        // doesn't, return an encoded empty doc so the broadcast
286        // shape stays consistent with the SQLite path.
287        if bytes.is_empty() {
288            let doc = LoroDoc::new();
289            Ok(encode_snapshot(&doc))
290        } else {
291            Ok(bytes)
292        }
293    }
294
295    /// Refresh the in-memory cache entry for a row from the
296    /// just-committed sidecar bytes. Called by the runtime layer
297    /// after `with_transaction_raw` commits the CRDT write — this
298    /// way the cache only ever reflects what's on disk, and a
299    /// rolled-back tx leaves no cache poison.
300    ///
301    /// On any read error we evict instead of caching stale state.
302    pub fn cache_after_commit<C: PgConn>(
303        &self,
304        conn: &mut C,
305        entity: &str,
306        row_id: &str,
307    ) {
308        let snap_result = conn.query_opt(
309            "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
310            &[&entity, &row_id],
311        );
312        let bytes = match snap_result {
313            Ok(Some(row)) => row.get::<_, Vec<u8>>(0),
314            _ => {
315                self.evict(entity, row_id);
316                return;
317            }
318        };
319        let doc = LoroDoc::new();
320        if crdt_apply_update(&doc, &bytes).is_err() {
321            self.evict(entity, row_id);
322            return;
323        }
324        let handle = Arc::new(Mutex::new(doc));
325        let mut guard = self.docs.lock().unwrap();
326        guard.insert((entity.to_string(), row_id.to_string()), handle);
327    }
328
329    /// Drop a row's cached doc. Next access re-hydrates from the PG
330    /// sidecar.
331    pub fn evict(&self, entity: &str, row_id: &str) {
332        self.docs
333            .lock()
334            .unwrap()
335            .remove(&(entity.to_string(), row_id.to_string()));
336    }
337
338    /// Diagnostic — number of rows cached in memory.
339    pub fn cached_rows(&self) -> usize {
340        self.docs.lock().unwrap().len()
341    }
342}
343
344// ---------------------------------------------------------------------------
345// PgCrdtHook impl — bridges pylon-storage's PgTxStore to PgLoroStore so
346// TS-mutation `ctx.db.X` calls maintain the CRDT sidecar in the same tx.
347// ---------------------------------------------------------------------------
348
349use pylon_kernel::AppManifest;
350use pylon_storage::pg_tx_store::PgCrdtHook;
351
352/// Bridge struct that lets PgTxStore (in pylon-storage) call back
353/// into the runtime's CRDT machinery without a direct dependency.
354/// Lives only for the duration of a single mutation tx.
355pub struct PgCrdtHookImpl {
356    /// Reference to the runtime's PgLoroStore. `Arc` so the trait
357    /// object can be cloned across the storage / runtime boundary.
358    pub crdt: std::sync::Arc<PgLoroStore>,
359    /// Shared with the runtime so we can resolve the field-shape
360    /// for each CRDT entity (which Loro types each field uses).
361    pub manifest: std::sync::Arc<AppManifest>,
362}
363
364impl PgCrdtHook for PgCrdtHookImpl {
365    fn before_insert(
366        &self,
367        tx: &mut postgres::Transaction<'_>,
368        entity: &str,
369        data: &serde_json::Value,
370    ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
371        let ent = self
372            .manifest
373            .entities
374            .iter()
375            .find(|e| e.name == entity)
376            .ok_or_else(|| pylon_http::DataError {
377                code: "ENTITY_NOT_FOUND".into(),
378                message: format!("Unknown entity: {entity}"),
379            })?;
380        let crdt_fields = crdt_fields_for(ent)?;
381
382        // If the caller supplied an `id`, reuse it as the snapshot
383        // key so the materialized row and the sidecar stay aligned.
384        // Otherwise generate one and inject it back into the data
385        // (build_insert_sql honors `data["id"]`).
386        let id = data
387            .get("id")
388            .and_then(|v| v.as_str())
389            .map(|s| s.to_string())
390            .unwrap_or_else(crate::generate_id);
391
392        self.crdt
393            .apply_patch(tx, entity, &id, &crdt_fields, data)
394            .map_err(|e| pylon_http::DataError {
395                code: "CRDT_APPLY_FAILED".into(),
396                message: format!("crdt write {entity}/{id}: {e}"),
397            })?;
398
399        // Bake the id back into the row so PgTxStore's tx_insert
400        // uses it instead of generating a fresh one.
401        let mut row = data.clone();
402        if let Some(obj) = row.as_object_mut() {
403            obj.insert("id".into(), serde_json::Value::String(id.clone()));
404        }
405        Ok(Some(row))
406    }
407
408    fn before_update(
409        &self,
410        tx: &mut postgres::Transaction<'_>,
411        entity: &str,
412        id: &str,
413        data: &serde_json::Value,
414    ) -> Result<(), pylon_http::DataError> {
415        let ent = self
416            .manifest
417            .entities
418            .iter()
419            .find(|e| e.name == entity)
420            .ok_or_else(|| pylon_http::DataError {
421                code: "ENTITY_NOT_FOUND".into(),
422                message: format!("Unknown entity: {entity}"),
423            })?;
424        let crdt_fields = crdt_fields_for(ent)?;
425        self.crdt
426            .apply_patch(tx, entity, id, &crdt_fields, data)
427            .map(|_| ())
428            .map_err(|e| pylon_http::DataError {
429                code: "CRDT_APPLY_FAILED".into(),
430                message: format!("crdt update {entity}/{id}: {e}"),
431            })
432    }
433
434    fn before_delete(
435        &self,
436        tx: &mut postgres::Transaction<'_>,
437        entity: &str,
438        id: &str,
439    ) -> Result<(), pylon_http::DataError> {
440        // Drop the sidecar row inside the same tx; runtime evicts
441        // cache entry on commit via after_commit/on_rollback.
442        tx.execute(
443            "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
444            &[&entity, &id],
445        )
446        .map(|_| ())
447        .map_err(|e| pylon_http::DataError {
448            code: "CRDT_SIDECAR_DELETE_FAILED".into(),
449            message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
450        })
451    }
452
453    fn after_commit(&self, entity: &str, id: &str) {
454        // Refresh cache via a fresh client connection. Can't pass
455        // the tx in here since it's already committed and dropped.
456        // The cache_after_commit method on PgLoroStore expects a
457        // PgConn — we don't have one here. Simplest: evict so the
458        // next read re-hydrates from the persisted snapshot. This
459        // is correct (just one extra round-trip for the next read);
460        // the alternative would require the runtime to hand us a
461        // fresh client which is more plumbing for marginal benefit.
462        self.crdt.evict(entity, id);
463    }
464
465    fn on_rollback(&self, entity: &str, id: &str) {
466        // Rolled-back tx: the in-memory doc may have been mutated
467        // in place by apply_patch. Evict to force re-hydration from
468        // the (unchanged) persisted snapshot.
469        self.crdt.evict(entity, id);
470    }
471}
472
473/// Resolve the CRDT field shape for an entity. Same logic as
474/// `Runtime::crdt_fields_for` but without the runtime borrow — the
475/// hook lives across the storage/runtime boundary and only needs
476/// the manifest. Returns `Err` if any field's CRDT annotation is
477/// invalid, matching `Runtime::crdt_fields_for`'s strict behavior:
478/// silently dropping an invalid field would commit the SQL row
479/// while omitting that field from the snapshot — exactly the
480/// sidecar/row divergence we're trying to prevent. Codex flagged.
481fn crdt_fields_for(
482    ent: &pylon_kernel::ManifestEntity,
483) -> Result<Vec<pylon_crdt::CrdtField>, pylon_http::DataError> {
484    let mut out = Vec::with_capacity(ent.fields.len());
485    for f in &ent.fields {
486        if f.name == "id" {
487            continue;
488        }
489        let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| {
490            pylon_http::DataError {
491                code: "INVALID_CRDT_FIELD".into(),
492                message: format!(
493                    "{}.{}: {e} (declared type={}, crdt={:?})",
494                    ent.name, f.name, f.field_type, f.crdt
495                ),
496            }
497        })?;
498        out.push(pylon_crdt::CrdtField {
499            name: f.name.clone(),
500            kind,
501        });
502    }
503    Ok(out)
504}