Skip to main content

trine_kv/
iterator.rs

1use std::{
2    cmp::Ordering as CmpOrdering, collections::BinaryHeap, ops::Bound, path::PathBuf, sync::Arc,
3};
4
5use crate::{
6    blob::ValueRef,
7    error::{Error, Result},
8    internal_key::{
9        InternalKey, ValueKind, first_internal_key_for_user, last_internal_key_for_user,
10    },
11    memtable::Memtable,
12    range_tombstone::{RangeTombstoneIndex, RangeTombstoneLike},
13    snapshot::Snapshot,
14    stats::BlobReadMetrics,
15    storage::NativeFileBackend,
16    table::TablePointCursor,
17    types::{KeyRange, KeyValue, Sequence, Value},
18};
19
20/// Scan direction for range and prefix iterators.
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22pub enum Direction {
23    /// Visit keys in ascending byte order.
24    #[default]
25    Forward,
26    /// Visit keys in descending byte order.
27    Reverse,
28}
29
30/// Eager iterator that returns owned key/value rows.
31#[derive(Debug, Clone)]
32pub struct Iter {
33    direction: Direction,
34    inner: IterInner,
35}
36
37/// Iterator that can defer reading large blob values until requested.
38#[derive(Debug, Clone)]
39pub struct LazyIter {
40    direction: Direction,
41    scan: LazyScan,
42}
43
44/// Row returned by `LazyIter`.
45#[derive(Debug, Clone)]
46pub struct LazyKeyValue {
47    /// User key bytes.
48    pub key: Vec<u8>,
49    /// Value handle that may defer reading blob bytes.
50    pub value: LazyValue,
51}
52
53/// Value returned by `LazyIter`.
54#[derive(Debug, Clone)]
55pub struct LazyValue {
56    inner: LazyValueInner,
57}
58
59#[derive(Debug, Clone)]
60enum LazyValueInner {
61    Inline(Vec<u8>),
62    Blob {
63        db_path: PathBuf,
64        native_storage: Option<NativeFileBackend>,
65        internal_key: InternalKey,
66        value: ValueRef,
67        blob_reads: Option<Arc<BlobReadMetrics>>,
68        _read_pin: Arc<Snapshot>,
69    },
70}
71
72#[derive(Debug, Clone)]
73enum IterInner {
74    Items(std::vec::IntoIter<KeyValue>),
75    Lazy(LazyScan),
76}
77
78#[derive(Debug, Clone)]
79pub(crate) struct ScanSourceInput {
80    pub(crate) read_sequence: Sequence,
81    pub(crate) read_pin: Snapshot,
82    pub(crate) db_path: Option<PathBuf>,
83    pub(crate) native_storage: Option<NativeFileBackend>,
84    pub(crate) blob_reads: Option<Arc<BlobReadMetrics>>,
85    pub(crate) range_tombstones: Vec<ScanRangeTombstone>,
86    pub(crate) sources: Vec<RecordSource>,
87}
88
89impl Iter {
90    /// Creates an empty iterator with the requested direction.
91    #[must_use]
92    pub fn empty(direction: Direction) -> Self {
93        Self::from_items(Vec::new(), direction)
94    }
95
96    /// Creates an iterator from already-owned rows.
97    #[must_use]
98    pub fn from_items(mut items: Vec<KeyValue>, direction: Direction) -> Self {
99        if direction == Direction::Reverse {
100            items.reverse();
101        }
102
103        Self {
104            direction,
105            inner: IterInner::Items(items.into_iter()),
106        }
107    }
108
109    pub(crate) fn from_sources(direction: Direction, input: ScanSourceInput) -> Self {
110        Self {
111            direction,
112            inner: IterInner::Lazy(LazyScan {
113                direction,
114                read_sequence: input.read_sequence,
115                read_pin: Arc::new(input.read_pin),
116                db_path: input.db_path,
117                native_storage: input.native_storage,
118                blob_reads: input.blob_reads,
119                range_tombstones: RangeTombstoneIndex::new(input.range_tombstones),
120                sources: input.sources,
121                source_heap: BinaryHeap::new(),
122                source_heap_initialized: false,
123            }),
124        }
125    }
126
127    /// Returns this iterator's scan direction.
128    #[must_use]
129    pub const fn direction(&self) -> Direction {
130        self.direction
131    }
132}
133
134impl LazyIter {
135    pub(crate) fn from_sources(direction: Direction, input: ScanSourceInput) -> Self {
136        Self {
137            direction,
138            scan: LazyScan {
139                direction,
140                read_sequence: input.read_sequence,
141                read_pin: Arc::new(input.read_pin),
142                db_path: input.db_path,
143                native_storage: input.native_storage,
144                blob_reads: input.blob_reads,
145                range_tombstones: RangeTombstoneIndex::new(input.range_tombstones),
146                sources: input.sources,
147                source_heap: BinaryHeap::new(),
148                source_heap_initialized: false,
149            },
150        }
151    }
152
153    /// Returns this iterator's scan direction.
154    #[must_use]
155    pub const fn direction(&self) -> Direction {
156        self.direction
157    }
158}
159
160impl LazyKeyValue {
161    /// Reads any deferred value bytes synchronously and returns an owned row.
162    pub fn into_key_value_sync(self) -> Result<KeyValue> {
163        Ok(KeyValue::new(self.key, self.value.into_value_sync()?))
164    }
165
166    /// Reads any deferred value bytes asynchronously and returns an owned row.
167    pub async fn into_key_value(self) -> Result<KeyValue> {
168        let value = self.value.into_value().await?;
169        Ok(KeyValue::new(self.key, value))
170    }
171}
172
173impl LazyValue {
174    /// Returns `true` if the value bytes are already inline.
175    #[must_use]
176    pub fn is_inline(&self) -> bool {
177        matches!(self.inner, LazyValueInner::Inline(_))
178    }
179
180    /// Reads the value bytes synchronously without consuming this handle.
181    pub fn read_sync(&self) -> Result<Value> {
182        match &self.inner {
183            LazyValueInner::Inline(bytes) => Ok(bytes.clone()),
184            LazyValueInner::Blob {
185                db_path,
186                native_storage: _,
187                internal_key,
188                value,
189                blob_reads,
190                _read_pin: _,
191            } => {
192                let bytes =
193                    crate::blob::read_value_for_internal_key(db_path, value, Some(internal_key))?;
194                if let Some(blob_reads) = blob_reads {
195                    blob_reads.record(bytes.len() as u64);
196                }
197                Ok(bytes)
198            }
199        }
200    }
201
202    /// Reads the value bytes synchronously and consumes this handle.
203    pub fn into_value_sync(self) -> Result<Value> {
204        match self.inner {
205            LazyValueInner::Inline(bytes) => Ok(bytes),
206            LazyValueInner::Blob {
207                db_path,
208                native_storage: _,
209                internal_key,
210                value,
211                blob_reads,
212                _read_pin: _,
213            } => {
214                let bytes = crate::blob::read_value_for_internal_key(
215                    &db_path,
216                    &value,
217                    Some(&internal_key),
218                )?;
219                if let Some(blob_reads) = blob_reads {
220                    blob_reads.record(bytes.len() as u64);
221                }
222                Ok(bytes)
223            }
224        }
225    }
226
227    /// Reads the value bytes asynchronously without consuming this handle.
228    pub async fn read(&self) -> Result<Value> {
229        match &self.inner {
230            LazyValueInner::Inline(bytes) => Ok(bytes.clone()),
231            LazyValueInner::Blob {
232                db_path,
233                native_storage: Some(native_storage),
234                internal_key,
235                value,
236                blob_reads,
237                _read_pin: _,
238            } => {
239                let bytes = crate::blob::read_value_for_internal_key_with_backend_async(
240                    native_storage,
241                    db_path,
242                    value,
243                    Some(internal_key),
244                )
245                .await?;
246                if let Some(blob_reads) = blob_reads {
247                    blob_reads.record(bytes.len() as u64);
248                }
249                Ok(bytes)
250            }
251            LazyValueInner::Blob {
252                native_storage: None,
253                ..
254            } => self.read_sync(),
255        }
256    }
257
258    /// Reads the value bytes asynchronously and consumes this handle.
259    pub async fn into_value(self) -> Result<Value> {
260        match self.inner {
261            LazyValueInner::Inline(bytes) => Ok(bytes),
262            LazyValueInner::Blob {
263                db_path,
264                native_storage: Some(native_storage),
265                internal_key,
266                value,
267                blob_reads,
268                _read_pin: _,
269            } => {
270                let bytes = crate::blob::read_value_for_internal_key_with_backend_async(
271                    &native_storage,
272                    &db_path,
273                    &value,
274                    Some(&internal_key),
275                )
276                .await?;
277                if let Some(blob_reads) = blob_reads {
278                    blob_reads.record(bytes.len() as u64);
279                }
280                Ok(bytes)
281            }
282            LazyValueInner::Blob {
283                db_path,
284                native_storage: None,
285                internal_key,
286                value,
287                blob_reads,
288                _read_pin: _,
289            } => {
290                let bytes = crate::blob::read_value_for_internal_key(
291                    &db_path,
292                    &value,
293                    Some(&internal_key),
294                )?;
295                if let Some(blob_reads) = blob_reads {
296                    blob_reads.record(bytes.len() as u64);
297                }
298                Ok(bytes)
299            }
300        }
301    }
302}
303
304impl Iter {
305    /// Returns the next owned row, reading deferred sources asynchronously when needed.
306    pub async fn next(&mut self) -> Result<Option<KeyValue>> {
307        match &mut self.inner {
308            IterInner::Items(items) => Ok(items.next()),
309            IterInner::Lazy(scan) => scan.next_async().await,
310        }
311    }
312
313    /// Returns the next owned row using the synchronous iterator path.
314    pub fn next_sync(&mut self) -> Option<Result<KeyValue>> {
315        Iterator::next(self)
316    }
317}
318
319impl LazyIter {
320    /// Returns the next lazy row, reading deferred metadata asynchronously when needed.
321    pub async fn next(&mut self) -> Result<Option<LazyKeyValue>> {
322        self.scan.next_lazy_async().await
323    }
324
325    /// Returns the next lazy row using the synchronous iterator path.
326    pub fn next_sync(&mut self) -> Option<Result<LazyKeyValue>> {
327        Iterator::next(self)
328    }
329}
330
331impl Iterator for Iter {
332    type Item = Result<KeyValue>;
333
334    fn next(&mut self) -> Option<Self::Item> {
335        match &mut self.inner {
336            IterInner::Items(items) => items.next().map(Ok),
337            IterInner::Lazy(scan) => scan.next(),
338        }
339    }
340}
341
342impl Iterator for LazyIter {
343    type Item = Result<LazyKeyValue>;
344
345    fn next(&mut self) -> Option<Self::Item> {
346        self.scan.next_lazy()
347    }
348}
349
350#[derive(Debug, Clone)]
351struct LazyScan {
352    direction: Direction,
353    read_sequence: Sequence,
354    read_pin: Arc<Snapshot>,
355    db_path: Option<PathBuf>,
356    native_storage: Option<NativeFileBackend>,
357    blob_reads: Option<Arc<BlobReadMetrics>>,
358    range_tombstones: RangeTombstoneIndex<ScanRangeTombstone>,
359    sources: Vec<RecordSource>,
360    source_heap: BinaryHeap<SourceHeapEntry>,
361    source_heap_initialized: bool,
362}
363
364impl LazyScan {
365    fn next(&mut self) -> Option<Result<KeyValue>> {
366        self.next_lazy()
367            .map(|item| item.and_then(LazyKeyValue::into_key_value_sync))
368    }
369
370    async fn next_async(&mut self) -> Result<Option<KeyValue>> {
371        let Some(item) = self.next_lazy_async().await? else {
372            return Ok(None);
373        };
374        item.into_key_value().await.map(Some)
375    }
376
377    fn next_lazy(&mut self) -> Option<Result<LazyKeyValue>> {
378        if !self.source_heap_initialized {
379            if let Err(error) = self.initialize_source_heap() {
380                return Some(Err(error));
381            }
382        }
383
384        loop {
385            let entry = self.source_heap.pop()?;
386            let user_key = entry.user_key;
387            let mut source_indices = vec![entry.source_index];
388            while self
389                .source_heap
390                .peek()
391                .is_some_and(|entry| entry.user_key == user_key)
392            {
393                let entry = self
394                    .source_heap
395                    .pop()
396                    .expect("heap peek promised another source entry");
397                source_indices.push(entry.source_index);
398            }
399
400            let mut first_record = None;
401            let mut rest_records = Vec::new();
402
403            for source_index in source_indices {
404                match self.sources[source_index].take_current_group() {
405                    Ok(Some(group)) => {
406                        push_group_records(&mut first_record, &mut rest_records, group);
407                    }
408                    Ok(None) => {}
409                    Err(error) => return Some(Err(error)),
410                }
411                if let Err(error) = self.push_source_heap_entry(source_index) {
412                    return Some(Err(error));
413                }
414            }
415
416            let Some(first_record) = first_record else {
417                continue;
418            };
419            match self.visible_lazy_item_from_records(first_record, rest_records) {
420                Ok(Some(item)) => return Some(Ok(item)),
421                Ok(None) => {}
422                Err(error) => return Some(Err(error)),
423            }
424        }
425    }
426
427    async fn next_lazy_async(&mut self) -> Result<Option<LazyKeyValue>> {
428        if !self.source_heap_initialized {
429            self.initialize_source_heap_async().await?;
430        }
431
432        loop {
433            let Some(entry) = self.source_heap.pop() else {
434                return Ok(None);
435            };
436            let user_key = entry.user_key;
437            let mut source_indices = vec![entry.source_index];
438            while self
439                .source_heap
440                .peek()
441                .is_some_and(|entry| entry.user_key == user_key)
442            {
443                let entry = self
444                    .source_heap
445                    .pop()
446                    .expect("heap peek promised another source entry");
447                source_indices.push(entry.source_index);
448            }
449
450            let mut first_record = None;
451            let mut rest_records = Vec::new();
452
453            for source_index in source_indices {
454                if let Some(group) = self.sources[source_index]
455                    .take_current_group_async()
456                    .await?
457                {
458                    push_group_records(&mut first_record, &mut rest_records, group);
459                }
460                self.push_source_heap_entry_async(source_index).await?;
461            }
462
463            let Some(first_record) = first_record else {
464                continue;
465            };
466            if let Some(item) = self.visible_lazy_item_from_records(first_record, rest_records)? {
467                return Ok(Some(item));
468            }
469        }
470    }
471
472    fn initialize_source_heap(&mut self) -> Result<()> {
473        for source_index in 0..self.sources.len() {
474            self.push_source_heap_entry(source_index)?;
475        }
476        self.source_heap_initialized = true;
477        Ok(())
478    }
479
480    async fn initialize_source_heap_async(&mut self) -> Result<()> {
481        for source_index in 0..self.sources.len() {
482            self.push_source_heap_entry_async(source_index).await?;
483        }
484        self.source_heap_initialized = true;
485        Ok(())
486    }
487
488    fn push_source_heap_entry(&mut self, source_index: usize) -> Result<()> {
489        let Some(user_key) = self.sources[source_index]
490            .current_key()?
491            .map(<[u8]>::to_vec)
492        else {
493            return Ok(());
494        };
495        self.source_heap.push(SourceHeapEntry {
496            user_key,
497            source_index,
498            direction: self.direction,
499        });
500        Ok(())
501    }
502
503    async fn push_source_heap_entry_async(&mut self, source_index: usize) -> Result<()> {
504        let Some(user_key) = self.sources[source_index].current_user_key_async().await? else {
505            return Ok(());
506        };
507        self.source_heap.push(SourceHeapEntry {
508            user_key,
509            source_index,
510            direction: self.direction,
511        });
512        Ok(())
513    }
514
515    fn visible_lazy_item_from_records(
516        &self,
517        first_record: ScanRecord,
518        mut rest_records: Vec<ScanRecord>,
519    ) -> Result<Option<LazyKeyValue>> {
520        if rest_records.is_empty() {
521            return self.visible_lazy_item_from_sorted_records(std::iter::once(first_record));
522        }
523
524        rest_records.push(first_record);
525        rest_records.sort_by(|left, right| left.0.cmp(&right.0));
526
527        self.visible_lazy_item_from_sorted_records(rest_records)
528    }
529
530    fn visible_lazy_item_from_sorted_records(
531        &self,
532        records: impl IntoIterator<Item = ScanRecord>,
533    ) -> Result<Option<LazyKeyValue>> {
534        for (internal_key, value) in records {
535            if internal_key.sequence() > self.read_sequence {
536                continue;
537            }
538
539            match internal_key.kind() {
540                ValueKind::Put => {
541                    if range_tombstones_cover(
542                        &self.range_tombstones,
543                        internal_key.user_key(),
544                        internal_key.sequence(),
545                        internal_key.batch_index(),
546                        self.read_sequence,
547                    ) {
548                        return Ok(None);
549                    }
550
551                    let key = internal_key.user_key().to_vec();
552                    let value = lazy_value(
553                        value,
554                        internal_key,
555                        self.db_path.as_deref(),
556                        self.native_storage.clone(),
557                        self.blob_reads.clone(),
558                        Arc::clone(&self.read_pin),
559                    )?;
560                    return Ok(Some(LazyKeyValue { key, value }));
561                }
562                ValueKind::PointDelete => return Ok(None),
563                ValueKind::RangeDelete => {}
564            }
565        }
566
567        Ok(None)
568    }
569}
570
571#[derive(Debug, Clone, PartialEq, Eq)]
572struct SourceHeapEntry {
573    user_key: Vec<u8>,
574    source_index: usize,
575    direction: Direction,
576}
577
578impl Ord for SourceHeapEntry {
579    fn cmp(&self, other: &Self) -> CmpOrdering {
580        debug_assert_eq!(self.direction, other.direction);
581        match compare_scan_keys(&self.user_key, &other.user_key, self.direction) {
582            CmpOrdering::Less => CmpOrdering::Greater,
583            CmpOrdering::Equal => other.source_index.cmp(&self.source_index),
584            CmpOrdering::Greater => CmpOrdering::Less,
585        }
586    }
587}
588
589impl PartialOrd for SourceHeapEntry {
590    fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
591        Some(self.cmp(other))
592    }
593}
594
595fn push_group_records(
596    first_record: &mut Option<ScanRecord>,
597    rest_records: &mut Vec<ScanRecord>,
598    group: RecordGroup,
599) {
600    if let Some(previous_first) = first_record.take() {
601        rest_records.push(previous_first);
602    }
603    *first_record = Some(group.first);
604    rest_records.extend(group.rest);
605}
606
607fn compare_scan_keys(left: &[u8], right: &[u8], direction: Direction) -> CmpOrdering {
608    match direction {
609        Direction::Forward => left.cmp(right),
610        Direction::Reverse => right.cmp(left),
611    }
612}
613
614pub(crate) type ScanRecord = (InternalKey, Option<ValueRef>);
615
616#[derive(Debug, Clone)]
617pub(crate) struct RecordGroup {
618    pub(crate) user_key: Vec<u8>,
619    pub(crate) first: ScanRecord,
620    pub(crate) rest: Vec<ScanRecord>,
621}
622
623#[derive(Debug, Clone)]
624pub(crate) struct RecordSource {
625    cursor: SourceCursor,
626    current: Option<RecordGroup>,
627}
628
629impl RecordSource {
630    pub(crate) fn memtable(
631        memtable: Arc<Memtable>,
632        selector: ScanSelector,
633        direction: Direction,
634    ) -> Self {
635        Self {
636            cursor: SourceCursor::Memtable(MemtableCursor::new(memtable, selector, direction)),
637            current: None,
638        }
639    }
640
641    pub(crate) fn table(cursor: TablePointCursor) -> Self {
642        Self {
643            cursor: SourceCursor::Table(cursor),
644            current: None,
645        }
646    }
647
648    fn current_key(&mut self) -> Result<Option<&[u8]>> {
649        self.ensure_current()?;
650        Ok(self.current.as_ref().map(|group| group.user_key.as_slice()))
651    }
652
653    fn take_current_group(&mut self) -> Result<Option<RecordGroup>> {
654        self.ensure_current()?;
655        Ok(self.current.take())
656    }
657
658    async fn current_user_key_async(&mut self) -> Result<Option<Vec<u8>>> {
659        self.ensure_current_async().await?;
660        Ok(self.current.as_ref().map(|group| group.user_key.clone()))
661    }
662
663    async fn take_current_group_async(&mut self) -> Result<Option<RecordGroup>> {
664        self.ensure_current_async().await?;
665        Ok(self.current.take())
666    }
667
668    fn ensure_current(&mut self) -> Result<()> {
669        if self.current.is_none() {
670            self.current = self.cursor.next_group()?;
671        }
672        Ok(())
673    }
674
675    async fn ensure_current_async(&mut self) -> Result<()> {
676        if self.current.is_none() {
677            self.current = self.cursor.next_group_async().await?;
678        }
679        Ok(())
680    }
681}
682
683#[derive(Debug, Clone)]
684enum SourceCursor {
685    Memtable(MemtableCursor),
686    Table(TablePointCursor),
687}
688
689impl SourceCursor {
690    fn next_group(&mut self) -> Result<Option<RecordGroup>> {
691        match self {
692            Self::Memtable(cursor) => cursor.next_group(),
693            Self::Table(cursor) => cursor.next_group(),
694        }
695    }
696
697    async fn next_group_async(&mut self) -> Result<Option<RecordGroup>> {
698        match self {
699            Self::Memtable(cursor) => cursor.next_group_async().await,
700            Self::Table(cursor) => cursor.next_group_async().await,
701        }
702    }
703}
704
705#[derive(Debug, Clone)]
706struct MemtableCursor {
707    // The cursor keeps the memtable handle that was active when the scan was
708    // created. A later flush can swap in a fresh active memtable without
709    // changing what this iterator is allowed to see.
710    memtable: Arc<Memtable>,
711    selector: ScanSelector,
712    direction: Direction,
713    lower_bound: Bound<InternalKey>,
714    upper_bound: Bound<InternalKey>,
715    exhausted: bool,
716}
717
718impl MemtableCursor {
719    fn new(memtable: Arc<Memtable>, selector: ScanSelector, direction: Direction) -> Self {
720        let (lower_bound, upper_bound) = memtable_scan_bounds(&selector);
721
722        Self {
723            memtable,
724            selector,
725            direction,
726            lower_bound,
727            upper_bound,
728            exhausted: false,
729        }
730    }
731
732    fn next_group(&mut self) -> Result<Option<RecordGroup>> {
733        match self.direction {
734            Direction::Forward => self.next_group_forward(),
735            Direction::Reverse => self.next_group_reverse(),
736        }
737    }
738
739    // Memtable advancement has no I/O, but it participates in the async scan
740    // chain so mixed memtable/table sources share one awaitable shape.
741    #[allow(clippy::unused_async)]
742    async fn next_group_async(&mut self) -> Result<Option<RecordGroup>> {
743        self.next_group()
744    }
745
746    fn next_group_forward(&mut self) -> Result<Option<RecordGroup>> {
747        if self.exhausted {
748            return Ok(None);
749        }
750
751        let entries = self
752            .memtable
753            .read_entries()
754            .map_err(|_| lock_poisoned("memtable entries"))?;
755        let mut records = Vec::new();
756        let mut group_user_key = None;
757
758        for (internal_key, value) in
759            entries.range((self.lower_bound.clone(), self.upper_bound.clone()))
760        {
761            match self.selector.forward_key_state(internal_key.user_key()) {
762                ForwardKeyState::Before => {}
763                ForwardKeyState::Match => {
764                    let user_key =
765                        group_user_key.get_or_insert_with(|| internal_key.user_key().to_vec());
766                    if internal_key.user_key() == user_key.as_slice() {
767                        records.push((internal_key.clone(), value.clone()));
768                    } else {
769                        break;
770                    }
771                }
772                ForwardKeyState::After => {
773                    self.exhausted = true;
774                    return Ok(None);
775                }
776            }
777        }
778        drop(entries);
779
780        let Some(user_key) = group_user_key else {
781            self.exhausted = true;
782            return Ok(None);
783        };
784        self.lower_bound = Bound::Excluded(last_internal_key_for_user(&user_key));
785        Ok(Some(record_group_from_records(user_key, records)))
786    }
787
788    fn next_group_reverse(&mut self) -> Result<Option<RecordGroup>> {
789        if self.exhausted {
790            return Ok(None);
791        }
792
793        let entries = self
794            .memtable
795            .read_entries()
796            .map_err(|_| lock_poisoned("memtable entries"))?;
797        let mut records = Vec::new();
798        let mut group_user_key = None;
799
800        for (internal_key, value) in entries
801            .range((self.lower_bound.clone(), self.upper_bound.clone()))
802            .rev()
803        {
804            match self.selector.reverse_key_state(internal_key.user_key()) {
805                ReverseKeyState::Above => {}
806                ReverseKeyState::Match => {
807                    let user_key =
808                        group_user_key.get_or_insert_with(|| internal_key.user_key().to_vec());
809                    if internal_key.user_key() == user_key.as_slice() {
810                        records.push((internal_key.clone(), value.clone()));
811                    } else {
812                        break;
813                    }
814                }
815                ReverseKeyState::Below => {
816                    self.exhausted = true;
817                    return Ok(None);
818                }
819            }
820        }
821        drop(entries);
822
823        let Some(user_key) = group_user_key else {
824            self.exhausted = true;
825            return Ok(None);
826        };
827        self.upper_bound = Bound::Excluded(first_internal_key_for_user(&user_key));
828        Ok(Some(record_group_from_records(user_key, records)))
829    }
830}
831
832fn record_group_from_records(user_key: Vec<u8>, mut records: Vec<ScanRecord>) -> RecordGroup {
833    let first = records
834        .pop()
835        .expect("memtable cursor only builds groups after finding a record");
836    let (first, rest) = sort_group_records(first, records);
837    RecordGroup {
838        user_key,
839        first,
840        rest,
841    }
842}
843
844pub(crate) fn sort_group_records(
845    first: ScanRecord,
846    mut rest: Vec<ScanRecord>,
847) -> (ScanRecord, Vec<ScanRecord>) {
848    if rest.is_empty() {
849        return (first, rest);
850    }
851
852    rest.push(first);
853    rest.sort_by(|left, right| left.0.cmp(&right.0));
854    let mut records = rest.into_iter();
855    let first = records
856        .next()
857        .expect("non-empty record group must keep a first record");
858    let rest = records.collect();
859    (first, rest)
860}
861
862fn memtable_scan_bounds(selector: &ScanSelector) -> (Bound<InternalKey>, Bound<InternalKey>) {
863    match selector {
864        ScanSelector::Range(range) => (
865            memtable_start_bound(&range.start),
866            memtable_end_bound(&range.end),
867        ),
868        ScanSelector::Prefix(prefix) => {
869            let start = Bound::Included(first_internal_key_for_user(prefix));
870            let end = prefix_successor(prefix).map_or(Bound::Unbounded, |end| {
871                Bound::Excluded(first_internal_key_for_user(&end))
872            });
873            (start, end)
874        }
875    }
876}
877
878fn memtable_start_bound(start: &Bound<Vec<u8>>) -> Bound<InternalKey> {
879    match start {
880        Bound::Included(key) => Bound::Included(first_internal_key_for_user(key)),
881        Bound::Excluded(key) => Bound::Excluded(last_internal_key_for_user(key)),
882        Bound::Unbounded => Bound::Unbounded,
883    }
884}
885
886fn memtable_end_bound(end: &Bound<Vec<u8>>) -> Bound<InternalKey> {
887    match end {
888        Bound::Included(key) => Bound::Included(last_internal_key_for_user(key)),
889        Bound::Excluded(key) => Bound::Excluded(first_internal_key_for_user(key)),
890        Bound::Unbounded => Bound::Unbounded,
891    }
892}
893
894#[derive(Debug, Clone, PartialEq, Eq)]
895pub(crate) enum ScanSelector {
896    Range(KeyRange),
897    Prefix(Vec<u8>),
898}
899
900impl ScanSelector {
901    pub(crate) fn forward_key_state(&self, key: &[u8]) -> ForwardKeyState {
902        match self {
903            Self::Range(range) => {
904                if key_is_before_start(key, &range.start) {
905                    ForwardKeyState::Before
906                } else if key_is_after_end(key, &range.end) {
907                    ForwardKeyState::After
908                } else {
909                    ForwardKeyState::Match
910                }
911            }
912            Self::Prefix(prefix) => {
913                if key < prefix.as_slice() {
914                    ForwardKeyState::Before
915                } else if key.starts_with(prefix) {
916                    ForwardKeyState::Match
917                } else {
918                    ForwardKeyState::After
919                }
920            }
921        }
922    }
923
924    pub(crate) fn reverse_key_state(&self, key: &[u8]) -> ReverseKeyState {
925        match self {
926            Self::Range(range) => {
927                if key_is_after_end(key, &range.end) {
928                    ReverseKeyState::Above
929                } else if key_is_before_start(key, &range.start) {
930                    ReverseKeyState::Below
931                } else {
932                    ReverseKeyState::Match
933                }
934            }
935            Self::Prefix(prefix) => {
936                if key.starts_with(prefix) {
937                    ReverseKeyState::Match
938                } else if key < prefix.as_slice() {
939                    ReverseKeyState::Below
940                } else {
941                    ReverseKeyState::Above
942                }
943            }
944        }
945    }
946
947    pub(crate) fn prefix(&self) -> Option<&[u8]> {
948        match self {
949            Self::Range(_) => None,
950            Self::Prefix(prefix) => Some(prefix),
951        }
952    }
953}
954
955#[derive(Debug, Clone, Copy, PartialEq, Eq)]
956pub(crate) enum ForwardKeyState {
957    Before,
958    Match,
959    After,
960}
961
962#[derive(Debug, Clone, Copy, PartialEq, Eq)]
963pub(crate) enum ReverseKeyState {
964    Above,
965    Match,
966    Below,
967}
968
969#[derive(Debug, Clone, PartialEq, Eq)]
970pub(crate) struct ScanRangeTombstone {
971    range: KeyRange,
972    sequence: Sequence,
973    batch_index: u32,
974}
975
976impl ScanRangeTombstone {
977    #[must_use]
978    pub(crate) fn new(range: KeyRange, sequence: Sequence, batch_index: u32) -> Self {
979        Self {
980            range,
981            sequence,
982            batch_index,
983        }
984    }
985
986    fn covers_visible_point(
987        &self,
988        key: &[u8],
989        point_sequence: Sequence,
990        point_batch_index: u32,
991        read_sequence: Sequence,
992    ) -> bool {
993        if self.sequence > read_sequence || !key_is_in_range(key, &self.range) {
994            return false;
995        }
996
997        self.sequence > point_sequence
998            || (self.sequence == point_sequence && self.batch_index > point_batch_index)
999    }
1000}
1001
1002impl RangeTombstoneLike for ScanRangeTombstone {
1003    fn range(&self) -> &KeyRange {
1004        &self.range
1005    }
1006}
1007
1008fn range_tombstones_cover(
1009    range_tombstones: &RangeTombstoneIndex<ScanRangeTombstone>,
1010    key: &[u8],
1011    point_sequence: Sequence,
1012    point_batch_index: u32,
1013    read_sequence: Sequence,
1014) -> bool {
1015    range_tombstones.covering_key(key).any(|tombstone| {
1016        tombstone.covers_visible_point(key, point_sequence, point_batch_index, read_sequence)
1017    })
1018}
1019
1020fn lock_poisoned(lock_name: &'static str) -> Error {
1021    Error::Corruption {
1022        message: format!("{lock_name} lock poisoned"),
1023    }
1024}
1025
1026fn lazy_value(
1027    value: Option<ValueRef>,
1028    internal_key: InternalKey,
1029    db_path: Option<&std::path::Path>,
1030    native_storage: Option<NativeFileBackend>,
1031    blob_reads: Option<Arc<BlobReadMetrics>>,
1032    read_pin: Arc<Snapshot>,
1033) -> Result<LazyValue> {
1034    let value = value.ok_or_else(|| Error::Corruption {
1035        message: "put record is missing value bytes".to_owned(),
1036    })?;
1037
1038    match value {
1039        ValueRef::Inline(bytes) => Ok(LazyValue {
1040            inner: LazyValueInner::Inline(bytes),
1041        }),
1042        ValueRef::BlobIndex(_) | ValueRef::Blob { .. } => {
1043            let db_path = db_path.ok_or_else(|| Error::Corruption {
1044                message: "in-memory database cannot read blob value references".to_owned(),
1045            })?;
1046            Ok(LazyValue {
1047                inner: LazyValueInner::Blob {
1048                    db_path: db_path.to_path_buf(),
1049                    native_storage,
1050                    internal_key,
1051                    value,
1052                    blob_reads,
1053                    _read_pin: read_pin,
1054                },
1055            })
1056        }
1057    }
1058}
1059
1060pub(crate) fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
1061    let mut end = prefix.to_vec();
1062    while let Some(last) = end.last_mut() {
1063        if *last == u8::MAX {
1064            end.pop();
1065        } else {
1066            *last += 1;
1067            return Some(end);
1068        }
1069    }
1070
1071    None
1072}
1073
1074fn key_is_before_start(key: &[u8], start: &Bound<Vec<u8>>) -> bool {
1075    match start {
1076        Bound::Included(start) => key < start.as_slice(),
1077        Bound::Excluded(start) => key <= start.as_slice(),
1078        Bound::Unbounded => false,
1079    }
1080}
1081
1082fn key_is_after_end(key: &[u8], end: &Bound<Vec<u8>>) -> bool {
1083    match end {
1084        Bound::Included(end) => key > end.as_slice(),
1085        Bound::Excluded(end) => key >= end.as_slice(),
1086        Bound::Unbounded => false,
1087    }
1088}
1089
1090fn key_is_in_range(key: &[u8], range: &KeyRange) -> bool {
1091    !key_is_before_start(key, &range.start) && !key_is_after_end(key, &range.end)
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096    use std::{collections::BinaryHeap, sync::Arc};
1097
1098    use super::{Direction, Iter, RecordSource, ScanSelector, ScanSourceInput, SourceHeapEntry};
1099    use crate::{
1100        blob::ValueRef,
1101        internal_key::{InternalKey, ValueKind},
1102        memtable::Memtable,
1103        snapshot::Snapshot,
1104        types::{KeyRange, Sequence},
1105    };
1106
1107    #[test]
1108    fn source_heap_orders_forward_and_reverse_keys() {
1109        let mut forward = BinaryHeap::new();
1110        forward.push(heap_entry(b"c", 0, Direction::Forward));
1111        forward.push(heap_entry(b"a", 1, Direction::Forward));
1112        forward.push(heap_entry(b"b", 2, Direction::Forward));
1113
1114        assert_eq!(forward.pop().expect("entry").user_key, b"a");
1115        assert_eq!(forward.pop().expect("entry").user_key, b"b");
1116        assert_eq!(forward.pop().expect("entry").user_key, b"c");
1117
1118        let mut reverse = BinaryHeap::new();
1119        reverse.push(heap_entry(b"c", 0, Direction::Reverse));
1120        reverse.push(heap_entry(b"a", 1, Direction::Reverse));
1121        reverse.push(heap_entry(b"b", 2, Direction::Reverse));
1122
1123        assert_eq!(reverse.pop().expect("entry").user_key, b"c");
1124        assert_eq!(reverse.pop().expect("entry").user_key, b"b");
1125        assert_eq!(reverse.pop().expect("entry").user_key, b"a");
1126    }
1127
1128    #[test]
1129    fn lazy_scan_heap_merge_preserves_forward_and_reverse_order() {
1130        let left = memtable_with(&[(b"a", b"a1"), (b"c", b"c1")]);
1131        let right = memtable_with(&[(b"b", b"b1"), (b"d", b"d1")]);
1132
1133        let forward = Iter::from_sources(
1134            Direction::Forward,
1135            ScanSourceInput {
1136                read_sequence: Sequence::new(4),
1137                read_pin: Snapshot::new(Sequence::new(4)),
1138                db_path: None,
1139                native_storage: None,
1140                blob_reads: None,
1141                range_tombstones: Vec::new(),
1142                sources: vec![
1143                    RecordSource::memtable(
1144                        Arc::clone(&left),
1145                        ScanSelector::Range(KeyRange::all()),
1146                        Direction::Forward,
1147                    ),
1148                    RecordSource::memtable(
1149                        Arc::clone(&right),
1150                        ScanSelector::Range(KeyRange::all()),
1151                        Direction::Forward,
1152                    ),
1153                ],
1154            },
1155        );
1156        assert_eq!(
1157            collect_keys(forward),
1158            vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec(), b"d".to_vec()]
1159        );
1160
1161        let reverse = Iter::from_sources(
1162            Direction::Reverse,
1163            ScanSourceInput {
1164                read_sequence: Sequence::new(4),
1165                read_pin: Snapshot::new(Sequence::new(4)),
1166                db_path: None,
1167                native_storage: None,
1168                blob_reads: None,
1169                range_tombstones: Vec::new(),
1170                sources: vec![
1171                    RecordSource::memtable(
1172                        left,
1173                        ScanSelector::Range(KeyRange::all()),
1174                        Direction::Reverse,
1175                    ),
1176                    RecordSource::memtable(
1177                        right,
1178                        ScanSelector::Range(KeyRange::all()),
1179                        Direction::Reverse,
1180                    ),
1181                ],
1182            },
1183        );
1184        assert_eq!(
1185            collect_keys(reverse),
1186            vec![b"d".to_vec(), b"c".to_vec(), b"b".to_vec(), b"a".to_vec()]
1187        );
1188    }
1189
1190    fn heap_entry(user_key: &[u8], source_index: usize, direction: Direction) -> SourceHeapEntry {
1191        SourceHeapEntry {
1192            user_key: user_key.to_vec(),
1193            source_index,
1194            direction,
1195        }
1196    }
1197
1198    fn memtable_with(records: &[(&[u8], &[u8])]) -> Arc<Memtable> {
1199        let memtable = Arc::new(Memtable::default());
1200        {
1201            let mut entries = memtable.write_entries().expect("memtable lock");
1202            for (index, (key, value)) in records.iter().enumerate() {
1203                entries.insert(
1204                    InternalKey::new(
1205                        *key,
1206                        Sequence::new(u64::try_from(index + 1).expect("test sequence fits")),
1207                        ValueKind::Put,
1208                        0,
1209                    ),
1210                    Some(ValueRef::Inline((*value).to_vec())),
1211                );
1212            }
1213        }
1214        memtable
1215    }
1216
1217    fn collect_keys(iter: Iter) -> Vec<Vec<u8>> {
1218        iter.map(|item| item.expect("iterator item").key)
1219            .collect::<Vec<_>>()
1220    }
1221}