Skip to main content

reifydb_store_multi/buffer/memory/
storage.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	cmp::{Ordering, Reverse},
6	collections::{HashMap, HashSet},
7	ops::Bound,
8	sync::Arc,
9};
10
11use reifydb_core::{
12	common::CommitVersion,
13	encoded::key::EncodedKey,
14	interface::{
15		catalog::{flow::FlowNodeId, id::TableId, shape::ShapeId},
16		store::EntryKind,
17	},
18};
19use reifydb_type::{Result, util::cowvec::CowVec};
20use tracing::{Span, field, instrument};
21
22use super::entry::{CurrentMap, Entries, Entry, HistoricalMap, entry_id_to_key};
23use crate::tier::{HistoricalCursor, RangeBatch, RangeCursor, RawEntry, TierBackend, TierBatch, TierStorage};
24
25#[derive(Clone)]
26pub struct MemoryPrimitiveStorage {
27	inner: Arc<MemoryPrimitiveStorageInner>,
28}
29
30struct MemoryPrimitiveStorageInner {
31	entries: Entries,
32}
33
34impl Default for MemoryPrimitiveStorage {
35	fn default() -> Self {
36		Self::new()
37	}
38}
39
40impl MemoryPrimitiveStorage {
41	#[instrument(name = "store::multi::memory::new", level = "debug")]
42	pub fn new() -> Self {
43		Self {
44			inner: Arc::new(MemoryPrimitiveStorageInner {
45				entries: Entries::default(),
46			}),
47		}
48	}
49
50	pub fn count_current(&self, table: EntryKind) -> Result<u64> {
51		let table_key = entry_id_to_key(table);
52		Ok(self.inner.entries.data.get(&table_key).map(|e| e.current.read().len() as u64).unwrap_or(0))
53	}
54
55	pub fn list_all_entry_kinds(&self) -> Result<Vec<EntryKind>> {
56		let mut out = Vec::new();
57		for key in self.inner.entries.data.keys() {
58			if key == "multi" {
59				out.push(EntryKind::Multi);
60			} else if let Some(rest) = key.strip_prefix("source:")
61				&& let Ok(id) = rest.parse::<u64>()
62			{
63				out.push(EntryKind::Source(ShapeId::Table(TableId(id))));
64			} else if let Some(rest) = key.strip_prefix("operator:")
65				&& let Ok(id) = rest.parse::<u64>()
66			{
67				out.push(EntryKind::Operator(FlowNodeId(id)));
68			}
69		}
70		Ok(out)
71	}
72
73	pub fn count_historical(&self, table: EntryKind) -> Result<u64> {
74		let table_key = entry_id_to_key(table);
75		Ok(self.inner
76			.entries
77			.data
78			.get(&table_key)
79			.map(|e| {
80				let hist = e.historical.read();
81				hist.values().map(|m| m.len() as u64).sum()
82			})
83			.unwrap_or(0))
84	}
85
86	#[inline]
87	#[instrument(name = "store::multi::memory::get_or_create_table", level = "trace", skip(self), fields(table = ?table))]
88	fn get_or_create_table(&self, table: EntryKind) -> Entry {
89		let table_key = entry_id_to_key(table);
90		self.inner.entries.data.get_or_insert_with(table_key, Entry::new)
91	}
92
93	#[inline]
94	#[instrument(name = "store::multi::memory::set::table", level = "trace", skip(self, entries), fields(
95		table = ?table,
96		entry_count = entries.len(),
97	))]
98	fn process_table(
99		&self,
100		table: EntryKind,
101		version: CommitVersion,
102		entries: Vec<(EncodedKey, Option<CowVec<u8>>)>,
103	) {
104		let table_entry = self.get_or_create_table(table);
105		let mut current = table_entry.current.write();
106		let mut historical = table_entry.historical.write();
107
108		for (key, value) in entries {
109			if let Some((pre_version, pre_value)) = current.get(&key) {
110				if *pre_version < version {
111					let pre_version = *pre_version;
112					let pre_value = pre_value.clone();
113					historical
114						.entry(key.clone())
115						.or_default()
116						.insert(Reverse(pre_version), pre_value);
117
118					current.insert(key, (version, value));
119				} else {
120					historical.entry(key).or_default().insert(Reverse(version), value);
121				}
122			} else {
123				current.insert(key, (version, value));
124			}
125		}
126	}
127}
128
129impl TierStorage for MemoryPrimitiveStorage {
130	#[instrument(name = "store::multi::memory::get", level = "trace", skip(self, key), fields(table = ?table, key_len = key.len(), version = version.0))]
131	fn get(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<Option<CowVec<u8>>> {
132		let table_key = entry_id_to_key(table);
133		let entry = match self.inner.entries.data.get(&table_key) {
134			Some(e) => e,
135			None => return Ok(None),
136		};
137
138		let current = entry.current.read();
139		if let Some((cur_version, value)) = current.get(key)
140			&& *cur_version <= version
141		{
142			return Ok(value.clone());
143		}
144		drop(current);
145
146		let historical = entry.historical.read();
147		if let Some(versions) = historical.get(key) {
148			for (Reverse(v), value) in versions.range(Reverse(version)..) {
149				if *v <= version {
150					return Ok(value.clone());
151				}
152			}
153		}
154
155		Ok(None)
156	}
157
158	#[instrument(name = "store::multi::memory::contains", level = "trace", skip(self, key), fields(table = ?table, key_len = key.len(), version = version.0), ret)]
159	fn contains(&self, table: EntryKind, key: &[u8], version: CommitVersion) -> Result<bool> {
160		let table_key = entry_id_to_key(table);
161		let entry = match self.inner.entries.data.get(&table_key) {
162			Some(e) => e,
163			None => return Ok(false),
164		};
165
166		let current = entry.current.read();
167		if let Some((cur_version, value)) = current.get(key)
168			&& *cur_version <= version
169		{
170			return Ok(value.is_some());
171		}
172		drop(current);
173
174		let historical = entry.historical.read();
175		if let Some(versions) = historical.get(key) {
176			for (Reverse(v), value) in versions.range(Reverse(version)..) {
177				if *v <= version {
178					return Ok(value.is_some());
179				}
180			}
181		}
182
183		Ok(false)
184	}
185
186	#[instrument(name = "store::multi::memory::set", level = "trace", skip(self, batches), fields(
187		table_count = batches.len(),
188		total_entry_count = field::Empty,
189		version = version.0
190	))]
191	fn set(&self, version: CommitVersion, batches: TierBatch) -> Result<()> {
192		let total_entries: usize = batches.values().map(|v| v.len()).sum();
193
194		batches.into_iter().for_each(|(table, entries)| {
195			self.process_table(table, version, entries);
196		});
197
198		Span::current().record("total_entry_count", total_entries);
199		Ok(())
200	}
201
202	#[instrument(name = "store::multi::memory::range_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
203	fn range_next(
204		&self,
205		table: EntryKind,
206		cursor: &mut RangeCursor,
207		start: Bound<&[u8]>,
208		end: Bound<&[u8]>,
209		version: CommitVersion,
210		batch_size: usize,
211	) -> Result<RangeBatch> {
212		if cursor.exhausted {
213			return Ok(RangeBatch::empty());
214		}
215
216		let table_key = entry_id_to_key(table);
217		let entry = match self.inner.entries.data.get(&table_key) {
218			Some(e) => e,
219			None => {
220				cursor.exhausted = true;
221				return Ok(RangeBatch::empty());
222			}
223		};
224
225		let cursor_key = cursor.last_key.clone();
226
227		let current = entry.current.read();
228		let historical = entry.historical.read();
229
230		let mut entries: Vec<RawEntry> = Vec::with_capacity(batch_size + 1);
231
232		let iter_start: Bound<&[u8]> = match &cursor_key {
233			Some(last) => Bound::Excluded(last.as_slice()),
234			None => start,
235		};
236
237		let iter_end: Bound<&[u8]> = end;
238
239		let mut cur_iter = current.range::<[u8], _>((iter_start, iter_end)).peekable();
240		let mut hist_iter = historical.range::<[u8], _>((iter_start, iter_end)).peekable();
241
242		while entries.len() <= batch_size {
243			let (take_cur, take_hist) = match (cur_iter.peek(), hist_iter.peek()) {
244				(None, None) => break,
245				(Some(_), None) => (true, false),
246				(None, Some(_)) => (false, true),
247				(Some((kc, _)), Some((kh, _))) => match kc.cmp(kh) {
248					Ordering::Less => (true, false),
249					Ordering::Greater => (false, true),
250					Ordering::Equal => (true, true),
251				},
252			};
253
254			if take_cur && take_hist {
255				let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
256				let (_, versions) = hist_iter.next().unwrap();
257				if *cur_version <= version {
258					entries.push(RawEntry {
259						key: key.clone(),
260						version: *cur_version,
261						value: cur_value.clone(),
262					});
263				} else if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
264					entries.push(RawEntry {
265						key: key.clone(),
266						version: *v,
267						value: value.clone(),
268					});
269				}
270			} else if take_cur {
271				let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
272				if *cur_version <= version {
273					entries.push(RawEntry {
274						key: key.clone(),
275						version: *cur_version,
276						value: cur_value.clone(),
277					});
278				}
279			} else {
280				let (key, versions) = hist_iter.next().unwrap();
281				if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
282					entries.push(RawEntry {
283						key: key.clone(),
284						version: *v,
285						value: value.clone(),
286					});
287				}
288			}
289		}
290
291		let has_more = entries.len() > batch_size;
292		if has_more {
293			entries.truncate(batch_size);
294		}
295
296		if let Some(last_entry) = entries.last() {
297			cursor.last_key = Some(last_entry.key.clone());
298		}
299		if !has_more {
300			cursor.exhausted = true;
301		}
302
303		Ok(RangeBatch {
304			entries,
305			has_more,
306		})
307	}
308
309	#[instrument(name = "store::multi::memory::range_rev_next", level = "trace", skip(self, cursor, start, end), fields(table = ?table, batch_size = batch_size, version = version.0))]
310	fn range_rev_next(
311		&self,
312		table: EntryKind,
313		cursor: &mut RangeCursor,
314		start: Bound<&[u8]>,
315		end: Bound<&[u8]>,
316		version: CommitVersion,
317		batch_size: usize,
318	) -> Result<RangeBatch> {
319		if cursor.exhausted {
320			return Ok(RangeBatch::empty());
321		}
322
323		let table_key = entry_id_to_key(table);
324		let entry = match self.inner.entries.data.get(&table_key) {
325			Some(e) => e,
326			None => {
327				cursor.exhausted = true;
328				return Ok(RangeBatch::empty());
329			}
330		};
331
332		let cursor_key = cursor.last_key.clone();
333
334		let current = entry.current.read();
335		let historical = entry.historical.read();
336
337		let mut entries: Vec<RawEntry> = Vec::with_capacity(batch_size + 1);
338
339		let iter_start: Bound<&[u8]> = start;
340
341		let iter_end: Bound<&[u8]> = match &cursor_key {
342			Some(last) => Bound::Excluded(last.as_slice()),
343			None => end,
344		};
345
346		let mut cur_iter = current.range::<[u8], _>((iter_start, iter_end)).rev().peekable();
347		let mut hist_iter = historical.range::<[u8], _>((iter_start, iter_end)).rev().peekable();
348
349		while entries.len() <= batch_size {
350			let (take_cur, take_hist) = match (cur_iter.peek(), hist_iter.peek()) {
351				(None, None) => break,
352				(Some(_), None) => (true, false),
353				(None, Some(_)) => (false, true),
354				(Some((kc, _)), Some((kh, _))) => match kc.cmp(kh) {
355					Ordering::Greater => (true, false),
356					Ordering::Less => (false, true),
357					Ordering::Equal => (true, true),
358				},
359			};
360
361			if take_cur && take_hist {
362				let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
363				let (_, versions) = hist_iter.next().unwrap();
364				if *cur_version <= version {
365					entries.push(RawEntry {
366						key: key.clone(),
367						version: *cur_version,
368						value: cur_value.clone(),
369					});
370				} else if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
371					entries.push(RawEntry {
372						key: key.clone(),
373						version: *v,
374						value: value.clone(),
375					});
376				}
377			} else if take_cur {
378				let (key, (cur_version, cur_value)) = cur_iter.next().unwrap();
379				if *cur_version <= version {
380					entries.push(RawEntry {
381						key: key.clone(),
382						version: *cur_version,
383						value: cur_value.clone(),
384					});
385				}
386			} else {
387				let (key, versions) = hist_iter.next().unwrap();
388				if let Some((Reverse(v), value)) = versions.range(Reverse(version)..).next() {
389					entries.push(RawEntry {
390						key: key.clone(),
391						version: *v,
392						value: value.clone(),
393					});
394				}
395			}
396		}
397
398		let has_more = entries.len() > batch_size;
399		if has_more {
400			entries.truncate(batch_size);
401		}
402
403		if let Some(last_entry) = entries.last() {
404			cursor.last_key = Some(last_entry.key.clone());
405		}
406		if !has_more {
407			cursor.exhausted = true;
408		}
409
410		Ok(RangeBatch {
411			entries,
412			has_more,
413		})
414	}
415
416	#[instrument(name = "store::multi::memory::ensure_table", level = "trace", skip(self), fields(table = ?table))]
417	fn ensure_table(&self, table: EntryKind) -> Result<()> {
418		let _ = self.get_or_create_table(table);
419		Ok(())
420	}
421
422	#[instrument(name = "store::multi::memory::clear_table", level = "debug", skip(self), fields(table = ?table))]
423	fn clear_table(&self, table: EntryKind) -> Result<()> {
424		let table_key = entry_id_to_key(table);
425		if let Some(entry) = self.inner.entries.data.get(&table_key) {
426			*entry.current.write() = CurrentMap::new();
427			*entry.historical.write() = HistoricalMap::new();
428		}
429		Ok(())
430	}
431
432	#[instrument(name = "store::multi::memory::drop", level = "debug", skip(self, batches), fields(
433		table_count = batches.len(),
434		total_entry_count = field::Empty
435	))]
436	fn drop(&self, batches: HashMap<EntryKind, Vec<(EncodedKey, CommitVersion)>>) -> Result<()> {
437		let total_entries: usize = batches.values().map(|v| v.len()).sum();
438
439		for (table, entries) in batches {
440			let table_entry = self.get_or_create_table(table);
441			let mut current = table_entry.current.write();
442			let mut historical = table_entry.historical.write();
443
444			let mut by_key: HashMap<EncodedKey, Vec<CommitVersion>> = HashMap::new();
445			for (key, version) in entries {
446				by_key.entry(key).or_default().push(version);
447			}
448
449			for (key, dropped_versions) in by_key {
450				let dropped_set: HashSet<CommitVersion> = dropped_versions.iter().copied().collect();
451
452				let cur_version = current.get(&key).map(|(v, _)| *v);
453				let stored_hist_covered = historical
454					.get(&key)
455					.map(|m| m.keys().all(|Reverse(v)| dropped_set.contains(v)))
456					.unwrap_or(true);
457				let stored_cur_covered = cur_version.is_none_or(|v| dropped_set.contains(&v));
458
459				if stored_cur_covered && stored_hist_covered {
460					current.remove(&key);
461					historical.remove(&key);
462					continue;
463				}
464
465				for version in dropped_versions {
466					let cur_matches = current.get(&key).map(|(v, _)| *v) == Some(version);
467					if cur_matches {
468						let popped = historical.get_mut(&key).and_then(|v| v.pop_first());
469						let now_empty = historical.get(&key).is_some_and(|v| v.is_empty());
470						if now_empty {
471							historical.remove(&key);
472						}
473						match popped {
474							Some((Reverse(promoted_v), promoted_value)) => {
475								current.insert(
476									key.clone(),
477									(promoted_v, promoted_value),
478								);
479							}
480							None => {
481								current.remove(&key);
482							}
483						}
484					} else {
485						let now_empty = if let Some(versions) = historical.get_mut(&key) {
486							versions.remove(&Reverse(version));
487							versions.is_empty()
488						} else {
489							false
490						};
491						if now_empty {
492							historical.remove(&key);
493						}
494					}
495				}
496			}
497		}
498
499		Span::current().record("total_entry_count", total_entries);
500		Ok(())
501	}
502
503	#[instrument(name = "store::multi::memory::get_all_versions", level = "trace", skip(self, key), fields(table = ?table, key_len = key.len()))]
504	fn get_all_versions(&self, table: EntryKind, key: &[u8]) -> Result<Vec<(CommitVersion, Option<CowVec<u8>>)>> {
505		let table_key = entry_id_to_key(table);
506		let entry = match self.inner.entries.data.get(&table_key) {
507			Some(e) => e,
508			None => return Ok(Vec::new()),
509		};
510
511		let mut versions: Vec<(CommitVersion, Option<CowVec<u8>>)> = Vec::new();
512
513		let current = entry.current.read();
514		if let Some((cur_version, value)) = current.get(key) {
515			versions.push((*cur_version, value.clone()));
516		}
517		drop(current);
518
519		let historical = entry.historical.read();
520		if let Some(hist_versions) = historical.get(key) {
521			for (Reverse(v), value) in hist_versions.iter() {
522				versions.push((*v, value.clone()));
523			}
524		}
525
526		versions.sort_by(|a, b| b.0.cmp(&a.0));
527
528		Ok(versions)
529	}
530
531	#[instrument(name = "store::multi::memory::scan_historical_below", level = "trace", skip(self, cursor), fields(table = ?table, cutoff = cutoff.0, batch_size = batch_size))]
532	fn scan_historical_below(
533		&self,
534		table: EntryKind,
535		cutoff: CommitVersion,
536		cursor: &mut HistoricalCursor,
537		batch_size: usize,
538	) -> Result<Vec<(EncodedKey, CommitVersion)>> {
539		if cursor.exhausted || batch_size == 0 {
540			return Ok(Vec::new());
541		}
542
543		let table_key = entry_id_to_key(table);
544		let entry = match self.inner.entries.data.get(&table_key) {
545			Some(e) => e,
546			None => {
547				cursor.exhausted = true;
548				return Ok(Vec::new());
549			}
550		};
551
552		let historical = entry.historical.read();
553
554		let mut collected: Vec<(EncodedKey, CommitVersion)> = Vec::new();
555		let mut over_limit = false;
556
557		for (key, versions) in historical.iter() {
558			match (cursor.last_key.as_ref(), cursor.last_version) {
559				(Some(lk), _) if key < lk => continue,
560				(Some(lk), Some(lv)) if key == lk => {
561					for (Reverse(v), _value) in versions.iter().rev() {
562						if *v <= lv {
563							continue;
564						}
565						if *v >= cutoff {
566							continue;
567						}
568						collected.push((key.clone(), *v));
569						if collected.len() > batch_size {
570							over_limit = true;
571							break;
572						}
573					}
574				}
575				_ => {
576					for (Reverse(v), _value) in versions.iter().rev() {
577						if *v >= cutoff {
578							continue;
579						}
580						collected.push((key.clone(), *v));
581						if collected.len() > batch_size {
582							over_limit = true;
583							break;
584						}
585					}
586				}
587			}
588
589			if over_limit {
590				break;
591			}
592		}
593
594		collected.sort_by(|a, b| a.0.as_slice().cmp(b.0.as_slice()).then(a.1.0.cmp(&b.1.0)));
595
596		let has_more = collected.len() > batch_size;
597		if has_more {
598			collected.truncate(batch_size);
599		}
600
601		if let Some(last) = collected.last() {
602			cursor.last_key = Some(last.0.clone());
603			cursor.last_version = Some(last.1);
604		}
605		if !has_more {
606			cursor.exhausted = true;
607		}
608
609		Ok(collected)
610	}
611}
612
613impl TierBackend for MemoryPrimitiveStorage {}
614
615#[cfg(test)]
616pub mod tests {
617	use reifydb_core::interface::catalog::{id::TableId, shape::ShapeId};
618
619	use super::*;
620
621	#[test]
622	fn test_basic_operations() {
623		let storage = MemoryPrimitiveStorage::new();
624
625		let key = EncodedKey::new(b"key1".to_vec());
626		let version = CommitVersion(1);
627
628		// Put and get
629		storage.set(
630			version,
631			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value1".to_vec())))])]),
632		)
633		.unwrap();
634
635		let value = storage.get(EntryKind::Multi, &key, version).unwrap();
636		assert_eq!(value.as_deref(), Some(b"value1".as_slice()));
637
638		// Contains
639		assert!(storage.contains(EntryKind::Multi, &key, version).unwrap());
640
641		assert!(!storage.contains(EntryKind::Multi, b"nonexistent", version).unwrap());
642
643		// Delete (tombstone)
644		let version2 = CommitVersion(2);
645		storage.set(version2, HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
646		assert!(!storage.contains(EntryKind::Multi, &key, version2).unwrap());
647	}
648
649	#[test]
650	fn test_source_tables() {
651		let storage = MemoryPrimitiveStorage::new();
652
653		let source1 = ShapeId::Table(TableId(1));
654		let source2 = ShapeId::Table(TableId(2));
655
656		let key = EncodedKey::new(b"key".to_vec());
657		let version = CommitVersion(1);
658
659		storage.set(
660			version,
661			HashMap::from([(
662				EntryKind::Source(source1),
663				vec![(key.clone(), Some(CowVec::new(b"table1".to_vec())))],
664			)]),
665		)
666		.unwrap();
667		storage.set(
668			version,
669			HashMap::from([(
670				EntryKind::Source(source2),
671				vec![(key.clone(), Some(CowVec::new(b"table2".to_vec())))],
672			)]),
673		)
674		.unwrap();
675
676		assert_eq!(
677			storage.get(EntryKind::Source(source1), &key, version).unwrap().as_deref(),
678			Some(b"table1".as_slice())
679		);
680		assert_eq!(
681			storage.get(EntryKind::Source(source2), &key, version).unwrap().as_deref(),
682			Some(b"table2".as_slice())
683		);
684	}
685
686	#[test]
687	fn test_version_promotion_to_historical() {
688		let storage = MemoryPrimitiveStorage::new();
689
690		let key = EncodedKey::new(b"key1".to_vec());
691
692		// Insert version 1
693		storage.set(
694			CommitVersion(1),
695			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
696		)
697		.unwrap();
698
699		// Insert version 2 (v1 should be promoted to historical)
700		storage.set(
701			CommitVersion(2),
702			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v2".to_vec())))])]),
703		)
704		.unwrap();
705
706		// Insert version 3 (v2 should be promoted to historical)
707		storage.set(
708			CommitVersion(3),
709			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
710		)
711		.unwrap();
712
713		// Get at version 3 should return v3 (from current)
714		assert_eq!(
715			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
716			Some(b"v3".as_slice())
717		);
718
719		// Get at version 2 should return v2 (from historical)
720		assert_eq!(
721			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
722			Some(b"v2".as_slice())
723		);
724
725		// Get at version 1 should return v1 (from historical)
726		assert_eq!(
727			storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
728			Some(b"v1".as_slice())
729		);
730	}
731
732	#[test]
733	fn test_insert_older_version() {
734		let storage = MemoryPrimitiveStorage::new();
735
736		let key = EncodedKey::new(b"key1".to_vec());
737
738		// Insert version 3 first
739		storage.set(
740			CommitVersion(3),
741			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v3".to_vec())))])]),
742		)
743		.unwrap();
744
745		// Insert version 1 (older - should go directly to historical)
746		storage.set(
747			CommitVersion(1),
748			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"v1".to_vec())))])]),
749		)
750		.unwrap();
751
752		// Get at version 3 should return v3 (current)
753		assert_eq!(
754			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
755			Some(b"v3".as_slice())
756		);
757
758		// Get at version 1 should return v1 (historical)
759		assert_eq!(
760			storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
761			Some(b"v1".as_slice())
762		);
763
764		// Get at version 2 should return v1 (largest version <= 2)
765		assert_eq!(
766			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
767			Some(b"v1".as_slice())
768		);
769	}
770
771	#[test]
772	fn test_range_next() {
773		let storage = MemoryPrimitiveStorage::new();
774
775		let version = CommitVersion(1);
776		storage.set(
777			version,
778			HashMap::from([(
779				EntryKind::Multi,
780				vec![
781					(EncodedKey::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
782					(EncodedKey::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
783					(EncodedKey::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
784				],
785			)]),
786		)
787		.unwrap();
788
789		let mut cursor = RangeCursor::new();
790		let batch = storage
791			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
792			.unwrap();
793
794		assert_eq!(batch.entries.len(), 3);
795		assert!(!batch.has_more);
796		assert!(cursor.exhausted);
797
798		// Verify order
799		assert_eq!(&*batch.entries[0].key, b"a");
800		assert_eq!(&*batch.entries[1].key, b"b");
801		assert_eq!(&*batch.entries[2].key, b"c");
802	}
803
804	#[test]
805	fn test_range_rev_next() {
806		let storage = MemoryPrimitiveStorage::new();
807
808		let version = CommitVersion(1);
809		storage.set(
810			version,
811			HashMap::from([(
812				EntryKind::Multi,
813				vec![
814					(EncodedKey::new(b"a".to_vec()), Some(CowVec::new(b"1".to_vec()))),
815					(EncodedKey::new(b"b".to_vec()), Some(CowVec::new(b"2".to_vec()))),
816					(EncodedKey::new(b"c".to_vec()), Some(CowVec::new(b"3".to_vec()))),
817				],
818			)]),
819		)
820		.unwrap();
821
822		let mut cursor = RangeCursor::new();
823		let batch = storage
824			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 100)
825			.unwrap();
826
827		assert_eq!(batch.entries.len(), 3);
828		assert!(!batch.has_more);
829		assert!(cursor.exhausted);
830
831		// Verify reverse order
832		assert_eq!(&*batch.entries[0].key, b"c");
833		assert_eq!(&*batch.entries[1].key, b"b");
834		assert_eq!(&*batch.entries[2].key, b"a");
835	}
836
837	#[test]
838	fn test_range_streaming_pagination() {
839		let storage = MemoryPrimitiveStorage::new();
840
841		let version = CommitVersion(1);
842
843		// Insert 10 entries
844		let entries: Vec<_> =
845			(0..10u8).map(|i| (EncodedKey::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
846		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
847
848		// Use a single cursor to stream through all entries
849		let mut cursor = RangeCursor::new();
850
851		// First batch of 3
852		let batch1 = storage
853			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
854			.unwrap();
855		assert_eq!(batch1.entries.len(), 3);
856		assert!(batch1.has_more);
857		assert!(!cursor.exhausted);
858
859		assert_eq!(&*batch1.entries[0].key, &[0]);
860		assert_eq!(&*batch1.entries[2].key, &[2]);
861
862		// Second batch of 3
863		let batch2 = storage
864			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
865			.unwrap();
866		assert_eq!(batch2.entries.len(), 3);
867		assert!(batch2.has_more);
868		assert!(!cursor.exhausted);
869
870		assert_eq!(&*batch2.entries[0].key, &[3]);
871		assert_eq!(&*batch2.entries[2].key, &[5]);
872
873		// Third batch of 3
874		let batch3 = storage
875			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
876			.unwrap();
877		assert_eq!(batch3.entries.len(), 3);
878		assert!(batch3.has_more);
879		assert!(!cursor.exhausted);
880
881		assert_eq!(&*batch3.entries[0].key, &[6]);
882		assert_eq!(&*batch3.entries[2].key, &[8]);
883
884		// Fourth batch - only 1 entry remaining
885		let batch4 = storage
886			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
887			.unwrap();
888		assert_eq!(batch4.entries.len(), 1);
889		assert!(!batch4.has_more);
890		assert!(cursor.exhausted);
891
892		assert_eq!(&*batch4.entries[0].key, &[9]);
893
894		// Fifth call - exhausted
895		let batch5 = storage
896			.range_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
897			.unwrap();
898		assert!(batch5.entries.is_empty());
899	}
900
901	#[test]
902	fn test_range_reving_pagination() {
903		let storage = MemoryPrimitiveStorage::new();
904
905		let version = CommitVersion(1);
906
907		// Insert 10 entries
908		let entries: Vec<_> =
909			(0..10u8).map(|i| (EncodedKey::new(vec![i]), Some(CowVec::new(vec![i * 10])))).collect();
910		storage.set(version, HashMap::from([(EntryKind::Multi, entries)])).unwrap();
911
912		// Use a single cursor to stream in reverse
913		let mut cursor = RangeCursor::new();
914
915		// First batch of 3 (reverse)
916		let batch1 = storage
917			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
918			.unwrap();
919		assert_eq!(batch1.entries.len(), 3);
920		assert!(batch1.has_more);
921		assert!(!cursor.exhausted);
922
923		assert_eq!(&*batch1.entries[0].key, &[9]);
924		assert_eq!(&*batch1.entries[2].key, &[7]);
925
926		// Second batch
927		let batch2 = storage
928			.range_rev_next(EntryKind::Multi, &mut cursor, Bound::Unbounded, Bound::Unbounded, version, 3)
929			.unwrap();
930		assert_eq!(batch2.entries.len(), 3);
931		assert!(batch2.has_more);
932		assert!(!cursor.exhausted);
933
934		assert_eq!(&*batch2.entries[0].key, &[6]);
935		assert_eq!(&*batch2.entries[2].key, &[4]);
936	}
937
938	#[test]
939	fn test_drop_from_historical() {
940		let storage = MemoryPrimitiveStorage::new();
941
942		let key = EncodedKey::new(b"key1".to_vec());
943
944		// Insert versions 1, 2, 3
945		for v in 1..=3u64 {
946			storage.set(
947				CommitVersion(v),
948				HashMap::from([(
949					EntryKind::Multi,
950					vec![(key.clone(), Some(CowVec::new(format!("v{}", v).into_bytes())))],
951				)]),
952			)
953			.unwrap();
954		}
955
956		// Version 3 is in current, versions 1 and 2 are in historical
957		// Drop version 1 (from historical)
958		storage.drop(HashMap::from([(EntryKind::Multi, vec![(key.clone(), CommitVersion(1))])])).unwrap();
959
960		// Version 1 should no longer be accessible
961		assert!(storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().is_none());
962
963		// Versions 2 and 3 should still work
964		assert_eq!(
965			storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().as_deref(),
966			Some(b"v2".as_slice())
967		);
968		assert_eq!(
969			storage.get(EntryKind::Multi, &key, CommitVersion(3)).unwrap().as_deref(),
970			Some(b"v3".as_slice())
971		);
972	}
973
974	#[test]
975	fn test_tombstones() {
976		let storage = MemoryPrimitiveStorage::new();
977
978		let key = EncodedKey::new(b"key1".to_vec());
979
980		// Insert version 1 with value
981		storage.set(
982			CommitVersion(1),
983			HashMap::from([(EntryKind::Multi, vec![(key.clone(), Some(CowVec::new(b"value".to_vec())))])]),
984		)
985		.unwrap();
986
987		// Insert version 2 with tombstone
988		storage.set(CommitVersion(2), HashMap::from([(EntryKind::Multi, vec![(key.clone(), None)])])).unwrap();
989
990		// Get at version 2 should return None (tombstone)
991		assert!(storage.get(EntryKind::Multi, &key, CommitVersion(2)).unwrap().is_none());
992		assert!(!storage.contains(EntryKind::Multi, &key, CommitVersion(2)).unwrap());
993
994		// Get at version 1 should return value
995		assert_eq!(
996			storage.get(EntryKind::Multi, &key, CommitVersion(1)).unwrap().as_deref(),
997			Some(b"value".as_slice())
998		);
999	}
1000}