Skip to main content

reifydb_store_single/hot/sqlite/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! SQLite implementation of single-version storage.
5//!
6//! Uses a single table for persistent key-value storage with thread-safe access.
7
8use std::{ops::Bound, sync::Arc};
9
10use reifydb_core::internal_error;
11use reifydb_runtime::sync::mutex::Mutex;
12use reifydb_type::{Result, util::cowvec::CowVec};
13use rusqlite::{
14	Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
15};
16use tracing::instrument;
17
18use super::{
19	SqliteConfig,
20	connection::{connect, convert_flags, resolve_db_path},
21	query::build_range_query,
22};
23use crate::tier::{RangeBatch, RangeCursor, RawEntry, TierBackend, TierStorage};
24
25/// Single table name for all storage
26const TABLE_NAME: &str = "entries";
27
28/// SQLite-based primitive storage implementation.
29///
30/// Uses a single table for persistent storage with a connection protected by Mutex.
31#[derive(Clone)]
32pub struct SqlitePrimitiveStorage {
33	inner: Arc<SqlitePrimitiveStorageInner>,
34}
35
36struct SqlitePrimitiveStorageInner {
37	/// Single connection protected by Mutex for thread-safe access.
38	/// Note: We use Mutex instead of RwLock because rusqlite::Connection
39	/// is Send but not Sync (contains RefCell).
40	conn: Mutex<Connection>,
41}
42
43impl SqlitePrimitiveStorage {
44	/// Create a new SQLite primitive storage with the given configuration.
45	#[instrument(name = "store::single::sqlite::new", level = "debug", skip(config), fields(
46		db_path = ?config.path,
47		page_size = config.page_size,
48		journal_mode = %config.journal_mode.as_str()
49	))]
50	pub fn new(config: SqliteConfig) -> Self {
51		let db_path = resolve_db_path(config.path);
52		let flags = convert_flags(&config.flags);
53
54		let conn = connect(&db_path, flags).expect("Failed to connect to database");
55
56		// Configure SQLite pragmas
57		conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
58		conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
59			.expect("Failed to set journal_mode");
60		conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
61			.expect("Failed to set synchronous");
62		conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
63		conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
64		conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
65		conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
66			.expect("Failed to set wal_autocheckpoint");
67		conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
68
69		Self {
70			inner: Arc::new(SqlitePrimitiveStorageInner {
71				conn: Mutex::new(conn),
72			}),
73		}
74	}
75
76	/// Create an in-memory SQLite storage for testing.
77	pub fn in_memory() -> Self {
78		Self::new(SqliteConfig::in_memory())
79	}
80}
81
82impl TierStorage for SqlitePrimitiveStorage {
83	#[instrument(name = "store::single::sqlite::get", level = "trace", skip(self, key), fields(key_len = key.len()))]
84	fn get(&self, key: &[u8]) -> Result<Option<CowVec<u8>>> {
85		let conn = self.inner.conn.lock();
86
87		let result = conn.query_row(
88			&format!("SELECT value FROM \"{}\" WHERE key = ?1", TABLE_NAME),
89			params![key],
90			|row| row.get::<_, Option<Vec<u8>>>(0),
91		);
92
93		match result {
94			Ok(Some(value)) => Ok(Some(CowVec::new(value))),
95			Ok(None) => Ok(None),
96			Err(QueryReturnedNoRows) => Ok(None),
97			Err(e) if e.to_string().contains("no such table") => Ok(None),
98			Err(e) => Err(internal_error!("Failed to get: {}", e)),
99		}
100	}
101
102	#[instrument(name = "store::single::sqlite::contains", level = "trace", skip(self, key), fields(key_len = key.len()), ret)]
103	fn contains(&self, key: &[u8]) -> Result<bool> {
104		let conn = self.inner.conn.lock();
105
106		let result = conn.query_row(
107			&format!("SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1", TABLE_NAME),
108			params![key],
109			|row| row.get::<_, bool>(0),
110		);
111
112		match result {
113			Ok(has_value) => Ok(has_value),
114			Err(QueryReturnedNoRows) => Ok(false),
115			Err(e) if e.to_string().contains("no such table") => Ok(false),
116			Err(e) => Err(internal_error!("Failed to check contains: {}", e)),
117		}
118	}
119
120	#[instrument(name = "store::single::sqlite::set", level = "debug", skip(self, entries), fields(entry_count = entries.len()))]
121	fn set(&self, entries: Vec<(CowVec<u8>, Option<CowVec<u8>>)>) -> Result<()> {
122		if entries.is_empty() {
123			return Ok(());
124		}
125
126		let conn = self.inner.conn.lock();
127		let tx = conn
128			.unchecked_transaction()
129			.map_err(|e| internal_error!("Failed to start transaction: {}", e))?;
130
131		let result = insert_entries_in_tx(&tx, TABLE_NAME, &entries);
132		if let Err(e) = result {
133			if e.to_string().contains("no such table") {
134				tx.execute(
135					&format!(
136						"CREATE TABLE IF NOT EXISTS \"{}\" (
137							key BLOB NOT NULL PRIMARY KEY,
138							value BLOB
139						) WITHOUT ROWID",
140						TABLE_NAME
141					),
142					[],
143				)
144				.map_err(|e| internal_error!("Failed to create table: {}", e))?;
145				insert_entries_in_tx(&tx, TABLE_NAME, &entries)
146					.map_err(|e| internal_error!("Failed to insert entries: {}", e))?;
147			} else {
148				return Err(internal_error!("Failed to insert entries: {}", e));
149			}
150		}
151
152		tx.commit().map_err(|e| internal_error!("Failed to commit transaction: {}", e))
153	}
154
155	#[instrument(name = "store::single::sqlite::range_next", level = "trace", skip(self, cursor))]
156	fn range_next(
157		&self,
158		cursor: &mut RangeCursor,
159		start: Bound<&[u8]>,
160		end: Bound<&[u8]>,
161		batch_size: usize,
162	) -> Result<RangeBatch> {
163		if cursor.exhausted {
164			return Ok(RangeBatch::empty());
165		}
166
167		// Determine effective start bound based on cursor state
168		let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
169			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
170			None => bound_to_owned(start),
171		};
172		let end_owned = bound_to_owned(end);
173
174		let conn = self.inner.conn.lock();
175
176		let start_ref = bound_as_ref(&effective_start);
177		let end_ref = bound_as_ref(&end_owned);
178		let (query, params) = build_range_query(TABLE_NAME, start_ref, end_ref, false, batch_size + 1);
179
180		let mut stmt = match conn.prepare(&query) {
181			Ok(stmt) => stmt,
182			Err(e) if e.to_string().contains("no such table") => {
183				cursor.exhausted = true;
184				return Ok(RangeBatch::empty());
185			}
186			Err(e) => return Err(internal_error!("Failed to prepare query: {}", e)),
187		};
188
189		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
190
191		let entries: Vec<RawEntry> = stmt
192			.query_map(params_refs.as_slice(), |row| {
193				let key: Vec<u8> = row.get(0)?;
194				let value: Option<Vec<u8>> = row.get(1)?;
195				Ok(RawEntry {
196					key: CowVec::new(key),
197					value: value.map(CowVec::new),
198				})
199			})
200			.map_err(|e| internal_error!("Failed to query range: {}", e))?
201			.filter_map(|r| r.ok())
202			.collect();
203
204		let has_more = entries.len() > batch_size;
205		let entries = if has_more {
206			entries.into_iter().take(batch_size).collect()
207		} else {
208			entries
209		};
210
211		let batch = RangeBatch {
212			entries,
213			has_more,
214		};
215
216		// Update cursor
217		if let Some(last_entry) = batch.entries.last() {
218			cursor.last_key = Some(last_entry.key.clone());
219		}
220		if !batch.has_more {
221			cursor.exhausted = true;
222		}
223
224		Ok(batch)
225	}
226
227	#[instrument(name = "store::single::sqlite::range_rev_next", level = "trace", skip(self, cursor))]
228	fn range_rev_next(
229		&self,
230		cursor: &mut RangeCursor,
231		start: Bound<&[u8]>,
232		end: Bound<&[u8]>,
233		batch_size: usize,
234	) -> Result<RangeBatch> {
235		if cursor.exhausted {
236			return Ok(RangeBatch::empty());
237		}
238
239		// For reverse iteration, effective end bound based on cursor
240		let start_owned = bound_to_owned(start);
241		let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
242			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
243			None => bound_to_owned(end),
244		};
245
246		let conn = self.inner.conn.lock();
247
248		let start_ref = bound_as_ref(&start_owned);
249		let end_ref = bound_as_ref(&effective_end);
250		let (query, params) = build_range_query(TABLE_NAME, start_ref, end_ref, true, batch_size + 1);
251
252		let mut stmt = match conn.prepare(&query) {
253			Ok(stmt) => stmt,
254			Err(e) if e.to_string().contains("no such table") => {
255				cursor.exhausted = true;
256				return Ok(RangeBatch::empty());
257			}
258			Err(e) => return Err(internal_error!("Failed to prepare query: {}", e)),
259		};
260
261		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
262
263		let entries: Vec<RawEntry> = stmt
264			.query_map(params_refs.as_slice(), |row| {
265				let key: Vec<u8> = row.get(0)?;
266				let value: Option<Vec<u8>> = row.get(1)?;
267				Ok(RawEntry {
268					key: CowVec::new(key),
269					value: value.map(CowVec::new),
270				})
271			})
272			.map_err(|e| internal_error!("Failed to query range: {}", e))?
273			.filter_map(|r| r.ok())
274			.collect();
275
276		let has_more = entries.len() > batch_size;
277		let entries = if has_more {
278			entries.into_iter().take(batch_size).collect()
279		} else {
280			entries
281		};
282
283		let batch = RangeBatch {
284			entries,
285			has_more,
286		};
287
288		// Update cursor
289		if let Some(last_entry) = batch.entries.last() {
290			cursor.last_key = Some(last_entry.key.clone());
291		}
292		if !batch.has_more {
293			cursor.exhausted = true;
294		}
295
296		Ok(batch)
297	}
298
299	#[instrument(name = "store::single::sqlite::ensure_table", level = "trace", skip(self))]
300	fn ensure_table(&self) -> Result<()> {
301		let conn = self.inner.conn.lock();
302
303		conn.execute(
304			&format!(
305				"CREATE TABLE IF NOT EXISTS \"{}\" (
306					key   BLOB NOT NULL PRIMARY KEY,
307					value BLOB
308				) WITHOUT ROWID",
309				TABLE_NAME
310			),
311			[],
312		)
313		.map(|_| ())
314		.map_err(|e| internal_error!("Failed to ensure table: {}", e))
315	}
316
317	#[instrument(name = "store::single::sqlite::clear_table", level = "debug", skip(self))]
318	fn clear_table(&self) -> Result<()> {
319		let conn = self.inner.conn.lock();
320
321		let result = conn.execute(&format!("DELETE FROM \"{}\"", TABLE_NAME), []);
322
323		match result {
324			Ok(_) => Ok(()),
325			Err(e) if e.to_string().contains("no such table") => Ok(()),
326			Err(e) => Err(internal_error!("Failed to clear table: {}", e)),
327		}
328	}
329}
330
331impl TierBackend for SqlitePrimitiveStorage {}
332
333/// Convert owned Bound to Bound<&[u8]>
334fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
335	match bound {
336		Bound::Included(v) => Bound::Included(v.as_slice()),
337		Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
338		Bound::Unbounded => Bound::Unbounded,
339	}
340}
341
342/// Convert Bound<&[u8]> to Bound<Vec<u8>>
343fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
344	match bound {
345		Bound::Included(v) => Bound::Included(v.to_vec()),
346		Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
347		Bound::Unbounded => Bound::Unbounded,
348	}
349}
350
351/// Insert entries into a table within an existing transaction
352fn insert_entries_in_tx(
353	tx: &SqliteTransaction,
354	table_name: &str,
355	entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
356) -> SqliteResult<()> {
357	for (key, value) in entries {
358		match value {
359			Some(v) => {
360				tx.execute(
361					&format!(
362						"INSERT OR REPLACE INTO \"{}\" (key, value) VALUES (?1, ?2)",
363						table_name
364					),
365					params![key.as_slice(), v.as_slice()],
366				)?;
367			}
368			None => {
369				tx.execute(
370					&format!("DELETE FROM \"{}\" WHERE key = ?1", table_name),
371					params![key.as_slice()],
372				)?;
373			}
374		}
375	}
376	Ok(())
377}
378
379#[cfg(test)]
380pub mod tests {
381	use super::*;
382
383	#[test]
384	fn test_basic_operations() {
385		let storage = SqlitePrimitiveStorage::in_memory();
386
387		// Put and get
388		storage.set(vec![(CowVec::new(b"key1".to_vec()), Some(CowVec::new(b"value1".to_vec())))]).unwrap();
389		let value = storage.get(b"key1").unwrap();
390		assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
391
392		// Contains
393		assert!(storage.contains(b"key1").unwrap());
394		assert!(!storage.contains(b"nonexistent").unwrap());
395
396		// Delete (tombstone)
397		storage.set(vec![(CowVec::new(b"key1".to_vec()), None)]).unwrap();
398		assert!(!storage.contains(b"key1").unwrap());
399	}
400
401	#[test]
402	fn test_range_next() {
403		let storage = SqlitePrimitiveStorage::in_memory();
404
405		storage.set(vec![(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec())))]).unwrap();
406		storage.set(vec![(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec())))]).unwrap();
407		storage.set(vec![(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec())))]).unwrap();
408
409		let mut cursor = RangeCursor::new();
410		let batch = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 100).unwrap();
411
412		assert_eq!(batch.entries.len(), 3);
413		assert!(!batch.has_more);
414		assert!(cursor.exhausted);
415		assert_eq!(&*batch.entries[0].key, b"a");
416		assert_eq!(&*batch.entries[1].key, b"b");
417		assert_eq!(&*batch.entries[2].key, b"c");
418	}
419
420	#[test]
421	fn test_range_rev_next() {
422		let storage = SqlitePrimitiveStorage::in_memory();
423
424		storage.set(vec![(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec())))]).unwrap();
425		storage.set(vec![(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec())))]).unwrap();
426		storage.set(vec![(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec())))]).unwrap();
427
428		let mut cursor = RangeCursor::new();
429		let batch = storage.range_rev_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 100).unwrap();
430
431		assert_eq!(batch.entries.len(), 3);
432		assert!(!batch.has_more);
433		assert!(cursor.exhausted);
434		assert_eq!(&*batch.entries[0].key, b"c");
435		assert_eq!(&*batch.entries[1].key, b"b");
436		assert_eq!(&*batch.entries[2].key, b"a");
437	}
438
439	#[test]
440	fn test_range_streaming_pagination() {
441		let storage = SqlitePrimitiveStorage::in_memory();
442
443		// Insert 10 entries
444		for i in 0..10u8 {
445			storage.set(vec![(CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))]).unwrap();
446		}
447
448		// Use a single cursor to stream through all entries
449		let mut cursor = RangeCursor::new();
450
451		// First batch of 3
452		let batch1 = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
453		assert_eq!(batch1.entries.len(), 3);
454		assert!(batch1.has_more);
455		assert!(!cursor.exhausted);
456		assert_eq!(&*batch1.entries[0].key, &[0]);
457		assert_eq!(&*batch1.entries[2].key, &[2]);
458
459		// Second batch of 3 - cursor automatically continues
460		let batch2 = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
461		assert_eq!(batch2.entries.len(), 3);
462		assert!(batch2.has_more);
463		assert!(!cursor.exhausted);
464		assert_eq!(&*batch2.entries[0].key, &[3]);
465		assert_eq!(&*batch2.entries[2].key, &[5]);
466	}
467
468	#[test]
469	fn test_range_reving_pagination() {
470		let storage = SqlitePrimitiveStorage::in_memory();
471
472		// Insert 10 entries
473		for i in 0..10u8 {
474			storage.set(vec![(CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))]).unwrap();
475		}
476
477		// Use a single cursor to stream in reverse
478		let mut cursor = RangeCursor::new();
479
480		// First batch of 3 (reverse)
481		let batch1 = storage.range_rev_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
482		assert_eq!(batch1.entries.len(), 3);
483		assert!(batch1.has_more);
484		assert!(!cursor.exhausted);
485		assert_eq!(&*batch1.entries[0].key, &[9]);
486		assert_eq!(&*batch1.entries[2].key, &[7]);
487
488		// Second batch
489		let batch2 = storage.range_rev_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 3).unwrap();
490		assert_eq!(batch2.entries.len(), 3);
491		assert!(batch2.has_more);
492		assert!(!cursor.exhausted);
493		assert_eq!(&*batch2.entries[0].key, &[6]);
494		assert_eq!(&*batch2.entries[2].key, &[4]);
495	}
496
497	#[test]
498	fn test_get_nonexistent_table() {
499		let storage = SqlitePrimitiveStorage::in_memory();
500
501		// Should return None for non-existent table, not error
502		let value = storage.get(b"key").unwrap();
503		assert_eq!(value, None);
504	}
505
506	#[test]
507	fn test_range_nonexistent_table() {
508		let storage = SqlitePrimitiveStorage::in_memory();
509
510		// Should return empty batch for non-existent table, not error
511		let mut cursor = RangeCursor::new();
512		let batch = storage.range_next(&mut cursor, Bound::Unbounded, Bound::Unbounded, 100).unwrap();
513		assert!(batch.entries.is_empty());
514		assert!(cursor.exhausted);
515	}
516}