use integer_encoding::FixedInt;
use std::sync::Arc;
use crate::filter_policy::FilterPolicy;
use super::errors::{ReadError, TableReadResult};
pub struct FilterBlockReader {
filters: Vec<Vec<u8>>,
filter_policy: Arc<dyn FilterPolicy>,
offsets: Vec<u32>,
encoded_range_size_exponent: u8,
}
impl FilterBlockReader {
pub fn new(
filter_policy: Arc<dyn FilterPolicy>,
mut filter_data: Vec<u8>,
) -> TableReadResult<Self> {
if filter_data.len() < 5 {
return Err(ReadError::FailedToParse(
"Failed to parse the filter block.".to_string(),
));
}
let encoded_range_size_exponent = filter_data.pop().unwrap();
let raw_offsets_start = &filter_data[filter_data.len() - 4..];
let offsets_start_index = u32::decode_fixed(raw_offsets_start) as usize;
let offsets = FilterBlockReader::deserialize_offsets(
&filter_data[offsets_start_index..(filter_data.len() - 4)],
)?;
let raw_filters = &filter_data[..offsets_start_index].to_vec();
let filters = FilterBlockReader::split_filters_with_offset(&offsets, raw_filters);
Ok(Self {
filter_policy: Arc::clone(&filter_policy),
filters,
encoded_range_size_exponent,
offsets,
})
}
pub fn key_may_match(&self, block_offset: u64, key: &[u8]) -> bool {
if self.filters.is_empty() {
return true;
}
let range_size: u64 = 1 << self.encoded_range_size_exponent;
let filter_index = (block_offset / range_size) as usize;
if filter_index >= self.filters.len() {
log::warn!(
"The provided block offset ({}) could not be used to index the filter array. \
Ignoring the error and returning true so that a disk seek is done.",
block_offset
);
} else {
if self.filters[filter_index].is_empty() {
return false;
}
match (*self.filter_policy).key_may_match(key, &self.filters[filter_index]) {
Err(error) => {
log::warn!(
"There was an error checking the filter for a match. Ignoring the error \
and forcing a disk seek. Original error: {}",
error
);
}
Ok(filter_result) => return filter_result,
}
}
true
}
}
impl FilterBlockReader {
fn deserialize_offsets(raw_offsets: &[u8]) -> TableReadResult<Vec<u32>> {
if raw_offsets.len() % 4 != 0 {
return Err(ReadError::FailedToParse(
"Failed to parse the filter block. The offset array was malformed.".to_string(),
));
}
let offsets = raw_offsets.chunks(4).map(u32::decode_fixed).collect();
Ok(offsets)
}
fn split_filters_with_offset(offsets: &[u32], raw_filters: &[u8]) -> Vec<Vec<u8>> {
let mut filters: Vec<Vec<u8>> = vec![];
for offset_index in 0..offsets.len() {
let next_offset_index = offset_index + 1;
let filter_start_index = offsets[offset_index] as usize;
if next_offset_index >= offsets.len() {
filters.push(raw_filters[filter_start_index..].to_vec());
} else {
let next_filter_start_index = offsets[next_offset_index] as usize;
filters.push(raw_filters[filter_start_index..next_filter_start_index].to_vec())
}
}
filters
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use crate::key::{InternalKey, RainDbKeyType};
use crate::tables::filter_block_builder::FilterBlockBuilder;
use crate::{BloomFilterPolicy, Operation};
use super::*;
#[test]
fn filter_block_reader_can_be_created_from_serialized_filter_block() {
let filter_policy: Arc<dyn FilterPolicy> = Arc::new(BloomFilterPolicy::new(10));
let mut filter_block =
create_filter_builder_with_num_data_blocks(Arc::clone(&filter_policy), 3);
let filters = filter_block.finalize();
let filter_reader = FilterBlockReader::new(filter_policy, filters).unwrap();
assert_eq!(
filter_reader.filters.len(),
6,
"With default options there should be twice as many filter blocks as data blocks."
);
}
#[test]
fn filter_block_reader_forces_disk_seeks_for_filter_blocks_without_filters() {
let filter_policy: Arc<dyn FilterPolicy> = Arc::new(BloomFilterPolicy::new(10));
let mut filter_block = FilterBlockBuilder::new(Arc::clone(&filter_policy));
let filters = filter_block.finalize();
let filter_reader = FilterBlockReader::new(filter_policy, filters).unwrap();
assert!(
filter_reader.key_may_match(
0,
&InternalKey::new(b"something".to_vec(), 1, Operation::Put).as_bytes()
),
"Should force for all queries if there are no filters."
);
assert!(
filter_reader.key_may_match(
100_000,
&InternalKey::new(b"something".to_vec(), 1, Operation::Put).as_bytes()
),
"Should force for all queries if there are no filters."
);
}
#[test]
fn filter_block_reader_correctly_detects_keys_when_a_single_filter_covers_keys_for_multiple_data_blocks(
) {
let filter_policy: Arc<dyn FilterPolicy> = Arc::new(BloomFilterPolicy::new(10));
let mut filter_block = FilterBlockBuilder::new(Arc::clone(&filter_policy));
filter_block.notify_new_data_block(100);
filter_block.add_key(InternalKey::new(b"foo".to_vec(), 1, Operation::Put).as_bytes());
filter_block.add_key(InternalKey::new(b"bar".to_vec(), 1, Operation::Put).as_bytes());
filter_block.add_key(InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes());
filter_block.notify_new_data_block(200);
filter_block.add_key(InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes());
filter_block.notify_new_data_block(300);
filter_block.add_key(InternalKey::new(b"hello".to_vec(), 1, Operation::Put).as_bytes());
let filters = filter_block.finalize();
let filter_reader = FilterBlockReader::new(filter_policy, filters).unwrap();
assert!(filter_reader.key_may_match(
100,
&InternalKey::new(b"foo".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(filter_reader.key_may_match(
100,
&InternalKey::new(b"bar".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(filter_reader.key_may_match(
100,
&InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(filter_reader.key_may_match(
100,
&InternalKey::new(b"hello".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
100,
&InternalKey::new(b"missing".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
100,
&InternalKey::new(b"another missing".to_vec(), 1, Operation::Put).as_bytes()
));
}
#[test]
fn filter_block_reader_correctly_detects_keys_when_there_are_multiple_filters_covering_multiple_data_blocks(
) {
let filter_policy: Arc<dyn FilterPolicy> = Arc::new(BloomFilterPolicy::new(10));
let mut filter_block = FilterBlockBuilder::new(Arc::clone(&filter_policy));
filter_block.notify_new_data_block(0);
filter_block.add_key(InternalKey::new(b"foo".to_vec(), 1, Operation::Put).as_bytes());
filter_block.notify_new_data_block(2000);
filter_block.add_key(InternalKey::new(b"bar".to_vec(), 1, Operation::Put).as_bytes());
filter_block.notify_new_data_block(3100);
filter_block.add_key(InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes());
filter_block.notify_new_data_block(9000);
filter_block.add_key(InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes());
filter_block.add_key(InternalKey::new(b"hello".to_vec(), 1, Operation::Put).as_bytes());
let filters = filter_block.finalize();
let filter_reader = FilterBlockReader::new(filter_policy, filters).unwrap();
assert!(filter_reader.key_may_match(
0,
&InternalKey::new(b"foo".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(filter_reader.key_may_match(
2000,
&InternalKey::new(b"bar".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
0,
&InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
0,
&InternalKey::new(b"hello".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(filter_reader.key_may_match(
3100,
&InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
3100,
&InternalKey::new(b"foo".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
3100,
&InternalKey::new(b"bar".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
3100,
&InternalKey::new(b"hello".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
4100,
&InternalKey::new(b"foo".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
4100,
&InternalKey::new(b"bar".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
4100,
&InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
4100,
&InternalKey::new(b"hello".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(filter_reader.key_may_match(
9000,
&InternalKey::new(b"box".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(filter_reader.key_may_match(
9000,
&InternalKey::new(b"hello".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
9000,
&InternalKey::new(b"foo".to_vec(), 1, Operation::Put).as_bytes()
));
assert!(!filter_reader.key_may_match(
9000,
&InternalKey::new(b"bar".to_vec(), 1, Operation::Put).as_bytes()
));
}
fn create_filter_builder_with_num_data_blocks(
filter_policy: Arc<dyn FilterPolicy>,
num_data_blocks: usize,
) -> FilterBlockBuilder {
let mut filter_block = FilterBlockBuilder::new(filter_policy);
for idx in 0..num_data_blocks {
let keys = generate_num_keys(30);
for key in keys {
filter_block.add_key(key.as_bytes());
}
filter_block.notify_new_data_block((idx + 1) * 4096);
}
filter_block
}
fn generate_num_keys(num_keys_to_generate: usize) -> Vec<InternalKey> {
let mut keys = Vec::with_capacity(num_keys_to_generate);
for idx in 0..num_keys_to_generate {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
keys.push(key);
}
keys
}
}