use super::{Block, DataBlock};
use crate::{
CompressionType, InternalValue, SeqNo,
comparator::SharedComparator,
encryption::EncryptionProvider,
table::{block::BlockType, iter::OwnedDataBlockIter},
};
use std::{fs::File, io::BufReader, path::Path, sync::Arc};
pub struct Scanner {
reader: BufReader<File>,
iter: OwnedDataBlockIter,
compression: CompressionType,
block_count: usize,
read_count: usize,
global_seqno: SeqNo,
encryption: Option<Arc<dyn EncryptionProvider>>,
comparator: SharedComparator,
#[cfg(zstd_any)]
zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
}
impl Scanner {
pub fn new(
path: &Path,
block_count: usize,
compression: CompressionType,
global_seqno: SeqNo,
encryption: Option<Arc<dyn EncryptionProvider>>,
#[cfg(zstd_any)] zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
comparator: SharedComparator,
) -> crate::Result<Self> {
let mut reader = BufReader::with_capacity(8 * 4_096, File::open(path)?);
let block = Self::fetch_next_block(
&mut reader,
compression,
encryption.as_deref(),
#[cfg(zstd_any)]
zstd_dictionary.as_deref(),
)?;
let cmp = comparator.clone();
let iter = OwnedDataBlockIter::try_new(block, |b| b.try_iter(cmp))?;
Ok(Self {
reader,
iter,
compression,
block_count,
read_count: 1,
global_seqno,
encryption,
comparator,
#[cfg(zstd_any)]
zstd_dictionary,
})
}
fn fetch_next_block(
reader: &mut BufReader<File>,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
#[cfg(zstd_any)] zstd_dict: Option<&crate::compression::ZstdDictionary>,
) -> crate::Result<DataBlock> {
let block = Block::from_reader(
reader,
compression,
encryption,
#[cfg(zstd_any)]
zstd_dict,
);
match block {
Ok(block) => {
if block.header.block_type != BlockType::Data {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
Ok(DataBlock::new(block))
}
Err(e) => Err(e),
}
}
}
impl Iterator for Scanner {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(mut item) = self.iter.next() {
item.key.seqno += self.global_seqno;
return Some(Ok(item));
}
if self.read_count >= self.block_count {
return None;
}
let block = match Self::fetch_next_block(
&mut self.reader,
self.compression,
self.encryption.as_deref(),
#[cfg(zstd_any)]
self.zstd_dictionary.as_deref(),
) {
Ok(block) => block,
Err(e) => {
self.read_count = self.block_count;
return Some(Err(e));
}
};
let cmp = self.comparator.clone();
match OwnedDataBlockIter::try_new(block, |b| b.try_iter(cmp)) {
Ok(iter) => {
self.iter = iter;
self.read_count += 1;
}
Err(e) => {
self.read_count = self.block_count;
return Some(Err(e));
}
}
}
}
}