Skip to main content

reifydb_store_multi/store/
multi.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{BTreeMap, HashMap, HashSet},
6	ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10	actors::drop::{DropMessage, DropRequest},
11	common::CommitVersion,
12	delta::Delta,
13	encoded::{
14		key::{EncodedKey, EncodedKeyRange},
15		row::EncodedRow,
16	},
17	event::metric::{MultiCommittedEvent, MultiDelete, MultiWrite},
18	interface::store::{
19		EntryKind, MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet,
20		MultiVersionGetPrevious, MultiVersionRow, MultiVersionStore,
21	},
22};
23use reifydb_type::util::{cowvec::CowVec, hex};
24use tracing::{instrument, warn};
25
26use super::{
27	StandardMultiStore,
28	router::{classify_key, classify_range, is_single_version_semantics_key},
29	version::{VersionedGetResult, get_at_version},
30};
31use crate::{
32	Result,
33	buffer::tier::MultiBufferTier,
34	persistent::MultiPersistentTier,
35	tier::{RangeBatch, RangeCursor, TierBatch, TierStorage},
36};
37
38const TIER_SCAN_CHUNK_SIZE: usize = 32;
39
40impl MultiVersionGet for StandardMultiStore {
41	#[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42	fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43		let table = classify_key(key);
44
45		if let Some(buffer) = &self.buffer {
46			match get_at_version(buffer, table, key.as_ref(), version)? {
47				VersionedGetResult::Value {
48					value,
49					version: v,
50				} => {
51					return Ok(Some(MultiVersionRow {
52						key: key.clone(),
53						row: EncodedRow(value),
54						version: v,
55					}));
56				}
57				VersionedGetResult::Tombstone => return Ok(None),
58				VersionedGetResult::NotFound => {}
59			}
60		}
61
62		if let Some(persistent) = &self.persistent {
63			match get_at_version(persistent, table, key.as_ref(), version)? {
64				VersionedGetResult::Value {
65					value,
66					version: v,
67				} => {
68					return Ok(Some(MultiVersionRow {
69						key: key.clone(),
70						row: EncodedRow(value),
71						version: v,
72					}));
73				}
74				VersionedGetResult::Tombstone => return Ok(None),
75				VersionedGetResult::NotFound => {}
76			}
77		}
78
79		Ok(None)
80	}
81}
82
83impl MultiVersionContains for StandardMultiStore {
84	#[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
85	fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
86		Ok(MultiVersionGet::get(self, key, version)?.is_some())
87	}
88}
89
90impl MultiVersionCommit for StandardMultiStore {
91	#[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
92	fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
93		let classified = classify_deltas(&deltas);
94		let drop_batch = build_drop_batch(classified.explicit_drops, &classified.pending_set_keys, version);
95		self.dispatch_drops(drop_batch);
96
97		if let Some(buffer) = &self.buffer {
98			buffer.set(version, classified.batches)?;
99		} else if let Some(persistent) = &self.persistent {
100			persistent.set(version, classified.batches)?;
101		} else {
102			return Ok(());
103		}
104
105		self.emit_commit_metrics(classified.writes, classified.deletes, version);
106		Ok(())
107	}
108}
109
110struct ClassifiedDeltas {
111	pending_set_keys: HashSet<EncodedKey>,
112	writes: Vec<MultiWrite>,
113	deletes: Vec<MultiDelete>,
114	batches: TierBatch,
115	explicit_drops: Vec<(EntryKind, EncodedKey)>,
116}
117
118#[inline]
119fn classify_deltas(deltas: &CowVec<Delta>) -> ClassifiedDeltas {
120	let mut pending_set_keys: HashSet<EncodedKey> = HashSet::new();
121	let mut writes: Vec<MultiWrite> = Vec::new();
122	let mut deletes: Vec<MultiDelete> = Vec::new();
123	let mut batches: TierBatch = HashMap::new();
124	let mut explicit_drops: Vec<(EntryKind, EncodedKey)> = Vec::new();
125
126	for delta in deltas.iter() {
127		let key = delta.key();
128		let table = classify_key(key);
129		let is_single_version = is_single_version_semantics_key(key);
130
131		match delta {
132			Delta::Set {
133				key,
134				row,
135			} => {
136				if is_single_version {
137					pending_set_keys.insert(key.clone());
138				}
139				writes.push(MultiWrite {
140					key: key.clone(),
141					value_bytes: row.len() as u64,
142				});
143				batches.entry(table).or_default().push((key.clone(), Some(row.0.clone())));
144			}
145			Delta::Unset {
146				key,
147				row,
148			} => {
149				deletes.push(MultiDelete {
150					key: key.clone(),
151					value_bytes: row.len() as u64,
152				});
153				batches.entry(table).or_default().push((key.clone(), None));
154			}
155			Delta::Remove {
156				key,
157			} => {
158				deletes.push(MultiDelete {
159					key: key.clone(),
160					value_bytes: 0,
161				});
162				batches.entry(table).or_default().push((key.clone(), None));
163			}
164			Delta::Drop {
165				key,
166			} => {
167				explicit_drops.push((table, key.clone()));
168			}
169		}
170	}
171
172	ClassifiedDeltas {
173		pending_set_keys,
174		writes,
175		deletes,
176		batches,
177		explicit_drops,
178	}
179}
180
181#[inline]
182fn build_drop_batch(
183	explicit_drops: Vec<(EntryKind, EncodedKey)>,
184	pending_set_keys: &HashSet<EncodedKey>,
185	version: CommitVersion,
186) -> Vec<DropRequest> {
187	let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
188	for (table, key) in explicit_drops {
189		let pending_version = if pending_set_keys.contains(key.as_ref()) {
190			Some(version)
191		} else {
192			None
193		};
194		drop_batch.push(DropRequest {
195			table,
196			key,
197			commit_version: version,
198			pending_version,
199		});
200	}
201	for key in pending_set_keys.iter() {
202		let encoded = EncodedKey::new(key.to_vec());
203		let table = classify_key(&encoded);
204		drop_batch.push(DropRequest {
205			table,
206			key: encoded,
207			commit_version: version,
208			pending_version: Some(version),
209		});
210	}
211	drop_batch
212}
213
214impl StandardMultiStore {
215	#[inline]
216	fn dispatch_drops(&self, drop_batch: Vec<DropRequest>) {
217		if drop_batch.is_empty() {
218			return;
219		}
220		if let Some(actor) = &self.drop_actor
221			&& actor.send_blocking(DropMessage::Batch(drop_batch)).is_err()
222		{
223			warn!("Failed to send drop batch");
224		}
225	}
226
227	#[inline]
228	fn emit_commit_metrics(&self, writes: Vec<MultiWrite>, deletes: Vec<MultiDelete>, version: CommitVersion) {
229		if writes.is_empty() && deletes.is_empty() {
230			return;
231		}
232		self.event_bus.emit(MultiCommittedEvent::new(writes, deletes, vec![], version));
233	}
234}
235
236#[derive(Debug, Clone, Default)]
237pub struct MultiVersionRangeCursor {
238	pub buffer: RangeCursor,
239
240	pub persistent: RangeCursor,
241
242	pub exhausted: bool,
243}
244
245impl MultiVersionRangeCursor {
246	pub fn new() -> Self {
247		Self::default()
248	}
249
250	pub fn is_exhausted(&self) -> bool {
251		self.exhausted
252	}
253}
254
255pub struct TierScanQuery<'a> {
256	pub table: EntryKind,
257	pub start: &'a [u8],
258	pub end: &'a [u8],
259	pub version: CommitVersion,
260	pub range: &'a EncodedKeyRange,
261}
262
263pub fn scan_tier_chunk<S: TierStorage>(
264	storage: &S,
265	cursor: &mut RangeCursor,
266	scan: &TierScanQuery,
267	collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
268) -> Result<bool> {
269	let batch = storage.range_next(
270		scan.table,
271		cursor,
272		Bound::Included(scan.start),
273		Bound::Included(scan.end),
274		scan.version,
275		TIER_SCAN_CHUNK_SIZE,
276	)?;
277	merge_tier_batch(batch, scan.range, collected)
278}
279
280pub fn scan_tier_chunk_rev<S: TierStorage>(
281	storage: &S,
282	cursor: &mut RangeCursor,
283	scan: &TierScanQuery,
284	collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
285) -> Result<bool> {
286	let batch = storage.range_rev_next(
287		scan.table,
288		cursor,
289		Bound::Included(scan.start),
290		Bound::Included(scan.end),
291		scan.version,
292		TIER_SCAN_CHUNK_SIZE,
293	)?;
294	merge_tier_batch(batch, scan.range, collected)
295}
296
297#[inline]
298fn merge_tier_batch(
299	batch: RangeBatch,
300	range: &EncodedKeyRange,
301	collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
302) -> Result<bool> {
303	if batch.entries.is_empty() {
304		return Ok(false);
305	}
306
307	for entry in batch.entries {
308		let original_key = entry.key.as_slice().to_vec();
309		let entry_version = entry.version;
310
311		let original_key_encoded = EncodedKey::new(original_key.clone());
312		if !range.contains(&original_key_encoded) {
313			continue;
314		}
315
316		let should_update = match collected.get(&original_key) {
317			None => true,
318			Some((existing_version, _)) => entry_version > *existing_version,
319		};
320
321		if should_update {
322			collected.insert(original_key, (entry_version, entry.value));
323		}
324	}
325
326	Ok(true)
327}
328
329#[inline]
330pub fn collected_to_batch(
331	collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
332	has_more: bool,
333) -> MultiVersionBatch {
334	let items: Vec<MultiVersionRow> = collected
335		.into_iter()
336		.filter_map(|(key_bytes, (v, value))| {
337			value.map(|val| MultiVersionRow {
338				key: EncodedKey::new(key_bytes),
339				row: EncodedRow(val),
340				version: v,
341			})
342		})
343		.collect();
344
345	MultiVersionBatch {
346		items,
347		has_more,
348	}
349}
350
351#[inline]
352fn step_all_tiers(
353	buffer: Option<&MultiBufferTier>,
354	buffer_cursor: &mut RangeCursor,
355	persistent: Option<&MultiPersistentTier>,
356	persistent_cursor: &mut RangeCursor,
357	scan: &TierScanQuery,
358	collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
359) -> Result<bool> {
360	let mut any_progress = false;
361	if let Some(s) = buffer
362		&& !buffer_cursor.exhausted
363	{
364		any_progress |= scan_tier_chunk(s, buffer_cursor, scan, collected)?;
365	}
366	if let Some(s) = persistent
367		&& !persistent_cursor.exhausted
368	{
369		any_progress |= scan_tier_chunk(s, persistent_cursor, scan, collected)?;
370	}
371	Ok(any_progress)
372}
373
374pub fn scan_tiers_latest(
375	buffer: Option<&MultiBufferTier>,
376	persistent: Option<&MultiPersistentTier>,
377	range: EncodedKeyRange,
378	version: CommitVersion,
379	max_keys: usize,
380) -> Result<MultiVersionBatch> {
381	let table = classify_key_range(&range);
382	let (start, end) = make_range_bounds(&range);
383	let scan = TierScanQuery {
384		table,
385		start: &start,
386		end: &end,
387		version,
388		range: &range,
389	};
390
391	let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
392	let mut buffer_cursor = RangeCursor::default();
393	let mut persistent_cursor = RangeCursor::default();
394	let mut exhausted = false;
395
396	while collected.len() < max_keys {
397		let progress = step_all_tiers(
398			buffer,
399			&mut buffer_cursor,
400			persistent,
401			&mut persistent_cursor,
402			&scan,
403			&mut collected,
404		)?;
405		if !progress {
406			exhausted = true;
407			break;
408		}
409	}
410
411	Ok(collected_to_batch(collected, !exhausted))
412}
413
414impl StandardMultiStore {
415	pub fn range_next(
416		&self,
417		cursor: &mut MultiVersionRangeCursor,
418		range: EncodedKeyRange,
419		version: CommitVersion,
420		batch_size: u64,
421	) -> Result<MultiVersionBatch> {
422		if cursor.exhausted {
423			return Ok(MultiVersionBatch {
424				items: Vec::new(),
425				has_more: false,
426			});
427		}
428
429		mark_unconfigured_exhausted(self, cursor);
430
431		let table = classify_key_range(&range);
432		let (start, end) = make_range_bounds(&range);
433		let batch_size = batch_size as usize;
434		let scan = TierScanQuery {
435			table,
436			start: &start,
437			end: &end,
438			version,
439			range: &range,
440		};
441
442		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
443
444		while collected.len() < batch_size {
445			let progress = step_all_tiers(
446				self.buffer.as_ref(),
447				&mut cursor.buffer,
448				self.persistent.as_ref(),
449				&mut cursor.persistent,
450				&scan,
451				&mut collected,
452			)?;
453			if !progress {
454				cursor.exhausted = true;
455				break;
456			}
457		}
458
459		apply_forward_horizon(cursor, &mut collected);
460
461		let items: Vec<MultiVersionRow> = collected
462			.into_iter()
463			.filter_map(|(key_bytes, (v, value))| {
464				value.map(|val| MultiVersionRow {
465					key: EncodedKey::new(key_bytes),
466					row: EncodedRow(val),
467					version: v,
468				})
469			})
470			.collect();
471
472		let has_more = !cursor.exhausted;
473
474		Ok(MultiVersionBatch {
475			items,
476			has_more,
477		})
478	}
479
480	pub fn range(
481		&self,
482		range: EncodedKeyRange,
483		version: CommitVersion,
484		batch_size: usize,
485	) -> MultiVersionRangeIter {
486		MultiVersionRangeIter {
487			store: self.clone(),
488			cursor: MultiVersionRangeCursor::new(),
489			range,
490			version,
491			batch_size,
492			current_batch: Vec::new(),
493			current_index: 0,
494		}
495	}
496
497	pub fn range_rev(
498		&self,
499		range: EncodedKeyRange,
500		version: CommitVersion,
501		batch_size: usize,
502	) -> MultiVersionRangeRevIter {
503		MultiVersionRangeRevIter {
504			store: self.clone(),
505			cursor: MultiVersionRangeCursor::new(),
506			range,
507			version,
508			batch_size,
509			current_batch: Vec::new(),
510			current_index: 0,
511		}
512	}
513
514	fn range_rev_next(
515		&self,
516		cursor: &mut MultiVersionRangeCursor,
517		range: EncodedKeyRange,
518		version: CommitVersion,
519		batch_size: u64,
520	) -> Result<MultiVersionBatch> {
521		if cursor.exhausted {
522			return Ok(MultiVersionBatch {
523				items: Vec::new(),
524				has_more: false,
525			});
526		}
527
528		mark_unconfigured_exhausted(self, cursor);
529
530		let table = classify_key_range(&range);
531		let (start, end) = make_range_bounds(&range);
532		let batch_size = batch_size as usize;
533		let scan = TierScanQuery {
534			table,
535			start: &start,
536			end: &end,
537			version,
538			range: &range,
539		};
540
541		let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
542
543		while collected.len() < batch_size {
544			let mut any_progress = false;
545
546			if let Some(buffer) = &self.buffer
547				&& !cursor.buffer.exhausted
548			{
549				any_progress |= scan_tier_chunk_rev(buffer, &mut cursor.buffer, &scan, &mut collected)?;
550			}
551
552			if let Some(persistent) = &self.persistent
553				&& !cursor.persistent.exhausted
554			{
555				any_progress |=
556					scan_tier_chunk_rev(persistent, &mut cursor.persistent, &scan, &mut collected)?;
557			}
558
559			if !any_progress {
560				cursor.exhausted = true;
561				break;
562			}
563		}
564
565		apply_reverse_horizon(cursor, &mut collected);
566
567		let items: Vec<MultiVersionRow> = collected
568			.into_iter()
569			.rev()
570			.filter_map(|(key_bytes, (v, value))| {
571				value.map(|val| MultiVersionRow {
572					key: EncodedKey::new(key_bytes),
573					row: EncodedRow(val),
574					version: v,
575				})
576			})
577			.collect();
578
579		let has_more = !cursor.exhausted;
580
581		Ok(MultiVersionBatch {
582			items,
583			has_more,
584		})
585	}
586}
587
588fn mark_unconfigured_exhausted(store: &StandardMultiStore, cursor: &mut MultiVersionRangeCursor) {
589	if store.buffer.is_none() {
590		cursor.buffer.exhausted = true;
591	}
592	if store.persistent.is_none() {
593		cursor.persistent.exhausted = true;
594	}
595}
596
597fn apply_forward_horizon(
598	cursor: &mut MultiVersionRangeCursor,
599	collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
600) {
601	let horizon = forward_horizon(cursor);
602	if let Some(h) = horizon {
603		collected.retain(|k, _| k.as_slice() <= h.as_slice());
604		rewind_over_advanced_forward(cursor, &h);
605	}
606}
607
608fn apply_reverse_horizon(
609	cursor: &mut MultiVersionRangeCursor,
610	collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
611) {
612	let horizon = reverse_horizon(cursor);
613	if let Some(h) = horizon {
614		collected.retain(|k, _| k.as_slice() >= h.as_slice());
615		rewind_over_advanced_reverse(cursor, &h);
616	}
617}
618
619fn forward_horizon(cursor: &MultiVersionRangeCursor) -> Option<EncodedKey> {
620	let mut horizon: Option<EncodedKey> = None;
621	for tier in [&cursor.buffer, &cursor.persistent] {
622		if tier.exhausted {
623			continue;
624		}
625		let last = match &tier.last_key {
626			Some(k) => k.clone(),
627
628			None => return None,
629		};
630		horizon = Some(match horizon {
631			None => last,
632			Some(prev) => {
633				if last.as_slice() < prev.as_slice() {
634					last
635				} else {
636					prev
637				}
638			}
639		});
640	}
641	horizon
642}
643
644fn reverse_horizon(cursor: &MultiVersionRangeCursor) -> Option<EncodedKey> {
645	let mut horizon: Option<EncodedKey> = None;
646	for tier in [&cursor.buffer, &cursor.persistent] {
647		if tier.exhausted {
648			continue;
649		}
650		let last = match &tier.last_key {
651			Some(k) => k.clone(),
652			None => return None,
653		};
654		horizon = Some(match horizon {
655			None => last,
656			Some(prev) => {
657				if last.as_slice() > prev.as_slice() {
658					last
659				} else {
660					prev
661				}
662			}
663		});
664	}
665	horizon
666}
667
668fn rewind_over_advanced_forward(cursor: &mut MultiVersionRangeCursor, horizon: &EncodedKey) {
669	for tier in [&mut cursor.buffer, &mut cursor.persistent] {
670		if tier.exhausted {
671			continue;
672		}
673		if let Some(last) = &tier.last_key
674			&& last.as_slice() > horizon.as_slice()
675		{
676			tier.last_key = Some(horizon.clone());
677		}
678	}
679}
680
681fn rewind_over_advanced_reverse(cursor: &mut MultiVersionRangeCursor, horizon: &EncodedKey) {
682	for tier in [&mut cursor.buffer, &mut cursor.persistent] {
683		if tier.exhausted {
684			continue;
685		}
686		if let Some(last) = &tier.last_key
687			&& last.as_slice() < horizon.as_slice()
688		{
689			tier.last_key = Some(horizon.clone());
690		}
691	}
692}
693
694impl MultiVersionGetPrevious for StandardMultiStore {
695	fn get_previous_version(
696		&self,
697		key: &EncodedKey,
698		before_version: CommitVersion,
699	) -> Result<Option<MultiVersionRow>> {
700		if before_version.0 == 0 {
701			return Ok(None);
702		}
703
704		let table = classify_key(key);
705		let prev_version = CommitVersion(before_version.0 - 1);
706
707		if let Some(buffer) = &self.buffer {
708			match get_at_version(buffer, table, key.as_ref(), prev_version)? {
709				VersionedGetResult::Value {
710					value,
711					version,
712				} => {
713					return Ok(Some(MultiVersionRow {
714						key: key.clone(),
715						row: EncodedRow(CowVec::new(value.to_vec())),
716						version,
717					}));
718				}
719				VersionedGetResult::Tombstone => return Ok(None),
720				VersionedGetResult::NotFound => {}
721			}
722		}
723
724		if let Some(persistent) = &self.persistent {
725			match get_at_version(persistent, table, key.as_ref(), prev_version)? {
726				VersionedGetResult::Value {
727					value,
728					version,
729				} => {
730					return Ok(Some(MultiVersionRow {
731						key: key.clone(),
732						row: EncodedRow(CowVec::new(value.to_vec())),
733						version,
734					}));
735				}
736				VersionedGetResult::Tombstone => return Ok(None),
737				VersionedGetResult::NotFound => {}
738			}
739		}
740
741		Ok(None)
742	}
743}
744
745impl MultiVersionStore for StandardMultiStore {}
746
747pub struct MultiVersionRangeIter {
748	store: StandardMultiStore,
749	cursor: MultiVersionRangeCursor,
750	range: EncodedKeyRange,
751	version: CommitVersion,
752	batch_size: usize,
753	current_batch: Vec<MultiVersionRow>,
754	current_index: usize,
755}
756
757impl Iterator for MultiVersionRangeIter {
758	type Item = Result<MultiVersionRow>;
759
760	fn next(&mut self) -> Option<Self::Item> {
761		if self.current_index < self.current_batch.len() {
762			let item = self.current_batch[self.current_index].clone();
763			self.current_index += 1;
764			return Some(Ok(item));
765		}
766
767		if self.cursor.exhausted {
768			return None;
769		}
770
771		match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
772		{
773			Ok(batch) => {
774				if batch.items.is_empty() {
775					if self.cursor.exhausted {
776						return None;
777					}
778					return self.next();
779				}
780				self.current_batch = batch.items;
781				self.current_index = 0;
782				self.next()
783			}
784			Err(e) => Some(Err(e)),
785		}
786	}
787}
788
789pub struct MultiVersionRangeRevIter {
790	store: StandardMultiStore,
791	cursor: MultiVersionRangeCursor,
792	range: EncodedKeyRange,
793	version: CommitVersion,
794	batch_size: usize,
795	current_batch: Vec<MultiVersionRow>,
796	current_index: usize,
797}
798
799impl Iterator for MultiVersionRangeRevIter {
800	type Item = Result<MultiVersionRow>;
801
802	fn next(&mut self) -> Option<Self::Item> {
803		if self.current_index < self.current_batch.len() {
804			let item = self.current_batch[self.current_index].clone();
805			self.current_index += 1;
806			return Some(Ok(item));
807		}
808
809		if self.cursor.exhausted {
810			return None;
811		}
812
813		match self.store.range_rev_next(
814			&mut self.cursor,
815			self.range.clone(),
816			self.version,
817			self.batch_size as u64,
818		) {
819			Ok(batch) => {
820				if batch.items.is_empty() {
821					if self.cursor.exhausted {
822						return None;
823					}
824					return self.next();
825				}
826				self.current_batch = batch.items;
827				self.current_index = 0;
828				self.next()
829			}
830			Err(e) => Some(Err(e)),
831		}
832	}
833}
834
835fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
836	classify_range(range).unwrap_or(EntryKind::Multi)
837}
838
839fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
840	let start = match &range.start {
841		Bound::Included(key) => key.as_ref().to_vec(),
842		Bound::Excluded(key) => key.as_ref().to_vec(),
843		Bound::Unbounded => vec![],
844	};
845
846	let end = match &range.end {
847		Bound::Included(key) => key.as_ref().to_vec(),
848		Bound::Excluded(key) => key.as_ref().to_vec(),
849		Bound::Unbounded => vec![0xFFu8; 256],
850	};
851
852	(start, end)
853}