Skip to main content

reifydb_store_multi/hot/sqlite/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! SQLite implementation of PrimitiveStorage with MVCC versioning.
5//!
6//! Uses SQLite tables with (key, version) composite primary key for persistent
7//! multi-version storage. All operations use a single connection protected by
8//! Mutex for thread safety.
9
10use std::{collections::HashMap, ops::Bound, sync::Arc};
11
12use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal};
13use reifydb_runtime::sync::mutex::Mutex;
14use reifydb_type::{Result, error, util::cowvec::CowVec};
15use rusqlite::{
16	Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
17};
18use tracing::instrument;
19
20use super::{
21	SqliteConfig,
22	connection::{connect, convert_flags, resolve_db_path},
23	entry::entry_id_to_name,
24	query::{build_versioned_range_query, version_to_bytes},
25};
26use crate::tier::{EntryKind, RangeBatch, RangeCursor, RawEntry, TierBackend, TierBatch, TierStorage};
27
28/// SQLite-based primitive storage implementation with MVCC versioning.
29///
30/// Uses SQLite for persistent storage with a single connection protected by Mutex.
31/// Tables use (key, version) composite primary key for multi-version support.
32#[derive(Clone)]
33pub struct SqlitePrimitiveStorage {
34	inner: Arc<SqlitePrimitiveStorageInner>,
35}
36
37struct SqlitePrimitiveStorageInner {
38	/// Single connection protected by Mutex for thread-safe access.
39	/// Note: We use Mutex instead of RwLock because rusqlite::Connection
40	/// is Send but not Sync.
41	conn: Mutex<Connection>,
42}
43
44impl SqlitePrimitiveStorage {
45	/// Create a new SQLite primitive storage with the given configuration.
46	#[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
47		db_path = ?config.path,
48		page_size = config.page_size,
49		journal_mode = %config.journal_mode.as_str()
50	))]
51	pub fn new(config: SqliteConfig) -> Self {
52		let db_path = resolve_db_path(config.path);
53		let flags = convert_flags(&config.flags);
54
55		let conn = connect(&db_path, flags).expect("Failed to connect to database");
56
57		// Configure SQLite pragmas
58		conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
59		conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
60			.expect("Failed to set journal_mode");
61		conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
62			.expect("Failed to set synchronous");
63		conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
64		conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
65		conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
66		conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
67			.expect("Failed to set wal_autocheckpoint");
68		conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
69
70		Self {
71			inner: Arc::new(SqlitePrimitiveStorageInner {
72				conn: Mutex::new(conn),
73			}),
74		}
75	}
76
77	/// Create an in-memory SQLite storage for testing.
78	pub fn in_memory() -> Self {
79		Self::new(SqliteConfig::in_memory())
80	}
81
82	/// Run incremental vacuum to return freed pages to the OS.
83	pub fn incremental_vacuum(&self) {
84		let conn = self.inner.conn.lock();
85		let _ = conn.execute("PRAGMA incremental_vacuum", []);
86	}
87
88	/// Release unused memory back to the allocator.
89	pub fn shrink_memory(&self) {
90		let conn = self.inner.conn.lock();
91		let _ = conn.pragma_update(None, "shrink_memory", 0);
92	}
93
94	/// Explicitly checkpoint WAL and shrink the page cache before shutdown.
95	pub fn shutdown(&self) {
96		let conn = self.inner.conn.lock();
97		let _ = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE");
98		let _ = conn.pragma_update(None, "cache_size", 0);
99	}
100
101	/// Create a table with the versioned shape if it doesn't exist.
102	fn create_table_if_needed(conn: &Connection, table_name: &str) -> SqliteResult<()> {
103		conn.execute(
104			&format!(
105				"CREATE TABLE IF NOT EXISTS \"{}\" (
106					key BLOB NOT NULL,
107					version BLOB NOT NULL,
108					value BLOB,
109					PRIMARY KEY (key, version)
110				) WITHOUT ROWID",
111				table_name
112			),
113			[],
114		)?;
115		Ok(())
116	}
117}
118
119impl TierStorage for SqlitePrimitiveStorage {
120	#[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
121	fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
122		let table_name = entry_id_to_name(table);
123		let conn = self.inner.conn.lock();
124
125		// Get the latest version <= requested version for this key
126		let result = conn.query_row(
127			&format!(
128				"SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
129				table_name
130			),
131			params![key, version_to_bytes(version).as_slice()],
132			|row| row.get::<_, Option<Vec<u8>>>(0),
133		);
134
135		match result {
136			Ok(Some(value)) => Ok(Some(CowVec::new(value))),
137			Ok(None) => Ok(None), // Tombstone
138			Err(QueryReturnedNoRows) => Ok(None),
139			Err(e) if e.to_string().contains("no such table") => Ok(None),
140			Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
141		}
142	}
143
144	#[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
145	fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
146		let table_name = entry_id_to_name(table);
147		let conn = self.inner.conn.lock();
148
149		// Check if value exists and is not a tombstone
150		let result = conn.query_row(
151			&format!(
152				"SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
153				table_name
154			),
155			params![key, version_to_bytes(version).as_slice()],
156			|row| row.get::<_, bool>(0),
157		);
158
159		match result {
160			Ok(has_value) => Ok(has_value),
161			Err(QueryReturnedNoRows) => Ok(false),
162			Err(e) if e.to_string().contains("no such table") => Ok(false),
163			Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
164		}
165	}
166
167	#[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
168	fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()> {
169		if batches.is_empty() {
170			return Ok(());
171		}
172
173		let conn = self.inner.conn.lock();
174		let tx = conn
175			.unchecked_transaction()
176			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
177
178		for (table, entries) in batches {
179			let table_name = entry_id_to_name(table);
180
181			// Try to insert entries, creating table if needed
182			let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
183			if let Err(e) = result {
184				if e.to_string().contains("no such table") {
185					Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
186						error!(internal(format!("Failed to create table: {}", e)))
187					})?;
188					insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
189						|e| error!(internal(format!("Failed to insert entries: {}", e))),
190					)?;
191				} else {
192					return Err(error!(internal(format!("Failed to insert entries: {}", e))));
193				}
194			}
195		}
196
197		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
198	}
199
200	#[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
201	fn range_next(
202		&self,
203		table: EntryKind,
204		cursor: &mut RangeCursor,
205		start: Bound<&[u8]>,
206		end: Bound<&[u8]>,
207		version: CommitVersion,
208		batch_size: usize,
209	) -> Result<RangeBatch> {
210		if cursor.exhausted {
211			return Ok(RangeBatch::empty());
212		}
213
214		let table_name = entry_id_to_name(table);
215
216		// Determine effective start bound based on cursor state
217		let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
218			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
219			None => bound_to_owned(start),
220		};
221		let end_owned = bound_to_owned(end);
222
223		let conn = self.inner.conn.lock();
224
225		let start_ref = bound_as_ref(&effective_start);
226		let end_ref = bound_as_ref(&end_owned);
227		let (query, params) =
228			build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
229
230		let mut stmt = match conn.prepare(&query) {
231			Ok(stmt) => stmt,
232			Err(e) if e.to_string().contains("no such table") => {
233				cursor.exhausted = true;
234				return Ok(RangeBatch::empty());
235			}
236			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
237		};
238
239		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
240
241		let entries: Vec<RawEntry> = stmt
242			.query_map(params_refs.as_slice(), |row| {
243				let key: Vec<u8> = row.get(0)?;
244				let version_bytes: Vec<u8> = row.get(1)?;
245				let value: Option<Vec<u8>> = row.get(2)?;
246				let version = u64::from_be_bytes(
247					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
248				);
249				Ok(RawEntry {
250					key: CowVec::new(key),
251					version: CommitVersion(version),
252					value: value.map(CowVec::new),
253				})
254			})
255			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
256			.filter_map(|r| r.ok())
257			.collect();
258
259		let has_more = entries.len() > batch_size;
260		let entries = if has_more {
261			entries.into_iter().take(batch_size).collect()
262		} else {
263			entries
264		};
265
266		let batch = RangeBatch {
267			entries,
268			has_more,
269		};
270
271		// Update cursor
272		if let Some(last_entry) = batch.entries.last() {
273			cursor.last_key = Some(last_entry.key.clone());
274		}
275		if !batch.has_more {
276			cursor.exhausted = true;
277		}
278
279		Ok(batch)
280	}
281
282	#[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
283	fn range_rev_next(
284		&self,
285		table: EntryKind,
286		cursor: &mut RangeCursor,
287		start: Bound<&[u8]>,
288		end: Bound<&[u8]>,
289		version: CommitVersion,
290		batch_size: usize,
291	) -> Result<RangeBatch> {
292		if cursor.exhausted {
293			return Ok(RangeBatch::empty());
294		}
295
296		let table_name = entry_id_to_name(table);
297
298		// For reverse iteration, effective end bound based on cursor
299		let start_owned = bound_to_owned(start);
300		let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
301			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
302			None => bound_to_owned(end),
303		};
304
305		let conn = self.inner.conn.lock();
306
307		let start_ref = bound_as_ref(&start_owned);
308		let end_ref = bound_as_ref(&effective_end);
309		let (query, params) =
310			build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
311
312		let mut stmt = match conn.prepare(&query) {
313			Ok(stmt) => stmt,
314			Err(e) if e.to_string().contains("no such table") => {
315				cursor.exhausted = true;
316				return Ok(RangeBatch::empty());
317			}
318			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
319		};
320
321		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
322
323		let entries: Vec<RawEntry> = stmt
324			.query_map(params_refs.as_slice(), |row| {
325				let key: Vec<u8> = row.get(0)?;
326				let version_bytes: Vec<u8> = row.get(1)?;
327				let value: Option<Vec<u8>> = row.get(2)?;
328				let version = u64::from_be_bytes(
329					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
330				);
331				Ok(RawEntry {
332					key: CowVec::new(key),
333					version: CommitVersion(version),
334					value: value.map(CowVec::new),
335				})
336			})
337			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
338			.filter_map(|r| r.ok())
339			.collect();
340
341		let has_more = entries.len() > batch_size;
342		let entries = if has_more {
343			entries.into_iter().take(batch_size).collect()
344		} else {
345			entries
346		};
347
348		let batch = RangeBatch {
349			entries,
350			has_more,
351		};
352
353		// Update cursor
354		if let Some(last_entry) = batch.entries.last() {
355			cursor.last_key = Some(last_entry.key.clone());
356		}
357		if !batch.has_more {
358			cursor.exhausted = true;
359		}
360
361		Ok(batch)
362	}
363
364	fn ensure_table(&self, table: EntryKind) -> Result<()> {
365		let table_name = entry_id_to_name(table);
366		let conn = self.inner.conn.lock();
367
368		Self::create_table_if_needed(&conn, &table_name)
369			.map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
370	}
371
372	fn clear_table(&self, table: EntryKind) -> Result<()> {
373		let table_name = entry_id_to_name(table);
374		let conn = self.inner.conn.lock();
375
376		let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
377
378		match result {
379			Ok(_) => Ok(()),
380			Err(e) if e.to_string().contains("no such table") => Ok(()),
381			Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
382		}
383	}
384
385	#[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
386	fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
387		if batches.is_empty() {
388			return Ok(());
389		}
390
391		let conn = self.inner.conn.lock();
392		let tx = conn
393			.unchecked_transaction()
394			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
395
396		for (table, entries) in batches {
397			let table_name = entry_id_to_name(table);
398
399			let max_version_sql = format!("SELECT MAX(version) FROM \"{}\" WHERE key = ?1", table_name);
400			let delete_all_sql = format!("DELETE FROM \"{}\" WHERE key = ?1", table_name);
401			let delete_one_sql = format!("DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2", table_name);
402
403			let mut max_version_stmt = match tx.prepare(&max_version_sql) {
404				Ok(s) => s,
405				Err(e) if e.to_string().contains("no such table") => continue,
406				Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
407			};
408			let mut delete_all_stmt = tx
409				.prepare(&delete_all_sql)
410				.map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
411			let mut delete_one_stmt = tx
412				.prepare(&delete_one_sql)
413				.map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
414
415			for (key, version) in entries {
416				let version_bytes = version_to_bytes(version);
417
418				let max_version: Option<Vec<u8>> = max_version_stmt
419					.query_row(params![key.as_slice()], |row| row.get(0))
420					.unwrap_or(None);
421
422				let is_latest = max_version.as_deref() == Some(version_bytes.as_slice());
423
424				let result = if is_latest {
425					delete_all_stmt.execute(params![key.as_slice()])
426				} else {
427					delete_one_stmt.execute(params![key.as_slice(), version_bytes.as_slice()])
428				};
429
430				if let Err(e) = result
431					&& !e.to_string().contains("no such table")
432				{
433					return Err(error!(internal(format!("Failed to delete entry: {}", e))));
434				}
435			}
436		}
437
438		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
439	}
440
441	#[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
442	fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
443		let table_name = entry_id_to_name(table);
444		let conn = self.inner.conn.lock();
445
446		let mut stmt = match conn.prepare(&format!(
447			"SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
448			table_name
449		)) {
450			Ok(stmt) => stmt,
451			Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
452			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
453		};
454
455		let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
456			.query_map(params![key], |row| {
457				let version_bytes: Vec<u8> = row.get(0)?;
458				let value: Option<Vec<u8>> = row.get(1)?;
459				let version = u64::from_be_bytes(
460					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
461				);
462				Ok((CommitVersion(version), value.map(CowVec::new)))
463			})
464			.map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
465			.filter_map(|r| r.ok())
466			.collect();
467
468		Ok(versions)
469	}
470}
471
472impl TierBackend for SqlitePrimitiveStorage {}
473
474/// Convert owned Bound to Bound<&[u8]>
475fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
476	match bound {
477		Bound::Included(v) => Bound::Included(v.as_slice()),
478		Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
479		Bound::Unbounded => Bound::Unbounded,
480	}
481}
482
483/// Convert Bound<&[u8]> to Bound<Vec<u8>>
484fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
485	match bound {
486		Bound::Included(v) => Bound::Included(v.to_vec()),
487		Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
488		Bound::Unbounded => Bound::Unbounded,
489	}
490}
491
492/// Insert versioned entries into a table within an existing transaction
493fn insert_versioned_entries_in_tx(
494	tx: &SqliteTransaction,
495	table_name: &str,
496	version: CommitVersion,
497	entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
498) -> SqliteResult<()> {
499	let version_bytes = version_to_bytes(version);
500	let sql = format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name);
501	let mut stmt = tx.prepare(&sql)?;
502	for (key, value) in entries {
503		stmt.execute(params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())])?;
504	}
505	Ok(())
506}
507
508#[cfg(test)]
509pub mod tests {
510	use reifydb_core::interface::catalog::{id::TableId, shape::ShapeId};
511
512	use super::*;
513
514	#[test]
515	fn test_basic_operations() {
516		let storage = SqlitePrimitiveStorage::in_memory();
517
518		let key = CowVec::new(b"key1".to_vec());
519		let version = CommitVersion(1);
520
521		// Put and get
522		storage.set(
523			version,
524			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
525		)
526		.unwrap();
527		let value = storage.get(EntryKind::Multi, &key, version).unwrap();
528		assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
529
530		// Contains
531		assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
532		assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
533
534		// Delete (tombstone)
535		let version2 = CommitVersion(2);
536		storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
537		assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
538	}
539
540	#[test]
541	fn test_source_tables() {
542		let storage = SqlitePrimitiveStorage::in_memory();
543
544		let source1 = ShapeId::Table(TableId(1));
545		let source2 = ShapeId::Table(TableId(2));
546		let key = CowVec::new(b"key".to_vec());
547		let version = CommitVersion(1);
548
549		storage.set(
550			version,
551			HashMap::from([(
552				EntryKind::Source(source1),
553				vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
554			)]),
555		)
556		.unwrap();
557		storage.set(
558			version,
559			HashMap::from([(
560				EntryKind::Source(source2),
561				vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
562			)]),
563		)
564		.unwrap();
565
566		assert_eq!(
567			storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
568			Some(b"table1".as_slice())
569		);
570		assert_eq!(
571			storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
572			Some(b"table2".as_slice())
573		);
574	}
575
576	#[test]
577	fn test_version_queries() {
578		let storage = SqlitePrimitiveStorage::in_memory();
579
580		let key = CowVec::new(b"key1".to_vec());
581
582		// Insert multiple versions
583		storage.set(
584			CommitVersion(1),
585			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
586		)
587		.unwrap();
588		storage.set(
589			CommitVersion(2),
590			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
591		)
592		.unwrap();
593		storage.set(
594			CommitVersion(3),
595			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
596		)
597		.unwrap();
598
599		// Get at specific versions
600		assert_eq!(
601			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
602			Some(b"v3".as_slice())
603		);
604		assert_eq!(
605			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
606			Some(b"v2".as_slice())
607		);
608		assert_eq!(
609			storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
610			Some(b"v1".as_slice())
611		);
612
613		// Get at intermediate version returns closest <= version
614		assert_eq!(
615			storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
616			Some(b"v3".as_slice())
617		);
618	}
619
620	#[test]
621	fn test_range_next() {
622		let storage = SqlitePrimitiveStorage::in_memory();
623
624		let version = CommitVersion(1);
625		storage.set(
626			version,
627			HashMap::from([(
628				EntryKind::Multi,
629				vec![
630					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
631					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
632					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
633				],
634			)]),
635		)
636		.unwrap();
637
638		let mut cursor = RangeCursor::new();
639		let batch = storage
640			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
641			.unwrap();
642
643		assert_eq!(batch.entries.len(), 3);
644		assert!(!batch.has_more);
645		assert!(cursor.exhausted);
646		assert_eq!(&*batch.entries[0].key, b"a");
647		assert_eq!(&*batch.entries[1].key, b"b");
648		assert_eq!(&*batch.entries[2].key, b"c");
649	}
650
651	#[test]
652	fn test_range_rev_next() {
653		let storage = SqlitePrimitiveStorage::in_memory();
654
655		let version = CommitVersion(1);
656		storage.set(
657			version,
658			HashMap::from([(
659				EntryKind::Multi,
660				vec![
661					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
662					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
663					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
664				],
665			)]),
666		)
667		.unwrap();
668
669		let mut cursor = RangeCursor::new();
670		let batch = storage
671			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
672			.unwrap();
673
674		assert_eq!(batch.entries.len(), 3);
675		assert!(!batch.has_more);
676		assert!(cursor.exhausted);
677		assert_eq!(&*batch.entries[0].key, b"c");
678		assert_eq!(&*batch.entries[1].key, b"b");
679		assert_eq!(&*batch.entries[2].key, b"a");
680	}
681
682	#[test]
683	fn test_range_streaming_pagination() {
684		let storage = SqlitePrimitiveStorage::in_memory();
685
686		let version = CommitVersion(1);
687
688		// Insert 10 entries
689		let entries: Vec<_> =
690			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
691		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
692
693		// Use a single cursor to stream through all entries
694		let mut cursor = RangeCursor::new();
695
696		// First batch of 3
697		let batch1 = storage
698			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
699			.unwrap();
700		assert_eq!(batch1.entries.len(), 3);
701		assert!(batch1.has_more);
702		assert!(!cursor.exhausted);
703		assert_eq!(&*batch1.entries[0].key, &[0]);
704		assert_eq!(&*batch1.entries[2].key, &[2]);
705
706		// Second batch of 3 - cursor automatically continues
707		let batch2 = storage
708			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
709			.unwrap();
710		assert_eq!(batch2.entries.len(), 3);
711		assert!(batch2.has_more);
712		assert!(!cursor.exhausted);
713		assert_eq!(&*batch2.entries[0].key, &[3]);
714		assert_eq!(&*batch2.entries[2].key, &[5]);
715	}
716
717	#[test]
718	fn test_range_reving_pagination() {
719		let storage = SqlitePrimitiveStorage::in_memory();
720
721		let version = CommitVersion(1);
722
723		// Insert 10 entries
724		let entries: Vec<_> =
725			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
726		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
727
728		// Use a single cursor to stream in reverse
729		let mut cursor = RangeCursor::new();
730
731		// First batch of 3 (reverse)
732		let batch1 = storage
733			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
734			.unwrap();
735		assert_eq!(batch1.entries.len(), 3);
736		assert!(batch1.has_more);
737		assert!(!cursor.exhausted);
738		assert_eq!(&*batch1.entries[0].key, &[9]);
739		assert_eq!(&*batch1.entries[2].key, &[7]);
740
741		// Second batch
742		let batch2 = storage
743			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
744			.unwrap();
745		assert_eq!(batch2.entries.len(), 3);
746		assert!(batch2.has_more);
747		assert!(!cursor.exhausted);
748		assert_eq!(&*batch2.entries[0].key, &[6]);
749		assert_eq!(&*batch2.entries[2].key, &[4]);
750	}
751
752	#[test]
753	fn test_get_nonexistent_table() {
754		let storage = SqlitePrimitiveStorage::in_memory();
755
756		// Should return None for non-existent table, not error
757		let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
758		assert_eq!(value, None);
759	}
760
761	#[test]
762	fn test_range_nonexistent_table() {
763		let storage = SqlitePrimitiveStorage::in_memory();
764
765		// Should return empty batch for non-existent table, not error
766		let mut cursor = RangeCursor::new();
767		let batch = storage
768			.range_next(
769				EntryKind::Multi,
770				&mut cursor,
771				Bound::Unbounded,
772				Bound::Unbounded,
773				CommitVersion(1),
774				100,
775			)
776			.unwrap();
777		assert!(batch.entries.is_empty());
778		assert!(cursor.exhausted);
779	}
780
781	#[test]
782	fn test_drop_specific_version() {
783		let storage = SqlitePrimitiveStorage::in_memory();
784
785		let key = CowVec::new(b"key1".to_vec());
786
787		// Insert versions 1, 2, 3
788		for v in 1..=3u64 {
789			storage.set(
790				CommitVersion(v),
791				HashMap::from([(
792					EntryKind::Multi,
793					vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
794				)]),
795			)
796			.unwrap();
797		}
798
799		// Drop version 1
800		storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
801
802		// Version 1 should no longer be accessible
803		assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
804
805		// Versions 2 and 3 should still work
806		assert_eq!(
807			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
808			Some(b"v2".as_slice())
809		);
810		assert_eq!(
811			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
812			Some(b"v3".as_slice())
813		);
814	}
815}