pluresdb_core/
lib.rs

1//! Core data structures, CRDT logic, and domain models that power PluresDB.
2//!
3//! The goal of this crate is to offer a lightweight, dependency-free-on-FFI
4//! foundation that can be reused across the native CLI, the Node addon, and
5//! any future host integrations.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::{DateTime, Utc};
13use dashmap::DashMap;
14use parking_lot::Mutex;
15use rusqlite::types::{Value as SqliteValue, ValueRef};
16use rusqlite::{params_from_iter, Connection, OpenFlags, Transaction};
17use serde::{Deserialize, Serialize};
18use serde_json::{json, Value as JsonValue};
19use thiserror::Error;
20use tracing::debug;
21use uuid::Uuid;
22
23/// Unique identifier for a stored node.
24pub type NodeId = String;
25
26/// Logical actor identifier used when merging CRDT updates.
27pub type ActorId = String;
28
29/// A key-value map of logical clocks per actor.
30pub type VectorClock = HashMap<ActorId, u64>;
31
32/// Arbitrary JSON payload that callers persist inside PluresDB.
33pub type NodeData = JsonValue;
34
35/// Metadata associated with a persisted node in the CRDT store.
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub struct NodeRecord {
38    pub id: NodeId,
39    pub data: NodeData,
40    pub clock: VectorClock,
41    pub timestamp: DateTime<Utc>,
42}
43
44impl NodeRecord {
45    /// Creates a new node record with a fresh logical clock entry for the actor.
46    pub fn new(id: NodeId, actor: impl Into<ActorId>, data: NodeData) -> Self {
47        let actor = actor.into();
48        let mut clock = VectorClock::default();
49        clock.insert(actor.clone(), 1);
50        Self {
51            id,
52            data,
53            clock,
54            timestamp: Utc::now(),
55        }
56    }
57
58    /// Increments the logical clock for the given actor and updates the payload.
59    pub fn merge_update(&mut self, actor: impl Into<ActorId>, data: NodeData) {
60        let actor = actor.into();
61        let counter = self.clock.entry(actor).or_insert(0);
62        *counter += 1;
63        self.timestamp = Utc::now();
64        self.data = data;
65    }
66}
67
68/// Errors that can be produced by the CRDT store.
69#[derive(Debug, Error)]
70pub enum StoreError {
71    #[error("node not found: {0}")]
72    NotFound(NodeId),
73}
74
75/// CRDT operations that clients may apply to the store.
76#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77pub enum CrdtOperation {
78    Put {
79        id: NodeId,
80        actor: ActorId,
81        data: NodeData,
82    },
83    Delete {
84        id: NodeId,
85    },
86}
87
88/// A simple conflict-free replicated data store backed by a concurrent map.
89#[derive(Debug, Default)]
90pub struct CrdtStore {
91    nodes: DashMap<NodeId, NodeRecord>,
92}
93
94impl CrdtStore {
95    /// Inserts or updates a node using CRDT semantics.
96    pub fn put(&self, id: impl Into<NodeId>, actor: impl Into<ActorId>, data: NodeData) -> NodeId {
97        let id = id.into();
98        let actor = actor.into();
99        self.nodes
100            .entry(id.clone())
101            .and_modify(|record| record.merge_update(actor.clone(), data.clone()))
102            .or_insert_with(|| NodeRecord::new(id.clone(), actor, data));
103        id
104    }
105
106    /// Removes a node from the store.
107    pub fn delete(&self, id: impl AsRef<str>) -> Result<(), StoreError> {
108        self.nodes
109            .remove(id.as_ref())
110            .map(|_| ())
111            .ok_or_else(|| StoreError::NotFound(id.as_ref().to_owned()))
112    }
113
114    /// Fetches a node by identifier.
115    pub fn get(&self, id: impl AsRef<str>) -> Option<NodeRecord> {
116        self.nodes
117            .get(id.as_ref())
118            .map(|entry| entry.value().clone())
119    }
120
121    /// Lists all nodes currently stored.
122    pub fn list(&self) -> Vec<NodeRecord> {
123        self.nodes
124            .iter()
125            .map(|entry| entry.value().clone())
126            .collect()
127    }
128
129    /// Applies a CRDT operation, returning the resulting node identifier when relevant.
130    pub fn apply(&self, op: CrdtOperation) -> Result<Option<NodeId>, StoreError> {
131        match op {
132            CrdtOperation::Put { id, actor, data } => Ok(Some(self.put(id, actor, data))),
133            CrdtOperation::Delete { id } => {
134                self.delete(&id)?;
135                Ok(None)
136            }
137        }
138    }
139
140    /// Generates a CRDT operation representing the provided node data.
141    pub fn operation_for(
142        &self,
143        actor: impl Into<ActorId>,
144        data: NodeData,
145    ) -> (NodeId, CrdtOperation) {
146        let id = Uuid::new_v4().to_string();
147        let op = CrdtOperation::Put {
148            id: id.clone(),
149            actor: actor.into(),
150            data,
151        };
152        (id, op)
153    }
154}
155
156/// Primitive SQLite values returned by the native engine.
157#[derive(Debug, Clone, PartialEq)]
158pub enum SqlValue {
159    Null,
160    Integer(i64),
161    Real(f64),
162    Text(String),
163    Blob(Vec<u8>),
164}
165
166impl SqlValue {
167    pub fn as_i64(&self) -> Option<i64> {
168        if let Self::Integer(value) = self {
169            Some(*value)
170        } else {
171            None
172        }
173    }
174
175    pub fn as_f64(&self) -> Option<f64> {
176        if let Self::Real(value) = self {
177            Some(*value)
178        } else {
179            None
180        }
181    }
182
183    pub fn as_str(&self) -> Option<&str> {
184        if let Self::Text(value) = self {
185            Some(value.as_str())
186        } else {
187            None
188        }
189    }
190
191    pub fn as_blob(&self) -> Option<&[u8]> {
192        if let Self::Blob(value) = self {
193            Some(value.as_slice())
194        } else {
195            None
196        }
197    }
198
199    pub fn to_json(&self) -> JsonValue {
200        match self {
201            SqlValue::Null => JsonValue::Null,
202            SqlValue::Integer(value) => json!(value),
203            SqlValue::Real(value) => json!(value),
204            SqlValue::Text(value) => json!(value),
205            SqlValue::Blob(bytes) => json!(bytes),
206        }
207    }
208}
209
210#[derive(Debug, Clone, PartialEq)]
211pub struct QueryResult {
212    pub columns: Vec<String>,
213    pub rows: Vec<Vec<SqlValue>>,
214    pub changes: u64,
215    pub last_insert_rowid: i64,
216}
217
218impl QueryResult {
219    pub fn rows_as_maps(&self) -> Vec<HashMap<String, SqlValue>> {
220        self.rows
221            .iter()
222            .map(|row| {
223                let mut map = HashMap::new();
224                for (index, value) in row.iter().cloned().enumerate() {
225                    if let Some(column) = self.columns.get(index) {
226                        map.insert(column.clone(), value);
227                    }
228                }
229                map
230            })
231            .collect()
232    }
233
234    pub fn rows_as_json(&self) -> Vec<JsonValue> {
235        self.rows_as_maps()
236            .into_iter()
237            .map(|row| {
238                let json_object: HashMap<String, JsonValue> = row
239                    .into_iter()
240                    .map(|(key, value)| (key, value.to_json()))
241                    .collect();
242                json!(json_object)
243            })
244            .collect()
245    }
246}
247
248#[derive(Debug, Clone, PartialEq)]
249pub struct ExecutionResult {
250    pub changes: u64,
251    pub last_insert_rowid: i64,
252}
253
254#[derive(Debug, Clone, PartialEq)]
255pub enum DatabasePath {
256    InMemory,
257    File(PathBuf),
258}
259
260#[derive(Debug, Clone)]
261pub struct DatabaseOptions {
262    pub path: DatabasePath,
263    pub read_only: bool,
264    pub create_if_missing: bool,
265    pub apply_default_pragmas: bool,
266    pub custom_pragmas: Vec<(String, String)>,
267    pub busy_timeout: Option<Duration>,
268}
269
270impl Default for DatabaseOptions {
271    fn default() -> Self {
272        Self {
273            path: DatabasePath::InMemory,
274            read_only: false,
275            create_if_missing: true,
276            apply_default_pragmas: true,
277            custom_pragmas: Vec::new(),
278            busy_timeout: Some(Duration::from_millis(5_000)),
279        }
280    }
281}
282
283impl DatabaseOptions {
284    pub fn in_memory() -> Self {
285        Self::default()
286    }
287
288    pub fn with_file(path: impl Into<PathBuf>) -> Self {
289        Self {
290            path: DatabasePath::File(path.into()),
291            ..Default::default()
292        }
293    }
294
295    pub fn read_only(mut self, flag: bool) -> Self {
296        self.read_only = flag;
297        self
298    }
299
300    pub fn create_if_missing(mut self, flag: bool) -> Self {
301        self.create_if_missing = flag;
302        self
303    }
304
305    pub fn apply_default_pragmas(mut self, flag: bool) -> Self {
306        self.apply_default_pragmas = flag;
307        self
308    }
309
310    pub fn add_pragma(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
311        self.custom_pragmas.push((name.into(), value.into()));
312        self
313    }
314
315    pub fn busy_timeout(mut self, timeout: Option<Duration>) -> Self {
316        self.busy_timeout = timeout;
317        self
318    }
319}
320
321#[derive(Debug, Clone)]
322pub struct Database {
323    conn: Arc<Mutex<Connection>>,
324    path: DatabasePath,
325}
326
327#[derive(Debug, Error)]
328pub enum DatabaseError {
329    #[error("sqlite error: {0}")]
330    Sqlite(#[from] rusqlite::Error),
331}
332
333pub type DbResult<T> = Result<T, DatabaseError>;
334
335const DEFAULT_PRAGMAS: &[(&str, &str)] = &[
336    ("journal_mode", "WAL"),
337    ("synchronous", "NORMAL"),
338    ("temp_store", "MEMORY"),
339    ("mmap_size", "30000000000"),
340    ("page_size", "4096"),
341    ("cache_size", "-64000"),
342];
343
344impl Database {
345    pub fn open(options: DatabaseOptions) -> DbResult<Self> {
346        let connection = match &options.path {
347            DatabasePath::InMemory => Connection::open_in_memory()?,
348            DatabasePath::File(path) => {
349                Connection::open_with_flags(path, build_open_flags(&options))?
350            }
351        };
352
353        if let Some(timeout) = options.busy_timeout {
354            connection.busy_timeout(timeout)?;
355        }
356
357        if options.apply_default_pragmas {
358            apply_pragmas(&connection, DEFAULT_PRAGMAS);
359        }
360
361        if !options.custom_pragmas.is_empty() {
362            let custom: Vec<(&str, &str)> = options
363                .custom_pragmas
364                .iter()
365                .map(|(name, value)| (name.as_str(), value.as_str()))
366                .collect();
367            apply_pragmas(&connection, &custom);
368        }
369
370        Ok(Self {
371            conn: Arc::new(Mutex::new(connection)),
372            path: options.path,
373        })
374    }
375
376    pub fn path(&self) -> &DatabasePath {
377        &self.path
378    }
379
380    pub fn prepare(&self, sql: impl Into<String>) -> DbResult<Statement> {
381        Ok(Statement {
382            database: self.clone(),
383            sql: sql.into(),
384        })
385    }
386
387    pub fn exec(&self, sql: &str) -> DbResult<ExecutionResult> {
388        self.with_connection(|conn| {
389            conn.execute_batch(sql)?;
390            Ok(ExecutionResult {
391                changes: conn.changes() as u64,
392                last_insert_rowid: conn.last_insert_rowid(),
393            })
394        })
395    }
396
397    pub fn query(&self, sql: &str, params: &[SqlValue]) -> DbResult<QueryResult> {
398        Statement {
399            database: self.clone(),
400            sql: sql.to_owned(),
401        }
402        .query_internal(params)
403    }
404
405    pub fn pragma(&self, pragma: &str) -> DbResult<QueryResult> {
406        let normalized = if pragma.trim_start().to_lowercase().starts_with("pragma") {
407            pragma.trim().to_owned()
408        } else {
409            format!("PRAGMA {}", pragma)
410        };
411        self.query(&normalized, &[])
412    }
413
414    pub fn transaction<F, T>(&self, f: F) -> DbResult<T>
415    where
416        F: FnOnce(&Transaction<'_>) -> DbResult<T>,
417    {
418        self.with_connection(|conn| {
419            let tx = conn.transaction()?;
420            let result = f(&tx)?;
421            tx.commit()?;
422            Ok(result)
423        })
424    }
425
426    fn with_connection<T, F>(&self, f: F) -> DbResult<T>
427    where
428        F: FnOnce(&mut Connection) -> DbResult<T>,
429    {
430        let mut guard = self.conn.lock();
431        f(&mut guard)
432    }
433}
434
435#[derive(Debug, Clone)]
436pub struct Statement {
437    database: Database,
438    sql: String,
439}
440
441impl Statement {
442    pub fn sql(&self) -> &str {
443        &self.sql
444    }
445
446    pub fn run(&self, params: &[SqlValue]) -> DbResult<ExecutionResult> {
447        self.database.with_connection(|conn| {
448            let mut stmt = conn.prepare(&self.sql)?;
449            let values = params_to_values(params);
450            let changes = stmt.execute(params_from_iter(values.iter()))? as u64;
451            Ok(ExecutionResult {
452                changes,
453                last_insert_rowid: conn.last_insert_rowid(),
454            })
455        })
456    }
457
458    pub fn all(&self, params: &[SqlValue]) -> DbResult<QueryResult> {
459        self.query_internal(params)
460    }
461
462    pub fn get(&self, params: &[SqlValue]) -> DbResult<Option<HashMap<String, SqlValue>>> {
463        let result = self.query_internal(params)?;
464        Ok(result.rows_as_maps().into_iter().next())
465    }
466
467    pub fn columns(&self) -> DbResult<Vec<String>> {
468        self.database.with_connection(|conn| {
469            let stmt = conn.prepare(&self.sql)?;
470            Ok(stmt
471                .column_names()
472                .iter()
473                .map(|name| name.to_string())
474                .collect())
475        })
476    }
477
478    fn query_internal(&self, params: &[SqlValue]) -> DbResult<QueryResult> {
479        self.database.with_connection(|conn| {
480            let mut stmt = conn.prepare(&self.sql)?;
481            let columns = stmt
482                .column_names()
483                .iter()
484                .map(|name| name.to_string())
485                .collect::<Vec<_>>();
486            let values = params_to_values(params);
487            let column_count = columns.len();
488            let mut rows_iter = stmt.query(params_from_iter(values.iter()))?;
489            let mut rows = Vec::new();
490            while let Some(row) = rows_iter.next()? {
491                rows.push(read_row(&row, column_count)?);
492            }
493            Ok(QueryResult {
494                columns,
495                rows,
496                changes: conn.changes() as u64,
497                last_insert_rowid: conn.last_insert_rowid(),
498            })
499        })
500    }
501}
502
503fn build_open_flags(options: &DatabaseOptions) -> OpenFlags {
504    let mut flags = OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_NO_MUTEX;
505    if options.read_only {
506        flags |= OpenFlags::SQLITE_OPEN_READ_ONLY;
507    } else {
508        flags |= OpenFlags::SQLITE_OPEN_READ_WRITE;
509        if options.create_if_missing {
510            flags |= OpenFlags::SQLITE_OPEN_CREATE;
511        }
512    }
513    flags
514}
515
516fn apply_pragmas(connection: &Connection, pragmas: &[(&str, &str)]) {
517    for (name, value) in pragmas {
518        if let Err(error) = connection.pragma_update(None, name, value) {
519            debug!(pragma = %name, "failed to apply pragma: {error}");
520        }
521    }
522}
523
524fn params_to_values(params: &[SqlValue]) -> Vec<SqliteValue> {
525    params
526        .iter()
527        .map(|value| match value {
528            SqlValue::Null => SqliteValue::Null,
529            SqlValue::Integer(v) => SqliteValue::Integer(*v),
530            SqlValue::Real(v) => SqliteValue::Real(*v),
531            SqlValue::Text(v) => SqliteValue::Text(v.clone()),
532            SqlValue::Blob(v) => SqliteValue::Blob(v.clone()),
533        })
534        .collect()
535}
536
537fn read_row(row: &rusqlite::Row<'_>, column_count: usize) -> Result<Vec<SqlValue>, rusqlite::Error> {
538    let mut values = Vec::with_capacity(column_count);
539    for index in 0..column_count {
540        let value = match row.get_ref(index)? {
541            ValueRef::Null => SqlValue::Null,
542            ValueRef::Integer(v) => SqlValue::Integer(v),
543            ValueRef::Real(v) => SqlValue::Real(v),
544            ValueRef::Text(v) => SqlValue::Text(String::from_utf8_lossy(v).into_owned()),
545            ValueRef::Blob(v) => SqlValue::Blob(v.to_vec()),
546        };
547        values.push(value);
548    }
549    Ok(values)
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use rusqlite::ErrorCode;
556
557    #[test]
558    fn put_and_get_round_trip() {
559        let store = CrdtStore::default();
560        let id = store.put("node-1", "actor-a", serde_json::json!({"hello": "world"}));
561        let record = store.get(&id).expect("record should exist");
562        assert_eq!(record.data["hello"], "world");
563        assert_eq!(record.clock.get("actor-a"), Some(&1));
564    }
565
566    #[test]
567    fn delete_removes_node() {
568        let store = CrdtStore::default();
569        let id = store.put("node-2", "actor-a", serde_json::json!({"name": "plures"}));
570        store.delete(&id).expect("delete succeeds");
571        assert!(store.get(&id).is_none());
572    }
573
574    #[test]
575    fn apply_operations() {
576        let store = CrdtStore::default();
577        let op = CrdtOperation::Put {
578            id: "node-3".to_string(),
579            actor: "actor-a".to_string(),
580            data: serde_json::json!({"count": 1}),
581        };
582        let result = store.apply(op).expect("apply succeeds");
583        assert_eq!(result, Some("node-3".to_string()));
584
585        let delete = CrdtOperation::Delete {
586            id: "node-3".to_string(),
587        };
588        let result = store.apply(delete).expect("delete succeeds");
589        assert_eq!(result, None);
590        assert!(store.get("node-3").is_none());
591    }
592
593    #[test]
594    fn database_exec_and_query() {
595        let db = Database::open(DatabaseOptions::default()).expect("open database");
596        db.exec("CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL)")
597            .expect("create table");
598
599        let insert = db
600            .prepare("INSERT INTO users (name) VALUES (?1)")
601            .expect("prepare insert");
602        insert
603            .run(&[SqlValue::Text("Alice".to_string())])
604            .expect("insert row");
605
606        let query = db
607            .prepare("SELECT id, name FROM users ORDER BY id")
608            .expect("prepare select");
609        let result = query.all(&[]).expect("query rows");
610        assert_eq!(result.columns, vec!["id".to_string(), "name".to_string()]);
611        assert_eq!(result.rows.len(), 1);
612        match &result.rows[0][1] {
613            SqlValue::Text(value) => assert_eq!(value, "Alice"),
614            other => panic!("unexpected value: {:?}", other),
615        }
616    }
617
618    #[test]
619    fn database_default_pragmas_applied() {
620        let temp = tempfile::NamedTempFile::new().expect("create temp file");
621        let db = Database::open(DatabaseOptions::with_file(temp.path()))
622            .expect("open database");
623        let result = db.pragma("journal_mode").expect("run pragma");
624        assert!(!result.rows.is_empty());
625        match &result.rows[0][0] {
626            SqlValue::Text(mode) => assert_eq!(mode.to_lowercase(), "wal"),
627            other => panic!("unexpected pragma value: {:?}", other),
628        }
629    }
630
631    #[test]
632    fn statement_get_returns_none_when_no_rows() {
633        let db = Database::open(DatabaseOptions::default()).expect("open database");
634        db.exec("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
635            .expect("create table");
636
637        let select = db
638            .prepare("SELECT name FROM items WHERE id = ?1")
639            .expect("prepare select");
640        let result = select
641            .get(&[SqlValue::Integer(42)])
642            .expect("query should succeed");
643        assert!(result.is_none());
644    }
645
646    #[test]
647    fn statement_run_propagates_sql_errors() {
648        let db = Database::open(DatabaseOptions::default()).expect("open database");
649        db.exec("CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT UNIQUE NOT NULL)")
650            .expect("create table");
651
652        let insert = db
653            .prepare("INSERT INTO users (email) VALUES (?1)")
654            .expect("prepare insert");
655        insert
656            .run(&[SqlValue::Text("alice@example.com".into())])
657            .expect("first insert succeeds");
658
659        let err = insert
660            .run(&[SqlValue::Text("alice@example.com".into())])
661            .expect_err("second insert should fail");
662        match err {
663            DatabaseError::Sqlite(inner) => {
664                assert_eq!(inner.sqlite_error_code(), Some(ErrorCode::ConstraintViolation));
665            }
666        }
667    }
668
669    #[test]
670    fn statement_handles_blob_parameters_and_columns() {
671        let db = Database::open(DatabaseOptions::default()).expect("open database");
672        db.exec("CREATE TABLE files (id INTEGER PRIMARY KEY, data BLOB NOT NULL)")
673            .expect("create table");
674
675        let blob = vec![0_u8, 1, 2, 3];
676        let insert = db
677            .prepare("INSERT INTO files (id, data) VALUES (?1, ?2)")
678            .expect("prepare insert");
679        insert
680            .run(&[SqlValue::Integer(1), SqlValue::Blob(blob.clone())])
681            .expect("insert blob row");
682
683        let select = db
684            .prepare("SELECT id, data FROM files WHERE id = ?1")
685            .expect("prepare select");
686        let columns = select.columns().expect("inspect columns");
687        assert_eq!(columns, vec!["id".to_string(), "data".to_string()]);
688
689        let result = select
690            .all(&[SqlValue::Integer(1)])
691            .expect("query single row");
692        assert_eq!(result.rows.len(), 1);
693        match &result.rows[0][1] {
694            SqlValue::Blob(value) => assert_eq!(value, &blob),
695            other => panic!("unexpected value: {:?}", other),
696        }
697    }
698}
699