1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10 actors::drop::{DropMessage, DropRequest},
11 common::CommitVersion,
12 delta::Delta,
13 encoded::{
14 key::{EncodedKey, EncodedKeyRange},
15 row::EncodedRow,
16 },
17 event::metric::{MultiCommittedEvent, MultiDelete, MultiWrite},
18 interface::store::{
19 EntryKind, MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet,
20 MultiVersionGetPrevious, MultiVersionRow, MultiVersionStore,
21 },
22};
23use reifydb_type::util::{cowvec::CowVec, hex};
24use tracing::{instrument, warn};
25
26use super::{
27 StandardMultiStore,
28 router::{classify_key, classify_range, is_single_version_semantics_key},
29 version::{VersionedGetResult, get_at_version},
30};
31use crate::{
32 Result,
33 buffer::tier::MultiBufferTier,
34 persistent::MultiPersistentTier,
35 tier::{RangeBatch, RangeCursor, TierBatch, TierStorage},
36};
37
38const TIER_SCAN_CHUNK_SIZE: usize = 32;
39
40impl MultiVersionGet for StandardMultiStore {
41 #[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42 fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43 let table = classify_key(key);
44
45 if let Some(buffer) = &self.buffer {
46 match get_at_version(buffer, table, key.as_ref(), version)? {
47 VersionedGetResult::Value {
48 value,
49 version: v,
50 } => {
51 return Ok(Some(MultiVersionRow {
52 key: key.clone(),
53 row: EncodedRow(value),
54 version: v,
55 }));
56 }
57 VersionedGetResult::Tombstone => return Ok(None),
58 VersionedGetResult::NotFound => {}
59 }
60 }
61
62 if let Some(persistent) = &self.persistent {
63 match get_at_version(persistent, table, key.as_ref(), version)? {
64 VersionedGetResult::Value {
65 value,
66 version: v,
67 } => {
68 return Ok(Some(MultiVersionRow {
69 key: key.clone(),
70 row: EncodedRow(value),
71 version: v,
72 }));
73 }
74 VersionedGetResult::Tombstone => return Ok(None),
75 VersionedGetResult::NotFound => {}
76 }
77 }
78
79 Ok(None)
80 }
81}
82
83impl MultiVersionContains for StandardMultiStore {
84 #[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
85 fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
86 Ok(MultiVersionGet::get(self, key, version)?.is_some())
87 }
88}
89
90impl MultiVersionCommit for StandardMultiStore {
91 #[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
92 fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
93 let classified = classify_deltas(&deltas);
94 let drop_batch = build_drop_batch(classified.explicit_drops, &classified.pending_set_keys, version);
95 self.dispatch_drops(drop_batch);
96
97 if let Some(buffer) = &self.buffer {
98 buffer.set(version, classified.batches)?;
99 } else if let Some(persistent) = &self.persistent {
100 persistent.set(version, classified.batches)?;
101 } else {
102 return Ok(());
103 }
104
105 self.emit_commit_metrics(classified.writes, classified.deletes, version);
106 Ok(())
107 }
108}
109
110struct ClassifiedDeltas {
111 pending_set_keys: HashSet<EncodedKey>,
112 writes: Vec<MultiWrite>,
113 deletes: Vec<MultiDelete>,
114 batches: TierBatch,
115 explicit_drops: Vec<(EntryKind, EncodedKey)>,
116}
117
118#[inline]
119fn classify_deltas(deltas: &CowVec<Delta>) -> ClassifiedDeltas {
120 let mut pending_set_keys: HashSet<EncodedKey> = HashSet::new();
121 let mut writes: Vec<MultiWrite> = Vec::new();
122 let mut deletes: Vec<MultiDelete> = Vec::new();
123 let mut batches: TierBatch = HashMap::new();
124 let mut explicit_drops: Vec<(EntryKind, EncodedKey)> = Vec::new();
125
126 for delta in deltas.iter() {
127 let key = delta.key();
128 let table = classify_key(key);
129 let is_single_version = is_single_version_semantics_key(key);
130
131 match delta {
132 Delta::Set {
133 key,
134 row,
135 } => {
136 if is_single_version {
137 pending_set_keys.insert(key.clone());
138 }
139 writes.push(MultiWrite {
140 key: key.clone(),
141 value_bytes: row.len() as u64,
142 });
143 batches.entry(table).or_default().push((key.clone(), Some(row.0.clone())));
144 }
145 Delta::Unset {
146 key,
147 row,
148 } => {
149 deletes.push(MultiDelete {
150 key: key.clone(),
151 value_bytes: row.len() as u64,
152 });
153 batches.entry(table).or_default().push((key.clone(), None));
154 }
155 Delta::Remove {
156 key,
157 } => {
158 deletes.push(MultiDelete {
159 key: key.clone(),
160 value_bytes: 0,
161 });
162 batches.entry(table).or_default().push((key.clone(), None));
163 }
164 Delta::Drop {
165 key,
166 } => {
167 explicit_drops.push((table, key.clone()));
168 }
169 }
170 }
171
172 ClassifiedDeltas {
173 pending_set_keys,
174 writes,
175 deletes,
176 batches,
177 explicit_drops,
178 }
179}
180
181#[inline]
182fn build_drop_batch(
183 explicit_drops: Vec<(EntryKind, EncodedKey)>,
184 pending_set_keys: &HashSet<EncodedKey>,
185 version: CommitVersion,
186) -> Vec<DropRequest> {
187 let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
188 for (table, key) in explicit_drops {
189 let pending_version = if pending_set_keys.contains(key.as_ref()) {
190 Some(version)
191 } else {
192 None
193 };
194 drop_batch.push(DropRequest {
195 table,
196 key,
197 commit_version: version,
198 pending_version,
199 });
200 }
201 for key in pending_set_keys.iter() {
202 let encoded = EncodedKey::new(key.to_vec());
203 let table = classify_key(&encoded);
204 drop_batch.push(DropRequest {
205 table,
206 key: encoded,
207 commit_version: version,
208 pending_version: Some(version),
209 });
210 }
211 drop_batch
212}
213
214impl StandardMultiStore {
215 #[inline]
216 fn dispatch_drops(&self, drop_batch: Vec<DropRequest>) {
217 if drop_batch.is_empty() {
218 return;
219 }
220 if let Some(actor) = &self.drop_actor
221 && actor.send_blocking(DropMessage::Batch(drop_batch)).is_err()
222 {
223 warn!("Failed to send drop batch");
224 }
225 }
226
227 #[inline]
228 fn emit_commit_metrics(&self, writes: Vec<MultiWrite>, deletes: Vec<MultiDelete>, version: CommitVersion) {
229 if writes.is_empty() && deletes.is_empty() {
230 return;
231 }
232 self.event_bus.emit(MultiCommittedEvent::new(writes, deletes, vec![], version));
233 }
234}
235
236#[derive(Debug, Clone, Default)]
237pub struct MultiVersionRangeCursor {
238 pub buffer: RangeCursor,
239
240 pub persistent: RangeCursor,
241
242 pub exhausted: bool,
243}
244
245impl MultiVersionRangeCursor {
246 pub fn new() -> Self {
247 Self::default()
248 }
249
250 pub fn is_exhausted(&self) -> bool {
251 self.exhausted
252 }
253}
254
255pub struct TierScanQuery<'a> {
256 pub table: EntryKind,
257 pub start: &'a [u8],
258 pub end: &'a [u8],
259 pub version: CommitVersion,
260 pub range: &'a EncodedKeyRange,
261}
262
263pub fn scan_tier_chunk<S: TierStorage>(
264 storage: &S,
265 cursor: &mut RangeCursor,
266 scan: &TierScanQuery,
267 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
268) -> Result<bool> {
269 let batch = storage.range_next(
270 scan.table,
271 cursor,
272 Bound::Included(scan.start),
273 Bound::Included(scan.end),
274 scan.version,
275 TIER_SCAN_CHUNK_SIZE,
276 )?;
277 merge_tier_batch(batch, scan.range, collected)
278}
279
280pub fn scan_tier_chunk_rev<S: TierStorage>(
281 storage: &S,
282 cursor: &mut RangeCursor,
283 scan: &TierScanQuery,
284 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
285) -> Result<bool> {
286 let batch = storage.range_rev_next(
287 scan.table,
288 cursor,
289 Bound::Included(scan.start),
290 Bound::Included(scan.end),
291 scan.version,
292 TIER_SCAN_CHUNK_SIZE,
293 )?;
294 merge_tier_batch(batch, scan.range, collected)
295}
296
297#[inline]
298fn merge_tier_batch(
299 batch: RangeBatch,
300 range: &EncodedKeyRange,
301 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
302) -> Result<bool> {
303 if batch.entries.is_empty() {
304 return Ok(false);
305 }
306
307 for entry in batch.entries {
308 let original_key = entry.key.as_slice().to_vec();
309 let entry_version = entry.version;
310
311 let original_key_encoded = EncodedKey::new(original_key.clone());
312 if !range.contains(&original_key_encoded) {
313 continue;
314 }
315
316 let should_update = match collected.get(&original_key) {
317 None => true,
318 Some((existing_version, _)) => entry_version > *existing_version,
319 };
320
321 if should_update {
322 collected.insert(original_key, (entry_version, entry.value));
323 }
324 }
325
326 Ok(true)
327}
328
329#[inline]
330pub fn collected_to_batch(
331 collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
332 has_more: bool,
333) -> MultiVersionBatch {
334 let items: Vec<MultiVersionRow> = collected
335 .into_iter()
336 .filter_map(|(key_bytes, (v, value))| {
337 value.map(|val| MultiVersionRow {
338 key: EncodedKey::new(key_bytes),
339 row: EncodedRow(val),
340 version: v,
341 })
342 })
343 .collect();
344
345 MultiVersionBatch {
346 items,
347 has_more,
348 }
349}
350
351#[inline]
352fn step_all_tiers(
353 buffer: Option<&MultiBufferTier>,
354 buffer_cursor: &mut RangeCursor,
355 persistent: Option<&MultiPersistentTier>,
356 persistent_cursor: &mut RangeCursor,
357 scan: &TierScanQuery,
358 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
359) -> Result<bool> {
360 let mut any_progress = false;
361 if let Some(s) = buffer
362 && !buffer_cursor.exhausted
363 {
364 any_progress |= scan_tier_chunk(s, buffer_cursor, scan, collected)?;
365 }
366 if let Some(s) = persistent
367 && !persistent_cursor.exhausted
368 {
369 any_progress |= scan_tier_chunk(s, persistent_cursor, scan, collected)?;
370 }
371 Ok(any_progress)
372}
373
374pub fn scan_tiers_latest(
375 buffer: Option<&MultiBufferTier>,
376 persistent: Option<&MultiPersistentTier>,
377 range: EncodedKeyRange,
378 version: CommitVersion,
379 max_keys: usize,
380) -> Result<MultiVersionBatch> {
381 let table = classify_key_range(&range);
382 let (start, end) = make_range_bounds(&range);
383 let scan = TierScanQuery {
384 table,
385 start: &start,
386 end: &end,
387 version,
388 range: &range,
389 };
390
391 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
392 let mut buffer_cursor = RangeCursor::default();
393 let mut persistent_cursor = RangeCursor::default();
394 let mut exhausted = false;
395
396 while collected.len() < max_keys {
397 let progress = step_all_tiers(
398 buffer,
399 &mut buffer_cursor,
400 persistent,
401 &mut persistent_cursor,
402 &scan,
403 &mut collected,
404 )?;
405 if !progress {
406 exhausted = true;
407 break;
408 }
409 }
410
411 Ok(collected_to_batch(collected, !exhausted))
412}
413
414impl StandardMultiStore {
415 pub fn range_next(
416 &self,
417 cursor: &mut MultiVersionRangeCursor,
418 range: EncodedKeyRange,
419 version: CommitVersion,
420 batch_size: u64,
421 ) -> Result<MultiVersionBatch> {
422 if cursor.exhausted {
423 return Ok(MultiVersionBatch {
424 items: Vec::new(),
425 has_more: false,
426 });
427 }
428
429 mark_unconfigured_exhausted(self, cursor);
430
431 let table = classify_key_range(&range);
432 let (start, end) = make_range_bounds(&range);
433 let batch_size = batch_size as usize;
434 let scan = TierScanQuery {
435 table,
436 start: &start,
437 end: &end,
438 version,
439 range: &range,
440 };
441
442 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
443
444 while collected.len() < batch_size {
445 let progress = step_all_tiers(
446 self.buffer.as_ref(),
447 &mut cursor.buffer,
448 self.persistent.as_ref(),
449 &mut cursor.persistent,
450 &scan,
451 &mut collected,
452 )?;
453 if !progress {
454 cursor.exhausted = true;
455 break;
456 }
457 }
458
459 apply_forward_horizon(cursor, &mut collected);
460
461 let items: Vec<MultiVersionRow> = collected
462 .into_iter()
463 .filter_map(|(key_bytes, (v, value))| {
464 value.map(|val| MultiVersionRow {
465 key: EncodedKey::new(key_bytes),
466 row: EncodedRow(val),
467 version: v,
468 })
469 })
470 .collect();
471
472 let has_more = !cursor.exhausted;
473
474 Ok(MultiVersionBatch {
475 items,
476 has_more,
477 })
478 }
479
480 pub fn range(
481 &self,
482 range: EncodedKeyRange,
483 version: CommitVersion,
484 batch_size: usize,
485 ) -> MultiVersionRangeIter {
486 MultiVersionRangeIter {
487 store: self.clone(),
488 cursor: MultiVersionRangeCursor::new(),
489 range,
490 version,
491 batch_size,
492 current_batch: Vec::new(),
493 current_index: 0,
494 }
495 }
496
497 pub fn range_rev(
498 &self,
499 range: EncodedKeyRange,
500 version: CommitVersion,
501 batch_size: usize,
502 ) -> MultiVersionRangeRevIter {
503 MultiVersionRangeRevIter {
504 store: self.clone(),
505 cursor: MultiVersionRangeCursor::new(),
506 range,
507 version,
508 batch_size,
509 current_batch: Vec::new(),
510 current_index: 0,
511 }
512 }
513
514 fn range_rev_next(
515 &self,
516 cursor: &mut MultiVersionRangeCursor,
517 range: EncodedKeyRange,
518 version: CommitVersion,
519 batch_size: u64,
520 ) -> Result<MultiVersionBatch> {
521 if cursor.exhausted {
522 return Ok(MultiVersionBatch {
523 items: Vec::new(),
524 has_more: false,
525 });
526 }
527
528 mark_unconfigured_exhausted(self, cursor);
529
530 let table = classify_key_range(&range);
531 let (start, end) = make_range_bounds(&range);
532 let batch_size = batch_size as usize;
533 let scan = TierScanQuery {
534 table,
535 start: &start,
536 end: &end,
537 version,
538 range: &range,
539 };
540
541 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
542
543 while collected.len() < batch_size {
544 let mut any_progress = false;
545
546 if let Some(buffer) = &self.buffer
547 && !cursor.buffer.exhausted
548 {
549 any_progress |= scan_tier_chunk_rev(buffer, &mut cursor.buffer, &scan, &mut collected)?;
550 }
551
552 if let Some(persistent) = &self.persistent
553 && !cursor.persistent.exhausted
554 {
555 any_progress |=
556 scan_tier_chunk_rev(persistent, &mut cursor.persistent, &scan, &mut collected)?;
557 }
558
559 if !any_progress {
560 cursor.exhausted = true;
561 break;
562 }
563 }
564
565 apply_reverse_horizon(cursor, &mut collected);
566
567 let items: Vec<MultiVersionRow> = collected
568 .into_iter()
569 .rev()
570 .filter_map(|(key_bytes, (v, value))| {
571 value.map(|val| MultiVersionRow {
572 key: EncodedKey::new(key_bytes),
573 row: EncodedRow(val),
574 version: v,
575 })
576 })
577 .collect();
578
579 let has_more = !cursor.exhausted;
580
581 Ok(MultiVersionBatch {
582 items,
583 has_more,
584 })
585 }
586}
587
588fn mark_unconfigured_exhausted(store: &StandardMultiStore, cursor: &mut MultiVersionRangeCursor) {
589 if store.buffer.is_none() {
590 cursor.buffer.exhausted = true;
591 }
592 if store.persistent.is_none() {
593 cursor.persistent.exhausted = true;
594 }
595}
596
597fn apply_forward_horizon(
598 cursor: &mut MultiVersionRangeCursor,
599 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
600) {
601 let horizon = forward_horizon(cursor);
602 if let Some(h) = horizon {
603 collected.retain(|k, _| k.as_slice() <= h.as_slice());
604 rewind_over_advanced_forward(cursor, &h);
605 }
606}
607
608fn apply_reverse_horizon(
609 cursor: &mut MultiVersionRangeCursor,
610 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
611) {
612 let horizon = reverse_horizon(cursor);
613 if let Some(h) = horizon {
614 collected.retain(|k, _| k.as_slice() >= h.as_slice());
615 rewind_over_advanced_reverse(cursor, &h);
616 }
617}
618
619fn forward_horizon(cursor: &MultiVersionRangeCursor) -> Option<EncodedKey> {
620 let mut horizon: Option<EncodedKey> = None;
621 for tier in [&cursor.buffer, &cursor.persistent] {
622 if tier.exhausted {
623 continue;
624 }
625 let last = match &tier.last_key {
626 Some(k) => k.clone(),
627
628 None => return None,
629 };
630 horizon = Some(match horizon {
631 None => last,
632 Some(prev) => {
633 if last.as_slice() < prev.as_slice() {
634 last
635 } else {
636 prev
637 }
638 }
639 });
640 }
641 horizon
642}
643
644fn reverse_horizon(cursor: &MultiVersionRangeCursor) -> Option<EncodedKey> {
645 let mut horizon: Option<EncodedKey> = None;
646 for tier in [&cursor.buffer, &cursor.persistent] {
647 if tier.exhausted {
648 continue;
649 }
650 let last = match &tier.last_key {
651 Some(k) => k.clone(),
652 None => return None,
653 };
654 horizon = Some(match horizon {
655 None => last,
656 Some(prev) => {
657 if last.as_slice() > prev.as_slice() {
658 last
659 } else {
660 prev
661 }
662 }
663 });
664 }
665 horizon
666}
667
668fn rewind_over_advanced_forward(cursor: &mut MultiVersionRangeCursor, horizon: &EncodedKey) {
669 for tier in [&mut cursor.buffer, &mut cursor.persistent] {
670 if tier.exhausted {
671 continue;
672 }
673 if let Some(last) = &tier.last_key
674 && last.as_slice() > horizon.as_slice()
675 {
676 tier.last_key = Some(horizon.clone());
677 }
678 }
679}
680
681fn rewind_over_advanced_reverse(cursor: &mut MultiVersionRangeCursor, horizon: &EncodedKey) {
682 for tier in [&mut cursor.buffer, &mut cursor.persistent] {
683 if tier.exhausted {
684 continue;
685 }
686 if let Some(last) = &tier.last_key
687 && last.as_slice() < horizon.as_slice()
688 {
689 tier.last_key = Some(horizon.clone());
690 }
691 }
692}
693
694impl MultiVersionGetPrevious for StandardMultiStore {
695 fn get_previous_version(
696 &self,
697 key: &EncodedKey,
698 before_version: CommitVersion,
699 ) -> Result<Option<MultiVersionRow>> {
700 if before_version.0 == 0 {
701 return Ok(None);
702 }
703
704 let table = classify_key(key);
705 let prev_version = CommitVersion(before_version.0 - 1);
706
707 if let Some(buffer) = &self.buffer {
708 match get_at_version(buffer, table, key.as_ref(), prev_version)? {
709 VersionedGetResult::Value {
710 value,
711 version,
712 } => {
713 return Ok(Some(MultiVersionRow {
714 key: key.clone(),
715 row: EncodedRow(CowVec::new(value.to_vec())),
716 version,
717 }));
718 }
719 VersionedGetResult::Tombstone => return Ok(None),
720 VersionedGetResult::NotFound => {}
721 }
722 }
723
724 if let Some(persistent) = &self.persistent {
725 match get_at_version(persistent, table, key.as_ref(), prev_version)? {
726 VersionedGetResult::Value {
727 value,
728 version,
729 } => {
730 return Ok(Some(MultiVersionRow {
731 key: key.clone(),
732 row: EncodedRow(CowVec::new(value.to_vec())),
733 version,
734 }));
735 }
736 VersionedGetResult::Tombstone => return Ok(None),
737 VersionedGetResult::NotFound => {}
738 }
739 }
740
741 Ok(None)
742 }
743}
744
745impl MultiVersionStore for StandardMultiStore {}
746
747pub struct MultiVersionRangeIter {
748 store: StandardMultiStore,
749 cursor: MultiVersionRangeCursor,
750 range: EncodedKeyRange,
751 version: CommitVersion,
752 batch_size: usize,
753 current_batch: Vec<MultiVersionRow>,
754 current_index: usize,
755}
756
757impl Iterator for MultiVersionRangeIter {
758 type Item = Result<MultiVersionRow>;
759
760 fn next(&mut self) -> Option<Self::Item> {
761 if self.current_index < self.current_batch.len() {
762 let item = self.current_batch[self.current_index].clone();
763 self.current_index += 1;
764 return Some(Ok(item));
765 }
766
767 if self.cursor.exhausted {
768 return None;
769 }
770
771 match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
772 {
773 Ok(batch) => {
774 if batch.items.is_empty() {
775 if self.cursor.exhausted {
776 return None;
777 }
778 return self.next();
779 }
780 self.current_batch = batch.items;
781 self.current_index = 0;
782 self.next()
783 }
784 Err(e) => Some(Err(e)),
785 }
786 }
787}
788
789pub struct MultiVersionRangeRevIter {
790 store: StandardMultiStore,
791 cursor: MultiVersionRangeCursor,
792 range: EncodedKeyRange,
793 version: CommitVersion,
794 batch_size: usize,
795 current_batch: Vec<MultiVersionRow>,
796 current_index: usize,
797}
798
799impl Iterator for MultiVersionRangeRevIter {
800 type Item = Result<MultiVersionRow>;
801
802 fn next(&mut self) -> Option<Self::Item> {
803 if self.current_index < self.current_batch.len() {
804 let item = self.current_batch[self.current_index].clone();
805 self.current_index += 1;
806 return Some(Ok(item));
807 }
808
809 if self.cursor.exhausted {
810 return None;
811 }
812
813 match self.store.range_rev_next(
814 &mut self.cursor,
815 self.range.clone(),
816 self.version,
817 self.batch_size as u64,
818 ) {
819 Ok(batch) => {
820 if batch.items.is_empty() {
821 if self.cursor.exhausted {
822 return None;
823 }
824 return self.next();
825 }
826 self.current_batch = batch.items;
827 self.current_index = 0;
828 self.next()
829 }
830 Err(e) => Some(Err(e)),
831 }
832 }
833}
834
835fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
836 classify_range(range).unwrap_or(EntryKind::Multi)
837}
838
839fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
840 let start = match &range.start {
841 Bound::Included(key) => key.as_ref().to_vec(),
842 Bound::Excluded(key) => key.as_ref().to_vec(),
843 Bound::Unbounded => vec![],
844 };
845
846 let end = match &range.end {
847 Bound::Included(key) => key.as_ref().to_vec(),
848 Bound::Excluded(key) => key.as_ref().to_vec(),
849 Bound::Unbounded => vec![0xFFu8; 256],
850 };
851
852 (start, end)
853}