1use std::{ops::Bound, sync::Arc};
9
10use reifydb_core::internal_error;
11use reifydb_runtime::sync::mutex::Mutex;
12use reifydb_type::{Result, util::cowvec::CowVec};
13use rusqlite::{
14 Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
15};
16use tracing::instrument;
17
18use super::{
19 SqliteConfig,
20 connection::{connect, convert_flags, resolve_db_path},
21 query::build_range_query,
22};
23use crate::tier::{RangeBatch, RangeCursor, RawEntry, TierBackend, TierStorage};
24
25const TABLE_NAME: &str = "entries";
27
28#[derive(Clone)]
32pub struct SqlitePrimitiveStorage {
33 inner: Arc<SqlitePrimitiveStorageInner>,
34}
35
36struct SqlitePrimitiveStorageInner {
37 conn: Mutex<Connection>,
41}
42
43impl SqlitePrimitiveStorage {
44 #[instrument(name = "store::single::sqlite::new", level = "debug", skip(config), fields(
46 db_path = ?config.path,
47 page_size = config.page_size,
48 journal_mode = %config.journal_mode.as_str()
49 ))]
50 pub fn new(config: SqliteConfig) -> Self {
51 let db_path = resolve_db_path(config.path);
52 let flags = convert_flags(&config.flags);
53
54 let conn = connect(&db_path, flags).expect("Failed to connect to database");
55
56 conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
58 conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
59 .expect("Failed to set journal_mode");
60 conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
61 .expect("Failed to set synchronous");
62 conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
63 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
64 conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
65 conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
66 .expect("Failed to set wal_autocheckpoint");
67 conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
68
69 Self {
70 inner: Arc::new(SqlitePrimitiveStorageInner {
71 conn: Mutex::new(conn),
72 }),
73 }
74 }
75
76 pub fn in_memory() -> Self {
78 Self::new(SqliteConfig::in_memory())
79 }
80}
81
82impl TierStorage for SqlitePrimitiveStorage {
83 #[instrument(name = "store::single::sqlite::get", level = "trace", skip(self, key), fields(key_len = key.len()))]
84 fn get(&self, key: &[u8]) -> Result<Option<CowVec<u8>>> {
85 let conn = self.inner.conn.lock();
86
87 let result = conn.query_row(
88 &format!("SELECT value FROM \"{}\" WHERE key = ?1", TABLE_NAME),
89 params![key],
90 |row| row.get::<_, Option<Vec<u8>>>(0),
91 );
92
93 match result {
94 Ok(Some(value)) => Ok(Some(CowVec::new(value))),
95 Ok(None) => Ok(None),
96 Err(QueryReturnedNoRows) => Ok(None),
97 Err(e) if e.to_string().contains("no such table") => Ok(None),
98 Err(e) => Err(internal_error!("Failed to get: {}", e)),
99 }
100 }
101
102 #[instrument(name = "store::single::sqlite::contains", level = "trace", skip(self, key), fields(key_len = key.len()), ret)]
103 fn contains(&self, key: &[u8]) -> Result<bool> {
104 let conn = self.inner.conn.lock();
105
106 let result = conn.query_row(
107 &format!("SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1", TABLE_NAME),
108 params![key],
109 |row| row.get::<_, bool>(0),
110 );
111
112 match result {
113 Ok(has_value) => Ok(has_value),
114 Err(QueryReturnedNoRows) => Ok(false),
115 Err(e) if e.to_string().contains("no such table") => Ok(false),
116 Err(e) => Err(internal_error!("Failed to check contains: {}", e)),
117 }
118 }
119
120 #[instrument(name = "store::single::sqlite::set", level = "debug", skip(self, entries), fields(entry_count = entries.len()))]
121 fn set(&self, entries: Vec<(CowVec<u8>, Option<CowVec<u8>>)>) -> Result<()> {
122 if entries.is_empty() {
123 return Ok(());
124 }
125
126 let conn = self.inner.conn.lock();
127 let tx = conn
128 .unchecked_transaction()
129 .map_err(|e| internal_error!("Failed to start transaction: {}", e))?;
130
131 let result = insert_entries_in_tx(&tx, TABLE_NAME, &entries);
132 if let Err(e) = result {
133 if e.to_string().contains("no such table") {
134 tx.execute(
135 &format!(
136 "CREATE TABLE IF NOT EXISTS \"{}\" (
137 key BLOB NOT NULL PRIMARY KEY,
138 value BLOB
139 ) WITHOUT ROWID",
140 TABLE_NAME
141 ),
142 [],
143 )
144 .map_err(|e| internal_error!("Failed to create table: {}", e))?;
145 insert_entries_in_tx(&tx, TABLE_NAME, &entries)
146 .map_err(|e| internal_error!("Failed to insert entries: {}", e))?;
147 } else {
148 return Err(internal_error!("Failed to insert entries: {}", e));
149 }
150 }
151
152 tx.commit().map_err(|e| internal_error!("Failed to commit transaction: {}", e))
153 }
154
155 #[instrument(name = "store::single::sqlite::range_next", level = "trace", skip(self, cursor))]
156 fn range_next(
157 &self,
158 cursor: &mut RangeCursor,
159 start: Bound<&[u8]>,
160 end: Bound<&[u8]>,
161 batch_size: usize,
162 ) -> Result<RangeBatch> {
163 if cursor.exhausted {
164 return Ok(RangeBatch::empty());
165 }
166
167 let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
169 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
170 None => bound_to_owned(start),
171 };
172 let end_owned = bound_to_owned(end);
173
174 let conn = self.inner.conn.lock();
175
176 let start_ref = bound_as_ref(&effective_start);
177 let end_ref = bound_as_ref(&end_owned);
178 let (query, params) = build_range_query(TABLE_NAME, start_ref, end_ref, false, batch_size + 1);
179
180 let mut stmt = match conn.prepare(&query) {
181 Ok(stmt) => stmt,
182 Err(e) if e.to_string().contains("no such table") => {
183 cursor.exhausted = true;
184 return Ok(RangeBatch::empty());
185 }
186 Err(e) => return Err(internal_error!("Failed to prepare query: {}", e)),
187 };
188
189 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
190
191 let entries: Vec<RawEntry> = stmt
192 .query_map(params_refs.as_slice(), |row| {
193 let key: Vec<u8> = row.get(0)?;
194 let value: Option<Vec<u8>> = row.get(1)?;
195 Ok(RawEntry {
196 key: CowVec::new(key),
197 value: value.map(CowVec::new),
198 })
199 })
200 .map_err(|e| internal_error!("Failed to query range: {}", e))?
201 .filter_map(|r| r.ok())
202 .collect();
203
204 let has_more = entries.len() > batch_size;
205 let entries = if has_more {
206 entries.into_iter().take(batch_size).collect()
207 } else {
208 entries
209 };
210
211 let batch = RangeBatch {
212 entries,
213 has_more,
214 };
215
216 if let Some(last_entry) = batch.entries.last() {
218 cursor.last_key = Some(last_entry.key.clone());
219 }
220 if !batch.has_more {
221 cursor.exhausted = true;
222 }
223
224 Ok(batch)
225 }
226
227 #[instrument(name = "store::single::sqlite::range_rev_next", level = "trace", skip(self, cursor))]
228 fn range_rev_next(
229 &self,
230 cursor: &mut RangeCursor,
231 start: Bound<&[u8]>,
232 end: Bound<&[u8]>,
233 batch_size: usize,
234 ) -> Result<RangeBatch> {
235 if cursor.exhausted {
236 return Ok(RangeBatch::empty());
237 }
238
239 let start_owned = bound_to_owned(start);
241 let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
242 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
243 None => bound_to_owned(end),
244 };
245
246 let conn = self.inner.conn.lock();
247
248 let start_ref = bound_as_ref(&start_owned);
249 let end_ref = bound_as_ref(&effective_end);
250 let (query, params) = build_range_query(TABLE_NAME, start_ref, end_ref, true, batch_size + 1);
251
252 let mut stmt = match conn.prepare(&query) {
253 Ok(stmt) => stmt,
254 Err(e) if e.to_string().contains("no such table") => {
255 cursor.exhausted = true;
256 return Ok(RangeBatch::empty());
257 }
258 Err(e) => return Err(internal_error!("Failed to prepare query: {}", e)),
259 };
260
261 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
262
263 let entries: Vec<RawEntry> = stmt
264 .query_map(params_refs.as_slice(), |row| {
265 let key: Vec<u8> = row.get(0)?;
266 let value: Option<Vec<u8>> = row.get(1)?;
267 Ok(RawEntry {
268 key: CowVec::new(key),
269 value: value.map(CowVec::new),
270 })
271 })
272 .map_err(|e| internal_error!("Failed to query range: {}", e))?
273 .filter_map(|r| r.ok())
274 .collect();
275
276 let has_more = entries.len() > batch_size;
277 let entries = if has_more {
278 entries.into_iter().take(batch_size).collect()
279 } else {
280 entries
281 };
282
283 let batch = RangeBatch {
284 entries,
285 has_more,
286 };
287
288 if let Some(last_entry) = batch.entries.last() {
290 cursor.last_key = Some(last_entry.key.clone());
291 }
292 if !batch.has_more {
293 cursor.exhausted = true;
294 }
295
296 Ok(batch)
297 }
298
299 #[instrument(name = "store::single::sqlite::ensure_table", level = "trace", skip(self))]
300 fn ensure_table(&self) -> Result<()> {
301 let conn = self.inner.conn.lock();
302
303 conn.execute(
304 &format!(
305 "CREATE TABLE IF NOT EXISTS \"{}\" (
306 key BLOB NOT NULL PRIMARY KEY,
307 value BLOB
308 ) WITHOUT ROWID",
309 TABLE_NAME
310 ),
311 [],
312 )
313 .map(|_| ())
314 .map_err(|e| internal_error!("Failed to ensure table: {}", e))
315 }
316
317 #[instrument(name = "store::single::sqlite::clear_table", level = "debug", skip(self))]
318 fn clear_table(&self) -> Result<()> {
319 let conn = self.inner.conn.lock();
320
321 let result = conn.execute(&format!("DELETE FROM \"{}\"", TABLE_NAME), []);
322
323 match result {
324 Ok(_) => Ok(()),
325 Err(e) if e.to_string().contains("no such table") => Ok(()),
326 Err(e) => Err(internal_error!("Failed to clear table: {}", e)),
327 }
328 }
329}
330
331impl TierBackend for SqlitePrimitiveStorage {}
332
333fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
335 match bound {
336 Bound::Included(v) => Bound::Included(v.as_slice()),
337 Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
338 Bound::Unbounded => Bound::Unbounded,
339 }
340}
341
342fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
344 match bound {
345 Bound::Included(v) => Bound::Included(v.to_vec()),
346 Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
347 Bound::Unbounded => Bound::Unbounded,
348 }
349}
350
351fn insert_entries_in_tx(
353 tx: &SqliteTransaction,
354 table_name: &str,
355 entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
356) -> SqliteResult<()> {
357 for (key, value) in entries {
358 match value {
359 Some(v) => {
360 tx.execute(
361 &format!(
362 "INSERT OR REPLACE INTO \"{}\" (key, value) VALUES (?1, ?2)",
363 table_name
364 ),
365 params![key.as_slice(), v.as_slice()],
366 )?;
367 }
368 None => {
369 tx.execute(
370 &format!("DELETE FROM \"{}\" WHERE key = ?1", table_name),
371 params![key.as_slice()],
372 )?;
373 }
374 }
375 }
376 Ok(())
377}
378
379#[cfg(test)]
380pub mod tests {
381 use super::*;
382
383 #[test]
384 fn test_basic_operations() {
385 let storage = SqlitePrimitiveStorage::in_memory();
386
387 storage.set(vec![(CowVec::new(b"key1".to_vec()), Some(CowVec::new(b"value1".to_vec())))]).unwrap();
389 let value = storage.get(b"key1").unwrap();
390 assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
391
392 assert!(storage.contains(b"key1").unwrap());
394 assert!(!storage.contains(b"nonexistent").unwrap());
395
396 storage.set(vec![(CowVec::new(b"key1".to_vec()), None)]).unwrap();
398 assert!(!storage.contains(b"key1").unwrap());
399 }
400
401 #[test]
402 fn test_range_next() {
403 let storage = SqlitePrimitiveStorage::in_memory();
404
405 storage.set(vec![(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec())))]).unwrap();
406 storage.set(vec![(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec())))]).unwrap();
407 storage.set(vec![(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec())))]).unwrap();
408
409 let mut cursor = RangeCursor::new();
410 let batch = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 100).unwrap();
411
412 assert_eq!(batch.entries.len(), 3);
413 assert!(!batch.has_more);
414 assert!(cursor.exhausted);
415 assert_eq!(&*batch.entries[0].key, b"a");
416 assert_eq!(&*batch.entries[1].key, b"b");
417 assert_eq!(&*batch.entries[2].key, b"c");
418 }
419
420 #[test]
421 fn test_range_rev_next() {
422 let storage = SqlitePrimitiveStorage::in_memory();
423
424 storage.set(vec![(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec())))]).unwrap();
425 storage.set(vec![(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec())))]).unwrap();
426 storage.set(vec![(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec())))]).unwrap();
427
428 let mut cursor = RangeCursor::new();
429 let batch = storage.range_rev_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 100).unwrap();
430
431 assert_eq!(batch.entries.len(), 3);
432 assert!(!batch.has_more);
433 assert!(cursor.exhausted);
434 assert_eq!(&*batch.entries[0].key, b"c");
435 assert_eq!(&*batch.entries[1].key, b"b");
436 assert_eq!(&*batch.entries[2].key, b"a");
437 }
438
439 #[test]
440 fn test_range_streaming_pagination() {
441 let storage = SqlitePrimitiveStorage::in_memory();
442
443 for i in 0..10u8 {
445 storage.set(vec![(CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))]).unwrap();
446 }
447
448 let mut cursor = RangeCursor::new();
450
451 let batch1 = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
453 assert_eq!(batch1.entries.len(), 3);
454 assert!(batch1.has_more);
455 assert!(!cursor.exhausted);
456 assert_eq!(&*batch1.entries[0].key, &[0]);
457 assert_eq!(&*batch1.entries[2].key, &[2]);
458
459 let batch2 = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
461 assert_eq!(batch2.entries.len(), 3);
462 assert!(batch2.has_more);
463 assert!(!cursor.exhausted);
464 assert_eq!(&*batch2.entries[0].key, &[3]);
465 assert_eq!(&*batch2.entries[2].key, &[5]);
466 }
467
468 #[test]
469 fn test_range_reving_pagination() {
470 let storage = SqlitePrimitiveStorage::in_memory();
471
472 for i in 0..10u8 {
474 storage.set(vec![(CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))]).unwrap();
475 }
476
477 let mut cursor = RangeCursor::new();
479
480 let batch1 = storage.range_rev_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
482 assert_eq!(batch1.entries.len(), 3);
483 assert!(batch1.has_more);
484 assert!(!cursor.exhausted);
485 assert_eq!(&*batch1.entries[0].key, &[9]);
486 assert_eq!(&*batch1.entries[2].key, &[7]);
487
488 let batch2 = storage.range_rev_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
490 assert_eq!(batch2.entries.len(), 3);
491 assert!(batch2.has_more);
492 assert!(!cursor.exhausted);
493 assert_eq!(&*batch2.entries[0].key, &[6]);
494 assert_eq!(&*batch2.entries[2].key, &[4]);
495 }
496
497 #[test]
498 fn test_get_nonexistent_table() {
499 let storage = SqlitePrimitiveStorage::in_memory();
500
501 let value = storage.get(b"key").unwrap();
503 assert_eq!(value, None);
504 }
505
506 #[test]
507 fn test_range_nonexistent_table() {
508 let storage = SqlitePrimitiveStorage::in_memory();
509
510 let mut cursor = RangeCursor::new();
512 let batch = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 100).unwrap();
513 assert!(batch.entries.is_empty());
514 assert!(cursor.exhausted);
515 }
516}