datacake_sqlite/
lib.rs

1//! # Datacake SQLite
2//!
3//! A pre-built implementation of the datacake-eventual-consistency `Storage` trait, this allows you to set up
4//! a persistent cluster immediately without any hassle of implementing a correct store.
5//!
6//! For more info see <https://github.com/lnx-search/datacake>
7//!
8//! ## Setup
9//! It's important to note that this crate does bundle SQLite with it but it can be disabled by passing
10//! `default-features = false`.
11//!
12//! ## Example
13//!
14//! ```rust
15//! use anyhow::Result;
16//! use datacake_eventual_consistency::EventuallyConsistentStoreExtension;
17//! use datacake_node::{
18//!     ConnectionConfig,
19//!     Consistency,
20//!     DCAwareSelector,
21//!     DatacakeNodeBuilder,
22//! };
23//! use datacake_sqlite::SqliteStorage;
24//!
25//! static KEYSPACE: &str = "sqlite-store";
26//!
27//! #[tokio::test]
28//! async fn test_basic_sqlite_cluster() -> Result<()> {
29//!     let _ = tracing_subscriber::fmt::try_init();
30//!
31//!     let store = SqliteStorage::open_in_memory().await?;
32//!
33//!     let addr = test_helper::get_unused_addr();
34//!     let connection_cfg = ConnectionConfig::new(addr, addr, Vec::<String>::new());
35//!
36//!     let node = DatacakeNodeBuilder::<DCAwareSelector>::new(1, connection_cfg)
37//!         .connect()
38//!         .await?;
39//!     let store = node
40//!         .add_extension(EventuallyConsistentStoreExtension::new(store))
41//!         .await?;
42//!
43//!     let handle = store.handle();
44//!
45//!     handle
46//!         .put(KEYSPACE, 1, b"Hello, world".to_vec(), Consistency::All)
47//!         .await
48//!         .expect("Put value.");
49//!
50//!     let doc = handle
51//!         .get(KEYSPACE, 1)
52//!         .await
53//!         .expect("Get value.")
54//!         .expect("Document should not be none");
55//!     assert_eq!(doc.id(), 1);
56//!     assert_eq!(doc.data(), b"Hello, world");
57//!
58//!     handle
59//!         .del(KEYSPACE, 1, Consistency::All)
60//!         .await
61//!         .expect("Del value.");
62//!     let doc = handle.get(KEYSPACE, 1).await.expect("Get value.");
63//!     assert!(doc.is_none(), "No document should not exist!");
64//!
65//!     handle
66//!         .del(KEYSPACE, 2, Consistency::All)
67//!         .await
68//!         .expect("Del value which doesnt exist locally.");
69//!     let doc = handle.get(KEYSPACE, 2).await.expect("Get value.");
70//!     assert!(doc.is_none(), "No document should not exist!");
71//!
72//!     node.shutdown().await;
73//!
74//!     Ok(())
75//! }
76//! ```
77
78mod db;
79mod from_row_impl;
80
81use std::path::Path;
82
83use async_trait::async_trait;
84use datacake_crdt::{HLCTimestamp, Key};
85use datacake_eventual_consistency::{
86    BulkMutationError,
87    Document,
88    DocumentMetadata,
89    Storage,
90};
91pub use db::FromRow;
92
93pub use crate::db::StorageHandle;
94
95/// A [Storage] implementation based on an SQLite database.
96pub struct SqliteStorage {
97    inner: StorageHandle,
98}
99
100impl SqliteStorage {
101    /// Opens a new SQLite database in the given path.
102    ///
103    /// If the database does not already exist it will be created.
104    ///
105    /// ```rust
106    /// use datacake_sqlite::SqliteStorage;
107    ///
108    /// # #[tokio::main]
109    /// # async fn main() {
110    /// let storage = SqliteStorage::open("./data.db").await.expect("Create database");
111    /// # drop(storage);
112    /// # let _ = std::fs::remove_file("./data.db");
113    /// # let _ = std::fs::remove_file("./data.db-shm");
114    /// # let _ = std::fs::remove_file("./data.db-wal");
115    /// # }
116    /// ```
117    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, rusqlite::Error> {
118        let inner = StorageHandle::open(path.as_ref()).await?;
119        setup_db(inner.clone()).await?;
120        Ok(Self { inner })
121    }
122
123    /// Opens a new SQLite database in memory.
124    ///
125    /// ```rust
126    /// use datacake_sqlite::SqliteStorage;
127    ///
128    /// # #[tokio::main]
129    /// # async fn main() {
130    /// let storage = SqliteStorage::open_in_memory().await.expect("Create database");
131    /// # drop(storage);
132    /// # }
133    /// ```
134    pub async fn open_in_memory() -> Result<Self, rusqlite::Error> {
135        let inner = StorageHandle::open_in_memory().await?;
136        setup_db(inner.clone()).await?;
137        Ok(Self { inner })
138    }
139
140    /// Creates a new [SqliteStorage] instances from an existing storage handle.
141    pub fn from_handle(handle: StorageHandle) -> Self {
142        Self { inner: handle }
143    }
144
145    /// Creates a copy of the storage handle to be used in other sections of code
146    /// which do not need to be distributed.
147    ///
148    /// WARNING:
149    /// Any changes made to this will not be reflected in the cluster, it is primarily
150    /// only provided for ease of reading.
151    ///
152    /// The table `state_entries` is already created and reserved.
153    pub fn handle(&self) -> StorageHandle {
154        self.inner.clone()
155    }
156}
157
158#[async_trait]
159impl Storage for SqliteStorage {
160    type Error = rusqlite::Error;
161    type DocsIter = Box<dyn Iterator<Item = Document>>;
162    type MetadataIter = Box<dyn Iterator<Item = (Key, HLCTimestamp, bool)>>;
163
164    async fn get_keyspace_list(&self) -> Result<Vec<String>, Self::Error> {
165        let list = self
166            .inner
167            .fetch_all::<_, (String,)>(queries::SELECT_KEYSPACE_LIST, ())
168            .await?
169            .into_iter()
170            .map(|row| row.0)
171            .collect();
172        Ok(list)
173    }
174
175    async fn iter_metadata(
176        &self,
177        keyspace: &str,
178    ) -> Result<Self::MetadataIter, Self::Error> {
179        let list = self
180            .inner
181            .fetch_all::<_, models::Metadata>(
182                queries::SELECT_METADATA_LIST,
183                (keyspace.to_string(),),
184            )
185            .await?
186            .into_iter()
187            .map(|metadata| (metadata.0, metadata.1, metadata.2));
188        Ok(Box::new(list))
189    }
190
191    async fn remove_tombstones(
192        &self,
193        keyspace: &str,
194        keys: impl Iterator<Item = Key> + Send,
195    ) -> Result<(), BulkMutationError<Self::Error>> {
196        let params = keys
197            .map(|doc_id| (keyspace.to_string(), doc_id as i64))
198            .collect::<Vec<_>>();
199        self.inner
200            .execute_many(queries::DELETE_TOMBSTONE, params)
201            .await // Safe as we're in a transaction.
202            .map_err(BulkMutationError::empty_with_error)?;
203        Ok(())
204    }
205
206    async fn put(&self, keyspace: &str, doc: Document) -> Result<(), Self::Error> {
207        self.inner
208            .execute(
209                queries::INSERT,
210                (
211                    keyspace.to_string(),
212                    doc.id() as i64,
213                    doc.last_updated().to_string(),
214                    doc.data().to_vec(),
215                ),
216            )
217            .await?;
218        Ok(())
219    }
220
221    async fn multi_put(
222        &self,
223        keyspace: &str,
224        documents: impl Iterator<Item = Document> + Send,
225    ) -> Result<(), BulkMutationError<Self::Error>> {
226        let params = documents
227            .map(|doc| {
228                (
229                    keyspace.to_string(),
230                    doc.id() as i64,
231                    doc.last_updated().to_string(),
232                    doc.data().to_vec(),
233                )
234            })
235            .collect::<Vec<_>>();
236        self.inner
237            .execute_many(queries::INSERT, params)
238            .await // Safe as we're in a transaction.
239            .map_err(BulkMutationError::empty_with_error)?;
240        Ok(())
241    }
242
243    async fn mark_as_tombstone(
244        &self,
245        keyspace: &str,
246        doc_id: Key,
247        timestamp: HLCTimestamp,
248    ) -> Result<(), Self::Error> {
249        self.inner
250            .execute(
251                queries::SET_TOMBSTONE,
252                (keyspace.to_string(), doc_id as i64, timestamp.to_string()),
253            )
254            .await?;
255        Ok(())
256    }
257
258    async fn mark_many_as_tombstone(
259        &self,
260        keyspace: &str,
261        documents: impl Iterator<Item = DocumentMetadata> + Send,
262    ) -> Result<(), BulkMutationError<Self::Error>> {
263        let params = documents
264            .map(|doc| {
265                (
266                    keyspace.to_string(),
267                    doc.id as i64,
268                    doc.last_updated.to_string(),
269                )
270            })
271            .collect::<Vec<_>>();
272        self.inner
273            .execute_many(queries::SET_TOMBSTONE, params)
274            .await // Safe as we're in a transaction.
275            .map_err(BulkMutationError::empty_with_error)?;
276        Ok(())
277    }
278
279    async fn get(
280        &self,
281        keyspace: &str,
282        doc_id: Key,
283    ) -> Result<Option<Document>, Self::Error> {
284        let entry = self
285            .inner
286            .fetch_one::<_, models::Doc>(
287                queries::SELECT_DOC,
288                (keyspace.to_string(), doc_id as i64),
289            )
290            .await?;
291
292        Ok(entry.map(|d| d.0))
293    }
294
295    async fn multi_get(
296        &self,
297        keyspace: &str,
298        doc_ids: impl Iterator<Item = Key> + Send,
299    ) -> Result<Self::DocsIter, Self::Error> {
300        let doc_ids = doc_ids
301            .map(|id| (keyspace.to_string(), id))
302            .collect::<Vec<_>>();
303        let docs = self
304            .inner
305            .fetch_many::<_, models::Doc>(queries::SELECT_DOC, doc_ids)
306            .await?
307            .into_iter()
308            .map(|d| d.0);
309
310        Ok(Box::new(docs))
311    }
312}
313
314mod queries {
315    pub static INSERT: &str = r#"
316        INSERT INTO state_entries (keyspace, doc_id, ts, data) VALUES (?, ?, ?, ?)
317            ON CONFLICT (keyspace, doc_id) DO UPDATE SET ts = excluded.ts, data = excluded.data;
318        "#;
319    pub static SELECT_DOC: &str = r#"
320        SELECT doc_id, ts, data FROM state_entries WHERE keyspace = ? AND doc_id = ? AND data IS NOT NULL;
321        "#;
322    pub static SELECT_KEYSPACE_LIST: &str = r#"
323        SELECT DISTINCT keyspace FROM state_entries GROUP BY keyspace;
324        "#;
325    pub static SELECT_METADATA_LIST: &str = r#"
326        SELECT doc_id, ts, (data IS NULL) as tombstone FROM state_entries WHERE keyspace = ?;
327        "#;
328    pub static SET_TOMBSTONE: &str = r#"
329        INSERT INTO state_entries (keyspace, doc_id, ts, data) VALUES (?, ?, ?, NULL)
330            ON CONFLICT (keyspace, doc_id) DO UPDATE SET ts = excluded.ts, data = NULL;
331        "#;
332    pub static DELETE_TOMBSTONE: &str = r#"
333        DELETE FROM state_entries WHERE keyspace = ? AND doc_id = ?;
334        "#;
335}
336
337mod models {
338    use std::str::FromStr;
339
340    use datacake_crdt::{HLCTimestamp, Key};
341    use datacake_eventual_consistency::Document;
342    use rusqlite::Row;
343
344    use crate::FromRow;
345
346    pub struct Doc(pub Document);
347    impl FromRow for Doc {
348        fn from_row(row: &Row) -> rusqlite::Result<Self> {
349            let id = row.get::<_, i64>(0)? as Key;
350            let ts = row.get::<_, String>(1)?;
351            let data = row.get::<_, Vec<u8>>(2)?;
352
353            let ts = HLCTimestamp::from_str(&ts)
354                .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
355
356            Ok(Self(Document::new(id, ts, data)))
357        }
358    }
359
360    pub struct Metadata(pub Key, pub HLCTimestamp, pub bool);
361    impl FromRow for Metadata {
362        fn from_row(row: &Row) -> rusqlite::Result<Self> {
363            let id = row.get::<_, i64>(0)? as Key;
364            let ts = row.get::<_, String>(1)?;
365            let is_tombstone = row.get::<_, bool>(2)?;
366
367            let ts = HLCTimestamp::from_str(&ts)
368                .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
369
370            Ok(Self(id, ts, is_tombstone))
371        }
372    }
373}
374
375async fn setup_db(handle: StorageHandle) -> rusqlite::Result<()> {
376    let table = r#"
377        CREATE TABLE IF NOT EXISTS state_entries (
378            keyspace TEXT,
379            doc_id BIGINT,
380            ts TEXT,
381            data BLOB,
382            PRIMARY KEY (keyspace, doc_id)
383        );
384    "#;
385    handle.execute(table, ()).await?;
386
387    Ok(())
388}
389
390#[cfg(test)]
391mod tests {
392    use datacake_eventual_consistency::test_suite;
393
394    use crate::SqliteStorage;
395
396    #[tokio::test]
397    async fn test_storage_logic() {
398        let storage = SqliteStorage::open_in_memory().await.unwrap();
399        test_suite::run_test_suite(storage).await;
400    }
401}