use crate::block_cache::BlockCache;
use crate::data_file::{DataFile, DataFileType};
use crate::error::Result;
use crate::file::FileManager;
use crate::iterator::{BucketFilterIterator, KvIterator};
use crate::parquet::ParquetIterator;
use crate::sst::{SSTIterator, SSTIteratorOptions};
use std::sync::Arc;
#[derive(Default)]
pub struct IteratorFactoryOptions {
pub sst_options: SSTIteratorOptions,
pub block_cache: Option<BlockCache>,
}
pub fn create_iterator(
file: &DataFile,
file_manager: &Arc<FileManager>,
options: &IteratorFactoryOptions,
) -> Result<Box<dyn for<'a> KvIterator<'a>>> {
match file.file_type {
DataFileType::SSTable => {
let reader = file_manager.open_data_file_reader(file.file_id)?;
let iter = SSTIterator::with_cache_and_file(
Box::new(reader),
file,
options.sst_options.clone(),
options.block_cache.clone(),
)?;
if file.needs_bucket_filter() {
Ok(Box::new(BucketFilterIterator::new(
iter,
file.effective_bucket_range.clone(),
)))
} else {
Ok(Box::new(iter))
}
}
DataFileType::Parquet => {
let reader = file_manager.open_data_file_reader(file.file_id)?;
let iter = ParquetIterator::from_data_file(
Box::new(reader),
file,
options.block_cache.clone(),
)?;
if file.needs_bucket_filter() {
Ok(Box::new(BucketFilterIterator::new(
iter,
file.effective_bucket_range.clone(),
)))
} else {
Ok(Box::new(iter))
}
}
}
}
pub fn make_iterator_factory(
file_manager: Arc<FileManager>,
options: IteratorFactoryOptions,
) -> impl Fn(&DataFile) -> Result<Box<dyn for<'a> KvIterator<'a>>> {
move |file: &DataFile| create_iterator(file, &file_manager, &options)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data_file::{DataFile, DataFileType};
use crate::file::{FileManager, FileSystemRegistry, TrackedFileId};
use crate::metrics_manager::MetricsManager;
use crate::parquet::ParquetWriter;
use crate::sst::row_codec::encode_value;
use crate::r#type::{Column, Value, ValueType};
fn cleanup_test_root(path: &str) {
let _ = std::fs::remove_dir_all(path);
}
#[test]
#[serial_test::serial(file)]
fn test_create_iterator_parquet_with_bucket_filter() {
cleanup_test_root("/tmp/iterator_factory_parquet_filter_test");
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register("file:///tmp/iterator_factory_parquet_filter_test")
.unwrap();
let metrics = Arc::new(MetricsManager::new("iterator-factory-parquet-test"));
let file_manager = Arc::new(FileManager::with_defaults(fs, metrics).unwrap());
let (file_id, writer_file) = file_manager.create_data_file().unwrap();
let mut writer = ParquetWriter::with_options(
writer_file,
crate::parquet::ParquetWriterOptions {
num_columns: 1,
..crate::parquet::ParquetWriterOptions::default()
},
)
.unwrap();
let encoded_v1 = encode_value(
&Value::new(vec![Some(Column::new(ValueType::Put, b"v1".to_vec()))]),
1,
);
let encoded_v2 = encode_value(
&Value::new(vec![Some(Column::new(ValueType::Put, b"v2".to_vec()))]),
1,
);
writer.add(&[1, 0, b'a'], &encoded_v1).unwrap();
writer.add(&[2, 0, b'b'], &encoded_v2).unwrap();
let (start_key, end_key, file_size, meta) = writer.finish().unwrap();
let data_file = DataFile::new(
DataFileType::Parquet,
start_key,
end_key,
file_id,
TrackedFileId::new(&file_manager, file_id),
0,
file_size,
1..=2,
2..=2,
);
data_file.set_meta_bytes(meta);
let options = IteratorFactoryOptions::default();
let mut iter = create_iterator(&data_file, &file_manager, &options).unwrap();
iter.seek_to_first().unwrap();
assert!(iter.valid());
let key = iter.key().unwrap().unwrap();
assert_eq!(key, &[2, 0, b'b']);
let decoded = iter.take_value().unwrap().unwrap().into_decoded(1).unwrap();
assert_eq!(
decoded.columns()[0].as_ref().unwrap().data().as_ref(),
b"v2"
);
assert!(!iter.next().unwrap());
cleanup_test_root("/tmp/iterator_factory_parquet_filter_test");
}
}