pylon_runtime/pg_loro_store.rs
1//! Postgres-backed CRDT snapshot store.
2//!
3//! Mirrors `loro_store::LoroStore` (the SQLite path) but persists the
4//! per-row Loro snapshots into a PG `_pylon_crdt_snapshots` table
5//! instead of a SQLite sidecar. Cache shape, hydrate-on-miss, and
6//! locking model are identical so the CRDT semantics don't drift
7//! between backends.
8//!
9//! Every method is generic over `PgConn`, which is implemented for
10//! both `postgres::Client` and `postgres::Transaction`. That's what
11//! lets the runtime call `apply_patch` *inside* the same transaction
12//! that writes the materialized entity row + maintains the FTS
13//! shadow — sidecar write, entity write, and FTS write either all
14//! commit or all roll back, so a crash mid-write can't desync the
15//! CRDT snapshot from the materialized columns.
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex};
19
20use postgres::Client;
21use pylon_crdt::{
22 apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
23 loro::{LoroDoc, VersionVector},
24 project_doc_to_json, CrdtField,
25};
26use pylon_storage::pg_exec::PgConn;
27use serde_json::Value;
28
29use crate::loro_store::LoroStoreError;
30
31/// SQL to create the PG sidecar table. Idempotent — called every time
32/// the runtime opens a Postgres backend so a fresh database gets the
33/// table without a manual migration step.
34pub const CREATE_PG_SIDECAR_SQL: &str = "\
35CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (\
36 entity text NOT NULL,\
37 row_id text NOT NULL,\
38 snapshot bytea NOT NULL,\
39 updated_at timestamptz NOT NULL DEFAULT now(),\
40 PRIMARY KEY (entity, row_id)\
41)";
42
43pub fn ensure_sidecar(client: &mut Client) -> Result<(), LoroStoreError> {
44 client
45 .execute(CREATE_PG_SIDECAR_SQL, &[])
46 .map(|_| ())
47 .map_err(|e| LoroStoreError::Storage(format!("create pg sidecar: {e}")))
48}
49
50/// PG analogue of `LoroStore`. Lives on the Postgres-backed runtime;
51/// holds the per-row LoroDoc cache (mutated only behind the inner
52/// per-row Mutex) and persists snapshots to the PG sidecar table.
53#[derive(Default)]
54pub struct PgLoroStore {
55 docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
56}
57
58impl PgLoroStore {
59 pub fn new() -> Self {
60 Self::default()
61 }
62
63 /// Hydrate a doc for a CRDT write, taking a transaction-scoped
64 /// advisory lock keyed on (entity, row_id). The lock auto-releases
65 /// at COMMIT/ROLLBACK.
66 ///
67 /// Why advisory + not row-level: `SELECT ... FOR UPDATE` only
68 /// locks rows that exist. The very first CRDT write to a row has
69 /// no sidecar row to lock, so two replicas would both hydrate
70 /// empty docs and race the UPSERT. The advisory lock is keyed on
71 /// the (entity, row_id) hash and works whether the sidecar row
72 /// exists or not. Codex flagged this.
73 ///
74 /// Bypass the in-memory cache on the write path — the cache is
75 /// only safe to read across tx boundaries when every write goes
76 /// through ONE process. For multi-replica it's a foot-gun.
77 /// Re-decoding the snapshot per write is cheap (a few hundred µs
78 /// for a typical row) compared to the round-trip we already pay.
79 fn hydrate_for_write<C: PgConn>(
80 conn: &mut C,
81 entity: &str,
82 row_id: &str,
83 ) -> Result<LoroDoc, LoroStoreError> {
84 // pg_advisory_xact_lock(key1, key2) — two-key form fits the
85 // (entity, row_id) tuple naturally. We hash each side into an
86 // i32 so the same logical row maps to the same lock across
87 // processes. Released automatically at tx end.
88 let entity_key = pg_advisory_key(entity);
89 let row_key = pg_advisory_key(row_id);
90 conn.execute(
91 "SELECT pg_advisory_xact_lock($1::int, $2::int)",
92 &[&entity_key, &row_key],
93 )
94 .map_err(|e| LoroStoreError::Storage(format!("crdt advisory lock: {e}")))?;
95
96 let snapshot: Option<Vec<u8>> = conn
97 .query_opt(
98 "SELECT snapshot FROM _pylon_crdt_snapshots \
99 WHERE entity = $1 AND row_id = $2",
100 &[&entity, &row_id],
101 )
102 .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
103 .map(|r| r.get::<_, Vec<u8>>(0));
104
105 let doc = LoroDoc::new();
106 if let Some(bytes) = snapshot {
107 crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
108 }
109 Ok(doc)
110 }
111}
112
113/// Hash a string into an i32 suitable for `pg_advisory_xact_lock`.
114/// PG's two-key advisory lock form takes int4 args; we use SipHash
115/// (the std hasher) and truncate to 32 bits. Collisions are
116/// possible but the worst outcome is two unrelated rows blocking
117/// each other briefly — never correctness loss.
118fn pg_advisory_key(s: &str) -> i32 {
119 use std::hash::{Hash, Hasher};
120 let mut hasher = std::collections::hash_map::DefaultHasher::new();
121 s.hash(&mut hasher);
122 let h = hasher.finish();
123 // Take the low 32 bits and reinterpret as i32 — PG accepts the
124 // full int4 range. Using `as i32` would panic-truncate; `as u32
125 // as i32` round-trips through the bit pattern.
126 (h as u32) as i32
127}
128
129impl PgLoroStore {
130
131 /// Read-only hydrate — no FOR UPDATE lock. Used by `snapshot()`
132 /// and `update_since()` which don't mutate. Hits the in-memory
133 /// cache on a hit so repeated reads of the same row don't pay
134 /// the decode cost.
135 fn get_or_hydrate_read<C: PgConn>(
136 &self,
137 conn: &mut C,
138 entity: &str,
139 row_id: &str,
140 ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
141 let key = (entity.to_string(), row_id.to_string());
142
143 // Fast path: already cached.
144 {
145 let guard = self.docs.lock().unwrap();
146 if let Some(doc) = guard.get(&key) {
147 return Ok(Arc::clone(doc));
148 }
149 }
150
151 let snapshot: Option<Vec<u8>> = conn
152 .query_opt(
153 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
154 &[&entity, &row_id],
155 )
156 .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
157 .map(|r| r.get::<_, Vec<u8>>(0));
158
159 let doc = LoroDoc::new();
160 if let Some(bytes) = snapshot {
161 crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
162 }
163 let handle = Arc::new(Mutex::new(doc));
164
165 let mut guard = self.docs.lock().unwrap();
166 let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
167 Ok(Arc::clone(entry))
168 }
169
170 /// Persist the current snapshot via UPSERT. Called after every
171 /// apply.
172 fn persist_snapshot<C: PgConn>(
173 conn: &mut C,
174 entity: &str,
175 row_id: &str,
176 doc: &LoroDoc,
177 ) -> Result<(), LoroStoreError> {
178 let snap = encode_snapshot(doc);
179 conn.execute(
180 "INSERT INTO _pylon_crdt_snapshots (entity, row_id, snapshot, updated_at) \
181 VALUES ($1, $2, $3, now()) \
182 ON CONFLICT (entity, row_id) DO UPDATE \
183 SET snapshot = EXCLUDED.snapshot, updated_at = EXCLUDED.updated_at",
184 &[&entity, &row_id, &snap],
185 )
186 .map(|_| ())
187 .map_err(|e| LoroStoreError::Storage(format!("persist pg snapshot: {e}")))
188 }
189
190 /// Apply a JSON patch, persist the new snapshot, return the
191 /// projected JSON. Caller is responsible for materializing the
192 /// projected JSON into the entity row — typically done in the
193 /// same `with_transaction_raw` so both writes share BEGIN/COMMIT.
194 ///
195 /// Multi-replica safe: hydrates with `SELECT ... FOR UPDATE`,
196 /// which serializes concurrent updates to the same row across
197 /// processes. Bypasses the in-memory cache on the write path —
198 /// the cache only updates after commit (see `cache_after_commit`),
199 /// so a stale cache from a different process can't shadow the
200 /// row-locked snapshot we just read.
201 pub fn apply_patch<C: PgConn>(
202 &self,
203 conn: &mut C,
204 entity: &str,
205 row_id: &str,
206 fields: &[CrdtField],
207 patch: &Value,
208 ) -> Result<Value, LoroStoreError> {
209 let doc = Self::hydrate_for_write(conn, entity, row_id)?;
210 apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
211 Self::persist_snapshot(conn, entity, row_id, &doc)?;
212 let projected = project_doc_to_json(&doc, fields);
213 // Cache update happens through `cache_after_commit` from the
214 // runtime layer once the surrounding tx commits. If the tx
215 // rolls back, no cache write happens — so the next read
216 // hydrates from the (unchanged) sidecar.
217 Ok(projected)
218 }
219
220 /// Apply a binary update from a peer. Returns the projected JSON
221 /// for re-materialization on the entity row. Same locking shape
222 /// as `apply_patch`.
223 pub fn apply_remote_update<C: PgConn>(
224 &self,
225 conn: &mut C,
226 entity: &str,
227 row_id: &str,
228 fields: &[CrdtField],
229 update: &[u8],
230 ) -> Result<Value, LoroStoreError> {
231 let doc = Self::hydrate_for_write(conn, entity, row_id)?;
232 crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
233 Self::persist_snapshot(conn, entity, row_id, &doc)?;
234 let projected = project_doc_to_json(&doc, fields);
235 Ok(projected)
236 }
237
238 /// Full snapshot for the row. Returns the encoded LoroDoc bytes
239 /// (empty doc if the row hasn't been written yet — same shape as
240 /// the SQLite path). Read-only, no FOR UPDATE.
241 pub fn snapshot<C: PgConn>(
242 &self,
243 conn: &mut C,
244 entity: &str,
245 row_id: &str,
246 ) -> Result<Vec<u8>, LoroStoreError> {
247 let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
248 let doc = handle.lock().unwrap();
249 Ok(encode_snapshot(&doc))
250 }
251
252 /// Incremental update since `since` for catch-up. Same shape as
253 /// the SQLite path's `update_since`.
254 pub fn update_since<C: PgConn>(
255 &self,
256 conn: &mut C,
257 entity: &str,
258 row_id: &str,
259 since: &VersionVector,
260 ) -> Result<Vec<u8>, LoroStoreError> {
261 let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
262 let doc = handle.lock().unwrap();
263 Ok(encode_update_since(&doc, since))
264 }
265
266 /// Read the snapshot bytes directly through the supplied
267 /// connection, bypassing the in-memory cache. Used by the
268 /// crdt_apply_update path: the cache may hold stale bytes from a
269 /// prior read (snapshot() populates it), and we need the bytes
270 /// that *just* committed to land in the broadcast.
271 pub fn read_snapshot_via_conn<C: PgConn>(
272 conn: &mut C,
273 entity: &str,
274 row_id: &str,
275 ) -> Result<Vec<u8>, LoroStoreError> {
276 let snap: Option<Vec<u8>> = conn
277 .query_opt(
278 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
279 &[&entity, &row_id],
280 )
281 .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
282 .map(|r| r.get::<_, Vec<u8>>(0));
283 let bytes = snap.unwrap_or_default();
284 // If the row exists, return its bytes verbatim. If it
285 // doesn't, return an encoded empty doc so the broadcast
286 // shape stays consistent with the SQLite path.
287 if bytes.is_empty() {
288 let doc = LoroDoc::new();
289 Ok(encode_snapshot(&doc))
290 } else {
291 Ok(bytes)
292 }
293 }
294
295 /// Refresh the in-memory cache entry for a row from the
296 /// just-committed sidecar bytes. Called by the runtime layer
297 /// after `with_transaction_raw` commits the CRDT write — this
298 /// way the cache only ever reflects what's on disk, and a
299 /// rolled-back tx leaves no cache poison.
300 ///
301 /// On any read error we evict instead of caching stale state.
302 pub fn cache_after_commit<C: PgConn>(
303 &self,
304 conn: &mut C,
305 entity: &str,
306 row_id: &str,
307 ) {
308 let snap_result = conn.query_opt(
309 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
310 &[&entity, &row_id],
311 );
312 let bytes = match snap_result {
313 Ok(Some(row)) => row.get::<_, Vec<u8>>(0),
314 _ => {
315 self.evict(entity, row_id);
316 return;
317 }
318 };
319 let doc = LoroDoc::new();
320 if crdt_apply_update(&doc, &bytes).is_err() {
321 self.evict(entity, row_id);
322 return;
323 }
324 let handle = Arc::new(Mutex::new(doc));
325 let mut guard = self.docs.lock().unwrap();
326 guard.insert((entity.to_string(), row_id.to_string()), handle);
327 }
328
329 /// Drop a row's cached doc. Next access re-hydrates from the PG
330 /// sidecar.
331 pub fn evict(&self, entity: &str, row_id: &str) {
332 self.docs
333 .lock()
334 .unwrap()
335 .remove(&(entity.to_string(), row_id.to_string()));
336 }
337
338 /// Diagnostic — number of rows cached in memory.
339 pub fn cached_rows(&self) -> usize {
340 self.docs.lock().unwrap().len()
341 }
342}
343
344// ---------------------------------------------------------------------------
345// PgCrdtHook impl — bridges pylon-storage's PgTxStore to PgLoroStore so
346// TS-mutation `ctx.db.X` calls maintain the CRDT sidecar in the same tx.
347// ---------------------------------------------------------------------------
348
349use pylon_kernel::AppManifest;
350use pylon_storage::pg_tx_store::PgCrdtHook;
351
352/// Bridge struct that lets PgTxStore (in pylon-storage) call back
353/// into the runtime's CRDT machinery without a direct dependency.
354/// Lives only for the duration of a single mutation tx.
355pub struct PgCrdtHookImpl {
356 /// Reference to the runtime's PgLoroStore. `Arc` so the trait
357 /// object can be cloned across the storage / runtime boundary.
358 pub crdt: std::sync::Arc<PgLoroStore>,
359 /// Shared with the runtime so we can resolve the field-shape
360 /// for each CRDT entity (which Loro types each field uses).
361 pub manifest: std::sync::Arc<AppManifest>,
362}
363
364impl PgCrdtHook for PgCrdtHookImpl {
365 fn before_insert(
366 &self,
367 tx: &mut postgres::Transaction<'_>,
368 entity: &str,
369 data: &serde_json::Value,
370 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
371 let ent = self
372 .manifest
373 .entities
374 .iter()
375 .find(|e| e.name == entity)
376 .ok_or_else(|| pylon_http::DataError {
377 code: "ENTITY_NOT_FOUND".into(),
378 message: format!("Unknown entity: {entity}"),
379 })?;
380 let crdt_fields = crdt_fields_for(ent)?;
381
382 // If the caller supplied an `id`, reuse it as the snapshot
383 // key so the materialized row and the sidecar stay aligned.
384 // Otherwise generate one and inject it back into the data
385 // (build_insert_sql honors `data["id"]`).
386 let id = data
387 .get("id")
388 .and_then(|v| v.as_str())
389 .map(|s| s.to_string())
390 .unwrap_or_else(crate::generate_id);
391
392 self.crdt
393 .apply_patch(tx, entity, &id, &crdt_fields, data)
394 .map_err(|e| pylon_http::DataError {
395 code: "CRDT_APPLY_FAILED".into(),
396 message: format!("crdt write {entity}/{id}: {e}"),
397 })?;
398
399 // Bake the id back into the row so PgTxStore's tx_insert
400 // uses it instead of generating a fresh one.
401 let mut row = data.clone();
402 if let Some(obj) = row.as_object_mut() {
403 obj.insert("id".into(), serde_json::Value::String(id.clone()));
404 }
405 Ok(Some(row))
406 }
407
408 fn before_update(
409 &self,
410 tx: &mut postgres::Transaction<'_>,
411 entity: &str,
412 id: &str,
413 data: &serde_json::Value,
414 ) -> Result<(), pylon_http::DataError> {
415 let ent = self
416 .manifest
417 .entities
418 .iter()
419 .find(|e| e.name == entity)
420 .ok_or_else(|| pylon_http::DataError {
421 code: "ENTITY_NOT_FOUND".into(),
422 message: format!("Unknown entity: {entity}"),
423 })?;
424 let crdt_fields = crdt_fields_for(ent)?;
425 self.crdt
426 .apply_patch(tx, entity, id, &crdt_fields, data)
427 .map(|_| ())
428 .map_err(|e| pylon_http::DataError {
429 code: "CRDT_APPLY_FAILED".into(),
430 message: format!("crdt update {entity}/{id}: {e}"),
431 })
432 }
433
434 fn before_delete(
435 &self,
436 tx: &mut postgres::Transaction<'_>,
437 entity: &str,
438 id: &str,
439 ) -> Result<(), pylon_http::DataError> {
440 // Drop the sidecar row inside the same tx; runtime evicts
441 // cache entry on commit via after_commit/on_rollback.
442 tx.execute(
443 "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
444 &[&entity, &id],
445 )
446 .map(|_| ())
447 .map_err(|e| pylon_http::DataError {
448 code: "CRDT_SIDECAR_DELETE_FAILED".into(),
449 message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
450 })
451 }
452
453 fn after_commit(&self, entity: &str, id: &str) {
454 // Refresh cache via a fresh client connection. Can't pass
455 // the tx in here since it's already committed and dropped.
456 // The cache_after_commit method on PgLoroStore expects a
457 // PgConn — we don't have one here. Simplest: evict so the
458 // next read re-hydrates from the persisted snapshot. This
459 // is correct (just one extra round-trip for the next read);
460 // the alternative would require the runtime to hand us a
461 // fresh client which is more plumbing for marginal benefit.
462 self.crdt.evict(entity, id);
463 }
464
465 fn on_rollback(&self, entity: &str, id: &str) {
466 // Rolled-back tx: the in-memory doc may have been mutated
467 // in place by apply_patch. Evict to force re-hydration from
468 // the (unchanged) persisted snapshot.
469 self.crdt.evict(entity, id);
470 }
471}
472
473/// Resolve the CRDT field shape for an entity. Same logic as
474/// `Runtime::crdt_fields_for` but without the runtime borrow — the
475/// hook lives across the storage/runtime boundary and only needs
476/// the manifest. Returns `Err` if any field's CRDT annotation is
477/// invalid, matching `Runtime::crdt_fields_for`'s strict behavior:
478/// silently dropping an invalid field would commit the SQL row
479/// while omitting that field from the snapshot — exactly the
480/// sidecar/row divergence we're trying to prevent. Codex flagged.
481fn crdt_fields_for(
482 ent: &pylon_kernel::ManifestEntity,
483) -> Result<Vec<pylon_crdt::CrdtField>, pylon_http::DataError> {
484 let mut out = Vec::with_capacity(ent.fields.len());
485 for f in &ent.fields {
486 if f.name == "id" {
487 continue;
488 }
489 let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| {
490 pylon_http::DataError {
491 code: "INVALID_CRDT_FIELD".into(),
492 message: format!(
493 "{}.{}: {e} (declared type={}, crdt={:?})",
494 ent.name, f.name, f.field_type, f.crdt
495 ),
496 }
497 })?;
498 out.push(pylon_crdt::CrdtField {
499 name: f.name.clone(),
500 kind,
501 });
502 }
503 Ok(out)
504}