Skip to main content

reifydb_store_multi/hot/sqlite/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! SQLite implementation of PrimitiveStorage with MVCC versioning.
5//!
6//! Uses SQLite tables with (key, version) composite primary key for persistent
7//! multi-version storage. All operations use a single connection protected by
8//! Mutex for thread safety.
9
10use std::{collections::HashMap, ops::Bound, sync::Arc};
11
12use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal};
13use reifydb_runtime::sync::mutex::Mutex;
14use reifydb_type::{Result, error, util::cowvec::CowVec};
15use rusqlite::{
16	Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
17};
18use tracing::instrument;
19
20use super::{
21	SqliteConfig,
22	connection::{connect, convert_flags, resolve_db_path},
23	entry::entry_id_to_name,
24	query::{build_versioned_range_query, version_to_bytes},
25};
26use crate::tier::{EntryKind, RangeBatch, RangeCursor, RawEntry, TierBackend, TierStorage};
27
28/// SQLite-based primitive storage implementation with MVCC versioning.
29///
30/// Uses SQLite for persistent storage with a single connection protected by Mutex.
31/// Tables use (key, version) composite primary key for multi-version support.
32#[derive(Clone)]
33pub struct SqlitePrimitiveStorage {
34	inner: Arc<SqlitePrimitiveStorageInner>,
35}
36
37struct SqlitePrimitiveStorageInner {
38	/// Single connection protected by Mutex for thread-safe access.
39	/// Note: We use Mutex instead of RwLock because rusqlite::Connection
40	/// is Send but not Sync.
41	conn: Mutex<Connection>,
42}
43
44impl SqlitePrimitiveStorage {
45	/// Create a new SQLite primitive storage with the given configuration.
46	#[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
47		db_path = ?config.path,
48		page_size = config.page_size,
49		journal_mode = %config.journal_mode.as_str()
50	))]
51	pub fn new(config: SqliteConfig) -> Self {
52		let db_path = resolve_db_path(config.path);
53		let flags = convert_flags(&config.flags);
54
55		let conn = connect(&db_path, flags).expect("Failed to connect to database");
56
57		// Configure SQLite pragmas
58		conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
59		conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
60			.expect("Failed to set journal_mode");
61		conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
62			.expect("Failed to set synchronous");
63		conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
64		conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
65		conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
66		conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
67			.expect("Failed to set wal_autocheckpoint");
68		conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
69
70		Self {
71			inner: Arc::new(SqlitePrimitiveStorageInner {
72				conn: Mutex::new(conn),
73			}),
74		}
75	}
76
77	/// Create an in-memory SQLite storage for testing.
78	pub fn in_memory() -> Self {
79		Self::new(SqliteConfig::in_memory())
80	}
81
82	/// Run incremental vacuum to return freed pages to the OS.
83	pub fn incremental_vacuum(&self) {
84		let conn = self.inner.conn.lock();
85		let _ = conn.execute("PRAGMA incremental_vacuum", []);
86	}
87
88	/// Release unused memory back to the allocator.
89	pub fn shrink_memory(&self) {
90		let conn = self.inner.conn.lock();
91		let _ = conn.pragma_update(None, "shrink_memory", 0);
92	}
93
94	/// Explicitly checkpoint WAL and shrink the page cache before shutdown.
95	pub fn shutdown(&self) {
96		let conn = self.inner.conn.lock();
97		let _ = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE");
98		let _ = conn.pragma_update(None, "cache_size", 0);
99	}
100
101	/// Create a table with the versioned schema if it doesn't exist.
102	fn create_table_if_needed(conn: &Connection, table_name: &str) -> SqliteResult<()> {
103		conn.execute(
104			&format!(
105				"CREATE TABLE IF NOT EXISTS \"{}\" (
106					key BLOB NOT NULL,
107					version BLOB NOT NULL,
108					value BLOB,
109					PRIMARY KEY (key, version)
110				) WITHOUT ROWID",
111				table_name
112			),
113			[],
114		)?;
115		Ok(())
116	}
117}
118
119impl TierStorage for SqlitePrimitiveStorage {
120	#[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
121	fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
122		let table_name = entry_id_to_name(table);
123		let conn = self.inner.conn.lock();
124
125		// Get the latest version <= requested version for this key
126		let result = conn.query_row(
127			&format!(
128				"SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
129				table_name
130			),
131			params![key, version_to_bytes(version).as_slice()],
132			|row| row.get::<_, Option<Vec<u8>>>(0),
133		);
134
135		match result {
136			Ok(Some(value)) => Ok(Some(CowVec::new(value))),
137			Ok(None) => Ok(None), // Tombstone
138			Err(QueryReturnedNoRows) => Ok(None),
139			Err(e) if e.to_string().contains("no such table") => Ok(None),
140			Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
141		}
142	}
143
144	#[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
145	fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
146		let table_name = entry_id_to_name(table);
147		let conn = self.inner.conn.lock();
148
149		// Check if value exists and is not a tombstone
150		let result = conn.query_row(
151			&format!(
152				"SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
153				table_name
154			),
155			params![key, version_to_bytes(version).as_slice()],
156			|row| row.get::<_, bool>(0),
157		);
158
159		match result {
160			Ok(has_value) => Ok(has_value),
161			Err(QueryReturnedNoRows) => Ok(false),
162			Err(e) if e.to_string().contains("no such table") => Ok(false),
163			Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
164		}
165	}
166
167	#[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
168	fn set(
169		&self,
170		version: CommitVersion,
171		batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>>,
172	) -> Result<()> {
173		if batches.is_empty() {
174			return Ok(());
175		}
176
177		let conn = self.inner.conn.lock();
178		let tx = conn
179			.unchecked_transaction()
180			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
181
182		for (table, entries) in batches {
183			let table_name = entry_id_to_name(table);
184
185			// Try to insert entries, creating table if needed
186			let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
187			if let Err(e) = result {
188				if e.to_string().contains("no such table") {
189					Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
190						error!(internal(format!("Failed to create table: {}", e)))
191					})?;
192					insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
193						|e| error!(internal(format!("Failed to insert entries: {}", e))),
194					)?;
195				} else {
196					return Err(error!(internal(format!("Failed to insert entries: {}", e))));
197				}
198			}
199		}
200
201		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
202	}
203
204	#[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
205	fn range_next(
206		&self,
207		table: EntryKind,
208		cursor: &mut RangeCursor,
209		start: Bound<&[u8]>,
210		end: Bound<&[u8]>,
211		version: CommitVersion,
212		batch_size: usize,
213	) -> Result<RangeBatch> {
214		if cursor.exhausted {
215			return Ok(RangeBatch::empty());
216		}
217
218		let table_name = entry_id_to_name(table);
219
220		// Determine effective start bound based on cursor state
221		let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
222			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
223			None => bound_to_owned(start),
224		};
225		let end_owned = bound_to_owned(end);
226
227		let conn = self.inner.conn.lock();
228
229		let start_ref = bound_as_ref(&effective_start);
230		let end_ref = bound_as_ref(&end_owned);
231		let (query, params) =
232			build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
233
234		let mut stmt = match conn.prepare(&query) {
235			Ok(stmt) => stmt,
236			Err(e) if e.to_string().contains("no such table") => {
237				cursor.exhausted = true;
238				return Ok(RangeBatch::empty());
239			}
240			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
241		};
242
243		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
244
245		let entries: Vec<RawEntry> = stmt
246			.query_map(params_refs.as_slice(), |row| {
247				let key: Vec<u8> = row.get(0)?;
248				let version_bytes: Vec<u8> = row.get(1)?;
249				let value: Option<Vec<u8>> = row.get(2)?;
250				let version = u64::from_be_bytes(
251					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
252				);
253				Ok(RawEntry {
254					key: CowVec::new(key),
255					version: CommitVersion(version),
256					value: value.map(CowVec::new),
257				})
258			})
259			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
260			.filter_map(|r| r.ok())
261			.collect();
262
263		let has_more = entries.len() > batch_size;
264		let entries = if has_more {
265			entries.into_iter().take(batch_size).collect()
266		} else {
267			entries
268		};
269
270		let batch = RangeBatch {
271			entries,
272			has_more,
273		};
274
275		// Update cursor
276		if let Some(last_entry) = batch.entries.last() {
277			cursor.last_key = Some(last_entry.key.clone());
278		}
279		if !batch.has_more {
280			cursor.exhausted = true;
281		}
282
283		Ok(batch)
284	}
285
286	#[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
287	fn range_rev_next(
288		&self,
289		table: EntryKind,
290		cursor: &mut RangeCursor,
291		start: Bound<&[u8]>,
292		end: Bound<&[u8]>,
293		version: CommitVersion,
294		batch_size: usize,
295	) -> Result<RangeBatch> {
296		if cursor.exhausted {
297			return Ok(RangeBatch::empty());
298		}
299
300		let table_name = entry_id_to_name(table);
301
302		// For reverse iteration, effective end bound based on cursor
303		let start_owned = bound_to_owned(start);
304		let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
305			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
306			None => bound_to_owned(end),
307		};
308
309		let conn = self.inner.conn.lock();
310
311		let start_ref = bound_as_ref(&start_owned);
312		let end_ref = bound_as_ref(&effective_end);
313		let (query, params) =
314			build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
315
316		let mut stmt = match conn.prepare(&query) {
317			Ok(stmt) => stmt,
318			Err(e) if e.to_string().contains("no such table") => {
319				cursor.exhausted = true;
320				return Ok(RangeBatch::empty());
321			}
322			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
323		};
324
325		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
326
327		let entries: Vec<RawEntry> = stmt
328			.query_map(params_refs.as_slice(), |row| {
329				let key: Vec<u8> = row.get(0)?;
330				let version_bytes: Vec<u8> = row.get(1)?;
331				let value: Option<Vec<u8>> = row.get(2)?;
332				let version = u64::from_be_bytes(
333					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
334				);
335				Ok(RawEntry {
336					key: CowVec::new(key),
337					version: CommitVersion(version),
338					value: value.map(CowVec::new),
339				})
340			})
341			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
342			.filter_map(|r| r.ok())
343			.collect();
344
345		let has_more = entries.len() > batch_size;
346		let entries = if has_more {
347			entries.into_iter().take(batch_size).collect()
348		} else {
349			entries
350		};
351
352		let batch = RangeBatch {
353			entries,
354			has_more,
355		};
356
357		// Update cursor
358		if let Some(last_entry) = batch.entries.last() {
359			cursor.last_key = Some(last_entry.key.clone());
360		}
361		if !batch.has_more {
362			cursor.exhausted = true;
363		}
364
365		Ok(batch)
366	}
367
368	fn ensure_table(&self, table: EntryKind) -> Result<()> {
369		let table_name = entry_id_to_name(table);
370		let conn = self.inner.conn.lock();
371
372		Self::create_table_if_needed(&conn, &table_name)
373			.map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
374	}
375
376	fn clear_table(&self, table: EntryKind) -> Result<()> {
377		let table_name = entry_id_to_name(table);
378		let conn = self.inner.conn.lock();
379
380		let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
381
382		match result {
383			Ok(_) => Ok(()),
384			Err(e) if e.to_string().contains("no such table") => Ok(()),
385			Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
386		}
387	}
388
389	#[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
390	fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
391		if batches.is_empty() {
392			return Ok(());
393		}
394
395		let conn = self.inner.conn.lock();
396		let tx = conn
397			.unchecked_transaction()
398			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
399
400		for (table, entries) in batches {
401			let table_name = entry_id_to_name(table);
402
403			let max_version_sql = format!("SELECT MAX(version) FROM \"{}\" WHERE key = ?1", table_name);
404			let delete_all_sql = format!("DELETE FROM \"{}\" WHERE key = ?1", table_name);
405			let delete_one_sql = format!("DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2", table_name);
406
407			let mut max_version_stmt = match tx.prepare(&max_version_sql) {
408				Ok(s) => s,
409				Err(e) if e.to_string().contains("no such table") => continue,
410				Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
411			};
412			let mut delete_all_stmt = tx
413				.prepare(&delete_all_sql)
414				.map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
415			let mut delete_one_stmt = tx
416				.prepare(&delete_one_sql)
417				.map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
418
419			for (key, version) in entries {
420				let version_bytes = version_to_bytes(version);
421
422				let max_version: Option<Vec<u8>> = max_version_stmt
423					.query_row(params![key.as_slice()], |row| row.get(0))
424					.unwrap_or(None);
425
426				let is_latest = max_version.as_deref() == Some(version_bytes.as_slice());
427
428				let result = if is_latest {
429					delete_all_stmt.execute(params![key.as_slice()])
430				} else {
431					delete_one_stmt.execute(params![key.as_slice(), version_bytes.as_slice()])
432				};
433
434				if let Err(e) = result {
435					if !e.to_string().contains("no such table") {
436						return Err(error!(internal(format!("Failed to delete entry: {}", e))));
437					}
438				}
439			}
440		}
441
442		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
443	}
444
445	#[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
446	fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
447		let table_name = entry_id_to_name(table);
448		let conn = self.inner.conn.lock();
449
450		let mut stmt = match conn.prepare(&format!(
451			"SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
452			table_name
453		)) {
454			Ok(stmt) => stmt,
455			Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
456			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
457		};
458
459		let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
460			.query_map(params![key], |row| {
461				let version_bytes: Vec<u8> = row.get(0)?;
462				let value: Option<Vec<u8>> = row.get(1)?;
463				let version = u64::from_be_bytes(
464					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
465				);
466				Ok((CommitVersion(version), value.map(CowVec::new)))
467			})
468			.map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
469			.filter_map(|r| r.ok())
470			.collect();
471
472		Ok(versions)
473	}
474}
475
476impl TierBackend for SqlitePrimitiveStorage {}
477
478/// Convert owned Bound to Bound<&[u8]>
479fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
480	match bound {
481		Bound::Included(v) => Bound::Included(v.as_slice()),
482		Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
483		Bound::Unbounded => Bound::Unbounded,
484	}
485}
486
487/// Convert Bound<&[u8]> to Bound<Vec<u8>>
488fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
489	match bound {
490		Bound::Included(v) => Bound::Included(v.to_vec()),
491		Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
492		Bound::Unbounded => Bound::Unbounded,
493	}
494}
495
496/// Insert versioned entries into a table within an existing transaction
497fn insert_versioned_entries_in_tx(
498	tx: &SqliteTransaction,
499	table_name: &str,
500	version: CommitVersion,
501	entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
502) -> SqliteResult<()> {
503	let version_bytes = version_to_bytes(version);
504	let sql = format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name);
505	let mut stmt = tx.prepare(&sql)?;
506	for (key, value) in entries {
507		stmt.execute(params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())])?;
508	}
509	Ok(())
510}
511
512#[cfg(test)]
513pub mod tests {
514	use reifydb_core::interface::catalog::{id::TableId, primitive::PrimitiveId};
515
516	use super::*;
517
518	#[test]
519	fn test_basic_operations() {
520		let storage = SqlitePrimitiveStorage::in_memory();
521
522		let key = CowVec::new(b"key1".to_vec());
523		let version = CommitVersion(1);
524
525		// Put and get
526		storage.set(
527			version,
528			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
529		)
530		.unwrap();
531		let value = storage.get(EntryKind::Multi, &key, version).unwrap();
532		assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
533
534		// Contains
535		assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
536		assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
537
538		// Delete (tombstone)
539		let version2 = CommitVersion(2);
540		storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
541		assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
542	}
543
544	#[test]
545	fn test_source_tables() {
546		let storage = SqlitePrimitiveStorage::in_memory();
547
548		let source1 = PrimitiveId::Table(TableId(1));
549		let source2 = PrimitiveId::Table(TableId(2));
550		let key = CowVec::new(b"key".to_vec());
551		let version = CommitVersion(1);
552
553		storage.set(
554			version,
555			HashMap::from([(
556				EntryKind::Source(source1),
557				vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
558			)]),
559		)
560		.unwrap();
561		storage.set(
562			version,
563			HashMap::from([(
564				EntryKind::Source(source2),
565				vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
566			)]),
567		)
568		.unwrap();
569
570		assert_eq!(
571			storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
572			Some(b"table1".as_slice())
573		);
574		assert_eq!(
575			storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
576			Some(b"table2".as_slice())
577		);
578	}
579
580	#[test]
581	fn test_version_queries() {
582		let storage = SqlitePrimitiveStorage::in_memory();
583
584		let key = CowVec::new(b"key1".to_vec());
585
586		// Insert multiple versions
587		storage.set(
588			CommitVersion(1),
589			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
590		)
591		.unwrap();
592		storage.set(
593			CommitVersion(2),
594			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
595		)
596		.unwrap();
597		storage.set(
598			CommitVersion(3),
599			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
600		)
601		.unwrap();
602
603		// Get at specific versions
604		assert_eq!(
605			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
606			Some(b"v3".as_slice())
607		);
608		assert_eq!(
609			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
610			Some(b"v2".as_slice())
611		);
612		assert_eq!(
613			storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
614			Some(b"v1".as_slice())
615		);
616
617		// Get at intermediate version returns closest <= version
618		assert_eq!(
619			storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
620			Some(b"v3".as_slice())
621		);
622	}
623
624	#[test]
625	fn test_range_next() {
626		let storage = SqlitePrimitiveStorage::in_memory();
627
628		let version = CommitVersion(1);
629		storage.set(
630			version,
631			HashMap::from([(
632				EntryKind::Multi,
633				vec![
634					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
635					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
636					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
637				],
638			)]),
639		)
640		.unwrap();
641
642		let mut cursor = RangeCursor::new();
643		let batch = storage
644			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
645			.unwrap();
646
647		assert_eq!(batch.entries.len(), 3);
648		assert!(!batch.has_more);
649		assert!(cursor.exhausted);
650		assert_eq!(&*batch.entries[0].key, b"a");
651		assert_eq!(&*batch.entries[1].key, b"b");
652		assert_eq!(&*batch.entries[2].key, b"c");
653	}
654
655	#[test]
656	fn test_range_rev_next() {
657		let storage = SqlitePrimitiveStorage::in_memory();
658
659		let version = CommitVersion(1);
660		storage.set(
661			version,
662			HashMap::from([(
663				EntryKind::Multi,
664				vec![
665					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
666					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
667					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
668				],
669			)]),
670		)
671		.unwrap();
672
673		let mut cursor = RangeCursor::new();
674		let batch = storage
675			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
676			.unwrap();
677
678		assert_eq!(batch.entries.len(), 3);
679		assert!(!batch.has_more);
680		assert!(cursor.exhausted);
681		assert_eq!(&*batch.entries[0].key, b"c");
682		assert_eq!(&*batch.entries[1].key, b"b");
683		assert_eq!(&*batch.entries[2].key, b"a");
684	}
685
686	#[test]
687	fn test_range_streaming_pagination() {
688		let storage = SqlitePrimitiveStorage::in_memory();
689
690		let version = CommitVersion(1);
691
692		// Insert 10 entries
693		let entries: Vec<_> =
694			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
695		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
696
697		// Use a single cursor to stream through all entries
698		let mut cursor = RangeCursor::new();
699
700		// First batch of 3
701		let batch1 = storage
702			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
703			.unwrap();
704		assert_eq!(batch1.entries.len(), 3);
705		assert!(batch1.has_more);
706		assert!(!cursor.exhausted);
707		assert_eq!(&*batch1.entries[0].key, &[0]);
708		assert_eq!(&*batch1.entries[2].key, &[2]);
709
710		// Second batch of 3 - cursor automatically continues
711		let batch2 = storage
712			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
713			.unwrap();
714		assert_eq!(batch2.entries.len(), 3);
715		assert!(batch2.has_more);
716		assert!(!cursor.exhausted);
717		assert_eq!(&*batch2.entries[0].key, &[3]);
718		assert_eq!(&*batch2.entries[2].key, &[5]);
719	}
720
721	#[test]
722	fn test_range_reving_pagination() {
723		let storage = SqlitePrimitiveStorage::in_memory();
724
725		let version = CommitVersion(1);
726
727		// Insert 10 entries
728		let entries: Vec<_> =
729			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
730		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
731
732		// Use a single cursor to stream in reverse
733		let mut cursor = RangeCursor::new();
734
735		// First batch of 3 (reverse)
736		let batch1 = storage
737			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
738			.unwrap();
739		assert_eq!(batch1.entries.len(), 3);
740		assert!(batch1.has_more);
741		assert!(!cursor.exhausted);
742		assert_eq!(&*batch1.entries[0].key, &[9]);
743		assert_eq!(&*batch1.entries[2].key, &[7]);
744
745		// Second batch
746		let batch2 = storage
747			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
748			.unwrap();
749		assert_eq!(batch2.entries.len(), 3);
750		assert!(batch2.has_more);
751		assert!(!cursor.exhausted);
752		assert_eq!(&*batch2.entries[0].key, &[6]);
753		assert_eq!(&*batch2.entries[2].key, &[4]);
754	}
755
756	#[test]
757	fn test_get_nonexistent_table() {
758		let storage = SqlitePrimitiveStorage::in_memory();
759
760		// Should return None for non-existent table, not error
761		let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
762		assert_eq!(value, None);
763	}
764
765	#[test]
766	fn test_range_nonexistent_table() {
767		let storage = SqlitePrimitiveStorage::in_memory();
768
769		// Should return empty batch for non-existent table, not error
770		let mut cursor = RangeCursor::new();
771		let batch = storage
772			.range_next(
773				EntryKind::Multi,
774				&mut cursor,
775				Bound::Unbounded,
776				Bound::Unbounded,
777				CommitVersion(1),
778				100,
779			)
780			.unwrap();
781		assert!(batch.entries.is_empty());
782		assert!(cursor.exhausted);
783	}
784
785	#[test]
786	fn test_drop_specific_version() {
787		let storage = SqlitePrimitiveStorage::in_memory();
788
789		let key = CowVec::new(b"key1".to_vec());
790
791		// Insert versions 1, 2, 3
792		for v in 1..=3u64 {
793			storage.set(
794				CommitVersion(v),
795				HashMap::from([(
796					EntryKind::Multi,
797					vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
798				)]),
799			)
800			.unwrap();
801		}
802
803		// Drop version 1
804		storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
805
806		// Version 1 should no longer be accessible
807		assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
808
809		// Versions 2 and 3 should still work
810		assert_eq!(
811			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
812			Some(b"v2".as_slice())
813		);
814		assert_eq!(
815			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
816			Some(b"v3".as_slice())
817		);
818	}
819}