Skip to main content

reifydb_store_multi/hot/sqlite/
storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! SQLite implementation of PrimitiveStorage with MVCC versioning.
5//!
6//! Uses SQLite tables with (key, version) composite primary key for persistent
7//! multi-version storage. All operations use a single connection protected by
8//! Mutex for thread safety.
9
10use std::{collections::HashMap, ops::Bound, sync::Arc};
11
12use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal};
13use reifydb_runtime::sync::mutex::Mutex;
14use reifydb_type::{Result, error, util::cowvec::CowVec};
15use rusqlite::{Connection, Error::QueryReturnedNoRows, params};
16use tracing::instrument;
17
18use super::{
19	SqliteConfig,
20	connection::{connect, convert_flags, resolve_db_path},
21	entry::entry_id_to_name,
22	query::{build_versioned_range_query, version_to_bytes},
23};
24use crate::tier::{EntryKind, RangeBatch, RangeCursor, RawEntry, TierBackend, TierStorage};
25
26/// SQLite-based primitive storage implementation with MVCC versioning.
27///
28/// Uses SQLite for persistent storage with a single connection protected by Mutex.
29/// Tables use (key, version) composite primary key for multi-version support.
30#[derive(Clone)]
31pub struct SqlitePrimitiveStorage {
32	inner: Arc<SqlitePrimitiveStorageInner>,
33}
34
35struct SqlitePrimitiveStorageInner {
36	/// Single connection protected by Mutex for thread-safe access.
37	/// Note: We use Mutex instead of RwLock because rusqlite::Connection
38	/// is Send but not Sync.
39	conn: Mutex<Connection>,
40}
41
42impl SqlitePrimitiveStorage {
43	/// Create a new SQLite primitive storage with the given configuration.
44	#[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
45		db_path = ?config.path,
46		page_size = config.page_size,
47		journal_mode = %config.journal_mode.as_str()
48	))]
49	pub fn new(config: SqliteConfig) -> Self {
50		let db_path = resolve_db_path(config.path);
51		let flags = convert_flags(&config.flags);
52
53		let conn = connect(&db_path, flags).expect("Failed to connect to database");
54
55		// Configure SQLite pragmas
56		conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
57		conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
58			.expect("Failed to set journal_mode");
59		conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
60			.expect("Failed to set synchronous");
61		conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
62		conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
63		conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
64		conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
65			.expect("Failed to set wal_autocheckpoint");
66		conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
67
68		Self {
69			inner: Arc::new(SqlitePrimitiveStorageInner {
70				conn: Mutex::new(conn),
71			}),
72		}
73	}
74
75	/// Create an in-memory SQLite storage for testing.
76	pub fn in_memory() -> Self {
77		Self::new(SqliteConfig::in_memory())
78	}
79
80	/// Create a table with the versioned schema if it doesn't exist.
81	fn create_table_if_needed(conn: &Connection, table_name: &str) -> rusqlite::Result<()> {
82		conn.execute(
83			&format!(
84				"CREATE TABLE IF NOT EXISTS \"{}\" (
85					key BLOB NOT NULL,
86					version BLOB NOT NULL,
87					value BLOB,
88					PRIMARY KEY (key, version)
89				) WITHOUT ROWID",
90				table_name
91			),
92			[],
93		)?;
94		Ok(())
95	}
96}
97
98impl TierStorage for SqlitePrimitiveStorage {
99	#[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
100	fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
101		let table_name = entry_id_to_name(table);
102		let conn = self.inner.conn.lock();
103
104		// Get the latest version <= requested version for this key
105		let result = conn.query_row(
106			&format!(
107				"SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
108				table_name
109			),
110			params![key, version_to_bytes(version).as_slice()],
111			|row| row.get::<_, Option<Vec<u8>>>(0),
112		);
113
114		match result {
115			Ok(Some(value)) => Ok(Some(CowVec::new(value))),
116			Ok(None) => Ok(None), // Tombstone
117			Err(QueryReturnedNoRows) => Ok(None),
118			Err(e) if e.to_string().contains("no such table") => Ok(None),
119			Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
120		}
121	}
122
123	#[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
124	fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
125		let table_name = entry_id_to_name(table);
126		let conn = self.inner.conn.lock();
127
128		// Check if value exists and is not a tombstone
129		let result = conn.query_row(
130			&format!(
131				"SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
132				table_name
133			),
134			params![key, version_to_bytes(version).as_slice()],
135			|row| row.get::<_, bool>(0),
136		);
137
138		match result {
139			Ok(has_value) => Ok(has_value),
140			Err(QueryReturnedNoRows) => Ok(false),
141			Err(e) if e.to_string().contains("no such table") => Ok(false),
142			Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
143		}
144	}
145
146	#[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
147	fn set(
148		&self,
149		version: CommitVersion,
150		batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>>,
151	) -> Result<()> {
152		if batches.is_empty() {
153			return Ok(());
154		}
155
156		let conn = self.inner.conn.lock();
157		let tx = conn
158			.unchecked_transaction()
159			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
160
161		for (table, entries) in batches {
162			let table_name = entry_id_to_name(table);
163
164			// Try to insert entries, creating table if needed
165			let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
166			if let Err(e) = result {
167				if e.to_string().contains("no such table") {
168					Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
169						error!(internal(format!("Failed to create table: {}", e)))
170					})?;
171					insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
172						|e| error!(internal(format!("Failed to insert entries: {}", e))),
173					)?;
174				} else {
175					return Err(error!(internal(format!("Failed to insert entries: {}", e))));
176				}
177			}
178		}
179
180		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
181	}
182
183	#[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
184	fn range_next(
185		&self,
186		table: EntryKind,
187		cursor: &mut RangeCursor,
188		start: Bound<&[u8]>,
189		end: Bound<&[u8]>,
190		version: CommitVersion,
191		batch_size: usize,
192	) -> Result<RangeBatch> {
193		if cursor.exhausted {
194			return Ok(RangeBatch::empty());
195		}
196
197		let table_name = entry_id_to_name(table);
198
199		// Determine effective start bound based on cursor state
200		let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
201			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
202			None => bound_to_owned(start),
203		};
204		let end_owned = bound_to_owned(end);
205
206		let conn = self.inner.conn.lock();
207
208		let start_ref = bound_as_ref(&effective_start);
209		let end_ref = bound_as_ref(&end_owned);
210		let (query, params) =
211			build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
212
213		let mut stmt = match conn.prepare(&query) {
214			Ok(stmt) => stmt,
215			Err(e) if e.to_string().contains("no such table") => {
216				cursor.exhausted = true;
217				return Ok(RangeBatch::empty());
218			}
219			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
220		};
221
222		let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
223
224		let entries: Vec<RawEntry> = stmt
225			.query_map(params_refs.as_slice(), |row| {
226				let key: Vec<u8> = row.get(0)?;
227				let version_bytes: Vec<u8> = row.get(1)?;
228				let value: Option<Vec<u8>> = row.get(2)?;
229				let version = u64::from_be_bytes(
230					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
231				);
232				Ok(RawEntry {
233					key: CowVec::new(key),
234					version: CommitVersion(version),
235					value: value.map(CowVec::new),
236				})
237			})
238			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
239			.filter_map(|r| r.ok())
240			.collect();
241
242		let has_more = entries.len() > batch_size;
243		let entries = if has_more {
244			entries.into_iter().take(batch_size).collect()
245		} else {
246			entries
247		};
248
249		let batch = RangeBatch {
250			entries,
251			has_more,
252		};
253
254		// Update cursor
255		if let Some(last_entry) = batch.entries.last() {
256			cursor.last_key = Some(last_entry.key.clone());
257		}
258		if !batch.has_more {
259			cursor.exhausted = true;
260		}
261
262		Ok(batch)
263	}
264
265	#[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
266	fn range_rev_next(
267		&self,
268		table: EntryKind,
269		cursor: &mut RangeCursor,
270		start: Bound<&[u8]>,
271		end: Bound<&[u8]>,
272		version: CommitVersion,
273		batch_size: usize,
274	) -> Result<RangeBatch> {
275		if cursor.exhausted {
276			return Ok(RangeBatch::empty());
277		}
278
279		let table_name = entry_id_to_name(table);
280
281		// For reverse iteration, effective end bound based on cursor
282		let start_owned = bound_to_owned(start);
283		let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
284			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
285			None => bound_to_owned(end),
286		};
287
288		let conn = self.inner.conn.lock();
289
290		let start_ref = bound_as_ref(&start_owned);
291		let end_ref = bound_as_ref(&effective_end);
292		let (query, params) =
293			build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
294
295		let mut stmt = match conn.prepare(&query) {
296			Ok(stmt) => stmt,
297			Err(e) if e.to_string().contains("no such table") => {
298				cursor.exhausted = true;
299				return Ok(RangeBatch::empty());
300			}
301			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
302		};
303
304		let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
305
306		let entries: Vec<RawEntry> = stmt
307			.query_map(params_refs.as_slice(), |row| {
308				let key: Vec<u8> = row.get(0)?;
309				let version_bytes: Vec<u8> = row.get(1)?;
310				let value: Option<Vec<u8>> = row.get(2)?;
311				let version = u64::from_be_bytes(
312					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
313				);
314				Ok(RawEntry {
315					key: CowVec::new(key),
316					version: CommitVersion(version),
317					value: value.map(CowVec::new),
318				})
319			})
320			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
321			.filter_map(|r| r.ok())
322			.collect();
323
324		let has_more = entries.len() > batch_size;
325		let entries = if has_more {
326			entries.into_iter().take(batch_size).collect()
327		} else {
328			entries
329		};
330
331		let batch = RangeBatch {
332			entries,
333			has_more,
334		};
335
336		// Update cursor
337		if let Some(last_entry) = batch.entries.last() {
338			cursor.last_key = Some(last_entry.key.clone());
339		}
340		if !batch.has_more {
341			cursor.exhausted = true;
342		}
343
344		Ok(batch)
345	}
346
347	fn ensure_table(&self, table: EntryKind) -> Result<()> {
348		let table_name = entry_id_to_name(table);
349		let conn = self.inner.conn.lock();
350
351		Self::create_table_if_needed(&conn, &table_name)
352			.map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
353	}
354
355	fn clear_table(&self, table: EntryKind) -> Result<()> {
356		let table_name = entry_id_to_name(table);
357		let conn = self.inner.conn.lock();
358
359		let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
360
361		match result {
362			Ok(_) => Ok(()),
363			Err(e) if e.to_string().contains("no such table") => Ok(()),
364			Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
365		}
366	}
367
368	#[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
369	fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
370		if batches.is_empty() {
371			return Ok(());
372		}
373
374		let conn = self.inner.conn.lock();
375		let tx = conn
376			.unchecked_transaction()
377			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
378
379		for (table, entries) in batches {
380			let table_name = entry_id_to_name(table);
381			for (key, version) in entries {
382				let version_bytes = version_to_bytes(version);
383				// First check if this is the latest version for this key
384				let is_latest: bool = tx
385					.query_row(
386						&format!(
387							"SELECT version = ?2 FROM \"{}\" WHERE key = ?1 ORDER BY version DESC LIMIT 1",
388							table_name
389						),
390						params![key.as_slice(), version_bytes.as_slice()],
391						|row| row.get(0),
392					)
393					.unwrap_or(false);
394
395				if is_latest {
396					// Dropping the latest version - remove ALL versions of this key
397					let result = tx.execute(
398						&format!("DELETE FROM \"{}\" WHERE key = ?1", table_name),
399						params![key.as_slice()],
400					);
401					if let Err(e) = result {
402						if !e.to_string().contains("no such table") {
403							return Err(error!(internal(format!(
404								"Failed to delete entry: {}",
405								e
406							))));
407						}
408					}
409				} else {
410					// Dropping a non-latest version - just remove that specific version
411					let result = tx.execute(
412						&format!(
413							"DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2",
414							table_name
415						),
416						params![key.as_slice(), version_bytes.as_slice()],
417					);
418					if let Err(e) = result {
419						if !e.to_string().contains("no such table") {
420							return Err(error!(internal(format!(
421								"Failed to delete entry: {}",
422								e
423							))));
424						}
425					}
426				}
427			}
428		}
429
430		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
431	}
432
433	#[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
434	fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
435		let table_name = entry_id_to_name(table);
436		let conn = self.inner.conn.lock();
437
438		let mut stmt = match conn.prepare(&format!(
439			"SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
440			table_name
441		)) {
442			Ok(stmt) => stmt,
443			Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
444			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
445		};
446
447		let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
448			.query_map(params![key], |row| {
449				let version_bytes: Vec<u8> = row.get(0)?;
450				let value: Option<Vec<u8>> = row.get(1)?;
451				let version = u64::from_be_bytes(
452					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
453				);
454				Ok((CommitVersion(version), value.map(CowVec::new)))
455			})
456			.map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
457			.filter_map(|r| r.ok())
458			.collect();
459
460		Ok(versions)
461	}
462}
463
464impl TierBackend for SqlitePrimitiveStorage {}
465
466/// Convert owned Bound to Bound<&[u8]>
467fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
468	match bound {
469		Bound::Included(v) => Bound::Included(v.as_slice()),
470		Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
471		Bound::Unbounded => Bound::Unbounded,
472	}
473}
474
475/// Convert Bound<&[u8]> to Bound<Vec<u8>>
476fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
477	match bound {
478		Bound::Included(v) => Bound::Included(v.to_vec()),
479		Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
480		Bound::Unbounded => Bound::Unbounded,
481	}
482}
483
484/// Insert versioned entries into a table within an existing transaction
485fn insert_versioned_entries_in_tx(
486	tx: &rusqlite::Transaction,
487	table_name: &str,
488	version: CommitVersion,
489	entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
490) -> rusqlite::Result<()> {
491	let version_bytes = version_to_bytes(version);
492	for (key, value) in entries {
493		tx.execute(
494			&format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name),
495			params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())],
496		)?;
497	}
498	Ok(())
499}
500
501#[cfg(test)]
502pub mod tests {
503	use reifydb_core::interface::catalog::{id::TableId, primitive::PrimitiveId};
504
505	use super::*;
506
507	#[test]
508	fn test_basic_operations() {
509		let storage = SqlitePrimitiveStorage::in_memory();
510
511		let key = CowVec::new(b"key1".to_vec());
512		let version = CommitVersion(1);
513
514		// Put and get
515		storage.set(
516			version,
517			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
518		)
519		.unwrap();
520		let value = storage.get(EntryKind::Multi, &key, version).unwrap();
521		assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
522
523		// Contains
524		assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
525		assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
526
527		// Delete (tombstone)
528		let version2 = CommitVersion(2);
529		storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
530		assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
531	}
532
533	#[test]
534	fn test_source_tables() {
535		let storage = SqlitePrimitiveStorage::in_memory();
536
537		let source1 = PrimitiveId::Table(TableId(1));
538		let source2 = PrimitiveId::Table(TableId(2));
539		let key = CowVec::new(b"key".to_vec());
540		let version = CommitVersion(1);
541
542		storage.set(
543			version,
544			HashMap::from([(
545				EntryKind::Source(source1),
546				vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
547			)]),
548		)
549		.unwrap();
550		storage.set(
551			version,
552			HashMap::from([(
553				EntryKind::Source(source2),
554				vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
555			)]),
556		)
557		.unwrap();
558
559		assert_eq!(
560			storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
561			Some(b"table1".as_slice())
562		);
563		assert_eq!(
564			storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
565			Some(b"table2".as_slice())
566		);
567	}
568
569	#[test]
570	fn test_version_queries() {
571		let storage = SqlitePrimitiveStorage::in_memory();
572
573		let key = CowVec::new(b"key1".to_vec());
574
575		// Insert multiple versions
576		storage.set(
577			CommitVersion(1),
578			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
579		)
580		.unwrap();
581		storage.set(
582			CommitVersion(2),
583			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
584		)
585		.unwrap();
586		storage.set(
587			CommitVersion(3),
588			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
589		)
590		.unwrap();
591
592		// Get at specific versions
593		assert_eq!(
594			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
595			Some(b"v3".as_slice())
596		);
597		assert_eq!(
598			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
599			Some(b"v2".as_slice())
600		);
601		assert_eq!(
602			storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
603			Some(b"v1".as_slice())
604		);
605
606		// Get at intermediate version returns closest <= version
607		assert_eq!(
608			storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
609			Some(b"v3".as_slice())
610		);
611	}
612
613	#[test]
614	fn test_range_next() {
615		let storage = SqlitePrimitiveStorage::in_memory();
616
617		let version = CommitVersion(1);
618		storage.set(
619			version,
620			HashMap::from([(
621				EntryKind::Multi,
622				vec![
623					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
624					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
625					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
626				],
627			)]),
628		)
629		.unwrap();
630
631		let mut cursor = RangeCursor::new();
632		let batch = storage
633			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
634			.unwrap();
635
636		assert_eq!(batch.entries.len(), 3);
637		assert!(!batch.has_more);
638		assert!(cursor.exhausted);
639		assert_eq!(&*batch.entries[0].key, b"a");
640		assert_eq!(&*batch.entries[1].key, b"b");
641		assert_eq!(&*batch.entries[2].key, b"c");
642	}
643
644	#[test]
645	fn test_range_rev_next() {
646		let storage = SqlitePrimitiveStorage::in_memory();
647
648		let version = CommitVersion(1);
649		storage.set(
650			version,
651			HashMap::from([(
652				EntryKind::Multi,
653				vec![
654					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
655					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
656					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
657				],
658			)]),
659		)
660		.unwrap();
661
662		let mut cursor = RangeCursor::new();
663		let batch = storage
664			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
665			.unwrap();
666
667		assert_eq!(batch.entries.len(), 3);
668		assert!(!batch.has_more);
669		assert!(cursor.exhausted);
670		assert_eq!(&*batch.entries[0].key, b"c");
671		assert_eq!(&*batch.entries[1].key, b"b");
672		assert_eq!(&*batch.entries[2].key, b"a");
673	}
674
675	#[test]
676	fn test_range_streaming_pagination() {
677		let storage = SqlitePrimitiveStorage::in_memory();
678
679		let version = CommitVersion(1);
680
681		// Insert 10 entries
682		let entries: Vec<_> =
683			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
684		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
685
686		// Use a single cursor to stream through all entries
687		let mut cursor = RangeCursor::new();
688
689		// First batch of 3
690		let batch1 = storage
691			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
692			.unwrap();
693		assert_eq!(batch1.entries.len(), 3);
694		assert!(batch1.has_more);
695		assert!(!cursor.exhausted);
696		assert_eq!(&*batch1.entries[0].key, &[0]);
697		assert_eq!(&*batch1.entries[2].key, &[2]);
698
699		// Second batch of 3 - cursor automatically continues
700		let batch2 = storage
701			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
702			.unwrap();
703		assert_eq!(batch2.entries.len(), 3);
704		assert!(batch2.has_more);
705		assert!(!cursor.exhausted);
706		assert_eq!(&*batch2.entries[0].key, &[3]);
707		assert_eq!(&*batch2.entries[2].key, &[5]);
708	}
709
710	#[test]
711	fn test_range_reving_pagination() {
712		let storage = SqlitePrimitiveStorage::in_memory();
713
714		let version = CommitVersion(1);
715
716		// Insert 10 entries
717		let entries: Vec<_> =
718			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
719		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
720
721		// Use a single cursor to stream in reverse
722		let mut cursor = RangeCursor::new();
723
724		// First batch of 3 (reverse)
725		let batch1 = storage
726			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
727			.unwrap();
728		assert_eq!(batch1.entries.len(), 3);
729		assert!(batch1.has_more);
730		assert!(!cursor.exhausted);
731		assert_eq!(&*batch1.entries[0].key, &[9]);
732		assert_eq!(&*batch1.entries[2].key, &[7]);
733
734		// Second batch
735		let batch2 = storage
736			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
737			.unwrap();
738		assert_eq!(batch2.entries.len(), 3);
739		assert!(batch2.has_more);
740		assert!(!cursor.exhausted);
741		assert_eq!(&*batch2.entries[0].key, &[6]);
742		assert_eq!(&*batch2.entries[2].key, &[4]);
743	}
744
745	#[test]
746	fn test_get_nonexistent_table() {
747		let storage = SqlitePrimitiveStorage::in_memory();
748
749		// Should return None for non-existent table, not error
750		let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
751		assert_eq!(value, None);
752	}
753
754	#[test]
755	fn test_range_nonexistent_table() {
756		let storage = SqlitePrimitiveStorage::in_memory();
757
758		// Should return empty batch for non-existent table, not error
759		let mut cursor = RangeCursor::new();
760		let batch = storage
761			.range_next(
762				EntryKind::Multi,
763				&mut cursor,
764				Bound::Unbounded,
765				Bound::Unbounded,
766				CommitVersion(1),
767				100,
768			)
769			.unwrap();
770		assert!(batch.entries.is_empty());
771		assert!(cursor.exhausted);
772	}
773
774	#[test]
775	fn test_drop_specific_version() {
776		let storage = SqlitePrimitiveStorage::in_memory();
777
778		let key = CowVec::new(b"key1".to_vec());
779
780		// Insert versions 1, 2, 3
781		for v in 1..=3u64 {
782			storage.set(
783				CommitVersion(v),
784				HashMap::from([(
785					EntryKind::Multi,
786					vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
787				)]),
788			)
789			.unwrap();
790		}
791
792		// Drop version 1
793		storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
794
795		// Version 1 should no longer be accessible
796		assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
797
798		// Versions 2 and 3 should still work
799		assert_eq!(
800			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
801			Some(b"v2".as_slice())
802		);
803		assert_eq!(
804			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
805			Some(b"v3".as_slice())
806		);
807	}
808}