1use std::{collections::HashMap, ops::Bound, sync::Arc};
5
6use reifydb_core::{common::CommitVersion, error::diagnostic::internal::internal, interface::store::EntryKind};
7use reifydb_runtime::sync::mutex::Mutex;
8use reifydb_sqlite::{
9 SqliteConfig,
10 connection::{connect, convert_flags, resolve_db_path},
11 pragma,
12};
13use reifydb_type::{Result, error, util::cowvec::CowVec};
14use rusqlite::{
15 Connection, Error::QueryReturnedNoRows, Result as SqliteResult, ToSql, Transaction as SqliteTransaction, params,
16};
17use tracing::instrument;
18
19use super::{
20 entry::entry_id_to_name,
21 query::{build_versioned_range_query, version_to_bytes},
22};
23use crate::tier::{RangeBatch, RangeCursor, RawEntry, TierBackend, TierBatch, TierStorage};
24
25#[derive(Clone)]
30pub struct SqlitePrimitiveStorage {
31 inner: Arc<SqlitePrimitiveStorageInner>,
32}
33
34struct SqlitePrimitiveStorageInner {
35 conn: Mutex<Connection>,
39}
40
41impl SqlitePrimitiveStorage {
42 #[instrument(name = "store::multi::sqlite::new", level = "debug", skip(config), fields(
44 db_path = ?config.path,
45 page_size = config.page_size,
46 journal_mode = %config.journal_mode.as_str()
47 ))]
48 pub fn new(config: SqliteConfig) -> Self {
49 let db_path = resolve_db_path(config.path.clone(), "primitive.db");
50 let flags = convert_flags(&config.flags);
51
52 let conn = connect(&db_path, flags).expect("Failed to connect to database");
53 pragma::apply(&conn, &config).expect("Failed to configure SQLite pragmas");
54
55 Self {
56 inner: Arc::new(SqlitePrimitiveStorageInner {
57 conn: Mutex::new(conn),
58 }),
59 }
60 }
61
62 pub fn in_memory() -> Self {
64 Self::new(SqliteConfig::in_memory())
65 }
66
67 pub fn incremental_vacuum(&self) {
69 let _ = pragma::incremental_vacuum(&self.inner.conn.lock());
70 }
71
72 pub fn shrink_memory(&self) {
74 let _ = pragma::shrink_memory(&self.inner.conn.lock());
75 }
76
77 pub fn shutdown(&self) {
79 let _ = pragma::shutdown(&self.inner.conn.lock());
80 }
81
82 fn create_table_if_needed(conn: &Connection, table_name: &str) -> SqliteResult<()> {
84 conn.execute(
85 &format!(
86 "CREATE TABLE IF NOT EXISTS \"{}\" (
87 key BLOB NOT NULL,
88 version BLOB NOT NULL,
89 value BLOB,
90 PRIMARY KEY (key, version)
91 ) WITHOUT ROWID",
92 table_name
93 ),
94 [],
95 )?;
96 Ok(())
97 }
98}
99
100impl TierStorage for SqlitePrimitiveStorage {
101 #[instrument(name = "store::multi::sqlite::get", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0))]
102 fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
103 let table_name = entry_id_to_name(table);
104 let conn = self.inner.conn.lock();
105
106 let result = conn.query_row(
108 &format!(
109 "SELECT value FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
110 table_name
111 ),
112 params![key, version_to_bytes(version).as_slice()],
113 |row| row.get::<_, Option<Vec<u8>>>(0),
114 );
115
116 match result {
117 Ok(Some(value)) => Ok(Some(CowVec::new(value))),
118 Ok(None) => Ok(None), Err(QueryReturnedNoRows) => Ok(None),
120 Err(e) if e.to_string().contains("no such table") => Ok(None),
121 Err(e) => Err(error!(internal(format!("Failed to get: {}", e)))),
122 }
123 }
124
125 #[instrument(name = "store::multi::sqlite::contains", level = "trace", skip(self), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
126 fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
127 let table_name = entry_id_to_name(table);
128 let conn = self.inner.conn.lock();
129
130 let result = conn.query_row(
132 &format!(
133 "SELECT value IS NOT NULL FROM \"{}\" WHERE key = ?1 AND version <= ?2 ORDER BY version DESC LIMIT 1",
134 table_name
135 ),
136 params![key, version_to_bytes(version).as_slice()],
137 |row| row.get::<_, bool>(0),
138 );
139
140 match result {
141 Ok(has_value) => Ok(has_value),
142 Err(QueryReturnedNoRows) => Ok(false),
143 Err(e) if e.to_string().contains("no such table") => Ok(false),
144 Err(e) => Err(error!(internal(format!("Failed to check contains: {}", e)))),
145 }
146 }
147
148 #[instrument(name = "store::multi::sqlite::set", level = "debug", skip(self, batches), fields(table_count = batches.len(), version = version.0))]
149 fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()> {
150 if batches.is_empty() {
151 return Ok(());
152 }
153
154 let conn = self.inner.conn.lock();
155 let tx = conn
156 .unchecked_transaction()
157 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
158
159 for (table, entries) in batches {
160 let table_name = entry_id_to_name(table);
161
162 let result = insert_versioned_entries_in_tx(&tx, &table_name, version, &entries);
164 if let Err(e) = result {
165 if e.to_string().contains("no such table") {
166 Self::create_table_if_needed(&tx, &table_name).map_err(|e| {
167 error!(internal(format!("Failed to create table: {}", e)))
168 })?;
169 insert_versioned_entries_in_tx(&tx, &table_name, version, &entries).map_err(
170 |e| error!(internal(format!("Failed to insert entries: {}", e))),
171 )?;
172 } else {
173 return Err(error!(internal(format!("Failed to insert entries: {}", e))));
174 }
175 }
176 }
177
178 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
179 }
180
181 #[instrument(name = "store::multi::sqlite::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
182 fn range_next(
183 &self,
184 table: EntryKind,
185 cursor: &mut RangeCursor,
186 start: Bound<&[u8]>,
187 end: Bound<&[u8]>,
188 version: CommitVersion,
189 batch_size: usize,
190 ) -> Result<RangeBatch> {
191 if cursor.exhausted {
192 return Ok(RangeBatch::empty());
193 }
194
195 let table_name = entry_id_to_name(table);
196
197 let effective_start: Bound<Vec<u8>> = match &cursor.last_key {
199 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
200 None => bound_to_owned(start),
201 };
202 let end_owned = bound_to_owned(end);
203
204 let conn = self.inner.conn.lock();
205
206 let start_ref = bound_as_ref(&effective_start);
207 let end_ref = bound_as_ref(&end_owned);
208 let (query, params) =
209 build_versioned_range_query(&table_name, start_ref, end_ref, version, false, batch_size + 1);
210
211 let mut stmt = match conn.prepare(&query) {
212 Ok(stmt) => stmt,
213 Err(e) if e.to_string().contains("no such table") => {
214 cursor.exhausted = true;
215 return Ok(RangeBatch::empty());
216 }
217 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
218 };
219
220 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
221
222 let entries: Vec<RawEntry> = stmt
223 .query_map(params_refs.as_slice(), |row| {
224 let key: Vec<u8> = row.get(0)?;
225 let version_bytes: Vec<u8> = row.get(1)?;
226 let value: Option<Vec<u8>> = row.get(2)?;
227 let version = u64::from_be_bytes(
228 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
229 );
230 Ok(RawEntry {
231 key: CowVec::new(key),
232 version: CommitVersion(version),
233 value: value.map(CowVec::new),
234 })
235 })
236 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
237 .filter_map(|r| r.ok())
238 .collect();
239
240 let has_more = entries.len() > batch_size;
241 let entries = if has_more {
242 entries.into_iter().take(batch_size).collect()
243 } else {
244 entries
245 };
246
247 let batch = RangeBatch {
248 entries,
249 has_more,
250 };
251
252 if let Some(last_entry) = batch.entries.last() {
254 cursor.last_key = Some(last_entry.key.clone());
255 }
256 if !batch.has_more {
257 cursor.exhausted = true;
258 }
259
260 Ok(batch)
261 }
262
263 #[instrument(name = "store::multi::sqlite::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
264 fn range_rev_next(
265 &self,
266 table: EntryKind,
267 cursor: &mut RangeCursor,
268 start: Bound<&[u8]>,
269 end: Bound<&[u8]>,
270 version: CommitVersion,
271 batch_size: usize,
272 ) -> Result<RangeBatch> {
273 if cursor.exhausted {
274 return Ok(RangeBatch::empty());
275 }
276
277 let table_name = entry_id_to_name(table);
278
279 let start_owned = bound_to_owned(start);
281 let effective_end: Bound<Vec<u8>> = match &cursor.last_key {
282 Some(last) => Bound::Excluded(last.as_slice().to_vec()),
283 None => bound_to_owned(end),
284 };
285
286 let conn = self.inner.conn.lock();
287
288 let start_ref = bound_as_ref(&start_owned);
289 let end_ref = bound_as_ref(&effective_end);
290 let (query, params) =
291 build_versioned_range_query(&table_name, start_ref, end_ref, version, true, batch_size + 1);
292
293 let mut stmt = match conn.prepare(&query) {
294 Ok(stmt) => stmt,
295 Err(e) if e.to_string().contains("no such table") => {
296 cursor.exhausted = true;
297 return Ok(RangeBatch::empty());
298 }
299 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
300 };
301
302 let params_refs: Vec<&dyn ToSql> = params.iter().map(|p| p as &dyn ToSql).collect();
303
304 let entries: Vec<RawEntry> = stmt
305 .query_map(params_refs.as_slice(), |row| {
306 let key: Vec<u8> = row.get(0)?;
307 let version_bytes: Vec<u8> = row.get(1)?;
308 let value: Option<Vec<u8>> = row.get(2)?;
309 let version = u64::from_be_bytes(
310 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
311 );
312 Ok(RawEntry {
313 key: CowVec::new(key),
314 version: CommitVersion(version),
315 value: value.map(CowVec::new),
316 })
317 })
318 .map_err(|e| error!(internal(format!("Failed to query range: {}", e))))?
319 .filter_map(|r| r.ok())
320 .collect();
321
322 let has_more = entries.len() > batch_size;
323 let entries = if has_more {
324 entries.into_iter().take(batch_size).collect()
325 } else {
326 entries
327 };
328
329 let batch = RangeBatch {
330 entries,
331 has_more,
332 };
333
334 if let Some(last_entry) = batch.entries.last() {
336 cursor.last_key = Some(last_entry.key.clone());
337 }
338 if !batch.has_more {
339 cursor.exhausted = true;
340 }
341
342 Ok(batch)
343 }
344
345 fn ensure_table(&self, table: EntryKind) -> Result<()> {
346 let table_name = entry_id_to_name(table);
347 let conn = self.inner.conn.lock();
348
349 Self::create_table_if_needed(&conn, &table_name)
350 .map_err(|e| error!(internal(format!("Failed to ensure table: {}", e))))
351 }
352
353 fn clear_table(&self, table: EntryKind) -> Result<()> {
354 let table_name = entry_id_to_name(table);
355 let conn = self.inner.conn.lock();
356
357 let result = conn.execute(&format!("DELETE FROM \"{}\"", table_name), []);
358
359 match result {
360 Ok(_) => Ok(()),
361 Err(e) if e.to_string().contains("no such table") => Ok(()),
362 Err(e) => Err(error!(internal(format!("Failed to clear table: {}", e)))),
363 }
364 }
365
366 #[instrument(name = "store::multi::sqlite::drop", level = "debug", skip(self, batches), fields(table_count = batches.len()))]
367 fn drop(&self, batches: HashMap<EntryKind, Vec<(CowVec<u8>, CommitVersion)>>) -> Result<()> {
368 if batches.is_empty() {
369 return Ok(());
370 }
371
372 let conn = self.inner.conn.lock();
373 let tx = conn
374 .unchecked_transaction()
375 .map_err(|e| error!(internal(format!("Failed to start transaction: {}", e))))?;
376
377 for (table, entries) in batches {
378 let table_name = entry_id_to_name(table);
379
380 let max_version_sql = format!("SELECT MAX(version) FROM \"{}\" WHERE key = ?1", table_name);
381 let delete_all_sql = format!("DELETE FROM \"{}\" WHERE key = ?1", table_name);
382 let delete_one_sql = format!("DELETE FROM \"{}\" WHERE key = ?1 AND version = ?2", table_name);
383
384 let mut max_version_stmt = match tx.prepare(&max_version_sql) {
385 Ok(s) => s,
386 Err(e) if e.to_string().contains("no such table") => continue,
387 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
388 };
389 let mut delete_all_stmt = tx
390 .prepare(&delete_all_sql)
391 .map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
392 let mut delete_one_stmt = tx
393 .prepare(&delete_one_sql)
394 .map_err(|e| error!(internal(format!("Failed to prepare delete: {}", e))))?;
395
396 for (key, version) in entries {
397 let version_bytes = version_to_bytes(version);
398
399 let max_version: Option<Vec<u8>> = max_version_stmt
400 .query_row(params![key.as_slice()], |row| row.get(0))
401 .unwrap_or(None);
402
403 let is_latest = max_version.as_deref() == Some(version_bytes.as_slice());
404
405 let result = if is_latest {
406 delete_all_stmt.execute(params![key.as_slice()])
407 } else {
408 delete_one_stmt.execute(params![key.as_slice(), version_bytes.as_slice()])
409 };
410
411 if let Err(e) = result
412 && !e.to_string().contains("no such table")
413 {
414 return Err(error!(internal(format!("Failed to delete entry: {}", e))));
415 }
416 }
417 }
418
419 tx.commit().map_err(|e| error!(internal(format!("Failed to commit transaction: {}", e))))
420 }
421
422 #[instrument(name = "store::multi::sqlite::get_all_versions", level = "trace", skip(self), fields(table = ?table, key_len = key.len()))]
423 fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
424 let table_name = entry_id_to_name(table);
425 let conn = self.inner.conn.lock();
426
427 let mut stmt = match conn.prepare(&format!(
428 "SELECT version, value FROM \"{}\" WHERE key = ?1 ORDER BY version DESC",
429 table_name
430 )) {
431 Ok(stmt) => stmt,
432 Err(e) if e.to_string().contains("no such table") => return Ok(Vec::new()),
433 Err(e) => return Err(error!(internal(format!("Failed to prepare query: {}", e)))),
434 };
435
436 let versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = stmt
437 .query_map(params![key], |row| {
438 let version_bytes: Vec<u8> = row.get(0)?;
439 let value: Option<Vec<u8>> = row.get(1)?;
440 let version = u64::from_be_bytes(
441 version_bytes.as_slice().try_into().expect("version must be 8 bytes"),
442 );
443 Ok((CommitVersion(version), value.map(CowVec::new)))
444 })
445 .map_err(|e| error!(internal(format!("Failed to query versions: {}", e))))?
446 .filter_map(|r| r.ok())
447 .collect();
448
449 Ok(versions)
450 }
451}
452
453impl TierBackend for SqlitePrimitiveStorage {}
454
455fn bound_as_ref(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
457 match bound {
458 Bound::Included(v) => Bound::Included(v.as_slice()),
459 Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
460 Bound::Unbounded => Bound::Unbounded,
461 }
462}
463
464fn bound_to_owned(bound: Bound<&[u8]>) -> Bound<Vec<u8>> {
466 match bound {
467 Bound::Included(v) => Bound::Included(v.to_vec()),
468 Bound::Excluded(v) => Bound::Excluded(v.to_vec()),
469 Bound::Unbounded => Bound::Unbounded,
470 }
471}
472
473fn insert_versioned_entries_in_tx(
475 tx: &SqliteTransaction,
476 table_name: &str,
477 version: CommitVersion,
478 entries: &[(CowVec<u8>, Option<CowVec<u8>>)],
479) -> SqliteResult<()> {
480 let version_bytes = version_to_bytes(version);
481 let sql = format!("INSERT OR REPLACE INTO \"{}\" (key, version, value) VALUES (?1, ?2, ?3)", table_name);
482 let mut stmt = tx.prepare(&sql)?;
483 for (key, value) in entries {
484 stmt.execute(params![key.as_slice(), version_bytes.as_slice(), value.as_ref().map(|v| v.as_slice())])?;
485 }
486 Ok(())
487}
488
489#[cfg(test)]
490pub mod tests {
491 use reifydb_core::interface::catalog::{id::TableId, shape::ShapeId};
492
493 use super::*;
494
495 #[test]
496 fn test_basic_operations() {
497 let storage = SqlitePrimitiveStorage::in_memory();
498
499 let key = CowVec::new(b"key1".to_vec());
500 let version = CommitVersion(1);
501
502 storage.set(
504 version,
505 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
506 )
507 .unwrap();
508 let value = storage.get(EntryKind::Multi, &key, version).unwrap();
509 assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
510
511 assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
513 assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
514
515 let version2 = CommitVersion(2);
517 storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
518 assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
519 }
520
521 #[test]
522 fn test_source_tables() {
523 let storage = SqlitePrimitiveStorage::in_memory();
524
525 let source1 = ShapeId::Table(TableId(1));
526 let source2 = ShapeId::Table(TableId(2));
527 let key = CowVec::new(b"key".to_vec());
528 let version = CommitVersion(1);
529
530 storage.set(
531 version,
532 HashMap::from([(
533 EntryKind::Source(source1),
534 vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
535 )]),
536 )
537 .unwrap();
538 storage.set(
539 version,
540 HashMap::from([(
541 EntryKind::Source(source2),
542 vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
543 )]),
544 )
545 .unwrap();
546
547 assert_eq!(
548 storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
549 Some(b"table1".as_slice())
550 );
551 assert_eq!(
552 storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
553 Some(b"table2".as_slice())
554 );
555 }
556
557 #[test]
558 fn test_version_queries() {
559 let storage = SqlitePrimitiveStorage::in_memory();
560
561 let key = CowVec::new(b"key1".to_vec());
562
563 storage.set(
565 CommitVersion(1),
566 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
567 )
568 .unwrap();
569 storage.set(
570 CommitVersion(2),
571 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
572 )
573 .unwrap();
574 storage.set(
575 CommitVersion(3),
576 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
577 )
578 .unwrap();
579
580 assert_eq!(
582 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
583 Some(b"v3".as_slice())
584 );
585 assert_eq!(
586 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
587 Some(b"v2".as_slice())
588 );
589 assert_eq!(
590 storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
591 Some(b"v1".as_slice())
592 );
593
594 assert_eq!(
596 storage.get(EntryKind::Multi, &key, CommitVersion(10)).unwrap().as_deref(),
597 Some(b"v3".as_slice())
598 );
599 }
600
601 #[test]
602 fn test_range_next() {
603 let storage = SqlitePrimitiveStorage::in_memory();
604
605 let version = CommitVersion(1);
606 storage.set(
607 version,
608 HashMap::from([(
609 EntryKind::Multi,
610 vec![
611 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
612 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
613 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
614 ],
615 )]),
616 )
617 .unwrap();
618
619 let mut cursor = RangeCursor::new();
620 let batch = storage
621 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
622 .unwrap();
623
624 assert_eq!(batch.entries.len(), 3);
625 assert!(!batch.has_more);
626 assert!(cursor.exhausted);
627 assert_eq!(&*batch.entries[0].key, b"a");
628 assert_eq!(&*batch.entries[1].key, b"b");
629 assert_eq!(&*batch.entries[2].key, b"c");
630 }
631
632 #[test]
633 fn test_range_rev_next() {
634 let storage = SqlitePrimitiveStorage::in_memory();
635
636 let version = CommitVersion(1);
637 storage.set(
638 version,
639 HashMap::from([(
640 EntryKind::Multi,
641 vec![
642 (CowVec::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
643 (CowVec::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
644 (CowVec::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
645 ],
646 )]),
647 )
648 .unwrap();
649
650 let mut cursor = RangeCursor::new();
651 let batch = storage
652 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
653 .unwrap();
654
655 assert_eq!(batch.entries.len(), 3);
656 assert!(!batch.has_more);
657 assert!(cursor.exhausted);
658 assert_eq!(&*batch.entries[0].key, b"c");
659 assert_eq!(&*batch.entries[1].key, b"b");
660 assert_eq!(&*batch.entries[2].key, b"a");
661 }
662
663 #[test]
664 fn test_range_streaming_pagination() {
665 let storage = SqlitePrimitiveStorage::in_memory();
666
667 let version = CommitVersion(1);
668
669 let entries: Vec<_> =
671 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
672 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
673
674 let mut cursor = RangeCursor::new();
676
677 let batch1 = storage
679 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
680 .unwrap();
681 assert_eq!(batch1.entries.len(), 3);
682 assert!(batch1.has_more);
683 assert!(!cursor.exhausted);
684 assert_eq!(&*batch1.entries[0].key, &[0]);
685 assert_eq!(&*batch1.entries[2].key, &[2]);
686
687 let batch2 = storage
689 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
690 .unwrap();
691 assert_eq!(batch2.entries.len(), 3);
692 assert!(batch2.has_more);
693 assert!(!cursor.exhausted);
694 assert_eq!(&*batch2.entries[0].key, &[3]);
695 assert_eq!(&*batch2.entries[2].key, &[5]);
696 }
697
698 #[test]
699 fn test_range_reving_pagination() {
700 let storage = SqlitePrimitiveStorage::in_memory();
701
702 let version = CommitVersion(1);
703
704 let entries: Vec<_> =
706 (0..10u8).map(|i| (CowVec::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
707 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
708
709 let mut cursor = RangeCursor::new();
711
712 let batch1 = storage
714 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
715 .unwrap();
716 assert_eq!(batch1.entries.len(), 3);
717 assert!(batch1.has_more);
718 assert!(!cursor.exhausted);
719 assert_eq!(&*batch1.entries[0].key, &[9]);
720 assert_eq!(&*batch1.entries[2].key, &[7]);
721
722 let batch2 = storage
724 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
725 .unwrap();
726 assert_eq!(batch2.entries.len(), 3);
727 assert!(batch2.has_more);
728 assert!(!cursor.exhausted);
729 assert_eq!(&*batch2.entries[0].key, &[6]);
730 assert_eq!(&*batch2.entries[2].key, &[4]);
731 }
732
733 #[test]
734 fn test_get_nonexistent_table() {
735 let storage = SqlitePrimitiveStorage::in_memory();
736
737 let value = storage.get(EntryKind::Multi, b"key", CommitVersion(1)).unwrap();
739 assert_eq!(value, None);
740 }
741
742 #[test]
743 fn test_range_nonexistent_table() {
744 let storage = SqlitePrimitiveStorage::in_memory();
745
746 let mut cursor = RangeCursor::new();
748 let batch = storage
749 .range_next(
750 EntryKind::Multi,
751 &mut cursor,
752 Bound::Unbounded,
753 Bound::Unbounded,
754 CommitVersion(1),
755 100,
756 )
757 .unwrap();
758 assert!(batch.entries.is_empty());
759 assert!(cursor.exhausted);
760 }
761
762 #[test]
763 fn test_drop_specific_version() {
764 let storage = SqlitePrimitiveStorage::in_memory();
765
766 let key = CowVec::new(b"key1".to_vec());
767
768 for v in 1..=3u64 {
770 storage.set(
771 CommitVersion(v),
772 HashMap::from([(
773 EntryKind::Multi,
774 vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
775 )]),
776 )
777 .unwrap();
778 }
779
780 storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
782
783 assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
785
786 assert_eq!(
788 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
789 Some(b"v2".as_slice())
790 );
791 assert_eq!(
792 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
793 Some(b"v3".as_slice())
794 );
795 }
796}