sov_schema_db/
iterator.rs

1use std::iter::FusedIterator;
2use std::marker::PhantomData;
3
4use anyhow::Result;
5
6use crate::metrics::{SCHEMADB_ITER_BYTES, SCHEMADB_ITER_LATENCY_SECONDS};
7use crate::schema::{KeyDecoder, Schema, ValueCodec};
8
9/// This defines a type that can be used to seek a [`SchemaIterator`], via
10/// interfaces like [`SchemaIterator::seek`]. Mind you, not all
11/// [`KeyEncoder`](crate::schema::KeyEncoder)s shall be [`SeekKeyEncoder`]s, and
12/// vice versa. E.g.:
13///
14/// - Some key types don't use an encoding that results in sensible
15/// seeking behavior under lexicographic ordering (what RocksDB uses by
16/// default), which means you shouldn't implement [`SeekKeyEncoder`] at all.
17/// - Other key types might maintain full lexicographic order, which means the
18/// original key type can also be [`SeekKeyEncoder`].
19/// - Other key types may be composite, and the first field alone may be
20/// a good candidate for [`SeekKeyEncoder`].
21pub trait SeekKeyEncoder<S: Schema + ?Sized>: Sized {
22    /// Converts `self` to bytes which is used to seek the underlying raw
23    /// iterator.
24    ///
25    /// If `self` is also a [`KeyEncoder`](crate::schema::KeyEncoder), then
26    /// [`SeekKeyEncoder::encode_seek_key`] MUST return the same bytes as
27    /// [`KeyEncoder::encode_key`](crate::schema::KeyEncoder::encode_key).
28    fn encode_seek_key(&self) -> crate::schema::Result<Vec<u8>>;
29}
30
31pub(crate) enum ScanDirection {
32    Forward,
33    Backward,
34}
35
36/// DB Iterator parameterized on [`Schema`] that seeks with [`Schema::Key`] and yields
37/// [`Schema::Key`] and [`Schema::Value`] pairs.
38pub struct SchemaIterator<'a, S> {
39    db_iter: rocksdb::DBRawIterator<'a>,
40    direction: ScanDirection,
41    phantom: PhantomData<S>,
42}
43
44impl<'a, S> SchemaIterator<'a, S>
45where
46    S: Schema,
47{
48    pub(crate) fn new(db_iter: rocksdb::DBRawIterator<'a>, direction: ScanDirection) -> Self {
49        SchemaIterator {
50            db_iter,
51            direction,
52            phantom: PhantomData,
53        }
54    }
55
56    /// Seeks to the first key.
57    pub fn seek_to_first(&mut self) {
58        self.db_iter.seek_to_first();
59    }
60
61    /// Seeks to the last key.
62    pub fn seek_to_last(&mut self) {
63        self.db_iter.seek_to_last();
64    }
65
66    /// Seeks to the first key whose binary representation is equal to or greater than that of the
67    /// `seek_key`.
68    pub fn seek(&mut self, seek_key: &impl SeekKeyEncoder<S>) -> Result<()> {
69        let key = seek_key.encode_seek_key()?;
70        self.db_iter.seek(&key);
71        Ok(())
72    }
73
74    /// Seeks to the last key whose binary representation is less than or equal to that of the
75    /// `seek_key`.
76    ///
77    /// See example in [`RocksDB doc`](https://github.com/facebook/rocksdb/wiki/SeekForPrev).
78    pub fn seek_for_prev(&mut self, seek_key: &impl SeekKeyEncoder<S>) -> Result<()> {
79        let key = seek_key.encode_seek_key()?;
80        self.db_iter.seek_for_prev(&key);
81        Ok(())
82    }
83
84    fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
85        let _timer = SCHEMADB_ITER_LATENCY_SECONDS
86            .with_label_values(&[S::COLUMN_FAMILY_NAME])
87            .start_timer();
88
89        if !self.db_iter.valid() {
90            self.db_iter.status()?;
91            return Ok(None);
92        }
93
94        let raw_key = self.db_iter.key().expect("db_iter.key() failed.");
95        let raw_value = self.db_iter.value().expect("db_iter.value() failed.");
96        SCHEMADB_ITER_BYTES
97            .with_label_values(&[S::COLUMN_FAMILY_NAME])
98            .observe((raw_key.len() + raw_value.len()) as f64);
99
100        let key = <S::Key as KeyDecoder<S>>::decode_key(raw_key)?;
101        let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
102
103        match self.direction {
104            ScanDirection::Forward => self.db_iter.next(),
105            ScanDirection::Backward => self.db_iter.prev(),
106        }
107
108        Ok(Some((key, value)))
109    }
110}
111
112impl<'a, S> Iterator for SchemaIterator<'a, S>
113where
114    S: Schema,
115{
116    type Item = Result<(S::Key, S::Value)>;
117
118    fn next(&mut self) -> Option<Self::Item> {
119        self.next_impl().transpose()
120    }
121}
122
123impl<'a, S> FusedIterator for SchemaIterator<'a, S> where S: Schema {}