Skip to main content

reifydb_store_multi/hot/sqlite/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, ops::Bound, sync::Arc};
5
6use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal, interface::store::EntryKind};
7use reifydb_runtime::sync::mutex::Mutex;
8use reifydb_sqlite::{
9	SqliteConfig,
10	connection::{connect, convert_flags, resolve_db_path},
11	pragma,
12};
13use reifydb_type::{Result, error, util::cowvec::CowVec};
14use rusqlite::{
15	Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
16};
17use tracing::instrument;
18
19use super::{
20	entry::entry_id_to_name,
21	query::{build_versioned_range_query, version_to_bytes},
22};
23use crate::tier::{RangeBatch, RangeCursor, RawEntry, TierBackend, TierBatch, TierStorage};
24
25/// SQLite-based primitive storage implementation with MVCC versioning.
26///
27/// Uses SQLite for persistent storage with a single connection protected by Mutex.
28/// Tables use (key, version) composite primary key for multi-version support.
29#[derive(Clone)]
30pub struct SqlitePrimitiveStorage {
31	inner: Arc<SqlitePrimitiveStorageInner>,
32}
33
34struct SqlitePrimitiveStorageInner {
35	/// Single connection protected by Mutex for thread-safe access.
36	/// Note: We use Mutex instead of RwLock because rusqlite::Connection
37	/// is Send but not Sync.
38	conn: Mutex<Connection>,
39}
40
41impl SqlitePrimitiveStorage {
42	/// Create a new SQLite primitive storage with the given configuration.
43	#[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
44		db_path = ?config.path,
45		page_size = config.page_size,
46		journal_mode = %config.journal_mode.as_str()
47	))]
48	pub fn new(config: SqliteConfig) -> Self {
49		let db_path = resolve_db_path(config.path.clone(), "primitive.db");
50		let flags = convert_flags(&config.flags);
51
52		let conn = connect(&db_path, flags).expect("Failed to connect to database");
53		pragma::apply(&conn, &config).expect("Failed to configure SQLite pragmas");
54
55		Self {
56			inner: Arc::new(SqlitePrimitiveStorageInner {
57				conn: Mutex::new(conn),
58			}),
59		}
60	}
61
62	/// Create an in-memory SQLite storage for testing.
63	pub fn in_memory() -> Self {
64		Self::new(SqliteConfig::in_memory())
65	}
66
67	/// Run incremental vacuum to return freed pages to the OS.
68	pub fn incremental_vacuum(&self) {
69		let _ = pragma::incremental_vacuum(&self.inner.conn.lock());
70	}
71
72	/// Release unused memory back to the allocator.
73	pub fn shrink_memory(&self) {
74		let _ = pragma::shrink_memory(&self.inner.conn.lock());
75	}
76
77	/// Explicitly checkpoint WAL and shrink the page cache before shutdown.
78	pub fn shutdown(&self) {
79		let _ = pragma::shutdown(&self.inner.conn.lock());
80	}
81
82	/// Create a table with the versioned shape if it doesn't exist.
83	fn create_table_if_needed(conn: &Connection, table_name: &str) -> SqliteResult<()> {
84		conn.execute(
85			&format!(
86				"CREATE TABLE IF NOT EXISTS \"{}\" (
87					key BLOB NOT NULL,
88					version BLOB NOT NULL,
89					value BLOB,
90					PRIMARY KEY (key, version)
91				) WITHOUT ROWID",
92				table_name
93			),
94			[],
95		)?;
96		Ok(())
97	}
98}
99
100impl TierStorage for SqlitePrimitiveStorage {
101	#[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
102	fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
103		let table_name = entry_id_to_name(table);
104		let conn = self.inner.conn.lock();
105
106		// Get the latest version <= requested version for this key
107		let result = conn.query_row(
108			&format!(
109				"SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
110				table_name
111			),
112			params![key, version_to_bytes(version).as_slice()],
113			|row| row.get::<_, Option<Vec<u8>>>(0),
114		);
115
116		match result {
117			Ok(Some(value)) => Ok(Some(CowVec::new(value))),
118			Ok(None) => Ok(None), // Tombstone
119			Err(QueryReturnedNoRows) => Ok(None),
120			Err(e) if e.to_string().contains("no such table") => Ok(None),
121			Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
122		}
123	}
124
125	#[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
126	fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
127		let table_name = entry_id_to_name(table);
128		let conn = self.inner.conn.lock();
129
130		// Check if value exists and is not a tombstone
131		let result = conn.query_row(
132			&format!(
133				"SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
134				table_name
135			),
136			params![key, version_to_bytes(version).as_slice()],
137			|row| row.get::<_, bool>(0),
138		);
139
140		match result {
141			Ok(has_value) => Ok(has_value),
142			Err(QueryReturnedNoRows) => Ok(false),
143			Err(e) if e.to_string().contains("no such table") => Ok(false),
144			Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
145		}
146	}
147
148	#[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
149	fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()> {
150		if batches.is_empty() {
151			return Ok(());
152		}
153
154		let conn = self.inner.conn.lock();
155		let tx = conn
156			.unchecked_transaction()
157			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
158
159		for (table, entries) in batches {
160			let table_name = entry_id_to_name(table);
161
162			// Try to insert entries, creating table if needed
163			let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
164			if let Err(e) = result {
165				if e.to_string().contains("no such table") {
166					Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
167						error!(internal(format!("Failed to create table: {}", e)))
168					})?;
169					insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
170						|e| error!(internal(format!("Failed to insert entries: {}", e))),
171					)?;
172				} else {
173					return Err(error!(internal(format!("Failed to insert entries: {}", e))));
174				}
175			}
176		}
177
178		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
179	}
180
181	#[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
182	fn range_next(
183		&self,
184		table: EntryKind,
185		cursor: &mut RangeCursor,
186		start: Bound<&[u8]>,
187		end: Bound<&[u8]>,
188		version: CommitVersion,
189		batch_size: usize,
190	) -> Result<RangeBatch> {
191		if cursor.exhausted {
192			return Ok(RangeBatch::empty());
193		}
194
195		let table_name = entry_id_to_name(table);
196
197		// Determine effective start bound based on cursor state
198		let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
199			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
200			None => bound_to_owned(start),
201		};
202		let end_owned = bound_to_owned(end);
203
204		let conn = self.inner.conn.lock();
205
206		let start_ref = bound_as_ref(&effective_start);
207		let end_ref = bound_as_ref(&end_owned);
208		let (query, params) =
209			build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
210
211		let mut stmt = match conn.prepare(&query) {
212			Ok(stmt) => stmt,
213			Err(e) if e.to_string().contains("no such table") => {
214				cursor.exhausted = true;
215				return Ok(RangeBatch::empty());
216			}
217			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
218		};
219
220		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
221
222		let entries: Vec<RawEntry> = stmt
223			.query_map(params_refs.as_slice(), |row| {
224				let key: Vec<u8> = row.get(0)?;
225				let version_bytes: Vec<u8> = row.get(1)?;
226				let value: Option<Vec<u8>> = row.get(2)?;
227				let version = u64::from_be_bytes(
228					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
229				);
230				Ok(RawEntry {
231					key: CowVec::new(key),
232					version: CommitVersion(version),
233					value: value.map(CowVec::new),
234				})
235			})
236			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
237			.filter_map(|r| r.ok())
238			.collect();
239
240		let has_more = entries.len() > batch_size;
241		let entries = if has_more {
242			entries.into_iter().take(batch_size).collect()
243		} else {
244			entries
245		};
246
247		let batch = RangeBatch {
248			entries,
249			has_more,
250		};
251
252		// Update cursor
253		if let Some(last_entry) = batch.entries.last() {
254			cursor.last_key = Some(last_entry.key.clone());
255		}
256		if !batch.has_more {
257			cursor.exhausted = true;
258		}
259
260		Ok(batch)
261	}
262
263	#[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
264	fn range_rev_next(
265		&self,
266		table: EntryKind,
267		cursor: &mut RangeCursor,
268		start: Bound<&[u8]>,
269		end: Bound<&[u8]>,
270		version: CommitVersion,
271		batch_size: usize,
272	) -> Result<RangeBatch> {
273		if cursor.exhausted {
274			return Ok(RangeBatch::empty());
275		}
276
277		let table_name = entry_id_to_name(table);
278
279		// For reverse iteration, effective end bound based on cursor
280		let start_owned = bound_to_owned(start);
281		let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
282			Some(last) => Bound::Excluded(last.as_slice().to_vec()),
283			None => bound_to_owned(end),
284		};
285
286		let conn = self.inner.conn.lock();
287
288		let start_ref = bound_as_ref(&start_owned);
289		let end_ref = bound_as_ref(&effective_end);
290		let (query, params) =
291			build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
292
293		let mut stmt = match conn.prepare(&query) {
294			Ok(stmt) => stmt,
295			Err(e) if e.to_string().contains("no such table") => {
296				cursor.exhausted = true;
297				return Ok(RangeBatch::empty());
298			}
299			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
300		};
301
302		let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
303
304		let entries: Vec<RawEntry> = stmt
305			.query_map(params_refs.as_slice(), |row| {
306				let key: Vec<u8> = row.get(0)?;
307				let version_bytes: Vec<u8> = row.get(1)?;
308				let value: Option<Vec<u8>> = row.get(2)?;
309				let version = u64::from_be_bytes(
310					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
311				);
312				Ok(RawEntry {
313					key: CowVec::new(key),
314					version: CommitVersion(version),
315					value: value.map(CowVec::new),
316				})
317			})
318			.map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
319			.filter_map(|r| r.ok())
320			.collect();
321
322		let has_more = entries.len() > batch_size;
323		let entries = if has_more {
324			entries.into_iter().take(batch_size).collect()
325		} else {
326			entries
327		};
328
329		let batch = RangeBatch {
330			entries,
331			has_more,
332		};
333
334		// Update cursor
335		if let Some(last_entry) = batch.entries.last() {
336			cursor.last_key = Some(last_entry.key.clone());
337		}
338		if !batch.has_more {
339			cursor.exhausted = true;
340		}
341
342		Ok(batch)
343	}
344
345	fn ensure_table(&self, table: EntryKind) -> Result<()> {
346		let table_name = entry_id_to_name(table);
347		let conn = self.inner.conn.lock();
348
349		Self::create_table_if_needed(&conn, &table_name)
350			.map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
351	}
352
353	fn clear_table(&self, table: EntryKind) -> Result<()> {
354		let table_name = entry_id_to_name(table);
355		let conn = self.inner.conn.lock();
356
357		let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
358
359		match result {
360			Ok(_) => Ok(()),
361			Err(e) if e.to_string().contains("no such table") => Ok(()),
362			Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
363		}
364	}
365
366	#[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
367	fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
368		if batches.is_empty() {
369			return Ok(());
370		}
371
372		let conn = self.inner.conn.lock();
373		let tx = conn
374			.unchecked_transaction()
375			.map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
376
377		for (table, entries) in batches {
378			let table_name = entry_id_to_name(table);
379
380			let max_version_sql = format!("SELECT MAX(version) FROM \"{}\" WHERE key = ?1", table_name);
381			let delete_all_sql = format!("DELETE FROM \"{}\" WHERE key = ?1", table_name);
382			let delete_one_sql = format!("DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2", table_name);
383
384			let mut max_version_stmt = match tx.prepare(&max_version_sql) {
385				Ok(s) => s,
386				Err(e) if e.to_string().contains("no such table") => continue,
387				Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
388			};
389			let mut delete_all_stmt = tx
390				.prepare(&delete_all_sql)
391				.map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
392			let mut delete_one_stmt = tx
393				.prepare(&delete_one_sql)
394				.map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
395
396			for (key, version) in entries {
397				let version_bytes = version_to_bytes(version);
398
399				let max_version: Option<Vec<u8>> = max_version_stmt
400					.query_row(params![key.as_slice()], |row| row.get(0))
401					.unwrap_or(None);
402
403				let is_latest = max_version.as_deref() == Some(version_bytes.as_slice());
404
405				let result = if is_latest {
406					delete_all_stmt.execute(params![key.as_slice()])
407				} else {
408					delete_one_stmt.execute(params![key.as_slice(), version_bytes.as_slice()])
409				};
410
411				if let Err(e) = result
412					&& !e.to_string().contains("no such table")
413				{
414					return Err(error!(internal(format!("Failed to delete entry: {}", e))));
415				}
416			}
417		}
418
419		tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
420	}
421
422	#[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
423	fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
424		let table_name = entry_id_to_name(table);
425		let conn = self.inner.conn.lock();
426
427		let mut stmt = match conn.prepare(&format!(
428			"SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
429			table_name
430		)) {
431			Ok(stmt) => stmt,
432			Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
433			Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
434		};
435
436		let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
437			.query_map(params![key], |row| {
438				let version_bytes: Vec<u8> = row.get(0)?;
439				let value: Option<Vec<u8>> = row.get(1)?;
440				let version = u64::from_be_bytes(
441					version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
442				);
443				Ok((CommitVersion(version), value.map(CowVec::new)))
444			})
445			.map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
446			.filter_map(|r| r.ok())
447			.collect();
448
449		Ok(versions)
450	}
451}
452
453impl TierBackend for SqlitePrimitiveStorage {}
454
455/// Convert owned Bound to Bound<&[u8]>
456fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
457	match bound {
458		Bound::Included(v) => Bound::Included(v.as_slice()),
459		Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
460		Bound::Unbounded => Bound::Unbounded,
461	}
462}
463
464/// Convert Bound<&[u8]> to Bound<Vec<u8>>
465fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
466	match bound {
467		Bound::Included(v) => Bound::Included(v.to_vec()),
468		Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
469		Bound::Unbounded => Bound::Unbounded,
470	}
471}
472
473/// Insert versioned entries into a table within an existing transaction
474fn insert_versioned_entries_in_tx(
475	tx: &SqliteTransaction,
476	table_name: &str,
477	version: CommitVersion,
478	entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
479) -> SqliteResult<()> {
480	let version_bytes = version_to_bytes(version);
481	let sql = format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name);
482	let mut stmt = tx.prepare(&sql)?;
483	for (key, value) in entries {
484		stmt.execute(params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())])?;
485	}
486	Ok(())
487}
488
489#[cfg(test)]
490pub mod tests {
491	use reifydb_core::interface::catalog::{id::TableId, shape::ShapeId};
492
493	use super::*;
494
495	#[test]
496	fn test_basic_operations() {
497		let storage = SqlitePrimitiveStorage::in_memory();
498
499		let key = CowVec::new(b"key1".to_vec());
500		let version = CommitVersion(1);
501
502		// Put and get
503		storage.set(
504			version,
505			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
506		)
507		.unwrap();
508		let value = storage.get(EntryKind::Multi, &key, version).unwrap();
509		assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
510
511		// Contains
512		assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
513		assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
514
515		// Delete (tombstone)
516		let version2 = CommitVersion(2);
517		storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
518		assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
519	}
520
521	#[test]
522	fn test_source_tables() {
523		let storage = SqlitePrimitiveStorage::in_memory();
524
525		let source1 = ShapeId::Table(TableId(1));
526		let source2 = ShapeId::Table(TableId(2));
527		let key = CowVec::new(b"key".to_vec());
528		let version = CommitVersion(1);
529
530		storage.set(
531			version,
532			HashMap::from([(
533				EntryKind::Source(source1),
534				vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
535			)]),
536		)
537		.unwrap();
538		storage.set(
539			version,
540			HashMap::from([(
541				EntryKind::Source(source2),
542				vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
543			)]),
544		)
545		.unwrap();
546
547		assert_eq!(
548			storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
549			Some(b"table1".as_slice())
550		);
551		assert_eq!(
552			storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
553			Some(b"table2".as_slice())
554		);
555	}
556
557	#[test]
558	fn test_version_queries() {
559		let storage = SqlitePrimitiveStorage::in_memory();
560
561		let key = CowVec::new(b"key1".to_vec());
562
563		// Insert multiple versions
564		storage.set(
565			CommitVersion(1),
566			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
567		)
568		.unwrap();
569		storage.set(
570			CommitVersion(2),
571			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
572		)
573		.unwrap();
574		storage.set(
575			CommitVersion(3),
576			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
577		)
578		.unwrap();
579
580		// Get at specific versions
581		assert_eq!(
582			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
583			Some(b"v3".as_slice())
584		);
585		assert_eq!(
586			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
587			Some(b"v2".as_slice())
588		);
589		assert_eq!(
590			storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
591			Some(b"v1".as_slice())
592		);
593
594		// Get at intermediate version returns closest <= version
595		assert_eq!(
596			storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
597			Some(b"v3".as_slice())
598		);
599	}
600
601	#[test]
602	fn test_range_next() {
603		let storage = SqlitePrimitiveStorage::in_memory();
604
605		let version = CommitVersion(1);
606		storage.set(
607			version,
608			HashMap::from([(
609				EntryKind::Multi,
610				vec![
611					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
612					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
613					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
614				],
615			)]),
616		)
617		.unwrap();
618
619		let mut cursor = RangeCursor::new();
620		let batch = storage
621			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
622			.unwrap();
623
624		assert_eq!(batch.entries.len(), 3);
625		assert!(!batch.has_more);
626		assert!(cursor.exhausted);
627		assert_eq!(&*batch.entries[0].key, b"a");
628		assert_eq!(&*batch.entries[1].key, b"b");
629		assert_eq!(&*batch.entries[2].key, b"c");
630	}
631
632	#[test]
633	fn test_range_rev_next() {
634		let storage = SqlitePrimitiveStorage::in_memory();
635
636		let version = CommitVersion(1);
637		storage.set(
638			version,
639			HashMap::from([(
640				EntryKind::Multi,
641				vec![
642					(CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
643					(CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
644					(CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
645				],
646			)]),
647		)
648		.unwrap();
649
650		let mut cursor = RangeCursor::new();
651		let batch = storage
652			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
653			.unwrap();
654
655		assert_eq!(batch.entries.len(), 3);
656		assert!(!batch.has_more);
657		assert!(cursor.exhausted);
658		assert_eq!(&*batch.entries[0].key, b"c");
659		assert_eq!(&*batch.entries[1].key, b"b");
660		assert_eq!(&*batch.entries[2].key, b"a");
661	}
662
663	#[test]
664	fn test_range_streaming_pagination() {
665		let storage = SqlitePrimitiveStorage::in_memory();
666
667		let version = CommitVersion(1);
668
669		// Insert 10 entries
670		let entries: Vec<_> =
671			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
672		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
673
674		// Use a single cursor to stream through all entries
675		let mut cursor = RangeCursor::new();
676
677		// First batch of 3
678		let batch1 = storage
679			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
680			.unwrap();
681		assert_eq!(batch1.entries.len(), 3);
682		assert!(batch1.has_more);
683		assert!(!cursor.exhausted);
684		assert_eq!(&*batch1.entries[0].key, &[0]);
685		assert_eq!(&*batch1.entries[2].key, &[2]);
686
687		// Second batch of 3 - cursor automatically continues
688		let batch2 = storage
689			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
690			.unwrap();
691		assert_eq!(batch2.entries.len(), 3);
692		assert!(batch2.has_more);
693		assert!(!cursor.exhausted);
694		assert_eq!(&*batch2.entries[0].key, &[3]);
695		assert_eq!(&*batch2.entries[2].key, &[5]);
696	}
697
698	#[test]
699	fn test_range_reving_pagination() {
700		let storage = SqlitePrimitiveStorage::in_memory();
701
702		let version = CommitVersion(1);
703
704		// Insert 10 entries
705		let entries: Vec<_> =
706			(0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
707		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
708
709		// Use a single cursor to stream in reverse
710		let mut cursor = RangeCursor::new();
711
712		// First batch of 3 (reverse)
713		let batch1 = storage
714			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
715			.unwrap();
716		assert_eq!(batch1.entries.len(), 3);
717		assert!(batch1.has_more);
718		assert!(!cursor.exhausted);
719		assert_eq!(&*batch1.entries[0].key, &[9]);
720		assert_eq!(&*batch1.entries[2].key, &[7]);
721
722		// Second batch
723		let batch2 = storage
724			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
725			.unwrap();
726		assert_eq!(batch2.entries.len(), 3);
727		assert!(batch2.has_more);
728		assert!(!cursor.exhausted);
729		assert_eq!(&*batch2.entries[0].key, &[6]);
730		assert_eq!(&*batch2.entries[2].key, &[4]);
731	}
732
733	#[test]
734	fn test_get_nonexistent_table() {
735		let storage = SqlitePrimitiveStorage::in_memory();
736
737		// Should return None for non-existent table, not error
738		let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
739		assert_eq!(value, None);
740	}
741
742	#[test]
743	fn test_range_nonexistent_table() {
744		let storage = SqlitePrimitiveStorage::in_memory();
745
746		// Should return empty batch for non-existent table, not error
747		let mut cursor = RangeCursor::new();
748		let batch = storage
749			.range_next(
750				EntryKind::Multi,
751				&mut cursor,
752				Bound::Unbounded,
753				Bound::Unbounded,
754				CommitVersion(1),
755				100,
756			)
757			.unwrap();
758		assert!(batch.entries.is_empty());
759		assert!(cursor.exhausted);
760	}
761
762	#[test]
763	fn test_drop_specific_version() {
764		let storage = SqlitePrimitiveStorage::in_memory();
765
766		let key = CowVec::new(b"key1".to_vec());
767
768		// Insert versions 1, 2, 3
769		for v in 1..=3u64 {
770			storage.set(
771				CommitVersion(v),
772				HashMap::from([(
773					EntryKind::Multi,
774					vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
775				)]),
776			)
777			.unwrap();
778		}
779
780		// Drop version 1
781		storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
782
783		// Version 1 should no longer be accessible
784		assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
785
786		// Versions 2 and 3 should still work
787		assert_eq!(
788			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
789			Some(b"v2".as_slice())
790		);
791		assert_eq!(
792			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
793			Some(b"v3".as_slice())
794		);
795	}
796}