sov_schema_db/
iterator.rs1use std::iter::FusedIterator;
2use std::marker::PhantomData;
3
4use anyhow::Result;
5
6use crate::metrics::{SCHEMADB_ITER_BYTES, SCHEMADB_ITER_LATENCY_SECONDS};
7use crate::schema::{KeyDecoder, Schema, ValueCodec};
8
9pub trait SeekKeyEncoder<S: Schema + ?Sized>: Sized {
22 fn encode_seek_key(&self) -> crate::schema::Result<Vec<u8>>;
29}
30
31pub(crate) enum ScanDirection {
32 Forward,
33 Backward,
34}
35
36pub struct SchemaIterator<'a, S> {
39 db_iter: rocksdb::DBRawIterator<'a>,
40 direction: ScanDirection,
41 phantom: PhantomData<S>,
42}
43
44impl<'a, S> SchemaIterator<'a, S>
45where
46 S: Schema,
47{
48 pub(crate) fn new(db_iter: rocksdb::DBRawIterator<'a>, direction: ScanDirection) -> Self {
49 SchemaIterator {
50 db_iter,
51 direction,
52 phantom: PhantomData,
53 }
54 }
55
56 pub fn seek_to_first(&mut self) {
58 self.db_iter.seek_to_first();
59 }
60
61 pub fn seek_to_last(&mut self) {
63 self.db_iter.seek_to_last();
64 }
65
66 pub fn seek(&mut self, seek_key: &impl SeekKeyEncoder<S>) -> Result<()> {
69 let key = seek_key.encode_seek_key()?;
70 self.db_iter.seek(&key);
71 Ok(())
72 }
73
74 pub fn seek_for_prev(&mut self, seek_key: &impl SeekKeyEncoder<S>) -> Result<()> {
79 let key = seek_key.encode_seek_key()?;
80 self.db_iter.seek_for_prev(&key);
81 Ok(())
82 }
83
84 fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
85 let _timer = SCHEMADB_ITER_LATENCY_SECONDS
86 .with_label_values(&[S::COLUMN_FAMILY_NAME])
87 .start_timer();
88
89 if !self.db_iter.valid() {
90 self.db_iter.status()?;
91 return Ok(None);
92 }
93
94 let raw_key = self.db_iter.key().expect("db_iter.key() failed.");
95 let raw_value = self.db_iter.value().expect("db_iter.value() failed.");
96 SCHEMADB_ITER_BYTES
97 .with_label_values(&[S::COLUMN_FAMILY_NAME])
98 .observe((raw_key.len() + raw_value.len()) as f64);
99
100 let key = <S::Key as KeyDecoder<S>>::decode_key(raw_key)?;
101 let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
102
103 match self.direction {
104 ScanDirection::Forward => self.db_iter.next(),
105 ScanDirection::Backward => self.db_iter.prev(),
106 }
107
108 Ok(Some((key, value)))
109 }
110}
111
112impl<'a, S> Iterator for SchemaIterator<'a, S>
113where
114 S: Schema,
115{
116 type Item = Result<(S::Key, S::Value)>;
117
118 fn next(&mut self) -> Option<Self::Item> {
119 self.next_impl().transpose()
120 }
121}
122
123impl<'a, S> FusedIterator for SchemaIterator<'a, S> where S: Schema {}