use std::iter::FusedIterator;
use std::marker::PhantomData;
use anyhow::Result;
use crate::metrics::{SCHEMADB_ITER_BYTES, SCHEMADB_ITER_LATENCY_SECONDS};
use crate::schema::{KeyDecoder, Schema, ValueCodec};
pub trait SeekKeyEncoder<S: Schema + ?Sized>: Sized {
fn encode_seek_key(&self) -> crate::schema::Result<Vec<u8>>;
}
pub(crate) enum ScanDirection {
Forward,
Backward,
}
pub struct SchemaIterator<'a, S> {
db_iter: rocksdb::DBRawIterator<'a>,
direction: ScanDirection,
phantom: PhantomData<S>,
}
impl<'a, S> SchemaIterator<'a, S>
where
S: Schema,
{
pub(crate) fn new(db_iter: rocksdb::DBRawIterator<'a>, direction: ScanDirection) -> Self {
SchemaIterator {
db_iter,
direction,
phantom: PhantomData,
}
}
pub fn seek_to_first(&mut self) {
self.db_iter.seek_to_first();
}
pub fn seek_to_last(&mut self) {
self.db_iter.seek_to_last();
}
pub fn seek(&mut self, seek_key: &impl SeekKeyEncoder<S>) -> Result<()> {
let key = seek_key.encode_seek_key()?;
self.db_iter.seek(&key);
Ok(())
}
pub fn seek_for_prev(&mut self, seek_key: &impl SeekKeyEncoder<S>) -> Result<()> {
let key = seek_key.encode_seek_key()?;
self.db_iter.seek_for_prev(&key);
Ok(())
}
fn next_impl(&mut self) -> Result<Option<(S::Key, S::Value)>> {
let _timer = SCHEMADB_ITER_LATENCY_SECONDS
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.start_timer();
if !self.db_iter.valid() {
self.db_iter.status()?;
return Ok(None);
}
let raw_key = self.db_iter.key().expect("db_iter.key() failed.");
let raw_value = self.db_iter.value().expect("db_iter.value() failed.");
SCHEMADB_ITER_BYTES
.with_label_values(&[S::COLUMN_FAMILY_NAME])
.observe((raw_key.len() + raw_value.len()) as f64);
let key = <S::Key as KeyDecoder<S>>::decode_key(raw_key)?;
let value = <S::Value as ValueCodec<S>>::decode_value(raw_value)?;
match self.direction {
ScanDirection::Forward => self.db_iter.next(),
ScanDirection::Backward => self.db_iter.prev(),
}
Ok(Some((key, value)))
}
}
impl<'a, S> Iterator for SchemaIterator<'a, S>
where
S: Schema,
{
type Item = Result<(S::Key, S::Value)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_impl().transpose()
}
}
impl<'a, S> FusedIterator for SchemaIterator<'a, S> where S: Schema {}