1mod db;
79mod from_row_impl;
80
81use std::path::Path;
82
83use async_trait::async_trait;
84use datacake_crdt::{HLCTimestamp, Key};
85use datacake_eventual_consistency::{
86 BulkMutationError,
87 Document,
88 DocumentMetadata,
89 Storage,
90};
91pub use db::FromRow;
92
93pub use crate::db::StorageHandle;
94
95pub struct SqliteStorage {
97 inner: StorageHandle,
98}
99
100impl SqliteStorage {
101 pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, rusqlite::Error> {
118 let inner = StorageHandle::open(path.as_ref()).await?;
119 setup_db(inner.clone()).await?;
120 Ok(Self { inner })
121 }
122
123 pub async fn open_in_memory() -> Result<Self, rusqlite::Error> {
135 let inner = StorageHandle::open_in_memory().await?;
136 setup_db(inner.clone()).await?;
137 Ok(Self { inner })
138 }
139
140 pub fn from_handle(handle: StorageHandle) -> Self {
142 Self { inner: handle }
143 }
144
145 pub fn handle(&self) -> StorageHandle {
154 self.inner.clone()
155 }
156}
157
158#[async_trait]
159impl Storage for SqliteStorage {
160 type Error = rusqlite::Error;
161 type DocsIter = Box<dyn Iterator<Item = Document>>;
162 type MetadataIter = Box<dyn Iterator<Item = (Key, HLCTimestamp, bool)>>;
163
164 async fn get_keyspace_list(&self) -> Result<Vec<String>, Self::Error> {
165 let list = self
166 .inner
167 .fetch_all::<_, (String,)>(queries::SELECT_KEYSPACE_LIST, ())
168 .await?
169 .into_iter()
170 .map(|row| row.0)
171 .collect();
172 Ok(list)
173 }
174
175 async fn iter_metadata(
176 &self,
177 keyspace: &str,
178 ) -> Result<Self::MetadataIter, Self::Error> {
179 let list = self
180 .inner
181 .fetch_all::<_, models::Metadata>(
182 queries::SELECT_METADATA_LIST,
183 (keyspace.to_string(),),
184 )
185 .await?
186 .into_iter()
187 .map(|metadata| (metadata.0, metadata.1, metadata.2));
188 Ok(Box::new(list))
189 }
190
191 async fn remove_tombstones(
192 &self,
193 keyspace: &str,
194 keys: impl Iterator<Item = Key> + Send,
195 ) -> Result<(), BulkMutationError<Self::Error>> {
196 let params = keys
197 .map(|doc_id| (keyspace.to_string(), doc_id as i64))
198 .collect::<Vec<_>>();
199 self.inner
200 .execute_many(queries::DELETE_TOMBSTONE, params)
201 .await .map_err(BulkMutationError::empty_with_error)?;
203 Ok(())
204 }
205
206 async fn put(&self, keyspace: &str, doc: Document) -> Result<(), Self::Error> {
207 self.inner
208 .execute(
209 queries::INSERT,
210 (
211 keyspace.to_string(),
212 doc.id() as i64,
213 doc.last_updated().to_string(),
214 doc.data().to_vec(),
215 ),
216 )
217 .await?;
218 Ok(())
219 }
220
221 async fn multi_put(
222 &self,
223 keyspace: &str,
224 documents: impl Iterator<Item = Document> + Send,
225 ) -> Result<(), BulkMutationError<Self::Error>> {
226 let params = documents
227 .map(|doc| {
228 (
229 keyspace.to_string(),
230 doc.id() as i64,
231 doc.last_updated().to_string(),
232 doc.data().to_vec(),
233 )
234 })
235 .collect::<Vec<_>>();
236 self.inner
237 .execute_many(queries::INSERT, params)
238 .await .map_err(BulkMutationError::empty_with_error)?;
240 Ok(())
241 }
242
243 async fn mark_as_tombstone(
244 &self,
245 keyspace: &str,
246 doc_id: Key,
247 timestamp: HLCTimestamp,
248 ) -> Result<(), Self::Error> {
249 self.inner
250 .execute(
251 queries::SET_TOMBSTONE,
252 (keyspace.to_string(), doc_id as i64, timestamp.to_string()),
253 )
254 .await?;
255 Ok(())
256 }
257
258 async fn mark_many_as_tombstone(
259 &self,
260 keyspace: &str,
261 documents: impl Iterator<Item = DocumentMetadata> + Send,
262 ) -> Result<(), BulkMutationError<Self::Error>> {
263 let params = documents
264 .map(|doc| {
265 (
266 keyspace.to_string(),
267 doc.id as i64,
268 doc.last_updated.to_string(),
269 )
270 })
271 .collect::<Vec<_>>();
272 self.inner
273 .execute_many(queries::SET_TOMBSTONE, params)
274 .await .map_err(BulkMutationError::empty_with_error)?;
276 Ok(())
277 }
278
279 async fn get(
280 &self,
281 keyspace: &str,
282 doc_id: Key,
283 ) -> Result<Option<Document>, Self::Error> {
284 let entry = self
285 .inner
286 .fetch_one::<_, models::Doc>(
287 queries::SELECT_DOC,
288 (keyspace.to_string(), doc_id as i64),
289 )
290 .await?;
291
292 Ok(entry.map(|d| d.0))
293 }
294
295 async fn multi_get(
296 &self,
297 keyspace: &str,
298 doc_ids: impl Iterator<Item = Key> + Send,
299 ) -> Result<Self::DocsIter, Self::Error> {
300 let doc_ids = doc_ids
301 .map(|id| (keyspace.to_string(), id))
302 .collect::<Vec<_>>();
303 let docs = self
304 .inner
305 .fetch_many::<_, models::Doc>(queries::SELECT_DOC, doc_ids)
306 .await?
307 .into_iter()
308 .map(|d| d.0);
309
310 Ok(Box::new(docs))
311 }
312}
313
314mod queries {
315 pub static INSERT: &str = r#"
316 INSERT INTO state_entries (keyspace, doc_id, ts, data) VALUES (?, ?, ?, ?)
317 ON CONFLICT (keyspace, doc_id) DO UPDATE SET ts = excluded.ts, data = excluded.data;
318 "#;
319 pub static SELECT_DOC: &str = r#"
320 SELECT doc_id, ts, data FROM state_entries WHERE keyspace = ? AND doc_id = ? AND data IS NOT NULL;
321 "#;
322 pub static SELECT_KEYSPACE_LIST: &str = r#"
323 SELECT DISTINCT keyspace FROM state_entries GROUP BY keyspace;
324 "#;
325 pub static SELECT_METADATA_LIST: &str = r#"
326 SELECT doc_id, ts, (data IS NULL) as tombstone FROM state_entries WHERE keyspace = ?;
327 "#;
328 pub static SET_TOMBSTONE: &str = r#"
329 INSERT INTO state_entries (keyspace, doc_id, ts, data) VALUES (?, ?, ?, NULL)
330 ON CONFLICT (keyspace, doc_id) DO UPDATE SET ts = excluded.ts, data = NULL;
331 "#;
332 pub static DELETE_TOMBSTONE: &str = r#"
333 DELETE FROM state_entries WHERE keyspace = ? AND doc_id = ?;
334 "#;
335}
336
337mod models {
338 use std::str::FromStr;
339
340 use datacake_crdt::{HLCTimestamp, Key};
341 use datacake_eventual_consistency::Document;
342 use rusqlite::Row;
343
344 use crate::FromRow;
345
346 pub struct Doc(pub Document);
347 impl FromRow for Doc {
348 fn from_row(row: &Row) -> rusqlite::Result<Self> {
349 let id = row.get::<_, i64>(0)? as Key;
350 let ts = row.get::<_, String>(1)?;
351 let data = row.get::<_, Vec<u8>>(2)?;
352
353 let ts = HLCTimestamp::from_str(&ts)
354 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
355
356 Ok(Self(Document::new(id, ts, data)))
357 }
358 }
359
360 pub struct Metadata(pub Key, pub HLCTimestamp, pub bool);
361 impl FromRow for Metadata {
362 fn from_row(row: &Row) -> rusqlite::Result<Self> {
363 let id = row.get::<_, i64>(0)? as Key;
364 let ts = row.get::<_, String>(1)?;
365 let is_tombstone = row.get::<_, bool>(2)?;
366
367 let ts = HLCTimestamp::from_str(&ts)
368 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
369
370 Ok(Self(id, ts, is_tombstone))
371 }
372 }
373}
374
375async fn setup_db(handle: StorageHandle) -> rusqlite::Result<()> {
376 let table = r#"
377 CREATE TABLE IF NOT EXISTS state_entries (
378 keyspace TEXT,
379 doc_id BIGINT,
380 ts TEXT,
381 data BLOB,
382 PRIMARY KEY (keyspace, doc_id)
383 );
384 "#;
385 handle.execute(table, ()).await?;
386
387 Ok(())
388}
389
390#[cfg(test)]
391mod tests {
392 use datacake_eventual_consistency::test_suite;
393
394 use crate::SqliteStorage;
395
396 #[tokio::test]
397 async fn test_storage_logic() {
398 let storage = SqliteStorage::open_in_memory().await.unwrap();
399 test_suite::run_test_suite(storage).await;
400 }
401}