1use std::{collections::HashMap, ops::Bound, sync::Arc};
11
12use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal};
13use reifydb_runtime::sync::mutex::Mutex;
14use reifydb_type::{Result, error, util::cowvec::CowVec};
15use rusqlite::{
16 Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
17};
18use tracing::instrument;
19
20use super::{
21 SqliteConfig,
22 connection::{connect, convert_flags, resolve_db_path},
23 entry::entry_id_to_name,
24 query::{build_versioned_range_query, version_to_bytes},
25};
26use crate::tier::{EntryKind, RangeBatch, RangeCursor, RawEntry, TierBackend, TierBatch, TierStorage};
27
28#[derive(Clone)]
33pub struct SqlitePrimitiveStorage {
34 inner: Arc<SqlitePrimitiveStorageInner>,
35}
36
37struct SqlitePrimitiveStorageInner {
38 conn: Mutex<Connection>,
42}
43
44impl SqlitePrimitiveStorage {
45 #[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
47 db_path = ?config.path,
48 page_size = config.page_size,
49 journal_mode = %config.journal_mode.as_str()
50 ))]
51 pub fn new(config: SqliteConfig) -> Self {
52 let db_path = resolve_db_path(config.path);
53 let flags = convert_flags(&config.flags);
54
55 let conn = connect(&db_path, flags).expect("Failed to connect to database");
56
57 conn.pragma_update(None, "page_size", config.page_size).expect("Failed to set page_size");
59 conn.pragma_update(None, "journal_mode", config.journal_mode.as_str())
60 .expect("Failed to set journal_mode");
61 conn.pragma_update(None, "synchronous", config.synchronous_mode.as_str())
62 .expect("Failed to set synchronous");
63 conn.pragma_update(None, "temp_store", config.temp_store.as_str()).expect("Failed to set temp_store");
64 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL").expect("Failed to set auto_vacuum");
65 conn.pragma_update(None, "cache_size", -(config.cache_size as i32)).expect("Failed to set cache_size");
66 conn.pragma_update(None, "wal_autocheckpoint", config.wal_autocheckpoint)
67 .expect("Failed to set wal_autocheckpoint");
68 conn.pragma_update(None, "mmap_size", config.mmap_size as i64).expect("Failed to set mmap_size");
69
70 Self {
71 inner: Arc::new(SqlitePrimitiveStorageInner {
72 conn: Mutex::new(conn),
73 }),
74 }
75 }
76
77 pub fn in_memory() -> Self {
79 Self::new(SqliteConfig::in_memory())
80 }
81
82 pub fn incremental_vacuum(&self) {
84 let conn = self.inner.conn.lock();
85 let _ = conn.execute("PRAGMA incremental_vacuum", []);
86 }
87
88 pub fn shrink_memory(&self) {
90 let conn = self.inner.conn.lock();
91 let _ = conn.pragma_update(None, "shrink_memory", 0);
92 }
93
94 pub fn shutdown(&self) {
96 let conn = self.inner.conn.lock();
97 let _ = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE");
98 let _ = conn.pragma_update(None, "cache_size", 0);
99 }
100
101 fn create_table_if_needed(conn: &Connection, table_name: &str) -> SqliteResult<()> {
103 conn.execute(
104 &format!(
105 "CREATE TABLE IF NOT EXISTS \"{}\" (
106 key BLOB NOT NULL,
107 version BLOB NOT NULL,
108 value BLOB,
109 PRIMARY KEY (key, version)
110 ) WITHOUT ROWID",
111 table_name
112 ),
113 [],
114 )?;
115 Ok(())
116 }
117}
118
119impl TierStorage for SqlitePrimitiveStorage {
120 #[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
121 fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
122 let table_name = entry_id_to_name(table);
123 let conn = self.inner.conn.lock();
124
125 let result = conn.query_row(
127 &format!(
128 "SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
129 table_name
130 ),
131 params![key, version_to_bytes(version).as_slice()],
132 |row| row.get::<_, Option<Vec<u8>>>(0),
133 );
134
135 match result {
136 Ok(Some(value)) => Ok(Some(CowVec::new(value))),
137 Ok(None) => Ok(None), Err(QueryReturnedNoRows) => Ok(None),
139 Err(e) if e.to_string().contains("no such table") => Ok(None),
140 Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
141 }
142 }
143
144 #[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
145 fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
146 let table_name = entry_id_to_name(table);
147 let conn = self.inner.conn.lock();
148
149 let result = conn.query_row(
151 &format!(
152 "SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
153 table_name
154 ),
155 params![key, version_to_bytes(version).as_slice()],
156 |row| row.get::<_, bool>(0),
157 );
158
159 match result {
160 Ok(has_value) => Ok(has_value),
161 Err(QueryReturnedNoRows) => Ok(false),
162 Err(e) if e.to_string().contains("no such table") => Ok(false),
163 Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
164 }
165 }
166
167 #[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
168 fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()> {
169 if batches.is_empty() {
170 return Ok(());
171 }
172
173 let conn = self.inner.conn.lock();
174 let tx = conn
175 .unchecked_transaction()
176 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
177
178 for (table, entries) in batches {
179 let table_name = entry_id_to_name(table);
180
181 let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
183 if let Err(e) = result {
184 if e.to_string().contains("no such table") {
185 Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
186 error!(internal(format!("Failed to create table: {}", e)))
187 })?;
188 insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
189 |e| error!(internal(format!("Failed to insert entries: {}", e))),
190 )?;
191 } else {
192 return Err(error!(internal(format!("Failed to insert entries: {}", e))));
193 }
194 }
195 }
196
197 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
198 }
199
200 #[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
201 fn range_next(
202 &self,
203 table: EntryKind,
204 cursor: &mut RangeCursor,
205 start: Bound<&[u8]>,
206 end: Bound<&[u8]>,
207 version: CommitVersion,
208 batch_size: usize,
209 ) -> Result<RangeBatch> {
210 if cursor.exhausted {
211 return Ok(RangeBatch::empty());
212 }
213
214 let table_name = entry_id_to_name(table);
215
216 let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
218 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
219 None => bound_to_owned(start),
220 };
221 let end_owned = bound_to_owned(end);
222
223 let conn = self.inner.conn.lock();
224
225 let start_ref = bound_as_ref(&effective_start);
226 let end_ref = bound_as_ref(&end_owned);
227 let (query, params) =
228 build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
229
230 let mut stmt = match conn.prepare(&query) {
231 Ok(stmt) => stmt,
232 Err(e) if e.to_string().contains("no such table") => {
233 cursor.exhausted = true;
234 return Ok(RangeBatch::empty());
235 }
236 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
237 };
238
239 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
240
241 let entries: Vec<RawEntry> = stmt
242 .query_map(params_refs.as_slice(), |row| {
243 let key: Vec<u8> = row.get(0)?;
244 let version_bytes: Vec<u8> = row.get(1)?;
245 let value: Option<Vec<u8>> = row.get(2)?;
246 let version = u64::from_be_bytes(
247 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
248 );
249 Ok(RawEntry {
250 key: CowVec::new(key),
251 version: CommitVersion(version),
252 value: value.map(CowVec::new),
253 })
254 })
255 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
256 .filter_map(|r| r.ok())
257 .collect();
258
259 let has_more = entries.len() > batch_size;
260 let entries = if has_more {
261 entries.into_iter().take(batch_size).collect()
262 } else {
263 entries
264 };
265
266 let batch = RangeBatch {
267 entries,
268 has_more,
269 };
270
271 if let Some(last_entry) = batch.entries.last() {
273 cursor.last_key = Some(last_entry.key.clone());
274 }
275 if !batch.has_more {
276 cursor.exhausted = true;
277 }
278
279 Ok(batch)
280 }
281
282 #[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
283 fn range_rev_next(
284 &self,
285 table: EntryKind,
286 cursor: &mut RangeCursor,
287 start: Bound<&[u8]>,
288 end: Bound<&[u8]>,
289 version: CommitVersion,
290 batch_size: usize,
291 ) -> Result<RangeBatch> {
292 if cursor.exhausted {
293 return Ok(RangeBatch::empty());
294 }
295
296 let table_name = entry_id_to_name(table);
297
298 let start_owned = bound_to_owned(start);
300 let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
301 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
302 None => bound_to_owned(end),
303 };
304
305 let conn = self.inner.conn.lock();
306
307 let start_ref = bound_as_ref(&start_owned);
308 let end_ref = bound_as_ref(&effective_end);
309 let (query, params) =
310 build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
311
312 let mut stmt = match conn.prepare(&query) {
313 Ok(stmt) => stmt,
314 Err(e) if e.to_string().contains("no such table") => {
315 cursor.exhausted = true;
316 return Ok(RangeBatch::empty());
317 }
318 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
319 };
320
321 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
322
323 let entries: Vec<RawEntry> = stmt
324 .query_map(params_refs.as_slice(), |row| {
325 let key: Vec<u8> = row.get(0)?;
326 let version_bytes: Vec<u8> = row.get(1)?;
327 let value: Option<Vec<u8>> = row.get(2)?;
328 let version = u64::from_be_bytes(
329 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
330 );
331 Ok(RawEntry {
332 key: CowVec::new(key),
333 version: CommitVersion(version),
334 value: value.map(CowVec::new),
335 })
336 })
337 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
338 .filter_map(|r| r.ok())
339 .collect();
340
341 let has_more = entries.len() > batch_size;
342 let entries = if has_more {
343 entries.into_iter().take(batch_size).collect()
344 } else {
345 entries
346 };
347
348 let batch = RangeBatch {
349 entries,
350 has_more,
351 };
352
353 if let Some(last_entry) = batch.entries.last() {
355 cursor.last_key = Some(last_entry.key.clone());
356 }
357 if !batch.has_more {
358 cursor.exhausted = true;
359 }
360
361 Ok(batch)
362 }
363
364 fn ensure_table(&self, table: EntryKind) -> Result<()> {
365 let table_name = entry_id_to_name(table);
366 let conn = self.inner.conn.lock();
367
368 Self::create_table_if_needed(&conn, &table_name)
369 .map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
370 }
371
372 fn clear_table(&self, table: EntryKind) -> Result<()> {
373 let table_name = entry_id_to_name(table);
374 let conn = self.inner.conn.lock();
375
376 let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
377
378 match result {
379 Ok(_) => Ok(()),
380 Err(e) if e.to_string().contains("no such table") => Ok(()),
381 Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
382 }
383 }
384
385 #[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
386 fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
387 if batches.is_empty() {
388 return Ok(());
389 }
390
391 let conn = self.inner.conn.lock();
392 let tx = conn
393 .unchecked_transaction()
394 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
395
396 for (table, entries) in batches {
397 let table_name = entry_id_to_name(table);
398
399 let max_version_sql = format!("SELECT MAX(version) FROM \"{}\" WHERE key = ?1", table_name);
400 let delete_all_sql = format!("DELETE FROM \"{}\" WHERE key = ?1", table_name);
401 let delete_one_sql = format!("DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2", table_name);
402
403 let mut max_version_stmt = match tx.prepare(&max_version_sql) {
404 Ok(s) => s,
405 Err(e) if e.to_string().contains("no such table") => continue,
406 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
407 };
408 let mut delete_all_stmt = tx
409 .prepare(&delete_all_sql)
410 .map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
411 let mut delete_one_stmt = tx
412 .prepare(&delete_one_sql)
413 .map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
414
415 for (key, version) in entries {
416 let version_bytes = version_to_bytes(version);
417
418 let max_version: Option<Vec<u8>> = max_version_stmt
419 .query_row(params![key.as_slice()], |row| row.get(0))
420 .unwrap_or(None);
421
422 let is_latest = max_version.as_deref() == Some(version_bytes.as_slice());
423
424 let result = if is_latest {
425 delete_all_stmt.execute(params![key.as_slice()])
426 } else {
427 delete_one_stmt.execute(params![key.as_slice(), version_bytes.as_slice()])
428 };
429
430 if let Err(e) = result
431 && !e.to_string().contains("no such table")
432 {
433 return Err(error!(internal(format!("Failed to delete entry: {}", e))));
434 }
435 }
436 }
437
438 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
439 }
440
441 #[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
442 fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
443 let table_name = entry_id_to_name(table);
444 let conn = self.inner.conn.lock();
445
446 let mut stmt = match conn.prepare(&format!(
447 "SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
448 table_name
449 )) {
450 Ok(stmt) => stmt,
451 Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
452 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
453 };
454
455 let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
456 .query_map(params![key], |row| {
457 let version_bytes: Vec<u8> = row.get(0)?;
458 let value: Option<Vec<u8>> = row.get(1)?;
459 let version = u64::from_be_bytes(
460 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
461 );
462 Ok((CommitVersion(version), value.map(CowVec::new)))
463 })
464 .map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
465 .filter_map(|r| r.ok())
466 .collect();
467
468 Ok(versions)
469 }
470}
471
472impl TierBackend for SqlitePrimitiveStorage {}
473
474fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
476 match bound {
477 Bound::Included(v) => Bound::Included(v.as_slice()),
478 Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
479 Bound::Unbounded => Bound::Unbounded,
480 }
481}
482
483fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
485 match bound {
486 Bound::Included(v) => Bound::Included(v.to_vec()),
487 Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
488 Bound::Unbounded => Bound::Unbounded,
489 }
490}
491
492fn insert_versioned_entries_in_tx(
494 tx: &SqliteTransaction,
495 table_name: &str,
496 version: CommitVersion,
497 entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
498) -> SqliteResult<()> {
499 let version_bytes = version_to_bytes(version);
500 let sql = format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name);
501 let mut stmt = tx.prepare(&sql)?;
502 for (key, value) in entries {
503 stmt.execute(params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())])?;
504 }
505 Ok(())
506}
507
508#[cfg(test)]
509pub mod tests {
510 use reifydb_core::interface::catalog::{id::TableId, shape::ShapeId};
511
512 use super::*;
513
514 #[test]
515 fn test_basic_operations() {
516 let storage = SqlitePrimitiveStorage::in_memory();
517
518 let key = CowVec::new(b"key1".to_vec());
519 let version = CommitVersion(1);
520
521 storage.set(
523 version,
524 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
525 )
526 .unwrap();
527 let value = storage.get(EntryKind::Multi, &key, version).unwrap();
528 assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
529
530 assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
532 assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
533
534 let version2 = CommitVersion(2);
536 storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
537 assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
538 }
539
540 #[test]
541 fn test_source_tables() {
542 let storage = SqlitePrimitiveStorage::in_memory();
543
544 let source1 = ShapeId::Table(TableId(1));
545 let source2 = ShapeId::Table(TableId(2));
546 let key = CowVec::new(b"key".to_vec());
547 let version = CommitVersion(1);
548
549 storage.set(
550 version,
551 HashMap::from([(
552 EntryKind::Source(source1),
553 vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
554 )]),
555 )
556 .unwrap();
557 storage.set(
558 version,
559 HashMap::from([(
560 EntryKind::Source(source2),
561 vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
562 )]),
563 )
564 .unwrap();
565
566 assert_eq!(
567 storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
568 Some(b"table1".as_slice())
569 );
570 assert_eq!(
571 storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
572 Some(b"table2".as_slice())
573 );
574 }
575
576 #[test]
577 fn test_version_queries() {
578 let storage = SqlitePrimitiveStorage::in_memory();
579
580 let key = CowVec::new(b"key1".to_vec());
581
582 storage.set(
584 CommitVersion(1),
585 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
586 )
587 .unwrap();
588 storage.set(
589 CommitVersion(2),
590 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
591 )
592 .unwrap();
593 storage.set(
594 CommitVersion(3),
595 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
596 )
597 .unwrap();
598
599 assert_eq!(
601 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
602 Some(b"v3".as_slice())
603 );
604 assert_eq!(
605 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
606 Some(b"v2".as_slice())
607 );
608 assert_eq!(
609 storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
610 Some(b"v1".as_slice())
611 );
612
613 assert_eq!(
615 storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
616 Some(b"v3".as_slice())
617 );
618 }
619
620 #[test]
621 fn test_range_next() {
622 let storage = SqlitePrimitiveStorage::in_memory();
623
624 let version = CommitVersion(1);
625 storage.set(
626 version,
627 HashMap::from([(
628 EntryKind::Multi,
629 vec![
630 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
631 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
632 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
633 ],
634 )]),
635 )
636 .unwrap();
637
638 let mut cursor = RangeCursor::new();
639 let batch = storage
640 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
641 .unwrap();
642
643 assert_eq!(batch.entries.len(), 3);
644 assert!(!batch.has_more);
645 assert!(cursor.exhausted);
646 assert_eq!(&*batch.entries[0].key, b"a");
647 assert_eq!(&*batch.entries[1].key, b"b");
648 assert_eq!(&*batch.entries[2].key, b"c");
649 }
650
651 #[test]
652 fn test_range_rev_next() {
653 let storage = SqlitePrimitiveStorage::in_memory();
654
655 let version = CommitVersion(1);
656 storage.set(
657 version,
658 HashMap::from([(
659 EntryKind::Multi,
660 vec![
661 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
662 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
663 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
664 ],
665 )]),
666 )
667 .unwrap();
668
669 let mut cursor = RangeCursor::new();
670 let batch = storage
671 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
672 .unwrap();
673
674 assert_eq!(batch.entries.len(), 3);
675 assert!(!batch.has_more);
676 assert!(cursor.exhausted);
677 assert_eq!(&*batch.entries[0].key, b"c");
678 assert_eq!(&*batch.entries[1].key, b"b");
679 assert_eq!(&*batch.entries[2].key, b"a");
680 }
681
682 #[test]
683 fn test_range_streaming_pagination() {
684 let storage = SqlitePrimitiveStorage::in_memory();
685
686 let version = CommitVersion(1);
687
688 let entries: Vec<_> =
690 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
691 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
692
693 let mut cursor = RangeCursor::new();
695
696 let batch1 = storage
698 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
699 .unwrap();
700 assert_eq!(batch1.entries.len(), 3);
701 assert!(batch1.has_more);
702 assert!(!cursor.exhausted);
703 assert_eq!(&*batch1.entries[0].key, &[0]);
704 assert_eq!(&*batch1.entries[2].key, &[2]);
705
706 let batch2 = storage
708 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
709 .unwrap();
710 assert_eq!(batch2.entries.len(), 3);
711 assert!(batch2.has_more);
712 assert!(!cursor.exhausted);
713 assert_eq!(&*batch2.entries[0].key, &[3]);
714 assert_eq!(&*batch2.entries[2].key, &[5]);
715 }
716
717 #[test]
718 fn test_range_reving_pagination() {
719 let storage = SqlitePrimitiveStorage::in_memory();
720
721 let version = CommitVersion(1);
722
723 let entries: Vec<_> =
725 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
726 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
727
728 let mut cursor = RangeCursor::new();
730
731 let batch1 = storage
733 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
734 .unwrap();
735 assert_eq!(batch1.entries.len(), 3);
736 assert!(batch1.has_more);
737 assert!(!cursor.exhausted);
738 assert_eq!(&*batch1.entries[0].key, &[9]);
739 assert_eq!(&*batch1.entries[2].key, &[7]);
740
741 let batch2 = storage
743 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
744 .unwrap();
745 assert_eq!(batch2.entries.len(), 3);
746 assert!(batch2.has_more);
747 assert!(!cursor.exhausted);
748 assert_eq!(&*batch2.entries[0].key, &[6]);
749 assert_eq!(&*batch2.entries[2].key, &[4]);
750 }
751
752 #[test]
753 fn test_get_nonexistent_table() {
754 let storage = SqlitePrimitiveStorage::in_memory();
755
756 let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
758 assert_eq!(value, None);
759 }
760
761 #[test]
762 fn test_range_nonexistent_table() {
763 let storage = SqlitePrimitiveStorage::in_memory();
764
765 let mut cursor = RangeCursor::new();
767 let batch = storage
768 .range_next(
769 EntryKind::Multi,
770 &mut cursor,
771 Bound::Unbounded,
772 Bound::Unbounded,
773 CommitVersion(1),
774 100,
775 )
776 .unwrap();
777 assert!(batch.entries.is_empty());
778 assert!(cursor.exhausted);
779 }
780
781 #[test]
782 fn test_drop_specific_version() {
783 let storage = SqlitePrimitiveStorage::in_memory();
784
785 let key = CowVec::new(b"key1".to_vec());
786
787 for v in 1..=3u64 {
789 storage.set(
790 CommitVersion(v),
791 HashMap::from([(
792 EntryKind::Multi,
793 vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
794 )]),
795 )
796 .unwrap();
797 }
798
799 storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
801
802 assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
804
805 assert_eq!(
807 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
808 Some(b"v2".as_slice())
809 );
810 assert_eq!(
811 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
812 Some(b"v3".as_slice())
813 );
814 }
815}