1use std::{collections::HashMap, ops::Bound, sync::Arc};
11
12use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal};
13use reifydb_runtime::sync::mutex::Mutex;
14use reifydb_type::{Result, error, util::cowvec::CowVec};
15use rusqlite::{
16 Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
17};
18use tracing::instrument;
19
20use super::{
21 SqliteConfig,
22 connection::{connect, convert_flags, resolve_db_path},
23 entry::entry_id_to_name,
24 query::{build_versioned_range_query, version_to_bytes},
25};
26use crate::tier::{EntryKind, RangeBatch, RangeCursor, RawEntry, TierBackend, TierStorage};
27
28#[derive(Clone)]
33pub struct SqlitePrimitiveStorage {
34 inner: Arc<SqlitePrimitiveStorageInner>,
35}
36
37struct SqlitePrimitiveStorageInner {
38 conn: Mutex<Connection>,
42}
43
44impl SqlitePrimitiveStorage {
45 #[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
47 db_path = ?config.path,
48 page_size = config.page_size,
49 journal_mode = %config.journal_mode.as_str()
50 ))]
51 pub fn new(config: SqliteConfig) -> Self {
52 let db_path = resolve_db_path(config.path);
53 let flags = convert_flags(&config.flags);
54
55 let conn = connect(&db_path, flags).expect("Failed to connect to database");
56
57 conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
59 conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
60 .expect("Failed to set journal_mode");
61 conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
62 .expect("Failed to set synchronous");
63 conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
64 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
65 conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
66 conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
67 .expect("Failed to set wal_autocheckpoint");
68 conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
69
70 Self {
71 inner: Arc::new(SqlitePrimitiveStorageInner {
72 conn: Mutex::new(conn),
73 }),
74 }
75 }
76
77 pub fn in_memory() -> Self {
79 Self::new(SqliteConfig::in_memory())
80 }
81
82 pub fn incremental_vacuum(&self) {
84 let conn = self.inner.conn.lock();
85 let _ = conn.execute("PRAGMA incremental_vacuum", []);
86 }
87
88 pub fn shrink_memory(&self) {
90 let conn = self.inner.conn.lock();
91 let _ = conn.pragma_update(None, "shrink_memory", 0);
92 }
93
94 pub fn shutdown(&self) {
96 let conn = self.inner.conn.lock();
97 let _ = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE");
98 let _ = conn.pragma_update(None, "cache_size", 0);
99 }
100
101 fn create_table_if_needed(conn: &Connection, table_name: &str) -> SqliteResult<()> {
103 conn.execute(
104 &format!(
105 "CREATE TABLE IF NOT EXISTS \"{}\" (
106 key BLOB NOT NULL,
107 version BLOB NOT NULL,
108 value BLOB,
109 PRIMARY KEY (key, version)
110 ) WITHOUT ROWID",
111 table_name
112 ),
113 [],
114 )?;
115 Ok(())
116 }
117}
118
119impl TierStorage for SqlitePrimitiveStorage {
120 #[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
121 fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
122 let table_name = entry_id_to_name(table);
123 let conn = self.inner.conn.lock();
124
125 let result = conn.query_row(
127 &format!(
128 "SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
129 table_name
130 ),
131 params![key, version_to_bytes(version).as_slice()],
132 |row| row.get::<_, Option<Vec<u8>>>(0),
133 );
134
135 match result {
136 Ok(Some(value)) => Ok(Some(CowVec::new(value))),
137 Ok(None) => Ok(None), Err(QueryReturnedNoRows) => Ok(None),
139 Err(e) if e.to_string().contains("no such table") => Ok(None),
140 Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
141 }
142 }
143
144 #[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
145 fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
146 let table_name = entry_id_to_name(table);
147 let conn = self.inner.conn.lock();
148
149 let result = conn.query_row(
151 &format!(
152 "SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
153 table_name
154 ),
155 params![key, version_to_bytes(version).as_slice()],
156 |row| row.get::<_, bool>(0),
157 );
158
159 match result {
160 Ok(has_value) => Ok(has_value),
161 Err(QueryReturnedNoRows) => Ok(false),
162 Err(e) if e.to_string().contains("no such table") => Ok(false),
163 Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
164 }
165 }
166
167 #[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
168 fn set(
169 &self,
170 version: CommitVersion,
171 batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>>,
172 ) -> Result<()> {
173 if batches.is_empty() {
174 return Ok(());
175 }
176
177 let conn = self.inner.conn.lock();
178 let tx = conn
179 .unchecked_transaction()
180 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
181
182 for (table, entries) in batches {
183 let table_name = entry_id_to_name(table);
184
185 let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
187 if let Err(e) = result {
188 if e.to_string().contains("no such table") {
189 Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
190 error!(internal(format!("Failed to create table: {}", e)))
191 })?;
192 insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
193 |e| error!(internal(format!("Failed to insert entries: {}", e))),
194 )?;
195 } else {
196 return Err(error!(internal(format!("Failed to insert entries: {}", e))));
197 }
198 }
199 }
200
201 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
202 }
203
204 #[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
205 fn range_next(
206 &self,
207 table: EntryKind,
208 cursor: &mut RangeCursor,
209 start: Bound<&[u8]>,
210 end: Bound<&[u8]>,
211 version: CommitVersion,
212 batch_size: usize,
213 ) -> Result<RangeBatch> {
214 if cursor.exhausted {
215 return Ok(RangeBatch::empty());
216 }
217
218 let table_name = entry_id_to_name(table);
219
220 let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
222 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
223 None => bound_to_owned(start),
224 };
225 let end_owned = bound_to_owned(end);
226
227 let conn = self.inner.conn.lock();
228
229 let start_ref = bound_as_ref(&effective_start);
230 let end_ref = bound_as_ref(&end_owned);
231 let (query, params) =
232 build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
233
234 let mut stmt = match conn.prepare(&query) {
235 Ok(stmt) => stmt,
236 Err(e) if e.to_string().contains("no such table") => {
237 cursor.exhausted = true;
238 return Ok(RangeBatch::empty());
239 }
240 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
241 };
242
243 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
244
245 let entries: Vec<RawEntry> = stmt
246 .query_map(params_refs.as_slice(), |row| {
247 let key: Vec<u8> = row.get(0)?;
248 let version_bytes: Vec<u8> = row.get(1)?;
249 let value: Option<Vec<u8>> = row.get(2)?;
250 let version = u64::from_be_bytes(
251 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
252 );
253 Ok(RawEntry {
254 key: CowVec::new(key),
255 version: CommitVersion(version),
256 value: value.map(CowVec::new),
257 })
258 })
259 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
260 .filter_map(|r| r.ok())
261 .collect();
262
263 let has_more = entries.len() > batch_size;
264 let entries = if has_more {
265 entries.into_iter().take(batch_size).collect()
266 } else {
267 entries
268 };
269
270 let batch = RangeBatch {
271 entries,
272 has_more,
273 };
274
275 if let Some(last_entry) = batch.entries.last() {
277 cursor.last_key = Some(last_entry.key.clone());
278 }
279 if !batch.has_more {
280 cursor.exhausted = true;
281 }
282
283 Ok(batch)
284 }
285
286 #[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
287 fn range_rev_next(
288 &self,
289 table: EntryKind,
290 cursor: &mut RangeCursor,
291 start: Bound<&[u8]>,
292 end: Bound<&[u8]>,
293 version: CommitVersion,
294 batch_size: usize,
295 ) -> Result<RangeBatch> {
296 if cursor.exhausted {
297 return Ok(RangeBatch::empty());
298 }
299
300 let table_name = entry_id_to_name(table);
301
302 let start_owned = bound_to_owned(start);
304 let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
305 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
306 None => bound_to_owned(end),
307 };
308
309 let conn = self.inner.conn.lock();
310
311 let start_ref = bound_as_ref(&start_owned);
312 let end_ref = bound_as_ref(&effective_end);
313 let (query, params) =
314 build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
315
316 let mut stmt = match conn.prepare(&query) {
317 Ok(stmt) => stmt,
318 Err(e) if e.to_string().contains("no such table") => {
319 cursor.exhausted = true;
320 return Ok(RangeBatch::empty());
321 }
322 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
323 };
324
325 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
326
327 let entries: Vec<RawEntry> = stmt
328 .query_map(params_refs.as_slice(), |row| {
329 let key: Vec<u8> = row.get(0)?;
330 let version_bytes: Vec<u8> = row.get(1)?;
331 let value: Option<Vec<u8>> = row.get(2)?;
332 let version = u64::from_be_bytes(
333 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
334 );
335 Ok(RawEntry {
336 key: CowVec::new(key),
337 version: CommitVersion(version),
338 value: value.map(CowVec::new),
339 })
340 })
341 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
342 .filter_map(|r| r.ok())
343 .collect();
344
345 let has_more = entries.len() > batch_size;
346 let entries = if has_more {
347 entries.into_iter().take(batch_size).collect()
348 } else {
349 entries
350 };
351
352 let batch = RangeBatch {
353 entries,
354 has_more,
355 };
356
357 if let Some(last_entry) = batch.entries.last() {
359 cursor.last_key = Some(last_entry.key.clone());
360 }
361 if !batch.has_more {
362 cursor.exhausted = true;
363 }
364
365 Ok(batch)
366 }
367
368 fn ensure_table(&self, table: EntryKind) -> Result<()> {
369 let table_name = entry_id_to_name(table);
370 let conn = self.inner.conn.lock();
371
372 Self::create_table_if_needed(&conn, &table_name)
373 .map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
374 }
375
376 fn clear_table(&self, table: EntryKind) -> Result<()> {
377 let table_name = entry_id_to_name(table);
378 let conn = self.inner.conn.lock();
379
380 let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
381
382 match result {
383 Ok(_) => Ok(()),
384 Err(e) if e.to_string().contains("no such table") => Ok(()),
385 Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
386 }
387 }
388
389 #[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
390 fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
391 if batches.is_empty() {
392 return Ok(());
393 }
394
395 let conn = self.inner.conn.lock();
396 let tx = conn
397 .unchecked_transaction()
398 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
399
400 for (table, entries) in batches {
401 let table_name = entry_id_to_name(table);
402
403 let max_version_sql = format!("SELECT MAX(version) FROM \"{}\" WHERE key = ?1", table_name);
404 let delete_all_sql = format!("DELETE FROM \"{}\" WHERE key = ?1", table_name);
405 let delete_one_sql = format!("DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2", table_name);
406
407 let mut max_version_stmt = match tx.prepare(&max_version_sql) {
408 Ok(s) => s,
409 Err(e) if e.to_string().contains("no such table") => continue,
410 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
411 };
412 let mut delete_all_stmt = tx
413 .prepare(&delete_all_sql)
414 .map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
415 let mut delete_one_stmt = tx
416 .prepare(&delete_one_sql)
417 .map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
418
419 for (key, version) in entries {
420 let version_bytes = version_to_bytes(version);
421
422 let max_version: Option<Vec<u8>> = max_version_stmt
423 .query_row(params![key.as_slice()], |row| row.get(0))
424 .unwrap_or(None);
425
426 let is_latest = max_version.as_deref() == Some(version_bytes.as_slice());
427
428 let result = if is_latest {
429 delete_all_stmt.execute(params![key.as_slice()])
430 } else {
431 delete_one_stmt.execute(params![key.as_slice(), version_bytes.as_slice()])
432 };
433
434 if let Err(e) = result {
435 if !e.to_string().contains("no such table") {
436 return Err(error!(internal(format!("Failed to delete entry: {}", e))));
437 }
438 }
439 }
440 }
441
442 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
443 }
444
445 #[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
446 fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
447 let table_name = entry_id_to_name(table);
448 let conn = self.inner.conn.lock();
449
450 let mut stmt = match conn.prepare(&format!(
451 "SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
452 table_name
453 )) {
454 Ok(stmt) => stmt,
455 Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
456 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
457 };
458
459 let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
460 .query_map(params![key], |row| {
461 let version_bytes: Vec<u8> = row.get(0)?;
462 let value: Option<Vec<u8>> = row.get(1)?;
463 let version = u64::from_be_bytes(
464 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
465 );
466 Ok((CommitVersion(version), value.map(CowVec::new)))
467 })
468 .map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
469 .filter_map(|r| r.ok())
470 .collect();
471
472 Ok(versions)
473 }
474}
475
476impl TierBackend for SqlitePrimitiveStorage {}
477
478fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
480 match bound {
481 Bound::Included(v) => Bound::Included(v.as_slice()),
482 Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
483 Bound::Unbounded => Bound::Unbounded,
484 }
485}
486
487fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
489 match bound {
490 Bound::Included(v) => Bound::Included(v.to_vec()),
491 Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
492 Bound::Unbounded => Bound::Unbounded,
493 }
494}
495
496fn insert_versioned_entries_in_tx(
498 tx: &SqliteTransaction,
499 table_name: &str,
500 version: CommitVersion,
501 entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
502) -> SqliteResult<()> {
503 let version_bytes = version_to_bytes(version);
504 let sql = format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name);
505 let mut stmt = tx.prepare(&sql)?;
506 for (key, value) in entries {
507 stmt.execute(params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())])?;
508 }
509 Ok(())
510}
511
512#[cfg(test)]
513pub mod tests {
514 use reifydb_core::interface::catalog::{id::TableId, primitive::PrimitiveId};
515
516 use super::*;
517
518 #[test]
519 fn test_basic_operations() {
520 let storage = SqlitePrimitiveStorage::in_memory();
521
522 let key = CowVec::new(b"key1".to_vec());
523 let version = CommitVersion(1);
524
525 storage.set(
527 version,
528 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
529 )
530 .unwrap();
531 let value = storage.get(EntryKind::Multi, &key, version).unwrap();
532 assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
533
534 assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
536 assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
537
538 let version2 = CommitVersion(2);
540 storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
541 assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
542 }
543
544 #[test]
545 fn test_source_tables() {
546 let storage = SqlitePrimitiveStorage::in_memory();
547
548 let source1 = PrimitiveId::Table(TableId(1));
549 let source2 = PrimitiveId::Table(TableId(2));
550 let key = CowVec::new(b"key".to_vec());
551 let version = CommitVersion(1);
552
553 storage.set(
554 version,
555 HashMap::from([(
556 EntryKind::Source(source1),
557 vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
558 )]),
559 )
560 .unwrap();
561 storage.set(
562 version,
563 HashMap::from([(
564 EntryKind::Source(source2),
565 vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
566 )]),
567 )
568 .unwrap();
569
570 assert_eq!(
571 storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
572 Some(b"table1".as_slice())
573 );
574 assert_eq!(
575 storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
576 Some(b"table2".as_slice())
577 );
578 }
579
580 #[test]
581 fn test_version_queries() {
582 let storage = SqlitePrimitiveStorage::in_memory();
583
584 let key = CowVec::new(b"key1".to_vec());
585
586 storage.set(
588 CommitVersion(1),
589 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
590 )
591 .unwrap();
592 storage.set(
593 CommitVersion(2),
594 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
595 )
596 .unwrap();
597 storage.set(
598 CommitVersion(3),
599 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
600 )
601 .unwrap();
602
603 assert_eq!(
605 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
606 Some(b"v3".as_slice())
607 );
608 assert_eq!(
609 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
610 Some(b"v2".as_slice())
611 );
612 assert_eq!(
613 storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
614 Some(b"v1".as_slice())
615 );
616
617 assert_eq!(
619 storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
620 Some(b"v3".as_slice())
621 );
622 }
623
624 #[test]
625 fn test_range_next() {
626 let storage = SqlitePrimitiveStorage::in_memory();
627
628 let version = CommitVersion(1);
629 storage.set(
630 version,
631 HashMap::from([(
632 EntryKind::Multi,
633 vec![
634 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
635 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
636 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
637 ],
638 )]),
639 )
640 .unwrap();
641
642 let mut cursor = RangeCursor::new();
643 let batch = storage
644 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
645 .unwrap();
646
647 assert_eq!(batch.entries.len(), 3);
648 assert!(!batch.has_more);
649 assert!(cursor.exhausted);
650 assert_eq!(&*batch.entries[0].key, b"a");
651 assert_eq!(&*batch.entries[1].key, b"b");
652 assert_eq!(&*batch.entries[2].key, b"c");
653 }
654
655 #[test]
656 fn test_range_rev_next() {
657 let storage = SqlitePrimitiveStorage::in_memory();
658
659 let version = CommitVersion(1);
660 storage.set(
661 version,
662 HashMap::from([(
663 EntryKind::Multi,
664 vec![
665 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
666 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
667 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
668 ],
669 )]),
670 )
671 .unwrap();
672
673 let mut cursor = RangeCursor::new();
674 let batch = storage
675 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
676 .unwrap();
677
678 assert_eq!(batch.entries.len(), 3);
679 assert!(!batch.has_more);
680 assert!(cursor.exhausted);
681 assert_eq!(&*batch.entries[0].key, b"c");
682 assert_eq!(&*batch.entries[1].key, b"b");
683 assert_eq!(&*batch.entries[2].key, b"a");
684 }
685
686 #[test]
687 fn test_range_streaming_pagination() {
688 let storage = SqlitePrimitiveStorage::in_memory();
689
690 let version = CommitVersion(1);
691
692 let entries: Vec<_> =
694 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
695 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
696
697 let mut cursor = RangeCursor::new();
699
700 let batch1 = storage
702 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
703 .unwrap();
704 assert_eq!(batch1.entries.len(), 3);
705 assert!(batch1.has_more);
706 assert!(!cursor.exhausted);
707 assert_eq!(&*batch1.entries[0].key, &[0]);
708 assert_eq!(&*batch1.entries[2].key, &[2]);
709
710 let batch2 = storage
712 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
713 .unwrap();
714 assert_eq!(batch2.entries.len(), 3);
715 assert!(batch2.has_more);
716 assert!(!cursor.exhausted);
717 assert_eq!(&*batch2.entries[0].key, &[3]);
718 assert_eq!(&*batch2.entries[2].key, &[5]);
719 }
720
721 #[test]
722 fn test_range_reving_pagination() {
723 let storage = SqlitePrimitiveStorage::in_memory();
724
725 let version = CommitVersion(1);
726
727 let entries: Vec<_> =
729 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
730 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
731
732 let mut cursor = RangeCursor::new();
734
735 let batch1 = storage
737 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
738 .unwrap();
739 assert_eq!(batch1.entries.len(), 3);
740 assert!(batch1.has_more);
741 assert!(!cursor.exhausted);
742 assert_eq!(&*batch1.entries[0].key, &[9]);
743 assert_eq!(&*batch1.entries[2].key, &[7]);
744
745 let batch2 = storage
747 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
748 .unwrap();
749 assert_eq!(batch2.entries.len(), 3);
750 assert!(batch2.has_more);
751 assert!(!cursor.exhausted);
752 assert_eq!(&*batch2.entries[0].key, &[6]);
753 assert_eq!(&*batch2.entries[2].key, &[4]);
754 }
755
756 #[test]
757 fn test_get_nonexistent_table() {
758 let storage = SqlitePrimitiveStorage::in_memory();
759
760 let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
762 assert_eq!(value, None);
763 }
764
765 #[test]
766 fn test_range_nonexistent_table() {
767 let storage = SqlitePrimitiveStorage::in_memory();
768
769 let mut cursor = RangeCursor::new();
771 let batch = storage
772 .range_next(
773 EntryKind::Multi,
774 &mut cursor,
775 Bound::Unbounded,
776 Bound::Unbounded,
777 CommitVersion(1),
778 100,
779 )
780 .unwrap();
781 assert!(batch.entries.is_empty());
782 assert!(cursor.exhausted);
783 }
784
785 #[test]
786 fn test_drop_specific_version() {
787 let storage = SqlitePrimitiveStorage::in_memory();
788
789 let key = CowVec::new(b"key1".to_vec());
790
791 for v in 1..=3u64 {
793 storage.set(
794 CommitVersion(v),
795 HashMap::from([(
796 EntryKind::Multi,
797 vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
798 )]),
799 )
800 .unwrap();
801 }
802
803 storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
805
806 assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
808
809 assert_eq!(
811 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
812 Some(b"v2".as_slice())
813 );
814 assert_eq!(
815 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
816 Some(b"v3".as_slice())
817 );
818 }
819}