pylon_runtime/pg_loro_store.rs
1//! Postgres-backed CRDT snapshot store.
2//!
3//! Mirrors `loro_store::LoroStore` (the SQLite path) but persists the
4//! per-row Loro snapshots into a PG `_pylon_crdt_snapshots` table
5//! instead of a SQLite sidecar. Cache shape, hydrate-on-miss, and
6//! locking model are identical so the CRDT semantics don't drift
7//! between backends.
8//!
9//! Every method is generic over `PgConn`, which is implemented for
10//! both `postgres::Client` and `postgres::Transaction`. That's what
11//! lets the runtime call `apply_patch` *inside* the same transaction
12//! that writes the materialized entity row + maintains the FTS
13//! shadow — sidecar write, entity write, and FTS write either all
14//! commit or all roll back, so a crash mid-write can't desync the
15//! CRDT snapshot from the materialized columns.
16
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex};
19
20use postgres::Client;
21use pylon_crdt::{
22 apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
23 loro::{LoroDoc, VersionVector},
24 project_doc_to_json, CrdtField,
25};
26use pylon_storage::pg_exec::PgConn;
27use serde_json::Value;
28
29use crate::loro_store::LoroStoreError;
30
31/// SQL to create the PG sidecar table. Idempotent — called every time
32/// the runtime opens a Postgres backend so a fresh database gets the
33/// table without a manual migration step.
34pub const CREATE_PG_SIDECAR_SQL: &str = "\
35CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (\
36 entity text NOT NULL,\
37 row_id text NOT NULL,\
38 snapshot bytea NOT NULL,\
39 updated_at timestamptz NOT NULL DEFAULT now(),\
40 PRIMARY KEY (entity, row_id)\
41)";
42
43pub fn ensure_sidecar(client: &mut Client) -> Result<(), LoroStoreError> {
44 client
45 .execute(CREATE_PG_SIDECAR_SQL, &[])
46 .map(|_| ())
47 .map_err(|e| LoroStoreError::Storage(format!("create pg sidecar: {e}")))
48}
49
50/// PG analogue of `LoroStore`. Lives on the Postgres-backed runtime;
51/// holds the per-row LoroDoc cache (mutated only behind the inner
52/// per-row Mutex) and persists snapshots to the PG sidecar table.
53#[derive(Default)]
54pub struct PgLoroStore {
55 docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
56}
57
58impl PgLoroStore {
59 pub fn new() -> Self {
60 Self::default()
61 }
62
63 /// Hydrate a doc for a CRDT write, taking a transaction-scoped
64 /// advisory lock keyed on (entity, row_id). The lock auto-releases
65 /// at COMMIT/ROLLBACK.
66 ///
67 /// Why advisory + not row-level: `SELECT ... FOR UPDATE` only
68 /// locks rows that exist. The very first CRDT write to a row has
69 /// no sidecar row to lock, so two replicas would both hydrate
70 /// empty docs and race the UPSERT. The advisory lock is keyed on
71 /// the (entity, row_id) hash and works whether the sidecar row
72 /// exists or not. Codex flagged this.
73 ///
74 /// Bypass the in-memory cache on the write path — the cache is
75 /// only safe to read across tx boundaries when every write goes
76 /// through ONE process. For multi-replica it's a foot-gun.
77 /// Re-decoding the snapshot per write is cheap (a few hundred µs
78 /// for a typical row) compared to the round-trip we already pay.
79 fn hydrate_for_write<C: PgConn>(
80 conn: &mut C,
81 entity: &str,
82 row_id: &str,
83 ) -> Result<LoroDoc, LoroStoreError> {
84 // pg_advisory_xact_lock(key1, key2) — two-key form fits the
85 // (entity, row_id) tuple naturally. We hash each side into an
86 // i32 so the same logical row maps to the same lock across
87 // processes. Released automatically at tx end.
88 let entity_key = pg_advisory_key(entity);
89 let row_key = pg_advisory_key(row_id);
90 conn.execute(
91 "SELECT pg_advisory_xact_lock($1::int, $2::int)",
92 &[&entity_key, &row_key],
93 )
94 .map_err(|e| LoroStoreError::Storage(format!("crdt advisory lock: {e}")))?;
95
96 let snapshot: Option<Vec<u8>> = conn
97 .query_opt(
98 "SELECT snapshot FROM _pylon_crdt_snapshots \
99 WHERE entity = $1 AND row_id = $2",
100 &[&entity, &row_id],
101 )
102 .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
103 .map(|r| r.get::<_, Vec<u8>>(0));
104
105 let doc = LoroDoc::new();
106 if let Some(bytes) = snapshot {
107 crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
108 }
109 Ok(doc)
110 }
111}
112
113/// Hash a string into an i32 suitable for `pg_advisory_xact_lock`.
114/// PG's two-key advisory lock form takes int4 args; we use SipHash
115/// (the std hasher) and truncate to 32 bits. Collisions are
116/// possible but the worst outcome is two unrelated rows blocking
117/// each other briefly — never correctness loss.
118fn pg_advisory_key(s: &str) -> i32 {
119 use std::hash::{Hash, Hasher};
120 let mut hasher = std::collections::hash_map::DefaultHasher::new();
121 s.hash(&mut hasher);
122 let h = hasher.finish();
123 // Take the low 32 bits and reinterpret as i32 — PG accepts the
124 // full int4 range. Using `as i32` would panic-truncate; `as u32
125 // as i32` round-trips through the bit pattern.
126 (h as u32) as i32
127}
128
129impl PgLoroStore {
130 /// Read-only hydrate — no FOR UPDATE lock. Used by `snapshot()`
131 /// and `update_since()` which don't mutate. Hits the in-memory
132 /// cache on a hit so repeated reads of the same row don't pay
133 /// the decode cost.
134 fn get_or_hydrate_read<C: PgConn>(
135 &self,
136 conn: &mut C,
137 entity: &str,
138 row_id: &str,
139 ) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
140 let key = (entity.to_string(), row_id.to_string());
141
142 // Fast path: already cached.
143 {
144 let guard = self.docs.lock().unwrap();
145 if let Some(doc) = guard.get(&key) {
146 return Ok(Arc::clone(doc));
147 }
148 }
149
150 let snapshot: Option<Vec<u8>> = conn
151 .query_opt(
152 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
153 &[&entity, &row_id],
154 )
155 .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
156 .map(|r| r.get::<_, Vec<u8>>(0));
157
158 let doc = LoroDoc::new();
159 if let Some(bytes) = snapshot {
160 crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
161 }
162 let handle = Arc::new(Mutex::new(doc));
163
164 let mut guard = self.docs.lock().unwrap();
165 let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
166 Ok(Arc::clone(entry))
167 }
168
169 /// Persist the current snapshot via UPSERT. Called after every
170 /// apply.
171 fn persist_snapshot<C: PgConn>(
172 conn: &mut C,
173 entity: &str,
174 row_id: &str,
175 doc: &LoroDoc,
176 ) -> Result<(), LoroStoreError> {
177 let snap = encode_snapshot(doc);
178 conn.execute(
179 "INSERT INTO _pylon_crdt_snapshots (entity, row_id, snapshot, updated_at) \
180 VALUES ($1, $2, $3, now()) \
181 ON CONFLICT (entity, row_id) DO UPDATE \
182 SET snapshot = EXCLUDED.snapshot, updated_at = EXCLUDED.updated_at",
183 &[&entity, &row_id, &snap],
184 )
185 .map(|_| ())
186 .map_err(|e| LoroStoreError::Storage(format!("persist pg snapshot: {e}")))
187 }
188
189 /// Apply a JSON patch, persist the new snapshot, return the
190 /// projected JSON. Caller is responsible for materializing the
191 /// projected JSON into the entity row — typically done in the
192 /// same `with_transaction_raw` so both writes share BEGIN/COMMIT.
193 ///
194 /// Multi-replica safe: hydrates with `SELECT ... FOR UPDATE`,
195 /// which serializes concurrent updates to the same row across
196 /// processes. Bypasses the in-memory cache on the write path —
197 /// the cache only updates after commit (see `cache_after_commit`),
198 /// so a stale cache from a different process can't shadow the
199 /// row-locked snapshot we just read.
200 pub fn apply_patch<C: PgConn>(
201 &self,
202 conn: &mut C,
203 entity: &str,
204 row_id: &str,
205 fields: &[CrdtField],
206 patch: &Value,
207 ) -> Result<Value, LoroStoreError> {
208 let doc = Self::hydrate_for_write(conn, entity, row_id)?;
209 apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
210 Self::persist_snapshot(conn, entity, row_id, &doc)?;
211 let projected = project_doc_to_json(&doc, fields);
212 // Cache update happens through `cache_after_commit` from the
213 // runtime layer once the surrounding tx commits. If the tx
214 // rolls back, no cache write happens — so the next read
215 // hydrates from the (unchanged) sidecar.
216 Ok(projected)
217 }
218
219 /// Apply a binary update from a peer. Returns the projected JSON
220 /// for re-materialization on the entity row. Same locking shape
221 /// as `apply_patch`.
222 pub fn apply_remote_update<C: PgConn>(
223 &self,
224 conn: &mut C,
225 entity: &str,
226 row_id: &str,
227 fields: &[CrdtField],
228 update: &[u8],
229 ) -> Result<Value, LoroStoreError> {
230 let doc = Self::hydrate_for_write(conn, entity, row_id)?;
231 crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
232 Self::persist_snapshot(conn, entity, row_id, &doc)?;
233 let projected = project_doc_to_json(&doc, fields);
234 Ok(projected)
235 }
236
237 /// Full snapshot for the row. Returns the encoded LoroDoc bytes
238 /// (empty doc if the row hasn't been written yet — same shape as
239 /// the SQLite path). Read-only, no FOR UPDATE.
240 pub fn snapshot<C: PgConn>(
241 &self,
242 conn: &mut C,
243 entity: &str,
244 row_id: &str,
245 ) -> Result<Vec<u8>, LoroStoreError> {
246 let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
247 let doc = handle.lock().unwrap();
248 Ok(encode_snapshot(&doc))
249 }
250
251 /// Incremental update since `since` for catch-up. Same shape as
252 /// the SQLite path's `update_since`.
253 pub fn update_since<C: PgConn>(
254 &self,
255 conn: &mut C,
256 entity: &str,
257 row_id: &str,
258 since: &VersionVector,
259 ) -> Result<Vec<u8>, LoroStoreError> {
260 let handle = self.get_or_hydrate_read(conn, entity, row_id)?;
261 let doc = handle.lock().unwrap();
262 Ok(encode_update_since(&doc, since))
263 }
264
265 /// Read the snapshot bytes directly through the supplied
266 /// connection, bypassing the in-memory cache. Used by the
267 /// crdt_apply_update path: the cache may hold stale bytes from a
268 /// prior read (snapshot() populates it), and we need the bytes
269 /// that *just* committed to land in the broadcast.
270 pub fn read_snapshot_via_conn<C: PgConn>(
271 conn: &mut C,
272 entity: &str,
273 row_id: &str,
274 ) -> Result<Vec<u8>, LoroStoreError> {
275 let snap: Option<Vec<u8>> = conn
276 .query_opt(
277 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
278 &[&entity, &row_id],
279 )
280 .map_err(|e| LoroStoreError::Storage(format!("read pg snapshot: {e}")))?
281 .map(|r| r.get::<_, Vec<u8>>(0));
282 let bytes = snap.unwrap_or_default();
283 // If the row exists, return its bytes verbatim. If it
284 // doesn't, return an encoded empty doc so the broadcast
285 // shape stays consistent with the SQLite path.
286 if bytes.is_empty() {
287 let doc = LoroDoc::new();
288 Ok(encode_snapshot(&doc))
289 } else {
290 Ok(bytes)
291 }
292 }
293
294 /// Refresh the in-memory cache entry for a row from the
295 /// just-committed sidecar bytes. Called by the runtime layer
296 /// after `with_transaction_raw` commits the CRDT write — this
297 /// way the cache only ever reflects what's on disk, and a
298 /// rolled-back tx leaves no cache poison.
299 ///
300 /// On any read error we evict instead of caching stale state.
301 pub fn cache_after_commit<C: PgConn>(&self, conn: &mut C, entity: &str, row_id: &str) {
302 let snap_result = conn.query_opt(
303 "SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
304 &[&entity, &row_id],
305 );
306 let bytes = match snap_result {
307 Ok(Some(row)) => row.get::<_, Vec<u8>>(0),
308 _ => {
309 self.evict(entity, row_id);
310 return;
311 }
312 };
313 let doc = LoroDoc::new();
314 if crdt_apply_update(&doc, &bytes).is_err() {
315 self.evict(entity, row_id);
316 return;
317 }
318 let handle = Arc::new(Mutex::new(doc));
319 let mut guard = self.docs.lock().unwrap();
320 guard.insert((entity.to_string(), row_id.to_string()), handle);
321 }
322
323 /// Drop a row's cached doc. Next access re-hydrates from the PG
324 /// sidecar.
325 pub fn evict(&self, entity: &str, row_id: &str) {
326 self.docs
327 .lock()
328 .unwrap()
329 .remove(&(entity.to_string(), row_id.to_string()));
330 }
331
332 /// Diagnostic — number of rows cached in memory.
333 pub fn cached_rows(&self) -> usize {
334 self.docs.lock().unwrap().len()
335 }
336}
337
338// ---------------------------------------------------------------------------
339// PgCrdtHook impl — bridges pylon-storage's PgTxStore to PgLoroStore so
340// TS-mutation `ctx.db.X` calls maintain the CRDT sidecar in the same tx.
341// ---------------------------------------------------------------------------
342
343use pylon_kernel::AppManifest;
344use pylon_storage::pg_tx_store::PgCrdtHook;
345
346/// Bridge struct that lets PgTxStore (in pylon-storage) call back
347/// into the runtime's CRDT machinery without a direct dependency.
348/// Lives only for the duration of a single mutation tx.
349pub struct PgCrdtHookImpl {
350 /// Reference to the runtime's PgLoroStore. `Arc` so the trait
351 /// object can be cloned across the storage / runtime boundary.
352 pub crdt: std::sync::Arc<PgLoroStore>,
353 /// Shared with the runtime so we can resolve the field-shape
354 /// for each CRDT entity (which Loro types each field uses).
355 pub manifest: std::sync::Arc<AppManifest>,
356}
357
358impl PgCrdtHook for PgCrdtHookImpl {
359 fn before_insert(
360 &self,
361 tx: &mut postgres::Transaction<'_>,
362 entity: &str,
363 data: &serde_json::Value,
364 ) -> Result<Option<serde_json::Value>, pylon_http::DataError> {
365 let ent = self
366 .manifest
367 .entities
368 .iter()
369 .find(|e| e.name == entity)
370 .ok_or_else(|| pylon_http::DataError {
371 code: "ENTITY_NOT_FOUND".into(),
372 message: format!("Unknown entity: {entity}"),
373 })?;
374 let crdt_fields = crdt_fields_for(ent)?;
375
376 // If the caller supplied an `id`, reuse it as the snapshot
377 // key so the materialized row and the sidecar stay aligned.
378 // Otherwise generate one and inject it back into the data
379 // (build_insert_sql honors `data["id"]`).
380 let id = data
381 .get("id")
382 .and_then(|v| v.as_str())
383 .map(|s| s.to_string())
384 .unwrap_or_else(crate::generate_id);
385
386 self.crdt
387 .apply_patch(tx, entity, &id, &crdt_fields, data)
388 .map_err(|e| pylon_http::DataError {
389 code: "CRDT_APPLY_FAILED".into(),
390 message: format!("crdt write {entity}/{id}: {e}"),
391 })?;
392
393 // Bake the id back into the row so PgTxStore's tx_insert
394 // uses it instead of generating a fresh one.
395 let mut row = data.clone();
396 if let Some(obj) = row.as_object_mut() {
397 obj.insert("id".into(), serde_json::Value::String(id.clone()));
398 }
399 Ok(Some(row))
400 }
401
402 fn before_update(
403 &self,
404 tx: &mut postgres::Transaction<'_>,
405 entity: &str,
406 id: &str,
407 data: &serde_json::Value,
408 ) -> Result<(), pylon_http::DataError> {
409 let ent = self
410 .manifest
411 .entities
412 .iter()
413 .find(|e| e.name == entity)
414 .ok_or_else(|| pylon_http::DataError {
415 code: "ENTITY_NOT_FOUND".into(),
416 message: format!("Unknown entity: {entity}"),
417 })?;
418 let crdt_fields = crdt_fields_for(ent)?;
419 self.crdt
420 .apply_patch(tx, entity, id, &crdt_fields, data)
421 .map(|_| ())
422 .map_err(|e| pylon_http::DataError {
423 code: "CRDT_APPLY_FAILED".into(),
424 message: format!("crdt update {entity}/{id}: {e}"),
425 })
426 }
427
428 fn before_delete(
429 &self,
430 tx: &mut postgres::Transaction<'_>,
431 entity: &str,
432 id: &str,
433 ) -> Result<(), pylon_http::DataError> {
434 // Drop the sidecar row inside the same tx; runtime evicts
435 // cache entry on commit via after_commit/on_rollback.
436 tx.execute(
437 "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
438 &[&entity, &id],
439 )
440 .map(|_| ())
441 .map_err(|e| pylon_http::DataError {
442 code: "CRDT_SIDECAR_DELETE_FAILED".into(),
443 message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
444 })
445 }
446
447 fn after_commit(&self, entity: &str, id: &str) {
448 // Refresh cache via a fresh client connection. Can't pass
449 // the tx in here since it's already committed and dropped.
450 // The cache_after_commit method on PgLoroStore expects a
451 // PgConn — we don't have one here. Simplest: evict so the
452 // next read re-hydrates from the persisted snapshot. This
453 // is correct (just one extra round-trip for the next read);
454 // the alternative would require the runtime to hand us a
455 // fresh client which is more plumbing for marginal benefit.
456 self.crdt.evict(entity, id);
457 }
458
459 fn on_rollback(&self, entity: &str, id: &str) {
460 // Rolled-back tx: the in-memory doc may have been mutated
461 // in place by apply_patch. Evict to force re-hydration from
462 // the (unchanged) persisted snapshot.
463 self.crdt.evict(entity, id);
464 }
465}
466
467/// Resolve the CRDT field shape for an entity. Same logic as
468/// `Runtime::crdt_fields_for` but without the runtime borrow — the
469/// hook lives across the storage/runtime boundary and only needs
470/// the manifest. Returns `Err` if any field's CRDT annotation is
471/// invalid, matching `Runtime::crdt_fields_for`'s strict behavior:
472/// silently dropping an invalid field would commit the SQL row
473/// while omitting that field from the snapshot — exactly the
474/// sidecar/row divergence we're trying to prevent. Codex flagged.
475fn crdt_fields_for(
476 ent: &pylon_kernel::ManifestEntity,
477) -> Result<Vec<pylon_crdt::CrdtField>, pylon_http::DataError> {
478 let mut out = Vec::with_capacity(ent.fields.len());
479 for f in &ent.fields {
480 if f.name == "id" {
481 continue;
482 }
483 let kind =
484 pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| pylon_http::DataError {
485 code: "INVALID_CRDT_FIELD".into(),
486 message: format!(
487 "{}.{}: {e} (declared type={}, crdt={:?})",
488 ent.name, f.name, f.field_type, f.crdt
489 ),
490 })?;
491 out.push(pylon_crdt::CrdtField {
492 name: f.name.clone(),
493 kind,
494 });
495 }
496 Ok(out)
497}