1use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::{DateTime, Utc};
13use dashmap::DashMap;
14use parking_lot::Mutex;
15use rusqlite::types::{Value as SqliteValue, ValueRef};
16use rusqlite::{params_from_iter, Connection, OpenFlags, Transaction};
17use serde::{Deserialize, Serialize};
18use serde_json::{json, Value as JsonValue};
19use thiserror::Error;
20use tracing::debug;
21use uuid::Uuid;
22
23pub type NodeId = String;
25
26pub type ActorId = String;
28
29pub type VectorClock = HashMap<ActorId, u64>;
31
32pub type NodeData = JsonValue;
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub struct NodeRecord {
38 pub id: NodeId,
39 pub data: NodeData,
40 pub clock: VectorClock,
41 pub timestamp: DateTime<Utc>,
42}
43
44impl NodeRecord {
45 pub fn new(id: NodeId, actor: impl Into<ActorId>, data: NodeData) -> Self {
47 let actor = actor.into();
48 let mut clock = VectorClock::default();
49 clock.insert(actor.clone(), 1);
50 Self {
51 id,
52 data,
53 clock,
54 timestamp: Utc::now(),
55 }
56 }
57
58 pub fn merge_update(&mut self, actor: impl Into<ActorId>, data: NodeData) {
60 let actor = actor.into();
61 let counter = self.clock.entry(actor).or_insert(0);
62 *counter += 1;
63 self.timestamp = Utc::now();
64 self.data = data;
65 }
66}
67
68#[derive(Debug, Error)]
70pub enum StoreError {
71 #[error("node not found: {0}")]
72 NotFound(NodeId),
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77pub enum CrdtOperation {
78 Put {
79 id: NodeId,
80 actor: ActorId,
81 data: NodeData,
82 },
83 Delete {
84 id: NodeId,
85 },
86}
87
88#[derive(Debug, Default)]
90pub struct CrdtStore {
91 nodes: DashMap<NodeId, NodeRecord>,
92}
93
94impl CrdtStore {
95 pub fn put(&self, id: impl Into<NodeId>, actor: impl Into<ActorId>, data: NodeData) -> NodeId {
97 let id = id.into();
98 let actor = actor.into();
99 self.nodes
100 .entry(id.clone())
101 .and_modify(|record| record.merge_update(actor.clone(), data.clone()))
102 .or_insert_with(|| NodeRecord::new(id.clone(), actor, data));
103 id
104 }
105
106 pub fn delete(&self, id: impl AsRef<str>) -> Result<(), StoreError> {
108 self.nodes
109 .remove(id.as_ref())
110 .map(|_| ())
111 .ok_or_else(|| StoreError::NotFound(id.as_ref().to_owned()))
112 }
113
114 pub fn get(&self, id: impl AsRef<str>) -> Option<NodeRecord> {
116 self.nodes
117 .get(id.as_ref())
118 .map(|entry| entry.value().clone())
119 }
120
121 pub fn list(&self) -> Vec<NodeRecord> {
123 self.nodes
124 .iter()
125 .map(|entry| entry.value().clone())
126 .collect()
127 }
128
129 pub fn apply(&self, op: CrdtOperation) -> Result<Option<NodeId>, StoreError> {
131 match op {
132 CrdtOperation::Put { id, actor, data } => Ok(Some(self.put(id, actor, data))),
133 CrdtOperation::Delete { id } => {
134 self.delete(&id)?;
135 Ok(None)
136 }
137 }
138 }
139
140 pub fn operation_for(
142 &self,
143 actor: impl Into<ActorId>,
144 data: NodeData,
145 ) -> (NodeId, CrdtOperation) {
146 let id = Uuid::new_v4().to_string();
147 let op = CrdtOperation::Put {
148 id: id.clone(),
149 actor: actor.into(),
150 data,
151 };
152 (id, op)
153 }
154}
155
156#[derive(Debug, Clone, PartialEq)]
158pub enum SqlValue {
159 Null,
160 Integer(i64),
161 Real(f64),
162 Text(String),
163 Blob(Vec<u8>),
164}
165
166impl SqlValue {
167 pub fn as_i64(&self) -> Option<i64> {
168 if let Self::Integer(value) = self {
169 Some(*value)
170 } else {
171 None
172 }
173 }
174
175 pub fn as_f64(&self) -> Option<f64> {
176 if let Self::Real(value) = self {
177 Some(*value)
178 } else {
179 None
180 }
181 }
182
183 pub fn as_str(&self) -> Option<&str> {
184 if let Self::Text(value) = self {
185 Some(value.as_str())
186 } else {
187 None
188 }
189 }
190
191 pub fn as_blob(&self) -> Option<&[u8]> {
192 if let Self::Blob(value) = self {
193 Some(value.as_slice())
194 } else {
195 None
196 }
197 }
198
199 pub fn to_json(&self) -> JsonValue {
200 match self {
201 SqlValue::Null => JsonValue::Null,
202 SqlValue::Integer(value) => json!(value),
203 SqlValue::Real(value) => json!(value),
204 SqlValue::Text(value) => json!(value),
205 SqlValue::Blob(bytes) => json!(bytes),
206 }
207 }
208}
209
210#[derive(Debug, Clone, PartialEq)]
211pub struct QueryResult {
212 pub columns: Vec<String>,
213 pub rows: Vec<Vec<SqlValue>>,
214 pub changes: u64,
215 pub last_insert_rowid: i64,
216}
217
218impl QueryResult {
219 pub fn rows_as_maps(&self) -> Vec<HashMap<String, SqlValue>> {
220 self.rows
221 .iter()
222 .map(|row| {
223 let mut map = HashMap::new();
224 for (index, value) in row.iter().cloned().enumerate() {
225 if let Some(column) = self.columns.get(index) {
226 map.insert(column.clone(), value);
227 }
228 }
229 map
230 })
231 .collect()
232 }
233
234 pub fn rows_as_json(&self) -> Vec<JsonValue> {
235 self.rows_as_maps()
236 .into_iter()
237 .map(|row| {
238 let json_object: HashMap<String, JsonValue> = row
239 .into_iter()
240 .map(|(key, value)| (key, value.to_json()))
241 .collect();
242 json!(json_object)
243 })
244 .collect()
245 }
246}
247
248#[derive(Debug, Clone, PartialEq)]
249pub struct ExecutionResult {
250 pub changes: u64,
251 pub last_insert_rowid: i64,
252}
253
254#[derive(Debug, Clone, PartialEq)]
255pub enum DatabasePath {
256 InMemory,
257 File(PathBuf),
258}
259
260#[derive(Debug, Clone)]
261pub struct DatabaseOptions {
262 pub path: DatabasePath,
263 pub read_only: bool,
264 pub create_if_missing: bool,
265 pub apply_default_pragmas: bool,
266 pub custom_pragmas: Vec<(String, String)>,
267 pub busy_timeout: Option<Duration>,
268}
269
270impl Default for DatabaseOptions {
271 fn default() -> Self {
272 Self {
273 path: DatabasePath::InMemory,
274 read_only: false,
275 create_if_missing: true,
276 apply_default_pragmas: true,
277 custom_pragmas: Vec::new(),
278 busy_timeout: Some(Duration::from_millis(5_000)),
279 }
280 }
281}
282
283impl DatabaseOptions {
284 pub fn in_memory() -> Self {
285 Self::default()
286 }
287
288 pub fn with_file(path: impl Into<PathBuf>) -> Self {
289 Self {
290 path: DatabasePath::File(path.into()),
291 ..Default::default()
292 }
293 }
294
295 pub fn read_only(mut self, flag: bool) -> Self {
296 self.read_only = flag;
297 self
298 }
299
300 pub fn create_if_missing(mut self, flag: bool) -> Self {
301 self.create_if_missing = flag;
302 self
303 }
304
305 pub fn apply_default_pragmas(mut self, flag: bool) -> Self {
306 self.apply_default_pragmas = flag;
307 self
308 }
309
310 pub fn add_pragma(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
311 self.custom_pragmas.push((name.into(), value.into()));
312 self
313 }
314
315 pub fn busy_timeout(mut self, timeout: Option<Duration>) -> Self {
316 self.busy_timeout = timeout;
317 self
318 }
319}
320
321#[derive(Debug, Clone)]
322pub struct Database {
323 conn: Arc<Mutex<Connection>>,
324 path: DatabasePath,
325}
326
327#[derive(Debug, Error)]
328pub enum DatabaseError {
329 #[error("sqlite error: {0}")]
330 Sqlite(#[from] rusqlite::Error),
331}
332
333pub type DbResult<T> = Result<T, DatabaseError>;
334
335const DEFAULT_PRAGMAS: &[(&str, &str)] = &[
336 ("journal_mode", "WAL"),
337 ("synchronous", "NORMAL"),
338 ("temp_store", "MEMORY"),
339 ("mmap_size", "30000000000"),
340 ("page_size", "4096"),
341 ("cache_size", "-64000"),
342];
343
344impl Database {
345 pub fn open(options: DatabaseOptions) -> DbResult<Self> {
346 let connection = match &options.path {
347 DatabasePath::InMemory => Connection::open_in_memory()?,
348 DatabasePath::File(path) => {
349 Connection::open_with_flags(path, build_open_flags(&options))?
350 }
351 };
352
353 if let Some(timeout) = options.busy_timeout {
354 connection.busy_timeout(timeout)?;
355 }
356
357 if options.apply_default_pragmas {
358 apply_pragmas(&connection, DEFAULT_PRAGMAS);
359 }
360
361 if !options.custom_pragmas.is_empty() {
362 let custom: Vec<(&str, &str)> = options
363 .custom_pragmas
364 .iter()
365 .map(|(name, value)| (name.as_str(), value.as_str()))
366 .collect();
367 apply_pragmas(&connection, &custom);
368 }
369
370 Ok(Self {
371 conn: Arc::new(Mutex::new(connection)),
372 path: options.path,
373 })
374 }
375
376 pub fn path(&self) -> &DatabasePath {
377 &self.path
378 }
379
380 pub fn prepare(&self, sql: impl Into<String>) -> DbResult<Statement> {
381 Ok(Statement {
382 database: self.clone(),
383 sql: sql.into(),
384 })
385 }
386
387 pub fn exec(&self, sql: &str) -> DbResult<ExecutionResult> {
388 self.with_connection(|conn| {
389 conn.execute_batch(sql)?;
390 Ok(ExecutionResult {
391 changes: conn.changes() as u64,
392 last_insert_rowid: conn.last_insert_rowid(),
393 })
394 })
395 }
396
397 pub fn query(&self, sql: &str, params: &[SqlValue]) -> DbResult<QueryResult> {
398 Statement {
399 database: self.clone(),
400 sql: sql.to_owned(),
401 }
402 .query_internal(params)
403 }
404
405 pub fn pragma(&self, pragma: &str) -> DbResult<QueryResult> {
406 let normalized = if pragma.trim_start().to_lowercase().starts_with("pragma") {
407 pragma.trim().to_owned()
408 } else {
409 format!("PRAGMA {}", pragma)
410 };
411 self.query(&normalized, &[])
412 }
413
414 pub fn transaction<F, T>(&self, f: F) -> DbResult<T>
415 where
416 F: FnOnce(&Transaction<'_>) -> DbResult<T>,
417 {
418 self.with_connection(|conn| {
419 let tx = conn.transaction()?;
420 let result = f(&tx)?;
421 tx.commit()?;
422 Ok(result)
423 })
424 }
425
426 fn with_connection<T, F>(&self, f: F) -> DbResult<T>
427 where
428 F: FnOnce(&mut Connection) -> DbResult<T>,
429 {
430 let mut guard = self.conn.lock();
431 f(&mut guard)
432 }
433}
434
435#[derive(Debug, Clone)]
436pub struct Statement {
437 database: Database,
438 sql: String,
439}
440
441impl Statement {
442 pub fn sql(&self) -> &str {
443 &self.sql
444 }
445
446 pub fn run(&self, params: &[SqlValue]) -> DbResult<ExecutionResult> {
447 self.database.with_connection(|conn| {
448 let mut stmt = conn.prepare(&self.sql)?;
449 let values = params_to_values(params);
450 let changes = stmt.execute(params_from_iter(values.iter()))? as u64;
451 Ok(ExecutionResult {
452 changes,
453 last_insert_rowid: conn.last_insert_rowid(),
454 })
455 })
456 }
457
458 pub fn all(&self, params: &[SqlValue]) -> DbResult<QueryResult> {
459 self.query_internal(params)
460 }
461
462 pub fn get(&self, params: &[SqlValue]) -> DbResult<Option<HashMap<String, SqlValue>>> {
463 let result = self.query_internal(params)?;
464 Ok(result.rows_as_maps().into_iter().next())
465 }
466
467 pub fn columns(&self) -> DbResult<Vec<String>> {
468 self.database.with_connection(|conn| {
469 let stmt = conn.prepare(&self.sql)?;
470 Ok(stmt
471 .column_names()
472 .iter()
473 .map(|name| name.to_string())
474 .collect())
475 })
476 }
477
478 fn query_internal(&self, params: &[SqlValue]) -> DbResult<QueryResult> {
479 self.database.with_connection(|conn| {
480 let mut stmt = conn.prepare(&self.sql)?;
481 let columns = stmt
482 .column_names()
483 .iter()
484 .map(|name| name.to_string())
485 .collect::<Vec<_>>();
486 let values = params_to_values(params);
487 let column_count = columns.len();
488 let mut rows_iter = stmt.query(params_from_iter(values.iter()))?;
489 let mut rows = Vec::new();
490 while let Some(row) = rows_iter.next()? {
491 rows.push(read_row(&row, column_count)?);
492 }
493 Ok(QueryResult {
494 columns,
495 rows,
496 changes: conn.changes() as u64,
497 last_insert_rowid: conn.last_insert_rowid(),
498 })
499 })
500 }
501}
502
503fn build_open_flags(options: &DatabaseOptions) -> OpenFlags {
504 let mut flags = OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_NO_MUTEX;
505 if options.read_only {
506 flags |= OpenFlags::SQLITE_OPEN_READ_ONLY;
507 } else {
508 flags |= OpenFlags::SQLITE_OPEN_READ_WRITE;
509 if options.create_if_missing {
510 flags |= OpenFlags::SQLITE_OPEN_CREATE;
511 }
512 }
513 flags
514}
515
516fn apply_pragmas(connection: &Connection, pragmas: &[(&str, &str)]) {
517 for (name, value) in pragmas {
518 if let Err(error) = connection.pragma_update(None, name, value) {
519 debug!(pragma = %name, "failed to apply pragma: {error}");
520 }
521 }
522}
523
524fn params_to_values(params: &[SqlValue]) -> Vec<SqliteValue> {
525 params
526 .iter()
527 .map(|value| match value {
528 SqlValue::Null => SqliteValue::Null,
529 SqlValue::Integer(v) => SqliteValue::Integer(*v),
530 SqlValue::Real(v) => SqliteValue::Real(*v),
531 SqlValue::Text(v) => SqliteValue::Text(v.clone()),
532 SqlValue::Blob(v) => SqliteValue::Blob(v.clone()),
533 })
534 .collect()
535}
536
537fn read_row(row: &rusqlite::Row<'_>, column_count: usize) -> Result<Vec<SqlValue>, rusqlite::Error> {
538 let mut values = Vec::with_capacity(column_count);
539 for index in 0..column_count {
540 let value = match row.get_ref(index)? {
541 ValueRef::Null => SqlValue::Null,
542 ValueRef::Integer(v) => SqlValue::Integer(v),
543 ValueRef::Real(v) => SqlValue::Real(v),
544 ValueRef::Text(v) => SqlValue::Text(String::from_utf8_lossy(v).into_owned()),
545 ValueRef::Blob(v) => SqlValue::Blob(v.to_vec()),
546 };
547 values.push(value);
548 }
549 Ok(values)
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use rusqlite::ErrorCode;
556
557 #[test]
558 fn put_and_get_round_trip() {
559 let store = CrdtStore::default();
560 let id = store.put("node-1", "actor-a", serde_json::json!({"hello": "world"}));
561 let record = store.get(&id).expect("record should exist");
562 assert_eq!(record.data["hello"], "world");
563 assert_eq!(record.clock.get("actor-a"), Some(&1));
564 }
565
566 #[test]
567 fn delete_removes_node() {
568 let store = CrdtStore::default();
569 let id = store.put("node-2", "actor-a", serde_json::json!({"name": "plures"}));
570 store.delete(&id).expect("delete succeeds");
571 assert!(store.get(&id).is_none());
572 }
573
574 #[test]
575 fn apply_operations() {
576 let store = CrdtStore::default();
577 let op = CrdtOperation::Put {
578 id: "node-3".to_string(),
579 actor: "actor-a".to_string(),
580 data: serde_json::json!({"count": 1}),
581 };
582 let result = store.apply(op).expect("apply succeeds");
583 assert_eq!(result, Some("node-3".to_string()));
584
585 let delete = CrdtOperation::Delete {
586 id: "node-3".to_string(),
587 };
588 let result = store.apply(delete).expect("delete succeeds");
589 assert_eq!(result, None);
590 assert!(store.get("node-3").is_none());
591 }
592
593 #[test]
594 fn database_exec_and_query() {
595 let db = Database::open(DatabaseOptions::default()).expect("open database");
596 db.exec("CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL)")
597 .expect("create table");
598
599 let insert = db
600 .prepare("INSERT INTO users (name) VALUES (?1)")
601 .expect("prepare insert");
602 insert
603 .run(&[SqlValue::Text("Alice".to_string())])
604 .expect("insert row");
605
606 let query = db
607 .prepare("SELECT id, name FROM users ORDER BY id")
608 .expect("prepare select");
609 let result = query.all(&[]).expect("query rows");
610 assert_eq!(result.columns, vec!["id".to_string(), "name".to_string()]);
611 assert_eq!(result.rows.len(), 1);
612 match &result.rows[0][1] {
613 SqlValue::Text(value) => assert_eq!(value, "Alice"),
614 other => panic!("unexpected value: {:?}", other),
615 }
616 }
617
618 #[test]
619 fn database_default_pragmas_applied() {
620 let temp = tempfile::NamedTempFile::new().expect("create temp file");
621 let db = Database::open(DatabaseOptions::with_file(temp.path()))
622 .expect("open database");
623 let result = db.pragma("journal_mode").expect("run pragma");
624 assert!(!result.rows.is_empty());
625 match &result.rows[0][0] {
626 SqlValue::Text(mode) => assert_eq!(mode.to_lowercase(), "wal"),
627 other => panic!("unexpected pragma value: {:?}", other),
628 }
629 }
630
631 #[test]
632 fn statement_get_returns_none_when_no_rows() {
633 let db = Database::open(DatabaseOptions::default()).expect("open database");
634 db.exec("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)")
635 .expect("create table");
636
637 let select = db
638 .prepare("SELECT name FROM items WHERE id = ?1")
639 .expect("prepare select");
640 let result = select
641 .get(&[SqlValue::Integer(42)])
642 .expect("query should succeed");
643 assert!(result.is_none());
644 }
645
646 #[test]
647 fn statement_run_propagates_sql_errors() {
648 let db = Database::open(DatabaseOptions::default()).expect("open database");
649 db.exec("CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT UNIQUE NOT NULL)")
650 .expect("create table");
651
652 let insert = db
653 .prepare("INSERT INTO users (email) VALUES (?1)")
654 .expect("prepare insert");
655 insert
656 .run(&[SqlValue::Text("alice@example.com".into())])
657 .expect("first insert succeeds");
658
659 let err = insert
660 .run(&[SqlValue::Text("alice@example.com".into())])
661 .expect_err("second insert should fail");
662 match err {
663 DatabaseError::Sqlite(inner) => {
664 assert_eq!(inner.sqlite_error_code(), Some(ErrorCode::ConstraintViolation));
665 }
666 }
667 }
668
669 #[test]
670 fn statement_handles_blob_parameters_and_columns() {
671 let db = Database::open(DatabaseOptions::default()).expect("open database");
672 db.exec("CREATE TABLE files (id INTEGER PRIMARY KEY, data BLOB NOT NULL)")
673 .expect("create table");
674
675 let blob = vec![0_u8, 1, 2, 3];
676 let insert = db
677 .prepare("INSERT INTO files (id, data) VALUES (?1, ?2)")
678 .expect("prepare insert");
679 insert
680 .run(&[SqlValue::Integer(1), SqlValue::Blob(blob.clone())])
681 .expect("insert blob row");
682
683 let select = db
684 .prepare("SELECT id, data FROM files WHERE id = ?1")
685 .expect("prepare select");
686 let columns = select.columns().expect("inspect columns");
687 assert_eq!(columns, vec!["id".to_string(), "data".to_string()]);
688
689 let result = select
690 .all(&[SqlValue::Integer(1)])
691 .expect("query single row");
692 assert_eq!(result.rows.len(), 1);
693 match &result.rows[0][1] {
694 SqlValue::Blob(value) => assert_eq!(value, &blob),
695 other => panic!("unexpected value: {:?}", other),
696 }
697 }
698}
699