1use std::{
5 cmp::{Ordering, Reverse},
6 collections::{HashMap, HashSet},
7 ops::Bound,
8 sync::Arc,
9};
10
11use reifydb_core::{
12 common::CommitVersion,
13 encoded::key::EncodedKey,
14 interface::{
15 catalog::{flow::FlowNodeId, id::TableId, shape::ShapeId},
16 store::EntryKind,
17 },
18};
19use reifydb_type::{Result, util::cowvec::CowVec};
20use tracing::{Span, field, instrument};
21
22use super::entry::{CurrentMap, Entries, Entry, HistoricalMap, entry_id_to_key};
23use crate::tier::{HistoricalCursor, RangeBatch, RangeCursor, RawEntry, TierBackend, TierBatch, TierStorage};
24
25#[derive(Clone)]
26pub struct MemoryPrimitiveStorage {
27 inner: Arc<MemoryPrimitiveStorageInner>,
28}
29
30struct MemoryPrimitiveStorageInner {
31 entries: Entries,
32}
33
34impl Default for MemoryPrimitiveStorage {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl MemoryPrimitiveStorage {
41 #[instrument(name = "store::multi::memory::new", level = "debug")]
42 pub fn new() -> Self {
43 Self {
44 inner: Arc::new(MemoryPrimitiveStorageInner {
45 entries: Entries::default(),
46 }),
47 }
48 }
49
50 pub fn count_current(&self, table: EntryKind) -> Result<u64> {
51 let table_key = entry_id_to_key(table);
52 Ok(self.inner.entries.data.get(&table_key).map(|e| e.current.read().len() as u64).unwrap_or(0))
53 }
54
55 pub fn list_all_entry_kinds(&self) -> Result<Vec<EntryKind>> {
56 let mut out = Vec::new();
57 for key in self.inner.entries.data.keys() {
58 if key == "multi" {
59 out.push(EntryKind::Multi);
60 } else if let Some(rest) = key.strip_prefix("source:")
61 && let Ok(id) = rest.parse::<u64>()
62 {
63 out.push(EntryKind::Source(ShapeId::Table(TableId(id))));
64 } else if let Some(rest) = key.strip_prefix("operator:")
65 && let Ok(id) = rest.parse::<u64>()
66 {
67 out.push(EntryKind::Operator(FlowNodeId(id)));
68 }
69 }
70 Ok(out)
71 }
72
73 pub fn count_historical(&self, table: EntryKind) -> Result<u64> {
74 let table_key = entry_id_to_key(table);
75 Ok(self.inner
76 .entries
77 .data
78 .get(&table_key)
79 .map(|e| {
80 let hist = e.historical.read();
81 hist.values().map(|m| m.len() as u64).sum()
82 })
83 .unwrap_or(0))
84 }
85
86 #[inline]
87 #[instrument(name = "store::multi::memory::get_or_create_table", level = "trace", skip(self), fields(table = ?table))]
88 fn get_or_create_table(&self, table: EntryKind) -> Entry {
89 let table_key = entry_id_to_key(table);
90 self.inner.entries.data.get_or_insert_with(table_key, Entry::new)
91 }
92
93 #[inline]
94 #[instrument(name = "store::multi::memory::set::table", level = "trace", skip(self, entries), fields(
95 table = ?table,
96 entry_count = entries.len(),
97 ))]
98 fn process_table(
99 &self,
100 table: EntryKind,
101 version: CommitVersion,
102 entries: Vec<(EncodedKey, Option<CowVec<u8>>)>,
103 ) {
104 let table_entry = self.get_or_create_table(table);
105 let mut current = table_entry.current.write();
106 let mut historical = table_entry.historical.write();
107
108 for (key, value) in entries {
109 if let Some((pre_version, pre_value)) = current.get(&key) {
110 if *pre_version < version {
111 let pre_version = *pre_version;
112 let pre_value = pre_value.clone();
113 historical
114 .entry(key.clone())
115 .or_default()
116 .insert(Reverse(pre_version), pre_value);
117
118 current.insert(key, (version, value));
119 } else {
120 historical.entry(key).or_default().insert(Reverse(version), value);
121 }
122 } else {
123 current.insert(key, (version, value));
124 }
125 }
126 }
127}
128
129impl TierStorage for MemoryPrimitiveStorage {
130 #[instrument(name = "store::multi::memory::get", level = "trace", skip(self, key), fields(table = ?table, key_len = key.len(), version = version.0))]
131 fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
132 let table_key = entry_id_to_key(table);
133 let entry = match self.inner.entries.data.get(&table_key) {
134 Some(e) => e,
135 None => return Ok(None),
136 };
137
138 let current = entry.current.read();
139 if let Some((cur_version, value)) = current.get(key)
140 && *cur_version <= version
141 {
142 return Ok(value.clone());
143 }
144 drop(current);
145
146 let historical = entry.historical.read();
147 if let Some(versions) = historical.get(key) {
148 for (Reverse(v), value) in versions.range(Reverse(version)..) {
149 if *v <= version {
150 return Ok(value.clone());
151 }
152 }
153 }
154
155 Ok(None)
156 }
157
158 #[instrument(name = "store::multi::memory::contains", level = "trace", skip(self, key), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
159 fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
160 let table_key = entry_id_to_key(table);
161 let entry = match self.inner.entries.data.get(&table_key) {
162 Some(e) => e,
163 None => return Ok(false),
164 };
165
166 let current = entry.current.read();
167 if let Some((cur_version, value)) = current.get(key)
168 && *cur_version <= version
169 {
170 return Ok(value.is_some());
171 }
172 drop(current);
173
174 let historical = entry.historical.read();
175 if let Some(versions) = historical.get(key) {
176 for (Reverse(v), value) in versions.range(Reverse(version)..) {
177 if *v <= version {
178 return Ok(value.is_some());
179 }
180 }
181 }
182
183 Ok(false)
184 }
185
186 #[instrument(name = "store::multi::memory::set", level = "trace", skip(self, batches), fields(
187 table_count = batches.len(),
188 total_entry_count = field::Empty,
189 version = version.0
190 ))]
191 fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()> {
192 let total_entries: usize = batches.values().map(|v| v.len()).sum();
193
194 batches.into_iter().for_each(|(table, entries)| {
195 self.process_table(table, version, entries);
196 });
197
198 Span::current().record("total_entry_count", total_entries);
199 Ok(())
200 }
201
202 #[instrument(name = "store::multi::memory::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
203 fn range_next(
204 &self,
205 table: EntryKind,
206 cursor: &mut RangeCursor,
207 start: Bound<&[u8]>,
208 end: Bound<&[u8]>,
209 version: CommitVersion,
210 batch_size: usize,
211 ) -> Result<RangeBatch> {
212 if cursor.exhausted {
213 return Ok(RangeBatch::empty());
214 }
215
216 let table_key = entry_id_to_key(table);
217 let entry = match self.inner.entries.data.get(&table_key) {
218 Some(e) => e,
219 None => {
220 cursor.exhausted = true;
221 return Ok(RangeBatch::empty());
222 }
223 };
224
225 let cursor_key = cursor.last_key.clone();
226
227 let current = entry.current.read();
228 let historical = entry.historical.read();
229
230 let mut entries: Vec<RawEntry> = Vec::with_capacity(batch_size + 1);
231
232 let iter_start: Bound<&[u8]> = match &cursor_key {
233 Some(last) => Bound::Excluded(last.as_slice()),
234 None => start,
235 };
236
237 let iter_end: Bound<&[u8]> = end;
238
239 let mut cur_iter = current.range::<[u8], _>((iter_start, iter_end)).peekable();
240 let mut hist_iter = historical.range::<[u8], _>((iter_start, iter_end)).peekable();
241
242 while entries.len() <= batch_size {
243 let (take_cur, take_hist) = match (cur_iter.peek(), hist_iter.peek()) {
244 (None, None) => break,
245 (Some(_), None) => (true, false),
246 (None, Some(_)) => (false, true),
247 (Some((kc, _)), Some((kh, _))) => match kc.cmp(kh) {
248 Ordering::Less => (true, false),
249 Ordering::Greater => (false, true),
250 Ordering::Equal => (true, true),
251 },
252 };
253
254 if take_cur && take_hist {
255 let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
256 let (_, versions) = hist_iter.next().unwrap();
257 if *cur_version <= version {
258 entries.push(RawEntry {
259 key: key.clone(),
260 version: *cur_version,
261 value: cur_value.clone(),
262 });
263 } else if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
264 entries.push(RawEntry {
265 key: key.clone(),
266 version: *v,
267 value: value.clone(),
268 });
269 }
270 } else if take_cur {
271 let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
272 if *cur_version <= version {
273 entries.push(RawEntry {
274 key: key.clone(),
275 version: *cur_version,
276 value: cur_value.clone(),
277 });
278 }
279 } else {
280 let (key, versions) = hist_iter.next().unwrap();
281 if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
282 entries.push(RawEntry {
283 key: key.clone(),
284 version: *v,
285 value: value.clone(),
286 });
287 }
288 }
289 }
290
291 let has_more = entries.len() > batch_size;
292 if has_more {
293 entries.truncate(batch_size);
294 }
295
296 if let Some(last_entry) = entries.last() {
297 cursor.last_key = Some(last_entry.key.clone());
298 }
299 if !has_more {
300 cursor.exhausted = true;
301 }
302
303 Ok(RangeBatch {
304 entries,
305 has_more,
306 })
307 }
308
309 #[instrument(name = "store::multi::memory::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
310 fn range_rev_next(
311 &self,
312 table: EntryKind,
313 cursor: &mut RangeCursor,
314 start: Bound<&[u8]>,
315 end: Bound<&[u8]>,
316 version: CommitVersion,
317 batch_size: usize,
318 ) -> Result<RangeBatch> {
319 if cursor.exhausted {
320 return Ok(RangeBatch::empty());
321 }
322
323 let table_key = entry_id_to_key(table);
324 let entry = match self.inner.entries.data.get(&table_key) {
325 Some(e) => e,
326 None => {
327 cursor.exhausted = true;
328 return Ok(RangeBatch::empty());
329 }
330 };
331
332 let cursor_key = cursor.last_key.clone();
333
334 let current = entry.current.read();
335 let historical = entry.historical.read();
336
337 let mut entries: Vec<RawEntry> = Vec::with_capacity(batch_size + 1);
338
339 let iter_start: Bound<&[u8]> = start;
340
341 let iter_end: Bound<&[u8]> = match &cursor_key {
342 Some(last) => Bound::Excluded(last.as_slice()),
343 None => end,
344 };
345
346 let mut cur_iter = current.range::<[u8], _>((iter_start, iter_end)).rev().peekable();
347 let mut hist_iter = historical.range::<[u8], _>((iter_start, iter_end)).rev().peekable();
348
349 while entries.len() <= batch_size {
350 let (take_cur, take_hist) = match (cur_iter.peek(), hist_iter.peek()) {
351 (None, None) => break,
352 (Some(_), None) => (true, false),
353 (None, Some(_)) => (false, true),
354 (Some((kc, _)), Some((kh, _))) => match kc.cmp(kh) {
355 Ordering::Greater => (true, false),
356 Ordering::Less => (false, true),
357 Ordering::Equal => (true, true),
358 },
359 };
360
361 if take_cur && take_hist {
362 let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
363 let (_, versions) = hist_iter.next().unwrap();
364 if *cur_version <= version {
365 entries.push(RawEntry {
366 key: key.clone(),
367 version: *cur_version,
368 value: cur_value.clone(),
369 });
370 } else if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
371 entries.push(RawEntry {
372 key: key.clone(),
373 version: *v,
374 value: value.clone(),
375 });
376 }
377 } else if take_cur {
378 let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
379 if *cur_version <= version {
380 entries.push(RawEntry {
381 key: key.clone(),
382 version: *cur_version,
383 value: cur_value.clone(),
384 });
385 }
386 } else {
387 let (key, versions) = hist_iter.next().unwrap();
388 if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
389 entries.push(RawEntry {
390 key: key.clone(),
391 version: *v,
392 value: value.clone(),
393 });
394 }
395 }
396 }
397
398 let has_more = entries.len() > batch_size;
399 if has_more {
400 entries.truncate(batch_size);
401 }
402
403 if let Some(last_entry) = entries.last() {
404 cursor.last_key = Some(last_entry.key.clone());
405 }
406 if !has_more {
407 cursor.exhausted = true;
408 }
409
410 Ok(RangeBatch {
411 entries,
412 has_more,
413 })
414 }
415
416 #[instrument(name = "store::multi::memory::ensure_table", level = "trace", skip(self), fields(table = ?table))]
417 fn ensure_table(&self, table: EntryKind) -> Result<()> {
418 let _ = self.get_or_create_table(table);
419 Ok(())
420 }
421
422 #[instrument(name = "store::multi::memory::clear_table", level = "debug", skip(self), fields(table = ?table))]
423 fn clear_table(&self, table: EntryKind) -> Result<()> {
424 let table_key = entry_id_to_key(table);
425 if let Some(entry) = self.inner.entries.data.get(&table_key) {
426 *entry.current.write() = CurrentMap::new();
427 *entry.historical.write() = HistoricalMap::new();
428 }
429 Ok(())
430 }
431
432 #[instrument(name = "store::multi::memory::drop", level = "debug", skip(self, batches), fields(
433 table_count = batches.len(),
434 total_entry_count = field::Empty
435 ))]
436 fn drop(&self, batches: HashMap<EntryKind, Vec<(EncodedKey, CommitVersion)>>) -> Result<()> {
437 let total_entries: usize = batches.values().map(|v| v.len()).sum();
438
439 for (table, entries) in batches {
440 let table_entry = self.get_or_create_table(table);
441 let mut current = table_entry.current.write();
442 let mut historical = table_entry.historical.write();
443
444 let mut by_key: HashMap<EncodedKey, Vec<CommitVersion>> = HashMap::new();
445 for (key, version) in entries {
446 by_key.entry(key).or_default().push(version);
447 }
448
449 for (key, dropped_versions) in by_key {
450 let dropped_set: HashSet<CommitVersion> = dropped_versions.iter().copied().collect();
451
452 let cur_version = current.get(&key).map(|(v, _)| *v);
453 let stored_hist_covered = historical
454 .get(&key)
455 .map(|m| m.keys().all(|Reverse(v)| dropped_set.contains(v)))
456 .unwrap_or(true);
457 let stored_cur_covered = cur_version.is_none_or(|v| dropped_set.contains(&v));
458
459 if stored_cur_covered && stored_hist_covered {
460 current.remove(&key);
461 historical.remove(&key);
462 continue;
463 }
464
465 for version in dropped_versions {
466 let cur_matches = current.get(&key).map(|(v, _)| *v) == Some(version);
467 if cur_matches {
468 let popped = historical.get_mut(&key).and_then(|v| v.pop_first());
469 let now_empty = historical.get(&key).is_some_and(|v| v.is_empty());
470 if now_empty {
471 historical.remove(&key);
472 }
473 match popped {
474 Some((Reverse(promoted_v), promoted_value)) => {
475 current.insert(
476 key.clone(),
477 (promoted_v, promoted_value),
478 );
479 }
480 None => {
481 current.remove(&key);
482 }
483 }
484 } else {
485 let now_empty = if let Some(versions) = historical.get_mut(&key) {
486 versions.remove(&Reverse(version));
487 versions.is_empty()
488 } else {
489 false
490 };
491 if now_empty {
492 historical.remove(&key);
493 }
494 }
495 }
496 }
497 }
498
499 Span::current().record("total_entry_count", total_entries);
500 Ok(())
501 }
502
503 #[instrument(name = "store::multi::memory::get_all_versions", level = "trace", skip(self, key), fields(table = ?table, key_len = key.len()))]
504 fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
505 let table_key = entry_id_to_key(table);
506 let entry = match self.inner.entries.data.get(&table_key) {
507 Some(e) => e,
508 None => return Ok(Vec::new()),
509 };
510
511 let mut versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = Vec::new();
512
513 let current = entry.current.read();
514 if let Some((cur_version, value)) = current.get(key) {
515 versions.push((*cur_version, value.clone()));
516 }
517 drop(current);
518
519 let historical = entry.historical.read();
520 if let Some(hist_versions) = historical.get(key) {
521 for (Reverse(v), value) in hist_versions.iter() {
522 versions.push((*v, value.clone()));
523 }
524 }
525
526 versions.sort_by(|a, b| b.0.cmp(&a.0));
527
528 Ok(versions)
529 }
530
531 #[instrument(name = "store::multi::memory::scan_historical_below", level = "trace", skip(self, cursor), fields(table = ?table, cutoff = cutoff.0, batch_size = batch_size))]
532 fn scan_historical_below(
533 &self,
534 table: EntryKind,
535 cutoff: CommitVersion,
536 cursor: &mut HistoricalCursor,
537 batch_size: usize,
538 ) -> Result<Vec<(EncodedKey, CommitVersion)>> {
539 if cursor.exhausted || batch_size == 0 {
540 return Ok(Vec::new());
541 }
542
543 let table_key = entry_id_to_key(table);
544 let entry = match self.inner.entries.data.get(&table_key) {
545 Some(e) => e,
546 None => {
547 cursor.exhausted = true;
548 return Ok(Vec::new());
549 }
550 };
551
552 let historical = entry.historical.read();
553
554 let mut collected: Vec<(EncodedKey, CommitVersion)> = Vec::new();
555 let mut over_limit = false;
556
557 for (key, versions) in historical.iter() {
558 match (cursor.last_key.as_ref(), cursor.last_version) {
559 (Some(lk), _) if key < lk => continue,
560 (Some(lk), Some(lv)) if key == lk => {
561 for (Reverse(v), _value) in versions.iter().rev() {
562 if *v <= lv {
563 continue;
564 }
565 if *v >= cutoff {
566 continue;
567 }
568 collected.push((key.clone(), *v));
569 if collected.len() > batch_size {
570 over_limit = true;
571 break;
572 }
573 }
574 }
575 _ => {
576 for (Reverse(v), _value) in versions.iter().rev() {
577 if *v >= cutoff {
578 continue;
579 }
580 collected.push((key.clone(), *v));
581 if collected.len() > batch_size {
582 over_limit = true;
583 break;
584 }
585 }
586 }
587 }
588
589 if over_limit {
590 break;
591 }
592 }
593
594 collected.sort_by(|a, b| a.0.as_slice().cmp(b.0.as_slice()).then(a.1.0.cmp(&b.1.0)));
595
596 let has_more = collected.len() > batch_size;
597 if has_more {
598 collected.truncate(batch_size);
599 }
600
601 if let Some(last) = collected.last() {
602 cursor.last_key = Some(last.0.clone());
603 cursor.last_version = Some(last.1);
604 }
605 if !has_more {
606 cursor.exhausted = true;
607 }
608
609 Ok(collected)
610 }
611}
612
613impl TierBackend for MemoryPrimitiveStorage {}
614
615#[cfg(test)]
616pub mod tests {
617 use reifydb_core::interface::catalog::{id::TableId, shape::ShapeId};
618
619 use super::*;
620
621 #[test]
622 fn test_basic_operations() {
623 let storage = MemoryPrimitiveStorage::new();
624
625 let key = EncodedKey::new(b"key1".to_vec());
626 let version = CommitVersion(1);
627
628 storage.set(
630 version,
631 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
632 )
633 .unwrap();
634
635 let value = storage.get(EntryKind::Multi, &key, version).unwrap();
636 assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
637
638 assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
640
641 assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
642
643 let version2 = CommitVersion(2);
645 storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
646 assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
647 }
648
649 #[test]
650 fn test_source_tables() {
651 let storage = MemoryPrimitiveStorage::new();
652
653 let source1 = ShapeId::Table(TableId(1));
654 let source2 = ShapeId::Table(TableId(2));
655
656 let key = EncodedKey::new(b"key".to_vec());
657 let version = CommitVersion(1);
658
659 storage.set(
660 version,
661 HashMap::from([(
662 EntryKind::Source(source1),
663 vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
664 )]),
665 )
666 .unwrap();
667 storage.set(
668 version,
669 HashMap::from([(
670 EntryKind::Source(source2),
671 vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
672 )]),
673 )
674 .unwrap();
675
676 assert_eq!(
677 storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
678 Some(b"table1".as_slice())
679 );
680 assert_eq!(
681 storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
682 Some(b"table2".as_slice())
683 );
684 }
685
686 #[test]
687 fn test_version_promotion_to_historical() {
688 let storage = MemoryPrimitiveStorage::new();
689
690 let key = EncodedKey::new(b"key1".to_vec());
691
692 storage.set(
694 CommitVersion(1),
695 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
696 )
697 .unwrap();
698
699 storage.set(
701 CommitVersion(2),
702 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
703 )
704 .unwrap();
705
706 storage.set(
708 CommitVersion(3),
709 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
710 )
711 .unwrap();
712
713 assert_eq!(
715 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
716 Some(b"v3".as_slice())
717 );
718
719 assert_eq!(
721 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
722 Some(b"v2".as_slice())
723 );
724
725 assert_eq!(
727 storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
728 Some(b"v1".as_slice())
729 );
730 }
731
732 #[test]
733 fn test_insert_older_version() {
734 let storage = MemoryPrimitiveStorage::new();
735
736 let key = EncodedKey::new(b"key1".to_vec());
737
738 storage.set(
740 CommitVersion(3),
741 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
742 )
743 .unwrap();
744
745 storage.set(
747 CommitVersion(1),
748 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
749 )
750 .unwrap();
751
752 assert_eq!(
754 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
755 Some(b"v3".as_slice())
756 );
757
758 assert_eq!(
760 storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
761 Some(b"v1".as_slice())
762 );
763
764 assert_eq!(
766 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
767 Some(b"v1".as_slice())
768 );
769 }
770
771 #[test]
772 fn test_range_next() {
773 let storage = MemoryPrimitiveStorage::new();
774
775 let version = CommitVersion(1);
776 storage.set(
777 version,
778 HashMap::from([(
779 EntryKind::Multi,
780 vec![
781 (EncodedKey::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
782 (EncodedKey::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
783 (EncodedKey::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
784 ],
785 )]),
786 )
787 .unwrap();
788
789 let mut cursor = RangeCursor::new();
790 let batch = storage
791 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
792 .unwrap();
793
794 assert_eq!(batch.entries.len(), 3);
795 assert!(!batch.has_more);
796 assert!(cursor.exhausted);
797
798 assert_eq!(&*batch.entries[0].key, b"a");
800 assert_eq!(&*batch.entries[1].key, b"b");
801 assert_eq!(&*batch.entries[2].key, b"c");
802 }
803
804 #[test]
805 fn test_range_rev_next() {
806 let storage = MemoryPrimitiveStorage::new();
807
808 let version = CommitVersion(1);
809 storage.set(
810 version,
811 HashMap::from([(
812 EntryKind::Multi,
813 vec![
814 (EncodedKey::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
815 (EncodedKey::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
816 (EncodedKey::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
817 ],
818 )]),
819 )
820 .unwrap();
821
822 let mut cursor = RangeCursor::new();
823 let batch = storage
824 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
825 .unwrap();
826
827 assert_eq!(batch.entries.len(), 3);
828 assert!(!batch.has_more);
829 assert!(cursor.exhausted);
830
831 assert_eq!(&*batch.entries[0].key, b"c");
833 assert_eq!(&*batch.entries[1].key, b"b");
834 assert_eq!(&*batch.entries[2].key, b"a");
835 }
836
837 #[test]
838 fn test_range_streaming_pagination() {
839 let storage = MemoryPrimitiveStorage::new();
840
841 let version = CommitVersion(1);
842
843 let entries: Vec<_> =
845 (0..10u8).map(|i| (EncodedKey::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
846 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
847
848 let mut cursor = RangeCursor::new();
850
851 let batch1 = storage
853 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
854 .unwrap();
855 assert_eq!(batch1.entries.len(), 3);
856 assert!(batch1.has_more);
857 assert!(!cursor.exhausted);
858
859 assert_eq!(&*batch1.entries[0].key, &[0]);
860 assert_eq!(&*batch1.entries[2].key, &[2]);
861
862 let batch2 = storage
864 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
865 .unwrap();
866 assert_eq!(batch2.entries.len(), 3);
867 assert!(batch2.has_more);
868 assert!(!cursor.exhausted);
869
870 assert_eq!(&*batch2.entries[0].key, &[3]);
871 assert_eq!(&*batch2.entries[2].key, &[5]);
872
873 let batch3 = storage
875 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
876 .unwrap();
877 assert_eq!(batch3.entries.len(), 3);
878 assert!(batch3.has_more);
879 assert!(!cursor.exhausted);
880
881 assert_eq!(&*batch3.entries[0].key, &[6]);
882 assert_eq!(&*batch3.entries[2].key, &[8]);
883
884 let batch4 = storage
886 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
887 .unwrap();
888 assert_eq!(batch4.entries.len(), 1);
889 assert!(!batch4.has_more);
890 assert!(cursor.exhausted);
891
892 assert_eq!(&*batch4.entries[0].key, &[9]);
893
894 let batch5 = storage
896 .range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
897 .unwrap();
898 assert!(batch5.entries.is_empty());
899 }
900
901 #[test]
902 fn test_range_reving_pagination() {
903 let storage = MemoryPrimitiveStorage::new();
904
905 let version = CommitVersion(1);
906
907 let entries: Vec<_> =
909 (0..10u8).map(|i| (EncodedKey::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
910 storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
911
912 let mut cursor = RangeCursor::new();
914
915 let batch1 = storage
917 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
918 .unwrap();
919 assert_eq!(batch1.entries.len(), 3);
920 assert!(batch1.has_more);
921 assert!(!cursor.exhausted);
922
923 assert_eq!(&*batch1.entries[0].key, &[9]);
924 assert_eq!(&*batch1.entries[2].key, &[7]);
925
926 let batch2 = storage
928 .range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
929 .unwrap();
930 assert_eq!(batch2.entries.len(), 3);
931 assert!(batch2.has_more);
932 assert!(!cursor.exhausted);
933
934 assert_eq!(&*batch2.entries[0].key, &[6]);
935 assert_eq!(&*batch2.entries[2].key, &[4]);
936 }
937
938 #[test]
939 fn test_drop_from_historical() {
940 let storage = MemoryPrimitiveStorage::new();
941
942 let key = EncodedKey::new(b"key1".to_vec());
943
944 for v in 1..=3u64 {
946 storage.set(
947 CommitVersion(v),
948 HashMap::from([(
949 EntryKind::Multi,
950 vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
951 )]),
952 )
953 .unwrap();
954 }
955
956 storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
959
960 assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
962
963 assert_eq!(
965 storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
966 Some(b"v2".as_slice())
967 );
968 assert_eq!(
969 storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
970 Some(b"v3".as_slice())
971 );
972 }
973
974 #[test]
975 fn test_tombstones() {
976 let storage = MemoryPrimitiveStorage::new();
977
978 let key = EncodedKey::new(b"key1".to_vec());
979
980 storage.set(
982 CommitVersion(1),
983 HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value".to_vec())))])]),
984 )
985 .unwrap();
986
987 storage.set(CommitVersion(2), HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
989
990 assert!(storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().is_none());
992 assert!(!storage.contains(EntryKind::Multi, &key, CommitVersion(2)).unwrap());
993
994 assert_eq!(
996 storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
997 Some(b"value".as_slice())
998 );
999 }
1000}