use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use pylon_crdt::{
apply_patch, apply_update as crdt_apply_update, encode_snapshot, encode_update_since,
loro::{LoroDoc, VersionVector},
project_doc_to_json, CrdtField,
};
use rusqlite::{params, Connection};
use serde_json::Value;
pub const CREATE_SIDECAR_SQL: &str = "
CREATE TABLE IF NOT EXISTS _pylon_crdt_snapshots (
entity TEXT NOT NULL,
row_id TEXT NOT NULL,
snapshot BLOB NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (entity, row_id)
)
";
pub fn ensure_sidecar(conn: &Connection) -> Result<(), LoroStoreError> {
conn.execute(CREATE_SIDECAR_SQL, [])
.map(|_| ())
.map_err(|e| LoroStoreError::Storage(format!("create sidecar: {e}")))
}
#[derive(Debug)]
pub enum LoroStoreError {
Apply(String),
Storage(String),
Decode(String),
}
impl std::fmt::Display for LoroStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Apply(m) => write!(f, "apply: {m}"),
Self::Storage(m) => write!(f, "storage: {m}"),
Self::Decode(m) => write!(f, "decode: {m}"),
}
}
}
impl std::error::Error for LoroStoreError {}
impl From<pylon_http::DataError> for LoroStoreError {
fn from(e: pylon_http::DataError) -> Self {
LoroStoreError::Storage(format!("[{}] {}", e.code, e.message))
}
}
#[derive(Default)]
pub struct LoroStore {
docs: Mutex<HashMap<(String, String), Arc<Mutex<LoroDoc>>>>,
}
impl LoroStore {
pub fn new() -> Self {
Self::default()
}
fn get_or_hydrate(
&self,
conn: &Connection,
entity: &str,
row_id: &str,
) -> Result<Arc<Mutex<LoroDoc>>, LoroStoreError> {
let key = (entity.to_string(), row_id.to_string());
{
let guard = self.docs.lock().unwrap();
if let Some(doc) = guard.get(&key) {
return Ok(Arc::clone(doc));
}
}
let snapshot: Option<Vec<u8>> = conn
.query_row(
"SELECT snapshot FROM _pylon_crdt_snapshots WHERE entity = ?1 AND row_id = ?2",
params![entity, row_id],
|r| r.get(0),
)
.map(Some)
.or_else(|e| {
if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
Ok(None)
} else {
Err(LoroStoreError::Storage(format!("read snapshot: {e}")))
}
})?;
let doc = LoroDoc::new();
if let Some(bytes) = snapshot {
crdt_apply_update(&doc, &bytes).map_err(LoroStoreError::Decode)?;
}
let handle = Arc::new(Mutex::new(doc));
let mut guard = self.docs.lock().unwrap();
let entry = guard.entry(key).or_insert_with(|| Arc::clone(&handle));
Ok(Arc::clone(entry))
}
fn persist_snapshot(
&self,
conn: &Connection,
entity: &str,
row_id: &str,
doc: &LoroDoc,
) -> Result<(), LoroStoreError> {
let snap = encode_snapshot(doc);
let now = chrono_now_iso();
conn.execute(
"INSERT OR REPLACE INTO _pylon_crdt_snapshots
(entity, row_id, snapshot, updated_at)
VALUES (?1, ?2, ?3, ?4)",
params![entity, row_id, snap, now],
)
.map(|_| ())
.map_err(|e| LoroStoreError::Storage(format!("persist snapshot: {e}")))
}
pub fn apply_patch(
&self,
conn: &Connection,
entity: &str,
row_id: &str,
fields: &[CrdtField],
patch: &Value,
) -> Result<Value, LoroStoreError> {
let handle = self.get_or_hydrate(conn, entity, row_id)?;
let projected = {
let doc = handle.lock().unwrap();
apply_patch(&doc, fields, patch).map_err(LoroStoreError::Apply)?;
self.persist_snapshot(conn, entity, row_id, &doc)?;
project_doc_to_json(&doc, fields)
};
Ok(projected)
}
pub fn apply_remote_update(
&self,
conn: &Connection,
entity: &str,
row_id: &str,
fields: &[CrdtField],
update: &[u8],
) -> Result<Value, LoroStoreError> {
let handle = self.get_or_hydrate(conn, entity, row_id)?;
let projected = {
let doc = handle.lock().unwrap();
crdt_apply_update(&doc, update).map_err(LoroStoreError::Decode)?;
self.persist_snapshot(conn, entity, row_id, &doc)?;
project_doc_to_json(&doc, fields)
};
Ok(projected)
}
pub fn snapshot(
&self,
conn: &Connection,
entity: &str,
row_id: &str,
) -> Result<Vec<u8>, LoroStoreError> {
let handle = self.get_or_hydrate(conn, entity, row_id)?;
let doc = handle.lock().unwrap();
Ok(encode_snapshot(&doc))
}
pub fn update_since(
&self,
conn: &Connection,
entity: &str,
row_id: &str,
since: &VersionVector,
) -> Result<Vec<u8>, LoroStoreError> {
let handle = self.get_or_hydrate(conn, entity, row_id)?;
let doc = handle.lock().unwrap();
Ok(encode_update_since(&doc, since))
}
pub fn evict(&self, entity: &str, row_id: &str) {
self.docs
.lock()
.unwrap()
.remove(&(entity.to_string(), row_id.to_string()));
}
pub fn cached_rows(&self) -> usize {
self.docs.lock().unwrap().len()
}
}
fn chrono_now_iso() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
format!("{}Z", secs)
}
#[cfg(test)]
mod tests {
use super::*;
use pylon_crdt::CrdtFieldKind;
fn open_test_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
ensure_sidecar(&conn).unwrap();
conn
}
fn fields() -> Vec<CrdtField> {
vec![
CrdtField {
name: "title".into(),
kind: CrdtFieldKind::LwwString,
},
CrdtField {
name: "body".into(),
kind: CrdtFieldKind::Text,
},
CrdtField {
name: "qty".into(),
kind: CrdtFieldKind::LwwNumber,
},
]
}
#[test]
fn sidecar_is_idempotent() {
let conn = open_test_db();
ensure_sidecar(&conn).unwrap(); }
#[test]
fn apply_patch_persists_and_projects() {
let conn = open_test_db();
let store = LoroStore::new();
let projected = store
.apply_patch(
&conn,
"Note",
"n1",
&fields(),
&serde_json::json!({"title": "Hello", "body": "world", "qty": 7}),
)
.unwrap();
assert_eq!(projected["title"], "Hello");
assert_eq!(projected["body"], "world");
assert_eq!(projected["qty"], 7.0);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity='Note' AND row_id='n1'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 1);
}
#[test]
fn second_open_hydrates_from_sidecar() {
let conn = open_test_db();
let store = LoroStore::new();
store
.apply_patch(
&conn,
"Note",
"n1",
&fields(),
&serde_json::json!({"title": "A", "qty": 1}),
)
.unwrap();
store.evict("Note", "n1");
assert_eq!(store.cached_rows(), 0);
let snap = store.snapshot(&conn, "Note", "n1").unwrap();
assert!(
!snap.is_empty(),
"snapshot should be non-empty after writes"
);
assert_eq!(store.cached_rows(), 1, "snapshot() rehydrated the cache");
}
#[test]
fn empty_row_yields_empty_snapshot() {
let conn = open_test_db();
let store = LoroStore::new();
let snap = store.snapshot(&conn, "Note", "missing").unwrap();
let store2 = LoroStore::new();
store2
.apply_remote_update(&conn, "Note", "missing", &fields(), &snap)
.unwrap();
}
#[test]
fn remote_update_merges_with_local_state() {
let conn = open_test_db();
let server = LoroStore::new();
server
.apply_patch(
&conn,
"Note",
"n1",
&fields(),
&serde_json::json!({"title": "A", "qty": 1}),
)
.unwrap();
let server_snap = server.snapshot(&conn, "Note", "n1").unwrap();
let conn2 = open_test_db();
let peer = LoroStore::new();
peer.apply_remote_update(&conn2, "Note", "n1", &fields(), &server_snap)
.unwrap();
peer.apply_patch(
&conn2,
"Note",
"n1",
&fields(),
&serde_json::json!({"qty": 2}),
)
.unwrap();
let peer_update = peer.snapshot(&conn2, "Note", "n1").unwrap();
let projected = server
.apply_remote_update(&conn, "Note", "n1", &fields(), &peer_update)
.unwrap();
assert_eq!(projected["title"], "A");
assert_eq!(projected["qty"], 2.0);
}
#[test]
fn concurrent_text_writes_converge() {
let conn_a = open_test_db();
let conn_b = open_test_db();
let a = LoroStore::new();
let b = LoroStore::new();
a.apply_patch(
&conn_a,
"Note",
"n1",
&fields(),
&serde_json::json!({"body": "from-a"}),
)
.unwrap();
b.apply_patch(
&conn_b,
"Note",
"n1",
&fields(),
&serde_json::json!({"body": "from-b"}),
)
.unwrap();
let snap_a = a.snapshot(&conn_a, "Note", "n1").unwrap();
let snap_b = b.snapshot(&conn_b, "Note", "n1").unwrap();
let projected_a = a
.apply_remote_update(&conn_a, "Note", "n1", &fields(), &snap_b)
.unwrap();
let projected_b = b
.apply_remote_update(&conn_b, "Note", "n1", &fields(), &snap_a)
.unwrap();
assert_eq!(projected_a, projected_b);
let body = projected_a["body"].as_str().unwrap();
assert!(!body.is_empty(), "body should contain merged text");
}
#[test]
fn incremental_update_carries_only_delta() {
let conn = open_test_db();
let store = LoroStore::new();
store
.apply_patch(
&conn,
"Note",
"n1",
&fields(),
&serde_json::json!({"title": "v1", "qty": 1}),
)
.unwrap();
let early_vv = {
let handle = store.get_or_hydrate(&conn, "Note", "n1").unwrap();
let vv = handle.lock().unwrap().oplog_vv();
vv
};
let snap_full = store.snapshot(&conn, "Note", "n1").unwrap();
store
.apply_patch(
&conn,
"Note",
"n1",
&fields(),
&serde_json::json!({"qty": 7}),
)
.unwrap();
let delta = store.update_since(&conn, "Note", "n1", &early_vv).unwrap();
assert!(
delta.len() < snap_full.len(),
"incremental delta ({}) must be smaller than full snapshot ({})",
delta.len(),
snap_full.len()
);
}
#[test]
fn cache_keeps_distinct_rows_separate() {
let conn = open_test_db();
let store = LoroStore::new();
store
.apply_patch(
&conn,
"Note",
"n1",
&fields(),
&serde_json::json!({"title": "first"}),
)
.unwrap();
store
.apply_patch(
&conn,
"Note",
"n2",
&fields(),
&serde_json::json!({"title": "second"}),
)
.unwrap();
assert_eq!(store.cached_rows(), 2);
let p1 = store
.apply_patch(&conn, "Note", "n1", &fields(), &serde_json::json!({}))
.unwrap();
let p2 = store
.apply_patch(&conn, "Note", "n2", &fields(), &serde_json::json!({}))
.unwrap();
assert_eq!(p1["title"], "first");
assert_eq!(p2["title"], "second");
}
}