use std::sync::Arc;
use integer_encoding::FixedInt;
use crate::FilterPolicy;
pub(crate) const FILTER_BASE_LOG2: u32 = 11;
const FILTER_BASE: u32 = 1 << FILTER_BASE_LOG2;
const FILTER_META_LENGTH: usize = 5;
pub(crate) struct FilterBlockWriter {
policy: Arc<dyn FilterPolicy>, keys: Vec<Vec<u8>>, filters: Vec<u8>, filter_offsets: Vec<u32>, }
impl FilterBlockWriter {
pub(crate) fn new(policy: Arc<dyn FilterPolicy>) -> Self {
Self {
policy,
keys: vec![],
filter_offsets: vec![],
filters: vec![],
}
}
pub(crate) fn add_key(&mut self, key: &[u8]) {
let key = Vec::from(key);
self.keys.push(key);
}
pub(crate) fn start_block(&mut self, block_offset: usize) {
let filter_index = block_offset / FILTER_BASE as usize; let filters_len = self.filter_offsets.len();
assert!(filter_index >= filters_len); while filter_index > self.filter_offsets.len() {
self.generate_filter(); }
}
fn generate_filter(&mut self) {
self.filter_offsets.push(self.filters.len() as u32); if self.keys.is_empty() {
return; };
let filter = self.policy.create_filter(&self.keys); self.filters.extend(filter);
self.keys.clear(); }
pub(crate) fn filter_name(&self) -> &str {
self.policy.name()
}
pub(crate) fn finish(mut self) -> Vec<u8> {
if !self.keys.is_empty() {
self.generate_filter(); };
let mut result = self.filters;
let offsets_offset = self.filter_offsets.len();
let mut ix = result.len();
result.resize(ix + 4 * self.filter_offsets.len() + 5, 0);
for offset in self.filter_offsets {
offset.encode_fixed(&mut result[ix..ix + 4]);
ix += 4;
}
(offsets_offset as u32).encode_fixed(&mut result[ix..ix + 4]);
ix += 4;
result[ix] = FILTER_BASE_LOG2 as u8;
result
}
}
#[derive(Clone)]
pub(crate) struct FilterBlockReader {
policy: Arc<dyn FilterPolicy>, data: Vec<u8>, filter_offsets: Vec<u32>, base_lg: u32, }
impl FilterBlockReader {
pub(crate) fn new(data: Vec<u8>, policy: Arc<dyn FilterPolicy>) -> Self {
let n = data.len();
let base_lg = data[n - 1] as u32; let num_offset = u32::decode_fixed(&data[n - FILTER_META_LENGTH..n - 1]).unwrap() as usize; let mut filter_offsets = Vec::with_capacity(num_offset);
if num_offset * 4 + FILTER_META_LENGTH > n {
panic!("invalid filter block data");
}
let offsets_offset = n - (num_offset * 4 + FILTER_META_LENGTH);
for i in 0..num_offset {
let start = offsets_offset + i * 4;
let end = offsets_offset + (i + 1) * 4;
let offset = u32::decode_fixed(&data[start..end]).unwrap();
filter_offsets.push(offset);
}
Self {
policy,
data,
filter_offsets,
base_lg,
}
}
pub(crate) fn may_contain(&self, key: &[u8], block_offset: usize) -> bool {
let block_index = block_offset >> self.base_lg;
if block_index >= self.filter_offsets.len() {
return true;
}
let start = self.filter_offsets[block_index] as usize; let limit = if block_index + 1 < self.filter_offsets.len() {
self.filter_offsets[block_index + 1] as usize } else {
self.data.len() - (4 * self.filter_offsets.len() + 5)
};
let filter = &self.data[start..limit];
self.policy.may_contain(filter, key)
}
}
#[cfg(test)]
mod tests {
use test_log::test;
use super::*;
use crate::sstable::bloom::LevelDBBloomFilter;
use crate::{InternalKey, InternalKeyKind};
#[test]
fn test_empty() {
let b = FilterBlockWriter::new(Arc::new(LevelDBBloomFilter::new(10)));
let block = b.finish();
assert_eq!(&[0, 0, 0, 0, FILTER_BASE_LOG2 as u8][..], &*block);
let r = FilterBlockReader::new(block, Arc::new(LevelDBBloomFilter::new(10)));
assert!(r.may_contain("foo".as_bytes(), 0));
assert!(r.may_contain("foo".as_bytes(), 10000));
}
#[test]
fn test_single_filter() {
let mut w = FilterBlockWriter::new(Arc::new(LevelDBBloomFilter::new(10)));
w.start_block(100);
w.add_key("foo".as_bytes());
w.add_key("bar".as_bytes());
w.add_key("box".as_bytes());
w.start_block(200);
w.add_key("box".as_bytes());
w.start_block(300);
w.add_key("hello".as_bytes());
let block = w.finish();
let r = FilterBlockReader::new(block, Arc::new(LevelDBBloomFilter::new(10)));
assert!(r.may_contain("foo".as_bytes(), 100));
assert!(r.may_contain("bar".as_bytes(), 100));
assert!(r.may_contain("box".as_bytes(), 100));
assert!(r.may_contain("hello".as_bytes(), 100));
assert!(r.may_contain("foo".as_bytes(), 100));
assert!(!r.may_contain("missing".as_bytes(), 100));
assert!(!r.may_contain("other".as_bytes(), 100));
}
#[test]
fn test_multiple_filters() {
let mut w = FilterBlockWriter::new(Arc::new(LevelDBBloomFilter::new(10)));
w.start_block(0);
w.add_key("foo".as_bytes());
w.start_block(2000);
w.add_key("bar".as_bytes());
w.start_block(3100);
w.add_key("box".as_bytes());
w.start_block(9000);
w.add_key("box".as_bytes());
w.add_key("hello".as_bytes());
let block = w.finish();
let r = FilterBlockReader::new(block, Arc::new(LevelDBBloomFilter::new(10)));
assert!(r.may_contain("foo".as_bytes(), 0));
assert!(r.may_contain("bar".as_bytes(), 2000));
assert!(!r.may_contain("box".as_bytes(), 0));
assert!(!r.may_contain("hello".as_bytes(), 0));
assert!(r.may_contain("box".as_bytes(), 3100));
assert!(!r.may_contain("foo".as_bytes(), 3100));
assert!(!r.may_contain("bar".as_bytes(), 3100));
assert!(!r.may_contain("hello".as_bytes(), 3100));
assert!(!r.may_contain("box".as_bytes(), 4100));
assert!(!r.may_contain("foo".as_bytes(), 4100));
assert!(!r.may_contain("bar".as_bytes(), 4100));
assert!(!r.may_contain("hello".as_bytes(), 4100));
assert!(r.may_contain("box".as_bytes(), 9000));
assert!(!r.may_contain("foo".as_bytes(), 9000));
assert!(!r.may_contain("bar".as_bytes(), 9000));
assert!(r.may_contain("hello".as_bytes(), 9000));
}
#[test]
fn test_filter_block_many() {
let bits_per_key = 10;
let filter_policy = Arc::new(LevelDBBloomFilter::new(bits_per_key));
let mut filter_writer =
FilterBlockWriter::new(Arc::clone(&filter_policy) as Arc<dyn FilterPolicy>);
filter_writer.start_block(0);
let num_items = 10001;
let mut keys = Vec::with_capacity(num_items);
for i in 0..num_items {
let user_key = format!("key_{i:05}");
let internal_key = InternalKey::new(
user_key.as_bytes().to_vec(),
(i + 1) as u64, InternalKeyKind::Set,
0,
);
let encoded_key = internal_key.encode();
filter_writer.add_key(&encoded_key);
keys.push(encoded_key);
}
let filter_block = filter_writer.finish();
let filter_reader = FilterBlockReader::new(filter_block, filter_policy);
for key in &keys {
assert!(filter_reader.may_contain(key, 0), "Key should be found in the filter");
}
let num_samples = 1000;
let mut false_positives = 0;
for i in 0..num_samples {
let user_key = format!("nonexistent_{:05}", i + num_items);
let internal_key =
InternalKey::new(user_key.as_bytes().to_vec(), i as u64, InternalKeyKind::Set, 0);
let encoded_key = internal_key.encode();
if filter_reader.may_contain(&encoded_key, 0) {
false_positives += 1;
}
}
let false_positive_rate = (false_positives as f64 / num_samples as f64) * 100.0;
println!(
"False positive rate: {false_positive_rate:.2}% ({false_positives} out of {num_samples})"
);
assert!(
false_positive_rate < 2.0,
"False positive rate too high: {false_positive_rate:.2}%"
);
}
#[test]
fn test_basic_bloom_filter() {
let policy = Arc::new(LevelDBBloomFilter::new(10));
let mut w = FilterBlockWriter::new(Arc::clone(&policy) as Arc<dyn FilterPolicy>);
w.start_block(0);
w.add_key("foo".as_bytes());
w.add_key("bar".as_bytes());
w.add_key("baz".as_bytes());
let block = w.finish();
let r = FilterBlockReader::new(block, policy);
assert!(r.may_contain("foo".as_bytes(), 0));
assert!(r.may_contain("bar".as_bytes(), 0));
assert!(r.may_contain("baz".as_bytes(), 0));
assert!(!r.may_contain("missing".as_bytes(), 0));
assert!(!r.may_contain("nothere".as_bytes(), 0));
}
}