1use std::{collections::HashMap, ops::Bound, sync::Arc};
11
12use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal};
13use reifydb_runtime::sync::mutex::Mutex;
14use reifydb_type::{Result, error, util::cowvec::CowVec};
15use rusqlite::{Connection, Error::QueryReturnedNoRows, params};
16use tracing::instrument;
17
18use super::{
19 SqliteConfig,
20 connection::{connect, convert_flags, resolve_db_path},
21 entry::entry_id_to_name,
22 query::{build_versioned_range_query, version_to_bytes},
23};
24use crate::tier::{EntryKind, RangeBatch, RangeCursor, RawEntry, TierBackend, TierStorage};
25
26#[derive(Clone)]
31pub struct SqlitePrimitiveStorage {
32 inner: Arc<SqlitePrimitiveStorageInner>,
33}
34
35struct SqlitePrimitiveStorageInner {
36 conn: Mutex<Connection>,
40}
41
42impl SqlitePrimitiveStorage {
43 #[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
45 db_path = ?config.path,
46 page_size = config.page_size,
47 journal_mode = %config.journal_mode.as_str()
48 ))]
49 pub fn new(config: SqliteConfig) -> Self {
50 let db_path = resolve_db_path(config.path);
51 let flags = convert_flags(&config.flags);
52
53 let conn = connect(&db_path, flags).expect("Failed to connect to database");
54
55 conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
57 conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
58 .expect("Failed to set journal_mode");
59 conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
60 .expect("Failed to set synchronous");
61 conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
62 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
63 conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
64 conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
65 .expect("Failed to set wal_autocheckpoint");
66 conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
67
68 Self {
69 inner: Arc::new(SqlitePrimitiveStorageInner {
70 conn: Mutex::new(conn),
71 }),
72 }
73 }
74
75 pub fn in_memory() -> Self {
77 Self::new(SqliteConfig::in_memory())
78 }
79
80 fn create_table_if_needed(conn: &Connection, table_name: &str) -> rusqlite::Result<()> {
82 conn.execute(
83 &format!(
84 "CREATE TABLE IF NOT EXISTS \"{}\" (
85 key BLOB NOT NULL,
86 version BLOB NOT NULL,
87 value BLOB,
88 PRIMARY KEY (key, version)
89 ) WITHOUT ROWID",
90 table_name
91 ),
92 [],
93 )?;
94 Ok(())
95 }
96}
97
98impl TierStorage for SqlitePrimitiveStorage {
99 #[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
100 fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
101 let table_name = entry_id_to_name(table);
102 let conn = self.inner.conn.lock();
103
104 let result = conn.query_row(
106 &format!(
107 "SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
108 table_name
109 ),
110 params![key, version_to_bytes(version).as_slice()],
111 |row| row.get::<_, Option<Vec<u8>>>(0),
112 );
113
114 match result {
115 Ok(Some(value)) => Ok(Some(CowVec::new(value))),
116 Ok(None) => Ok(None), Err(QueryReturnedNoRows) => Ok(None),
118 Err(e) if e.to_string().contains("no such table") => Ok(None),
119 Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
120 }
121 }
122
123 #[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
124 fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
125 let table_name = entry_id_to_name(table);
126 let conn = self.inner.conn.lock();
127
128 let result = conn.query_row(
130 &format!(
131 "SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
132 table_name
133 ),
134 params![key, version_to_bytes(version).as_slice()],
135 |row| row.get::<_, bool>(0),
136 );
137
138 match result {
139 Ok(has_value) => Ok(has_value),
140 Err(QueryReturnedNoRows) => Ok(false),
141 Err(e) if e.to_string().contains("no such table") => Ok(false),
142 Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
143 }
144 }
145
146 #[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
147 fn set(
148 &self,
149 version: CommitVersion,
150 batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>>,
151 ) -> Result<()> {
152 if batches.is_empty() {
153 return Ok(());
154 }
155
156 let conn = self.inner.conn.lock();
157 let tx = conn
158 .unchecked_transaction()
159 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
160
161 for (table, entries) in batches {
162 let table_name = entry_id_to_name(table);
163
164 let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
166 if let Err(e) = result {
167 if e.to_string().contains("no such table") {
168 Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
169 error!(internal(format!("Failed to create table: {}", e)))
170 })?;
171 insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
172 |e| error!(internal(format!("Failed to insert entries: {}", e))),
173 )?;
174 } else {
175 return Err(error!(internal(format!("Failed to insert entries: {}", e))));
176 }
177 }
178 }
179
180 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
181 }
182
183 #[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
184 fn range_next(
185 &self,
186 table: EntryKind,
187 cursor: &mut RangeCursor,
188 start: Bound<&[u8]>,
189 end: Bound<&[u8]>,
190 version: CommitVersion,
191 batch_size: usize,
192 ) -> Result<RangeBatch> {
193 if cursor.exhausted {
194 return Ok(RangeBatch::empty());
195 }
196
197 let table_name = entry_id_to_name(table);
198
199 let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
201 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
202 None => bound_to_owned(start),
203 };
204 let end_owned = bound_to_owned(end);
205
206 let conn = self.inner.conn.lock();
207
208 let start_ref = bound_as_ref(&effective_start);
209 let end_ref = bound_as_ref(&end_owned);
210 let (query, params) =
211 build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
212
213 let mut stmt = match conn.prepare(&query) {
214 Ok(stmt) => stmt,
215 Err(e) if e.to_string().contains("no such table") => {
216 cursor.exhausted = true;
217 return Ok(RangeBatch::empty());
218 }
219 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
220 };
221
222 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
223
224 let entries: Vec<RawEntry> = stmt
225 .query_map(params_refs.as_slice(), |row| {
226 let key: Vec<u8> = row.get(0)?;
227 let version_bytes: Vec<u8> = row.get(1)?;
228 let value: Option<Vec<u8>> = row.get(2)?;
229 let version = u64::from_be_bytes(
230 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
231 );
232 Ok(RawEntry {
233 key: CowVec::new(key),
234 version: CommitVersion(version),
235 value: value.map(CowVec::new),
236 })
237 })
238 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
239 .filter_map(|r| r.ok())
240 .collect();
241
242 let has_more = entries.len() > batch_size;
243 let entries = if has_more {
244 entries.into_iter().take(batch_size).collect()
245 } else {
246 entries
247 };
248
249 let batch = RangeBatch {
250 entries,
251 has_more,
252 };
253
254 if let Some(last_entry) = batch.entries.last() {
256 cursor.last_key = Some(last_entry.key.clone());
257 }
258 if !batch.has_more {
259 cursor.exhausted = true;
260 }
261
262 Ok(batch)
263 }
264
265 #[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
266 fn range_rev_next(
267 &self,
268 table: EntryKind,
269 cursor: &mut RangeCursor,
270 start: Bound<&[u8]>,
271 end: Bound<&[u8]>,
272 version: CommitVersion,
273 batch_size: usize,
274 ) -> Result<RangeBatch> {
275 if cursor.exhausted {
276 return Ok(RangeBatch::empty());
277 }
278
279 let table_name = entry_id_to_name(table);
280
281 let start_owned = bound_to_owned(start);
283 let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
284 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
285 None => bound_to_owned(end),
286 };
287
288 let conn = self.inner.conn.lock();
289
290 let start_ref = bound_as_ref(&start_owned);
291 let end_ref = bound_as_ref(&effective_end);
292 let (query, params) =
293 build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
294
295 let mut stmt = match conn.prepare(&query) {
296 Ok(stmt) => stmt,
297 Err(e) if e.to_string().contains("no such table") => {
298 cursor.exhausted = true;
299 return Ok(RangeBatch::empty());
300 }
301 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
302 };
303
304 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p as &dyn rusqlite::ToSql).collect();
305
306 let entries: Vec<RawEntry> = stmt
307 .query_map(params_refs.as_slice(), |row| {
308 let key: Vec<u8> = row.get(0)?;
309 let version_bytes: Vec<u8> = row.get(1)?;
310 let value: Option<Vec<u8>> = row.get(2)?;
311 let version = u64::from_be_bytes(
312 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
313 );
314 Ok(RawEntry {
315 key: CowVec::new(key),
316 version: CommitVersion(version),
317 value: value.map(CowVec::new),
318 })
319 })
320 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
321 .filter_map(|r| r.ok())
322 .collect();
323
324 let has_more = entries.len() > batch_size;
325 let entries = if has_more {
326 entries.into_iter().take(batch_size).collect()
327 } else {
328 entries
329 };
330
331 let batch = RangeBatch {
332 entries,
333 has_more,
334 };
335
336 if let Some(last_entry) = batch.entries.last() {
338 cursor.last_key = Some(last_entry.key.clone());
339 }
340 if !batch.has_more {
341 cursor.exhausted = true;
342 }
343
344 Ok(batch)
345 }
346
347 fn ensure_table(&self, table: EntryKind) -> Result<()> {
348 let table_name = entry_id_to_name(table);
349 let conn = self.inner.conn.lock();
350
351 Self::create_table_if_needed(&conn, &table_name)
352 .map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
353 }
354
355 fn clear_table(&self, table: EntryKind) -> Result<()> {
356 let table_name = entry_id_to_name(table);
357 let conn = self.inner.conn.lock();
358
359 let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
360
361 match result {
362 Ok(_) => Ok(()),
363 Err(e) if e.to_string().contains("no such table") => Ok(()),
364 Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
365 }
366 }
367
368 #[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
369 fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
370 if batches.is_empty() {
371 return Ok(());
372 }
373
374 let conn = self.inner.conn.lock();
375 let tx = conn
376 .unchecked_transaction()
377 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
378
379 for (table, entries) in batches {
380 let table_name = entry_id_to_name(table);
381 for (key, version) in entries {
382 let version_bytes = version_to_bytes(version);
383 let is_latest: bool = tx
385 .query_row(
386 &format!(
387 "SELECT version = ?2 FROM \"{}\" WHERE key = ?1 ORDER BY version DESC LIMIT 1",
388 table_name
389 ),
390 params![key.as_slice(), version_bytes.as_slice()],
391 |row| row.get(0),
392 )
393 .unwrap_or(false);
394
395 if is_latest {
396 let result = tx.execute(
398 &format!("DELETE FROM \"{}\" WHERE key = ?1", table_name),
399 params![key.as_slice()],
400 );
401 if let Err(e) = result {
402 if !e.to_string().contains("no such table") {
403 return Err(error!(internal(format!(
404 "Failed to delete entry: {}",
405 e
406 ))));
407 }
408 }
409 } else {
410 let result = tx.execute(
412 &format!(
413 "DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2",
414 table_name
415 ),
416 params![key.as_slice(), version_bytes.as_slice()],
417 );
418 if let Err(e) = result {
419 if !e.to_string().contains("no such table") {
420 return Err(error!(internal(format!(
421 "Failed to delete entry: {}",
422 e
423 ))));
424 }
425 }
426 }
427 }
428 }
429
430 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
431 }
432
433 #[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
434 fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
435 let table_name = entry_id_to_name(table);
436 let conn = self.inner.conn.lock();
437
438 let mut stmt = match conn.prepare(&format!(
439 "SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
440 table_name
441 )) {
442 Ok(stmt) => stmt,
443 Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
444 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
445 };
446
447 let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
448 .query_map(params![key], |row| {
449 let version_bytes: Vec<u8> = row.get(0)?;
450 let value: Option<Vec<u8>> = row.get(1)?;
451 let version = u64::from_be_bytes(
452 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
453 );
454 Ok((CommitVersion(version), value.map(CowVec::new)))
455 })
456 .map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
457 .filter_map(|r| r.ok())
458 .collect();
459
460 Ok(versions)
461 }
462}
463
464impl TierBackend for SqlitePrimitiveStorage {}
465
466fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
468 match bound {
469 Bound::Included(v) => Bound::Included(v.as_slice()),
470 Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
471 Bound::Unbounded => Bound::Unbounded,
472 }
473}
474
475fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
477 match bound {
478 Bound::Included(v) => Bound::Included(v.to_vec()),
479 Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
480 Bound::Unbounded => Bound::Unbounded,
481 }
482}
483
484fn insert_versioned_entries_in_tx(
486 tx: &rusqlite::Transaction,
487 table_name: &str,
488 version: CommitVersion,
489 entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
490) -> rusqlite::Result<()> {
491 let version_bytes = version_to_bytes(version);
492 for (key, value) in entries {
493 tx.execute(
494 &format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name),
495 params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())],
496 )?;
497 }
498 Ok(())
499}
500
501#[cfg(test)]
502pub mod tests {
503 use reifydb_core::interface::catalog::{id::TableId, primitive::PrimitiveId};
504
505 use super::*;
506
507 #[test]
508 fn test_basic_operations() {
509 let storage = SqlitePrimitiveStorage::in_memory();
510
511 let key = CowVec::new(b"key1".to_vec());
512 let version = CommitVersion(1);
513
514 storage.set(
516 version,
517 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
518 )
519 .unwrap();
520 let value = storage.get(EntryKind::Multi, &key, version).unwrap();
521 assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
522
523 assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
525 assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
526
527 let version2 = CommitVersion(2);
529 storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
530 assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
531 }
532
533 #[test]
534 fn test_source_tables() {
535 let storage = SqlitePrimitiveStorage::in_memory();
536
537 let source1 = PrimitiveId::Table(TableId(1));
538 let source2 = PrimitiveId::Table(TableId(2));
539 let key = CowVec::new(b"key".to_vec());
540 let version = CommitVersion(1);
541
542 storage.set(
543 version,
544 HashMap::from([(
545 EntryKind::Source(source1),
546 vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
547 )]),
548 )
549 .unwrap();
550 storage.set(
551 version,
552 HashMap::from([(
553 EntryKind::Source(source2),
554 vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
555 )]),
556 )
557 .unwrap();
558
559 assert_eq!(
560 storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
561 Some(b"table1".as_slice())
562 );
563 assert_eq!(
564 storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
565 Some(b"table2".as_slice())
566 );
567 }
568
569 #[test]
570 fn test_version_queries() {
571 let storage = SqlitePrimitiveStorage::in_memory();
572
573 let key = CowVec::new(b"key1".to_vec());
574
575 storage.set(
577 CommitVersion(1),
578 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
579 )
580 .unwrap();
581 storage.set(
582 CommitVersion(2),
583 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
584 )
585 .unwrap();
586 storage.set(
587 CommitVersion(3),
588 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
589 )
590 .unwrap();
591
592 assert_eq!(
594 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
595 Some(b"v3".as_slice())
596 );
597 assert_eq!(
598 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
599 Some(b"v2".as_slice())
600 );
601 assert_eq!(
602 storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
603 Some(b"v1".as_slice())
604 );
605
606 assert_eq!(
608 storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
609 Some(b"v3".as_slice())
610 );
611 }
612
613 #[test]
614 fn test_range_next() {
615 let storage = SqlitePrimitiveStorage::in_memory();
616
617 let version = CommitVersion(1);
618 storage.set(
619 version,
620 HashMap::from([(
621 EntryKind::Multi,
622 vec![
623 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
624 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
625 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
626 ],
627 )]),
628 )
629 .unwrap();
630
631 let mut cursor = RangeCursor::new();
632 let batch = storage
633 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
634 .unwrap();
635
636 assert_eq!(batch.entries.len(), 3);
637 assert!(!batch.has_more);
638 assert!(cursor.exhausted);
639 assert_eq!(&*batch.entries[0].key, b"a");
640 assert_eq!(&*batch.entries[1].key, b"b");
641 assert_eq!(&*batch.entries[2].key, b"c");
642 }
643
644 #[test]
645 fn test_range_rev_next() {
646 let storage = SqlitePrimitiveStorage::in_memory();
647
648 let version = CommitVersion(1);
649 storage.set(
650 version,
651 HashMap::from([(
652 EntryKind::Multi,
653 vec![
654 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
655 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
656 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
657 ],
658 )]),
659 )
660 .unwrap();
661
662 let mut cursor = RangeCursor::new();
663 let batch = storage
664 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
665 .unwrap();
666
667 assert_eq!(batch.entries.len(), 3);
668 assert!(!batch.has_more);
669 assert!(cursor.exhausted);
670 assert_eq!(&*batch.entries[0].key, b"c");
671 assert_eq!(&*batch.entries[1].key, b"b");
672 assert_eq!(&*batch.entries[2].key, b"a");
673 }
674
675 #[test]
676 fn test_range_streaming_pagination() {
677 let storage = SqlitePrimitiveStorage::in_memory();
678
679 let version = CommitVersion(1);
680
681 let entries: Vec<_> =
683 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
684 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
685
686 let mut cursor = RangeCursor::new();
688
689 let batch1 = storage
691 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
692 .unwrap();
693 assert_eq!(batch1.entries.len(), 3);
694 assert!(batch1.has_more);
695 assert!(!cursor.exhausted);
696 assert_eq!(&*batch1.entries[0].key, &[0]);
697 assert_eq!(&*batch1.entries[2].key, &[2]);
698
699 let batch2 = storage
701 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
702 .unwrap();
703 assert_eq!(batch2.entries.len(), 3);
704 assert!(batch2.has_more);
705 assert!(!cursor.exhausted);
706 assert_eq!(&*batch2.entries[0].key, &[3]);
707 assert_eq!(&*batch2.entries[2].key, &[5]);
708 }
709
710 #[test]
711 fn test_range_reving_pagination() {
712 let storage = SqlitePrimitiveStorage::in_memory();
713
714 let version = CommitVersion(1);
715
716 let entries: Vec<_> =
718 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
719 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
720
721 let mut cursor = RangeCursor::new();
723
724 let batch1 = storage
726 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
727 .unwrap();
728 assert_eq!(batch1.entries.len(), 3);
729 assert!(batch1.has_more);
730 assert!(!cursor.exhausted);
731 assert_eq!(&*batch1.entries[0].key, &[9]);
732 assert_eq!(&*batch1.entries[2].key, &[7]);
733
734 let batch2 = storage
736 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
737 .unwrap();
738 assert_eq!(batch2.entries.len(), 3);
739 assert!(batch2.has_more);
740 assert!(!cursor.exhausted);
741 assert_eq!(&*batch2.entries[0].key, &[6]);
742 assert_eq!(&*batch2.entries[2].key, &[4]);
743 }
744
745 #[test]
746 fn test_get_nonexistent_table() {
747 let storage = SqlitePrimitiveStorage::in_memory();
748
749 let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
751 assert_eq!(value, None);
752 }
753
754 #[test]
755 fn test_range_nonexistent_table() {
756 let storage = SqlitePrimitiveStorage::in_memory();
757
758 let mut cursor = RangeCursor::new();
760 let batch = storage
761 .range_next(
762 EntryKind::Multi,
763 &mut cursor,
764 Bound::Unbounded,
765 Bound::Unbounded,
766 CommitVersion(1),
767 100,
768 )
769 .unwrap();
770 assert!(batch.entries.is_empty());
771 assert!(cursor.exhausted);
772 }
773
774 #[test]
775 fn test_drop_specific_version() {
776 let storage = SqlitePrimitiveStorage::in_memory();
777
778 let key = CowVec::new(b"key1".to_vec());
779
780 for v in 1..=3u64 {
782 storage.set(
783 CommitVersion(v),
784 HashMap::from([(
785 EntryKind::Multi,
786 vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
787 )]),
788 )
789 .unwrap();
790 }
791
792 storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
794
795 assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
797
798 assert_eq!(
800 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
801 Some(b"v2".as_slice())
802 );
803 assert_eq!(
804 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
805 Some(b"v3".as_slice())
806 );
807 }
808}