use std::hash::{Hash, Hasher};
use ahash::AHasher;
use crate::core::Value;
#[allow(dead_code)]
const DEFAULT_BITS_PER_ELEMENT: usize = 10;
#[allow(dead_code)]
const DEFAULT_NUM_HASHES: usize = 7;
const MIN_FILTER_BITS: usize = 64;
const MAX_FILTER_BITS: usize = 8_000_000;
#[derive(Debug, Clone)]
pub struct BloomFilter {
bits: Vec<u64>,
num_bits: usize,
num_hashes: usize,
element_count: u64,
}
impl BloomFilter {
pub fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
let fp_rate = false_positive_rate.clamp(0.0001, 0.5);
let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
let optimal_bits =
(-(expected_elements as f64) * fp_rate.ln() / ln2_squared).ceil() as usize;
let num_bits = optimal_bits.clamp(MIN_FILTER_BITS, MAX_FILTER_BITS);
let num_bits = num_bits.div_ceil(64) * 64;
let optimal_hashes = ((num_bits as f64 / expected_elements.max(1) as f64)
* std::f64::consts::LN_2)
.ceil() as usize;
let num_hashes = optimal_hashes.clamp(1, 15);
let num_words = num_bits / 64;
Self {
bits: vec![0u64; num_words],
num_bits,
num_hashes,
element_count: 0,
}
}
pub fn with_capacity(expected_elements: usize) -> Self {
Self::new(expected_elements, 0.01)
}
pub fn for_edge_computing(expected_elements: usize) -> Self {
Self::new(expected_elements, 0.05)
}
pub fn insert(&mut self, value: &Value) {
let hash = self.hash_value(value);
self.insert_hash(hash);
self.element_count += 1;
}
#[inline]
fn insert_hash(&mut self, hash: u64) {
let h1 = hash as usize;
let h2 = (hash >> 32) as usize;
let num_bits = self.num_bits;
let num_hashes = self.num_hashes;
let bits = self.bits.as_mut_ptr();
let mut i = 0usize;
while i < num_hashes {
let bit_idx = h1.wrapping_add(i.wrapping_mul(h2)).wrapping_add(i * i) % num_bits;
let word_idx = bit_idx / 64;
let bit_offset = bit_idx % 64;
unsafe {
*bits.add(word_idx) |= 1u64 << bit_offset;
}
i += 1;
}
}
pub fn might_contain(&self, value: &Value) -> bool {
let hash = self.hash_value(value);
self.might_contain_hash(hash)
}
#[inline]
fn might_contain_hash(&self, hash: u64) -> bool {
let h1 = hash as usize;
let h2 = (hash >> 32) as usize;
let num_bits = self.num_bits;
let num_hashes = self.num_hashes;
let bits = self.bits.as_ptr();
let mut i = 0usize;
while i < num_hashes {
let bit_idx = h1.wrapping_add(i.wrapping_mul(h2)).wrapping_add(i * i) % num_bits;
let word_idx = bit_idx / 64;
let bit_offset = bit_idx % 64;
let word = unsafe { *bits.add(word_idx) };
if (word & (1u64 << bit_offset)) == 0 {
return false;
}
i += 1;
}
true
}
pub fn insert_raw_hash(&mut self, hash: u64) {
self.insert_hash(hash);
self.element_count += 1;
}
pub fn might_contain_raw_hash(&self, hash: u64) -> bool {
self.might_contain_hash(hash)
}
fn hash_value(&self, value: &Value) -> u64 {
let mut hasher = AHasher::default();
std::mem::discriminant(value).hash(&mut hasher);
match value {
Value::Null(_) => {}
Value::Boolean(b) => b.hash(&mut hasher),
Value::Integer(i) => i.hash(&mut hasher),
Value::Float(f) => f.to_bits().hash(&mut hasher),
Value::Text(s) => s.hash(&mut hasher),
Value::Timestamp(t) => t.timestamp_nanos_opt().hash(&mut hasher),
Value::Extension(data) => {
data.hash(&mut hasher);
}
}
hasher.finish()
}
pub fn estimated_false_positive_rate(&self) -> f64 {
if self.element_count == 0 {
return 0.0;
}
let k = self.num_hashes as f64;
let n = self.element_count as f64;
let m = self.num_bits as f64;
(1.0 - (-k * n / m).exp()).powf(k)
}
pub fn memory_bytes(&self) -> usize {
self.bits.len() * 8
}
pub fn len(&self) -> u64 {
self.element_count
}
pub fn is_empty(&self) -> bool {
self.element_count == 0
}
pub fn merge(&mut self, other: &BloomFilter) -> Result<(), &'static str> {
if self.num_bits != other.num_bits || self.num_hashes != other.num_hashes {
return Err("Cannot merge bloom filters with different configurations");
}
for (word, other_word) in self.bits.iter_mut().zip(other.bits.iter()) {
*word |= *other_word;
}
self.element_count += other.element_count;
Ok(())
}
pub fn clear(&mut self) {
for word in &mut self.bits {
*word = 0;
}
self.element_count = 0;
}
pub fn fill_ratio(&self) -> f64 {
let set_bits: usize = self.bits.iter().map(|w| w.count_ones() as usize).sum();
set_bits as f64 / self.num_bits as f64
}
}
impl Default for BloomFilter {
fn default() -> Self {
Self::with_capacity(1000)
}
}
pub struct BloomFilterBuilder {
filter: BloomFilter,
pub column_name: String,
pub source_table: String,
}
impl BloomFilterBuilder {
pub fn new(column_name: String, source_table: String, expected_rows: usize) -> Self {
Self {
filter: BloomFilter::with_capacity(expected_rows),
column_name,
source_table,
}
}
pub fn for_edge(column_name: String, source_table: String, expected_rows: usize) -> Self {
Self {
filter: BloomFilter::for_edge_computing(expected_rows),
column_name,
source_table,
}
}
pub fn insert(&mut self, value: &Value) {
self.filter.insert(value);
}
#[inline]
pub fn insert_raw_hash(&mut self, hash: u64) {
self.filter.insert_raw_hash(hash);
}
pub fn build(self) -> RuntimeBloomFilter {
RuntimeBloomFilter {
filter: self.filter,
column_name: self.column_name,
source_table: self.source_table,
}
}
}
#[derive(Debug, Clone)]
pub struct RuntimeBloomFilter {
pub filter: BloomFilter,
pub column_name: String,
pub source_table: String,
}
impl RuntimeBloomFilter {
pub fn might_match(&self, value: &Value) -> bool {
self.filter.might_contain(value)
}
#[inline]
pub fn might_match_raw_hash(&self, hash: u64) -> bool {
self.filter.might_contain_raw_hash(hash)
}
pub fn estimated_selectivity(&self) -> f64 {
let fp_rate = self.filter.estimated_false_positive_rate();
let fill = self.filter.fill_ratio();
(fill * (1.0 + fp_rate)).min(1.0)
}
pub fn is_effective(&self) -> bool {
if self.filter.is_empty() {
return false;
}
if self.filter.estimated_false_positive_rate() > 0.5 {
return false;
}
if self.filter.fill_ratio() > 0.9 {
return false;
}
true
}
pub fn stats(&self) -> BloomFilterStats {
BloomFilterStats {
column_name: self.column_name.clone(),
source_table: self.source_table.clone(),
element_count: self.filter.len(),
memory_bytes: self.filter.memory_bytes(),
false_positive_rate: self.filter.estimated_false_positive_rate(),
fill_ratio: self.filter.fill_ratio(),
is_effective: self.is_effective(),
}
}
}
#[derive(Debug, Clone)]
pub struct BloomFilterStats {
pub column_name: String,
pub source_table: String,
pub element_count: u64,
pub memory_bytes: usize,
pub false_positive_rate: f64,
pub fill_ratio: f64,
pub is_effective: bool,
}
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
static BLOOM_EFFECTIVENESS: OnceLock<BloomEffectivenessTracker> = OnceLock::new();
pub struct BloomEffectivenessTracker {
total_checks: AtomicU64,
true_negatives: AtomicU64,
false_positives: AtomicU64,
true_positives: AtomicU64,
}
impl BloomEffectivenessTracker {
fn new() -> Self {
Self {
total_checks: AtomicU64::new(0),
true_negatives: AtomicU64::new(0),
false_positives: AtomicU64::new(0),
true_positives: AtomicU64::new(0),
}
}
pub fn global() -> &'static Self {
BLOOM_EFFECTIVENESS.get_or_init(Self::new)
}
pub fn record_check(&self, passed_filter: bool, actually_matched: bool) {
self.total_checks.fetch_add(1, Ordering::Relaxed);
if !passed_filter {
self.true_negatives.fetch_add(1, Ordering::Relaxed);
} else if actually_matched {
self.true_positives.fetch_add(1, Ordering::Relaxed);
} else {
self.false_positives.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_true_negative(&self) {
self.total_checks.fetch_add(1, Ordering::Relaxed);
self.true_negatives.fetch_add(1, Ordering::Relaxed);
}
pub fn record_filter_passed(&self) {
self.total_checks.fetch_add(1, Ordering::Relaxed);
self.true_positives.fetch_add(1, Ordering::Relaxed);
}
pub fn true_negative_rate(&self) -> f64 {
let total = self.total_checks.load(Ordering::Relaxed);
if total == 0 {
return 0.0;
}
let tn = self.true_negatives.load(Ordering::Relaxed);
tn as f64 / total as f64
}
pub fn estimated_false_positive_rate(&self) -> f64 {
let passed = self.false_positives.load(Ordering::Relaxed)
+ self.true_positives.load(Ordering::Relaxed);
if passed == 0 {
return 0.0;
}
let fp = self.false_positives.load(Ordering::Relaxed);
fp as f64 / passed as f64
}
pub fn total_checks(&self) -> u64 {
self.total_checks.load(Ordering::Relaxed)
}
pub fn true_negatives(&self) -> u64 {
self.true_negatives.load(Ordering::Relaxed)
}
pub fn recommend_false_positive_rate(&self) -> f64 {
let tn_rate = self.true_negative_rate();
let total = self.total_checks.load(Ordering::Relaxed);
if total < 1000 {
return 0.01; }
if tn_rate > 0.7 {
0.005 } else if tn_rate > 0.3 {
0.01 } else {
0.05 }
}
pub fn reset(&self) {
self.total_checks.store(0, Ordering::Relaxed);
self.true_negatives.store(0, Ordering::Relaxed);
self.false_positives.store(0, Ordering::Relaxed);
self.true_positives.store(0, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bloom_filter_basic() {
let mut bf = BloomFilter::with_capacity(100);
bf.insert(&Value::Integer(42));
bf.insert(&Value::Integer(100));
bf.insert(&Value::Text("hello".into()));
assert!(bf.might_contain(&Value::Integer(42)));
assert!(bf.might_contain(&Value::Integer(100)));
assert!(bf.might_contain(&Value::Text("hello".into())));
let mut false_positives = 0;
for i in 1000..1100 {
if bf.might_contain(&Value::Integer(i)) {
false_positives += 1;
}
}
assert!(
false_positives < 10,
"Too many false positives: {}",
false_positives
);
}
#[test]
fn test_bloom_filter_no_false_negatives() {
let mut bf = BloomFilter::with_capacity(1000);
for i in 0..1000 {
bf.insert(&Value::Integer(i));
}
for i in 0..1000 {
assert!(
bf.might_contain(&Value::Integer(i)),
"False negative for {}",
i
);
}
}
#[test]
fn test_bloom_filter_false_positive_rate() {
let mut bf = BloomFilter::new(1000, 0.01);
for i in 0..1000 {
bf.insert(&Value::Integer(i));
}
let mut false_positives = 0;
for i in 10000..20000 {
if bf.might_contain(&Value::Integer(i)) {
false_positives += 1;
}
}
let actual_fp_rate = false_positives as f64 / 10000.0;
assert!(
actual_fp_rate < 0.05,
"FP rate {} too high (target: 0.01)",
actual_fp_rate
);
}
#[test]
fn test_bloom_filter_different_types() {
let mut bf = BloomFilter::with_capacity(100);
bf.insert(&Value::Integer(42));
bf.insert(&Value::Float(42.0));
bf.insert(&Value::Text("42".into()));
assert!(bf.might_contain(&Value::Integer(42)));
assert!(bf.might_contain(&Value::Float(42.0)));
assert!(bf.might_contain(&Value::Text("42".into())));
}
#[test]
fn test_bloom_filter_merge() {
let mut bf1 = BloomFilter::with_capacity(100);
let mut bf2 = BloomFilter::with_capacity(100);
bf1.insert(&Value::Integer(1));
bf1.insert(&Value::Integer(2));
bf2.insert(&Value::Integer(3));
bf2.insert(&Value::Integer(4));
bf1.merge(&bf2).unwrap();
assert!(bf1.might_contain(&Value::Integer(1)));
assert!(bf1.might_contain(&Value::Integer(2)));
assert!(bf1.might_contain(&Value::Integer(3)));
assert!(bf1.might_contain(&Value::Integer(4)));
}
#[test]
fn test_runtime_bloom_filter() {
let mut builder =
BloomFilterBuilder::new("customer_id".to_string(), "customers".to_string(), 100);
for i in 0..100 {
builder.insert(&Value::Integer(i));
}
let runtime_filter = builder.build();
assert!(runtime_filter.might_match(&Value::Integer(50)));
assert!(runtime_filter.is_effective());
let stats = runtime_filter.stats();
assert_eq!(stats.element_count, 100);
assert!(stats.memory_bytes > 0);
}
#[test]
fn test_bloom_filter_edge_computing() {
let standard = BloomFilter::with_capacity(10000);
let edge = BloomFilter::for_edge_computing(10000);
assert!(
edge.memory_bytes() < standard.memory_bytes(),
"Edge filter should use less memory: {} vs {}",
edge.memory_bytes(),
standard.memory_bytes()
);
}
#[test]
fn test_fill_ratio() {
let mut bf = BloomFilter::with_capacity(100);
assert!(bf.fill_ratio() < 0.01, "Empty filter should have low fill");
for i in 0..100 {
bf.insert(&Value::Integer(i));
}
let fill = bf.fill_ratio();
assert!(
fill > 0.1 && fill < 0.9,
"Fill ratio should be moderate: {}",
fill
);
}
#[test]
fn test_effectiveness_check() {
let builder = BloomFilterBuilder::new("col".to_string(), "table".to_string(), 100);
let filter = builder.build();
assert!(!filter.is_effective());
let mut bf = BloomFilter::new(10, 0.5); for i in 0..1000 {
bf.insert(&Value::Integer(i));
}
let runtime = RuntimeBloomFilter {
filter: bf,
column_name: "col".to_string(),
source_table: "table".to_string(),
};
assert!(!runtime.is_effective());
}
}