1use std::{
2 cmp::Ordering as CmpOrdering, collections::BinaryHeap, ops::Bound, path::PathBuf, sync::Arc,
3};
4
5use crate::{
6 blob::ValueRef,
7 error::{Error, Result},
8 internal_key::{
9 InternalKey, ValueKind, first_internal_key_for_user, last_internal_key_for_user,
10 },
11 memtable::Memtable,
12 range_tombstone::{RangeTombstoneIndex, RangeTombstoneLike},
13 snapshot::Snapshot,
14 stats::BlobReadMetrics,
15 storage::NativeFileBackend,
16 table::TablePointCursor,
17 types::{KeyRange, KeyValue, Sequence, Value},
18};
19
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22pub enum Direction {
23 #[default]
25 Forward,
26 Reverse,
28}
29
30#[derive(Debug, Clone)]
32pub struct Iter {
33 direction: Direction,
34 inner: IterInner,
35}
36
37#[derive(Debug, Clone)]
39pub struct LazyIter {
40 direction: Direction,
41 scan: LazyScan,
42}
43
44#[derive(Debug, Clone)]
46pub struct LazyKeyValue {
47 pub key: Vec<u8>,
49 pub value: LazyValue,
51}
52
53#[derive(Debug, Clone)]
55pub struct LazyValue {
56 inner: LazyValueInner,
57}
58
59#[derive(Debug, Clone)]
60enum LazyValueInner {
61 Inline(Vec<u8>),
62 Blob {
63 db_path: PathBuf,
64 native_storage: Option<NativeFileBackend>,
65 internal_key: InternalKey,
66 value: ValueRef,
67 blob_reads: Option<Arc<BlobReadMetrics>>,
68 _read_pin: Arc<Snapshot>,
69 },
70}
71
72#[derive(Debug, Clone)]
73enum IterInner {
74 Items(std::vec::IntoIter<KeyValue>),
75 Lazy(LazyScan),
76}
77
78#[derive(Debug, Clone)]
79pub(crate) struct ScanSourceInput {
80 pub(crate) read_sequence: Sequence,
81 pub(crate) read_pin: Snapshot,
82 pub(crate) db_path: Option<PathBuf>,
83 pub(crate) native_storage: Option<NativeFileBackend>,
84 pub(crate) blob_reads: Option<Arc<BlobReadMetrics>>,
85 pub(crate) range_tombstones: Vec<ScanRangeTombstone>,
86 pub(crate) sources: Vec<RecordSource>,
87}
88
89impl Iter {
90 #[must_use]
92 pub fn empty(direction: Direction) -> Self {
93 Self::from_items(Vec::new(), direction)
94 }
95
96 #[must_use]
98 pub fn from_items(mut items: Vec<KeyValue>, direction: Direction) -> Self {
99 if direction == Direction::Reverse {
100 items.reverse();
101 }
102
103 Self {
104 direction,
105 inner: IterInner::Items(items.into_iter()),
106 }
107 }
108
109 pub(crate) fn from_sources(direction: Direction, input: ScanSourceInput) -> Self {
110 Self {
111 direction,
112 inner: IterInner::Lazy(LazyScan {
113 direction,
114 read_sequence: input.read_sequence,
115 read_pin: Arc::new(input.read_pin),
116 db_path: input.db_path,
117 native_storage: input.native_storage,
118 blob_reads: input.blob_reads,
119 range_tombstones: RangeTombstoneIndex::new(input.range_tombstones),
120 sources: input.sources,
121 source_heap: BinaryHeap::new(),
122 source_heap_initialized: false,
123 }),
124 }
125 }
126
127 #[must_use]
129 pub const fn direction(&self) -> Direction {
130 self.direction
131 }
132}
133
134impl LazyIter {
135 pub(crate) fn from_sources(direction: Direction, input: ScanSourceInput) -> Self {
136 Self {
137 direction,
138 scan: LazyScan {
139 direction,
140 read_sequence: input.read_sequence,
141 read_pin: Arc::new(input.read_pin),
142 db_path: input.db_path,
143 native_storage: input.native_storage,
144 blob_reads: input.blob_reads,
145 range_tombstones: RangeTombstoneIndex::new(input.range_tombstones),
146 sources: input.sources,
147 source_heap: BinaryHeap::new(),
148 source_heap_initialized: false,
149 },
150 }
151 }
152
153 #[must_use]
155 pub const fn direction(&self) -> Direction {
156 self.direction
157 }
158}
159
160impl LazyKeyValue {
161 pub fn into_key_value_sync(self) -> Result<KeyValue> {
163 Ok(KeyValue::new(self.key, self.value.into_value_sync()?))
164 }
165
166 pub async fn into_key_value(self) -> Result<KeyValue> {
168 let value = self.value.into_value().await?;
169 Ok(KeyValue::new(self.key, value))
170 }
171}
172
173impl LazyValue {
174 #[must_use]
176 pub fn is_inline(&self) -> bool {
177 matches!(self.inner, LazyValueInner::Inline(_))
178 }
179
180 pub fn read_sync(&self) -> Result<Value> {
182 match &self.inner {
183 LazyValueInner::Inline(bytes) => Ok(bytes.clone()),
184 LazyValueInner::Blob {
185 db_path,
186 native_storage: _,
187 internal_key,
188 value,
189 blob_reads,
190 _read_pin: _,
191 } => {
192 let bytes =
193 crate::blob::read_value_for_internal_key(db_path, value, Some(internal_key))?;
194 if let Some(blob_reads) = blob_reads {
195 blob_reads.record(bytes.len() as u64);
196 }
197 Ok(bytes)
198 }
199 }
200 }
201
202 pub fn into_value_sync(self) -> Result<Value> {
204 match self.inner {
205 LazyValueInner::Inline(bytes) => Ok(bytes),
206 LazyValueInner::Blob {
207 db_path,
208 native_storage: _,
209 internal_key,
210 value,
211 blob_reads,
212 _read_pin: _,
213 } => {
214 let bytes = crate::blob::read_value_for_internal_key(
215 &db_path,
216 &value,
217 Some(&internal_key),
218 )?;
219 if let Some(blob_reads) = blob_reads {
220 blob_reads.record(bytes.len() as u64);
221 }
222 Ok(bytes)
223 }
224 }
225 }
226
227 pub async fn read(&self) -> Result<Value> {
229 match &self.inner {
230 LazyValueInner::Inline(bytes) => Ok(bytes.clone()),
231 LazyValueInner::Blob {
232 db_path,
233 native_storage: Some(native_storage),
234 internal_key,
235 value,
236 blob_reads,
237 _read_pin: _,
238 } => {
239 let bytes = crate::blob::read_value_for_internal_key_with_backend_async(
240 native_storage,
241 db_path,
242 value,
243 Some(internal_key),
244 )
245 .await?;
246 if let Some(blob_reads) = blob_reads {
247 blob_reads.record(bytes.len() as u64);
248 }
249 Ok(bytes)
250 }
251 LazyValueInner::Blob {
252 native_storage: None,
253 ..
254 } => self.read_sync(),
255 }
256 }
257
258 pub async fn into_value(self) -> Result<Value> {
260 match self.inner {
261 LazyValueInner::Inline(bytes) => Ok(bytes),
262 LazyValueInner::Blob {
263 db_path,
264 native_storage: Some(native_storage),
265 internal_key,
266 value,
267 blob_reads,
268 _read_pin: _,
269 } => {
270 let bytes = crate::blob::read_value_for_internal_key_with_backend_async(
271 &native_storage,
272 &db_path,
273 &value,
274 Some(&internal_key),
275 )
276 .await?;
277 if let Some(blob_reads) = blob_reads {
278 blob_reads.record(bytes.len() as u64);
279 }
280 Ok(bytes)
281 }
282 LazyValueInner::Blob {
283 db_path,
284 native_storage: None,
285 internal_key,
286 value,
287 blob_reads,
288 _read_pin: _,
289 } => {
290 let bytes = crate::blob::read_value_for_internal_key(
291 &db_path,
292 &value,
293 Some(&internal_key),
294 )?;
295 if let Some(blob_reads) = blob_reads {
296 blob_reads.record(bytes.len() as u64);
297 }
298 Ok(bytes)
299 }
300 }
301 }
302}
303
304impl Iter {
305 pub async fn next(&mut self) -> Result<Option<KeyValue>> {
307 match &mut self.inner {
308 IterInner::Items(items) => Ok(items.next()),
309 IterInner::Lazy(scan) => scan.next_async().await,
310 }
311 }
312
313 pub fn next_sync(&mut self) -> Option<Result<KeyValue>> {
315 Iterator::next(self)
316 }
317}
318
319impl LazyIter {
320 pub async fn next(&mut self) -> Result<Option<LazyKeyValue>> {
322 self.scan.next_lazy_async().await
323 }
324
325 pub fn next_sync(&mut self) -> Option<Result<LazyKeyValue>> {
327 Iterator::next(self)
328 }
329}
330
331impl Iterator for Iter {
332 type Item = Result<KeyValue>;
333
334 fn next(&mut self) -> Option<Self::Item> {
335 match &mut self.inner {
336 IterInner::Items(items) => items.next().map(Ok),
337 IterInner::Lazy(scan) => scan.next(),
338 }
339 }
340}
341
342impl Iterator for LazyIter {
343 type Item = Result<LazyKeyValue>;
344
345 fn next(&mut self) -> Option<Self::Item> {
346 self.scan.next_lazy()
347 }
348}
349
350#[derive(Debug, Clone)]
351struct LazyScan {
352 direction: Direction,
353 read_sequence: Sequence,
354 read_pin: Arc<Snapshot>,
355 db_path: Option<PathBuf>,
356 native_storage: Option<NativeFileBackend>,
357 blob_reads: Option<Arc<BlobReadMetrics>>,
358 range_tombstones: RangeTombstoneIndex<ScanRangeTombstone>,
359 sources: Vec<RecordSource>,
360 source_heap: BinaryHeap<SourceHeapEntry>,
361 source_heap_initialized: bool,
362}
363
364impl LazyScan {
365 fn next(&mut self) -> Option<Result<KeyValue>> {
366 self.next_lazy()
367 .map(|item| item.and_then(LazyKeyValue::into_key_value_sync))
368 }
369
370 async fn next_async(&mut self) -> Result<Option<KeyValue>> {
371 let Some(item) = self.next_lazy_async().await? else {
372 return Ok(None);
373 };
374 item.into_key_value().await.map(Some)
375 }
376
377 fn next_lazy(&mut self) -> Option<Result<LazyKeyValue>> {
378 if !self.source_heap_initialized {
379 if let Err(error) = self.initialize_source_heap() {
380 return Some(Err(error));
381 }
382 }
383
384 loop {
385 let entry = self.source_heap.pop()?;
386 let user_key = entry.user_key;
387 let mut source_indices = vec![entry.source_index];
388 while self
389 .source_heap
390 .peek()
391 .is_some_and(|entry| entry.user_key == user_key)
392 {
393 let entry = self
394 .source_heap
395 .pop()
396 .expect("heap peek promised another source entry");
397 source_indices.push(entry.source_index);
398 }
399
400 let mut first_record = None;
401 let mut rest_records = Vec::new();
402
403 for source_index in source_indices {
404 match self.sources[source_index].take_current_group() {
405 Ok(Some(group)) => {
406 push_group_records(&mut first_record, &mut rest_records, group);
407 }
408 Ok(None) => {}
409 Err(error) => return Some(Err(error)),
410 }
411 if let Err(error) = self.push_source_heap_entry(source_index) {
412 return Some(Err(error));
413 }
414 }
415
416 let Some(first_record) = first_record else {
417 continue;
418 };
419 match self.visible_lazy_item_from_records(first_record, rest_records) {
420 Ok(Some(item)) => return Some(Ok(item)),
421 Ok(None) => {}
422 Err(error) => return Some(Err(error)),
423 }
424 }
425 }
426
427 async fn next_lazy_async(&mut self) -> Result<Option<LazyKeyValue>> {
428 if !self.source_heap_initialized {
429 self.initialize_source_heap_async().await?;
430 }
431
432 loop {
433 let Some(entry) = self.source_heap.pop() else {
434 return Ok(None);
435 };
436 let user_key = entry.user_key;
437 let mut source_indices = vec![entry.source_index];
438 while self
439 .source_heap
440 .peek()
441 .is_some_and(|entry| entry.user_key == user_key)
442 {
443 let entry = self
444 .source_heap
445 .pop()
446 .expect("heap peek promised another source entry");
447 source_indices.push(entry.source_index);
448 }
449
450 let mut first_record = None;
451 let mut rest_records = Vec::new();
452
453 for source_index in source_indices {
454 if let Some(group) = self.sources[source_index]
455 .take_current_group_async()
456 .await?
457 {
458 push_group_records(&mut first_record, &mut rest_records, group);
459 }
460 self.push_source_heap_entry_async(source_index).await?;
461 }
462
463 let Some(first_record) = first_record else {
464 continue;
465 };
466 if let Some(item) = self.visible_lazy_item_from_records(first_record, rest_records)? {
467 return Ok(Some(item));
468 }
469 }
470 }
471
472 fn initialize_source_heap(&mut self) -> Result<()> {
473 for source_index in 0..self.sources.len() {
474 self.push_source_heap_entry(source_index)?;
475 }
476 self.source_heap_initialized = true;
477 Ok(())
478 }
479
480 async fn initialize_source_heap_async(&mut self) -> Result<()> {
481 for source_index in 0..self.sources.len() {
482 self.push_source_heap_entry_async(source_index).await?;
483 }
484 self.source_heap_initialized = true;
485 Ok(())
486 }
487
488 fn push_source_heap_entry(&mut self, source_index: usize) -> Result<()> {
489 let Some(user_key) = self.sources[source_index]
490 .current_key()?
491 .map(<[u8]>::to_vec)
492 else {
493 return Ok(());
494 };
495 self.source_heap.push(SourceHeapEntry {
496 user_key,
497 source_index,
498 direction: self.direction,
499 });
500 Ok(())
501 }
502
503 async fn push_source_heap_entry_async(&mut self, source_index: usize) -> Result<()> {
504 let Some(user_key) = self.sources[source_index].current_user_key_async().await? else {
505 return Ok(());
506 };
507 self.source_heap.push(SourceHeapEntry {
508 user_key,
509 source_index,
510 direction: self.direction,
511 });
512 Ok(())
513 }
514
515 fn visible_lazy_item_from_records(
516 &self,
517 first_record: ScanRecord,
518 mut rest_records: Vec<ScanRecord>,
519 ) -> Result<Option<LazyKeyValue>> {
520 if rest_records.is_empty() {
521 return self.visible_lazy_item_from_sorted_records(std::iter::once(first_record));
522 }
523
524 rest_records.push(first_record);
525 rest_records.sort_by(|left, right| left.0.cmp(&right.0));
526
527 self.visible_lazy_item_from_sorted_records(rest_records)
528 }
529
530 fn visible_lazy_item_from_sorted_records(
531 &self,
532 records: impl IntoIterator<Item = ScanRecord>,
533 ) -> Result<Option<LazyKeyValue>> {
534 for (internal_key, value) in records {
535 if internal_key.sequence() > self.read_sequence {
536 continue;
537 }
538
539 match internal_key.kind() {
540 ValueKind::Put => {
541 if range_tombstones_cover(
542 &self.range_tombstones,
543 internal_key.user_key(),
544 internal_key.sequence(),
545 internal_key.batch_index(),
546 self.read_sequence,
547 ) {
548 return Ok(None);
549 }
550
551 let key = internal_key.user_key().to_vec();
552 let value = lazy_value(
553 value,
554 internal_key,
555 self.db_path.as_deref(),
556 self.native_storage.clone(),
557 self.blob_reads.clone(),
558 Arc::clone(&self.read_pin),
559 )?;
560 return Ok(Some(LazyKeyValue { key, value }));
561 }
562 ValueKind::PointDelete => return Ok(None),
563 ValueKind::RangeDelete => {}
564 }
565 }
566
567 Ok(None)
568 }
569}
570
571#[derive(Debug, Clone, PartialEq, Eq)]
572struct SourceHeapEntry {
573 user_key: Vec<u8>,
574 source_index: usize,
575 direction: Direction,
576}
577
578impl Ord for SourceHeapEntry {
579 fn cmp(&self, other: &Self) -> CmpOrdering {
580 debug_assert_eq!(self.direction, other.direction);
581 match compare_scan_keys(&self.user_key, &other.user_key, self.direction) {
582 CmpOrdering::Less => CmpOrdering::Greater,
583 CmpOrdering::Equal => other.source_index.cmp(&self.source_index),
584 CmpOrdering::Greater => CmpOrdering::Less,
585 }
586 }
587}
588
589impl PartialOrd for SourceHeapEntry {
590 fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
591 Some(self.cmp(other))
592 }
593}
594
595fn push_group_records(
596 first_record: &mut Option<ScanRecord>,
597 rest_records: &mut Vec<ScanRecord>,
598 group: RecordGroup,
599) {
600 if let Some(previous_first) = first_record.take() {
601 rest_records.push(previous_first);
602 }
603 *first_record = Some(group.first);
604 rest_records.extend(group.rest);
605}
606
607fn compare_scan_keys(left: &[u8], right: &[u8], direction: Direction) -> CmpOrdering {
608 match direction {
609 Direction::Forward => left.cmp(right),
610 Direction::Reverse => right.cmp(left),
611 }
612}
613
614pub(crate) type ScanRecord = (InternalKey, Option<ValueRef>);
615
616#[derive(Debug, Clone)]
617pub(crate) struct RecordGroup {
618 pub(crate) user_key: Vec<u8>,
619 pub(crate) first: ScanRecord,
620 pub(crate) rest: Vec<ScanRecord>,
621}
622
623#[derive(Debug, Clone)]
624pub(crate) struct RecordSource {
625 cursor: SourceCursor,
626 current: Option<RecordGroup>,
627}
628
629impl RecordSource {
630 pub(crate) fn memtable(
631 memtable: Arc<Memtable>,
632 selector: ScanSelector,
633 direction: Direction,
634 ) -> Self {
635 Self {
636 cursor: SourceCursor::Memtable(MemtableCursor::new(memtable, selector, direction)),
637 current: None,
638 }
639 }
640
641 pub(crate) fn table(cursor: TablePointCursor) -> Self {
642 Self {
643 cursor: SourceCursor::Table(cursor),
644 current: None,
645 }
646 }
647
648 fn current_key(&mut self) -> Result<Option<&[u8]>> {
649 self.ensure_current()?;
650 Ok(self.current.as_ref().map(|group| group.user_key.as_slice()))
651 }
652
653 fn take_current_group(&mut self) -> Result<Option<RecordGroup>> {
654 self.ensure_current()?;
655 Ok(self.current.take())
656 }
657
658 async fn current_user_key_async(&mut self) -> Result<Option<Vec<u8>>> {
659 self.ensure_current_async().await?;
660 Ok(self.current.as_ref().map(|group| group.user_key.clone()))
661 }
662
663 async fn take_current_group_async(&mut self) -> Result<Option<RecordGroup>> {
664 self.ensure_current_async().await?;
665 Ok(self.current.take())
666 }
667
668 fn ensure_current(&mut self) -> Result<()> {
669 if self.current.is_none() {
670 self.current = self.cursor.next_group()?;
671 }
672 Ok(())
673 }
674
675 async fn ensure_current_async(&mut self) -> Result<()> {
676 if self.current.is_none() {
677 self.current = self.cursor.next_group_async().await?;
678 }
679 Ok(())
680 }
681}
682
683#[derive(Debug, Clone)]
684enum SourceCursor {
685 Memtable(MemtableCursor),
686 Table(TablePointCursor),
687}
688
689impl SourceCursor {
690 fn next_group(&mut self) -> Result<Option<RecordGroup>> {
691 match self {
692 Self::Memtable(cursor) => cursor.next_group(),
693 Self::Table(cursor) => cursor.next_group(),
694 }
695 }
696
697 async fn next_group_async(&mut self) -> Result<Option<RecordGroup>> {
698 match self {
699 Self::Memtable(cursor) => cursor.next_group_async().await,
700 Self::Table(cursor) => cursor.next_group_async().await,
701 }
702 }
703}
704
705#[derive(Debug, Clone)]
706struct MemtableCursor {
707 memtable: Arc<Memtable>,
711 selector: ScanSelector,
712 direction: Direction,
713 lower_bound: Bound<InternalKey>,
714 upper_bound: Bound<InternalKey>,
715 exhausted: bool,
716}
717
718impl MemtableCursor {
719 fn new(memtable: Arc<Memtable>, selector: ScanSelector, direction: Direction) -> Self {
720 let (lower_bound, upper_bound) = memtable_scan_bounds(&selector);
721
722 Self {
723 memtable,
724 selector,
725 direction,
726 lower_bound,
727 upper_bound,
728 exhausted: false,
729 }
730 }
731
732 fn next_group(&mut self) -> Result<Option<RecordGroup>> {
733 match self.direction {
734 Direction::Forward => self.next_group_forward(),
735 Direction::Reverse => self.next_group_reverse(),
736 }
737 }
738
739 #[allow(clippy::unused_async)]
742 async fn next_group_async(&mut self) -> Result<Option<RecordGroup>> {
743 self.next_group()
744 }
745
746 fn next_group_forward(&mut self) -> Result<Option<RecordGroup>> {
747 if self.exhausted {
748 return Ok(None);
749 }
750
751 let entries = self
752 .memtable
753 .read_entries()
754 .map_err(|_| lock_poisoned("memtable entries"))?;
755 let mut records = Vec::new();
756 let mut group_user_key = None;
757
758 for (internal_key, value) in
759 entries.range((self.lower_bound.clone(), self.upper_bound.clone()))
760 {
761 match self.selector.forward_key_state(internal_key.user_key()) {
762 ForwardKeyState::Before => {}
763 ForwardKeyState::Match => {
764 let user_key =
765 group_user_key.get_or_insert_with(|| internal_key.user_key().to_vec());
766 if internal_key.user_key() == user_key.as_slice() {
767 records.push((internal_key.clone(), value.clone()));
768 } else {
769 break;
770 }
771 }
772 ForwardKeyState::After => {
773 self.exhausted = true;
774 return Ok(None);
775 }
776 }
777 }
778 drop(entries);
779
780 let Some(user_key) = group_user_key else {
781 self.exhausted = true;
782 return Ok(None);
783 };
784 self.lower_bound = Bound::Excluded(last_internal_key_for_user(&user_key));
785 Ok(Some(record_group_from_records(user_key, records)))
786 }
787
788 fn next_group_reverse(&mut self) -> Result<Option<RecordGroup>> {
789 if self.exhausted {
790 return Ok(None);
791 }
792
793 let entries = self
794 .memtable
795 .read_entries()
796 .map_err(|_| lock_poisoned("memtable entries"))?;
797 let mut records = Vec::new();
798 let mut group_user_key = None;
799
800 for (internal_key, value) in entries
801 .range((self.lower_bound.clone(), self.upper_bound.clone()))
802 .rev()
803 {
804 match self.selector.reverse_key_state(internal_key.user_key()) {
805 ReverseKeyState::Above => {}
806 ReverseKeyState::Match => {
807 let user_key =
808 group_user_key.get_or_insert_with(|| internal_key.user_key().to_vec());
809 if internal_key.user_key() == user_key.as_slice() {
810 records.push((internal_key.clone(), value.clone()));
811 } else {
812 break;
813 }
814 }
815 ReverseKeyState::Below => {
816 self.exhausted = true;
817 return Ok(None);
818 }
819 }
820 }
821 drop(entries);
822
823 let Some(user_key) = group_user_key else {
824 self.exhausted = true;
825 return Ok(None);
826 };
827 self.upper_bound = Bound::Excluded(first_internal_key_for_user(&user_key));
828 Ok(Some(record_group_from_records(user_key, records)))
829 }
830}
831
832fn record_group_from_records(user_key: Vec<u8>, mut records: Vec<ScanRecord>) -> RecordGroup {
833 let first = records
834 .pop()
835 .expect("memtable cursor only builds groups after finding a record");
836 let (first, rest) = sort_group_records(first, records);
837 RecordGroup {
838 user_key,
839 first,
840 rest,
841 }
842}
843
844pub(crate) fn sort_group_records(
845 first: ScanRecord,
846 mut rest: Vec<ScanRecord>,
847) -> (ScanRecord, Vec<ScanRecord>) {
848 if rest.is_empty() {
849 return (first, rest);
850 }
851
852 rest.push(first);
853 rest.sort_by(|left, right| left.0.cmp(&right.0));
854 let mut records = rest.into_iter();
855 let first = records
856 .next()
857 .expect("non-empty record group must keep a first record");
858 let rest = records.collect();
859 (first, rest)
860}
861
862fn memtable_scan_bounds(selector: &ScanSelector) -> (Bound<InternalKey>, Bound<InternalKey>) {
863 match selector {
864 ScanSelector::Range(range) => (
865 memtable_start_bound(&range.start),
866 memtable_end_bound(&range.end),
867 ),
868 ScanSelector::Prefix(prefix) => {
869 let start = Bound::Included(first_internal_key_for_user(prefix));
870 let end = prefix_successor(prefix).map_or(Bound::Unbounded, |end| {
871 Bound::Excluded(first_internal_key_for_user(&end))
872 });
873 (start, end)
874 }
875 }
876}
877
878fn memtable_start_bound(start: &Bound<Vec<u8>>) -> Bound<InternalKey> {
879 match start {
880 Bound::Included(key) => Bound::Included(first_internal_key_for_user(key)),
881 Bound::Excluded(key) => Bound::Excluded(last_internal_key_for_user(key)),
882 Bound::Unbounded => Bound::Unbounded,
883 }
884}
885
886fn memtable_end_bound(end: &Bound<Vec<u8>>) -> Bound<InternalKey> {
887 match end {
888 Bound::Included(key) => Bound::Included(last_internal_key_for_user(key)),
889 Bound::Excluded(key) => Bound::Excluded(first_internal_key_for_user(key)),
890 Bound::Unbounded => Bound::Unbounded,
891 }
892}
893
894#[derive(Debug, Clone, PartialEq, Eq)]
895pub(crate) enum ScanSelector {
896 Range(KeyRange),
897 Prefix(Vec<u8>),
898}
899
900impl ScanSelector {
901 pub(crate) fn forward_key_state(&self, key: &[u8]) -> ForwardKeyState {
902 match self {
903 Self::Range(range) => {
904 if key_is_before_start(key, &range.start) {
905 ForwardKeyState::Before
906 } else if key_is_after_end(key, &range.end) {
907 ForwardKeyState::After
908 } else {
909 ForwardKeyState::Match
910 }
911 }
912 Self::Prefix(prefix) => {
913 if key < prefix.as_slice() {
914 ForwardKeyState::Before
915 } else if key.starts_with(prefix) {
916 ForwardKeyState::Match
917 } else {
918 ForwardKeyState::After
919 }
920 }
921 }
922 }
923
924 pub(crate) fn reverse_key_state(&self, key: &[u8]) -> ReverseKeyState {
925 match self {
926 Self::Range(range) => {
927 if key_is_after_end(key, &range.end) {
928 ReverseKeyState::Above
929 } else if key_is_before_start(key, &range.start) {
930 ReverseKeyState::Below
931 } else {
932 ReverseKeyState::Match
933 }
934 }
935 Self::Prefix(prefix) => {
936 if key.starts_with(prefix) {
937 ReverseKeyState::Match
938 } else if key < prefix.as_slice() {
939 ReverseKeyState::Below
940 } else {
941 ReverseKeyState::Above
942 }
943 }
944 }
945 }
946
947 pub(crate) fn prefix(&self) -> Option<&[u8]> {
948 match self {
949 Self::Range(_) => None,
950 Self::Prefix(prefix) => Some(prefix),
951 }
952 }
953}
954
955#[derive(Debug, Clone, Copy, PartialEq, Eq)]
956pub(crate) enum ForwardKeyState {
957 Before,
958 Match,
959 After,
960}
961
962#[derive(Debug, Clone, Copy, PartialEq, Eq)]
963pub(crate) enum ReverseKeyState {
964 Above,
965 Match,
966 Below,
967}
968
969#[derive(Debug, Clone, PartialEq, Eq)]
970pub(crate) struct ScanRangeTombstone {
971 range: KeyRange,
972 sequence: Sequence,
973 batch_index: u32,
974}
975
976impl ScanRangeTombstone {
977 #[must_use]
978 pub(crate) fn new(range: KeyRange, sequence: Sequence, batch_index: u32) -> Self {
979 Self {
980 range,
981 sequence,
982 batch_index,
983 }
984 }
985
986 fn covers_visible_point(
987 &self,
988 key: &[u8],
989 point_sequence: Sequence,
990 point_batch_index: u32,
991 read_sequence: Sequence,
992 ) -> bool {
993 if self.sequence > read_sequence || !key_is_in_range(key, &self.range) {
994 return false;
995 }
996
997 self.sequence > point_sequence
998 || (self.sequence == point_sequence && self.batch_index > point_batch_index)
999 }
1000}
1001
1002impl RangeTombstoneLike for ScanRangeTombstone {
1003 fn range(&self) -> &KeyRange {
1004 &self.range
1005 }
1006}
1007
1008fn range_tombstones_cover(
1009 range_tombstones: &RangeTombstoneIndex<ScanRangeTombstone>,
1010 key: &[u8],
1011 point_sequence: Sequence,
1012 point_batch_index: u32,
1013 read_sequence: Sequence,
1014) -> bool {
1015 range_tombstones.covering_key(key).any(|tombstone| {
1016 tombstone.covers_visible_point(key, point_sequence, point_batch_index, read_sequence)
1017 })
1018}
1019
1020fn lock_poisoned(lock_name: &'static str) -> Error {
1021 Error::Corruption {
1022 message: format!("{lock_name} lock poisoned"),
1023 }
1024}
1025
1026fn lazy_value(
1027 value: Option<ValueRef>,
1028 internal_key: InternalKey,
1029 db_path: Option<&std::path::Path>,
1030 native_storage: Option<NativeFileBackend>,
1031 blob_reads: Option<Arc<BlobReadMetrics>>,
1032 read_pin: Arc<Snapshot>,
1033) -> Result<LazyValue> {
1034 let value = value.ok_or_else(|| Error::Corruption {
1035 message: "put record is missing value bytes".to_owned(),
1036 })?;
1037
1038 match value {
1039 ValueRef::Inline(bytes) => Ok(LazyValue {
1040 inner: LazyValueInner::Inline(bytes),
1041 }),
1042 ValueRef::BlobIndex(_) | ValueRef::Blob { .. } => {
1043 let db_path = db_path.ok_or_else(|| Error::Corruption {
1044 message: "in-memory database cannot read blob value references".to_owned(),
1045 })?;
1046 Ok(LazyValue {
1047 inner: LazyValueInner::Blob {
1048 db_path: db_path.to_path_buf(),
1049 native_storage,
1050 internal_key,
1051 value,
1052 blob_reads,
1053 _read_pin: read_pin,
1054 },
1055 })
1056 }
1057 }
1058}
1059
1060pub(crate) fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
1061 let mut end = prefix.to_vec();
1062 while let Some(last) = end.last_mut() {
1063 if *last == u8::MAX {
1064 end.pop();
1065 } else {
1066 *last += 1;
1067 return Some(end);
1068 }
1069 }
1070
1071 None
1072}
1073
1074fn key_is_before_start(key: &[u8], start: &Bound<Vec<u8>>) -> bool {
1075 match start {
1076 Bound::Included(start) => key < start.as_slice(),
1077 Bound::Excluded(start) => key <= start.as_slice(),
1078 Bound::Unbounded => false,
1079 }
1080}
1081
1082fn key_is_after_end(key: &[u8], end: &Bound<Vec<u8>>) -> bool {
1083 match end {
1084 Bound::Included(end) => key > end.as_slice(),
1085 Bound::Excluded(end) => key >= end.as_slice(),
1086 Bound::Unbounded => false,
1087 }
1088}
1089
1090fn key_is_in_range(key: &[u8], range: &KeyRange) -> bool {
1091 !key_is_before_start(key, &range.start) && !key_is_after_end(key, &range.end)
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096 use std::{collections::BinaryHeap, sync::Arc};
1097
1098 use super::{Direction, Iter, RecordSource, ScanSelector, ScanSourceInput, SourceHeapEntry};
1099 use crate::{
1100 blob::ValueRef,
1101 internal_key::{InternalKey, ValueKind},
1102 memtable::Memtable,
1103 snapshot::Snapshot,
1104 types::{KeyRange, Sequence},
1105 };
1106
1107 #[test]
1108 fn source_heap_orders_forward_and_reverse_keys() {
1109 let mut forward = BinaryHeap::new();
1110 forward.push(heap_entry(b"c", 0, Direction::Forward));
1111 forward.push(heap_entry(b"a", 1, Direction::Forward));
1112 forward.push(heap_entry(b"b", 2, Direction::Forward));
1113
1114 assert_eq!(forward.pop().expect("entry").user_key, b"a");
1115 assert_eq!(forward.pop().expect("entry").user_key, b"b");
1116 assert_eq!(forward.pop().expect("entry").user_key, b"c");
1117
1118 let mut reverse = BinaryHeap::new();
1119 reverse.push(heap_entry(b"c", 0, Direction::Reverse));
1120 reverse.push(heap_entry(b"a", 1, Direction::Reverse));
1121 reverse.push(heap_entry(b"b", 2, Direction::Reverse));
1122
1123 assert_eq!(reverse.pop().expect("entry").user_key, b"c");
1124 assert_eq!(reverse.pop().expect("entry").user_key, b"b");
1125 assert_eq!(reverse.pop().expect("entry").user_key, b"a");
1126 }
1127
1128 #[test]
1129 fn lazy_scan_heap_merge_preserves_forward_and_reverse_order() {
1130 let left = memtable_with(&[(b"a", b"a1"), (b"c", b"c1")]);
1131 let right = memtable_with(&[(b"b", b"b1"), (b"d", b"d1")]);
1132
1133 let forward = Iter::from_sources(
1134 Direction::Forward,
1135 ScanSourceInput {
1136 read_sequence: Sequence::new(4),
1137 read_pin: Snapshot::new(Sequence::new(4)),
1138 db_path: None,
1139 native_storage: None,
1140 blob_reads: None,
1141 range_tombstones: Vec::new(),
1142 sources: vec![
1143 RecordSource::memtable(
1144 Arc::clone(&left),
1145 ScanSelector::Range(KeyRange::all()),
1146 Direction::Forward,
1147 ),
1148 RecordSource::memtable(
1149 Arc::clone(&right),
1150 ScanSelector::Range(KeyRange::all()),
1151 Direction::Forward,
1152 ),
1153 ],
1154 },
1155 );
1156 assert_eq!(
1157 collect_keys(forward),
1158 vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec(), b"d".to_vec()]
1159 );
1160
1161 let reverse = Iter::from_sources(
1162 Direction::Reverse,
1163 ScanSourceInput {
1164 read_sequence: Sequence::new(4),
1165 read_pin: Snapshot::new(Sequence::new(4)),
1166 db_path: None,
1167 native_storage: None,
1168 blob_reads: None,
1169 range_tombstones: Vec::new(),
1170 sources: vec![
1171 RecordSource::memtable(
1172 left,
1173 ScanSelector::Range(KeyRange::all()),
1174 Direction::Reverse,
1175 ),
1176 RecordSource::memtable(
1177 right,
1178 ScanSelector::Range(KeyRange::all()),
1179 Direction::Reverse,
1180 ),
1181 ],
1182 },
1183 );
1184 assert_eq!(
1185 collect_keys(reverse),
1186 vec![b"d".to_vec(), b"c".to_vec(), b"b".to_vec(), b"a".to_vec()]
1187 );
1188 }
1189
1190 fn heap_entry(user_key: &[u8], source_index: usize, direction: Direction) -> SourceHeapEntry {
1191 SourceHeapEntry {
1192 user_key: user_key.to_vec(),
1193 source_index,
1194 direction,
1195 }
1196 }
1197
1198 fn memtable_with(records: &[(&[u8], &[u8])]) -> Arc<Memtable> {
1199 let memtable = Arc::new(Memtable::default());
1200 {
1201 let mut entries = memtable.write_entries().expect("memtable lock");
1202 for (index, (key, value)) in records.iter().enumerate() {
1203 entries.insert(
1204 InternalKey::new(
1205 *key,
1206 Sequence::new(u64::try_from(index + 1).expect("test sequence fits")),
1207 ValueKind::Put,
1208 0,
1209 ),
1210 Some(ValueRef::Inline((*value).to_vec())),
1211 );
1212 }
1213 }
1214 memtable
1215 }
1216
1217 fn collect_keys(iter: Iter) -> Vec<Vec<u8>> {
1218 iter.map(|item| item.expect("iterator item").key)
1219 .collect::<Vec<_>>()
1220 }
1221}