use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::collections::hash_map::DefaultHasher;
use serde::{Serialize, Deserialize};
use parking_lot::RwLock;
use crate::{debug_log, trace_log, warn_log, error_log, info_log};
#[derive(Debug, Clone)]
pub struct BloomFilter {
bitmap: Vec<u64>,
bit_count: usize,
hash_count: usize,
element_count: Arc<AtomicU64>,
target_fpp: f64,
}
impl BloomFilter {
pub fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
assert!(false_positive_rate > 0.0 && false_positive_rate < 1.0);
assert!(expected_elements > 0);
let bit_count = Self::optimal_bit_count(expected_elements, false_positive_rate);
let hash_count = Self::optimal_hash_count(bit_count, expected_elements);
let word_count = (bit_count + 63) / 64;
let bitmap = vec![0; word_count];
Self {
bitmap,
bit_count,
hash_count,
element_count: Arc::new(AtomicU64::new(0)),
target_fpp: false_positive_rate,
}
}
fn optimal_bit_count(n: usize, p: f64) -> usize {
let ln_p = p.ln();
let ln_2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
((n as f64) * (-ln_p) / ln_2_squared) as usize
}
fn optimal_hash_count(m: usize, n: usize) -> usize {
if n == 0 { return 1; }
((m as f64) / (n as f64) * std::f64::consts::LN_2) as usize
}
pub fn insert(&mut self, data: &[u8]) {
let hashes = self.compute_hashes(data);
for hash in hashes {
let bit_index = (hash % self.bit_count as u64) as usize;
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
if word_index < self.bitmap.len() {
let mask = 1u64 << bit_offset;
self.bitmap[word_index] |= mask;
}
}
self.element_count.fetch_add(1, Ordering::Relaxed);
}
pub fn contains(&self, data: &[u8]) -> bool {
let hashes = self.compute_hashes(data);
for hash in hashes {
let bit_index = (hash % self.bit_count as u64) as usize;
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
if word_index >= self.bitmap.len() {
return false;
}
let mask = 1u64 << bit_offset;
if (self.bitmap[word_index] & mask) == 0 {
return false;
}
}
true
}
fn compute_hashes(&self, data: &[u8]) -> Vec<u64> {
let mut hashes = Vec::with_capacity(self.hash_count);
let hash1 = self.hash(data, 0);
let hash2 = self.hash(data, hash1);
for i in 0..self.hash_count {
let combined_hash = hash1.wrapping_add((i as u64).wrapping_mul(hash2));
hashes.push(combined_hash);
}
hashes
}
fn hash(&self, data: &[u8], seed: u64) -> u64 {
let mut hasher = DefaultHasher::new();
seed.hash(&mut hasher);
data.hash(&mut hasher);
hasher.finish()
}
pub fn len(&self) -> u64 {
self.element_count.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn current_false_positive_rate(&self) -> f64 {
let n = self.len() as f64;
let m = self.bit_count as f64;
let k = self.hash_count as f64;
let exp = (-k * n / m).exp();
(1.0 - exp).powf(k)
}
pub fn needs_resize(&self) -> bool {
let current_fpp = self.current_false_positive_rate();
current_fpp > self.target_fpp * 1.5 }
pub fn resize(&mut self) {
let new_element_count = (self.len() as usize * 2).max(1024);
let mut new_filter = Self::new(new_element_count, self.target_fpp);
warn_log!("布隆过滤器扩容从 {} 到 {} 元素", self.len(), new_element_count);
*self = new_filter;
}
pub fn clear(&mut self) {
self.bitmap.fill(0);
self.element_count.store(0, Ordering::Relaxed);
}
pub fn size_in_bytes(&self) -> usize {
self.bitmap.len() * 8
}
pub fn stats(&self) -> BloomFilterStats {
BloomFilterStats {
bit_count: self.bit_count,
hash_count: self.hash_count,
element_count: self.len(),
size_in_bytes: self.size_in_bytes(),
current_fpp: self.current_false_positive_rate(),
target_fpp: self.target_fpp,
}
}
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct BloomFilterStats {
pub bit_count: usize,
pub hash_count: usize,
pub element_count: u64,
pub size_in_bytes: usize,
pub current_fpp: f64,
pub target_fpp: f64,
}
#[derive(Debug, Clone)]
pub struct ConcurrentBloomFilter {
inner: Arc<RwLock<BloomFilter>>,
}
impl ConcurrentBloomFilter {
pub fn new(expected_elements: usize, false_positive_rate: f64) -> Self {
Self {
inner: Arc::new(RwLock::new(BloomFilter::new(
expected_elements,
false_positive_rate
))),
}
}
pub fn insert(&self, data: &[u8]) {
self.inner.write().insert(data);
}
pub fn contains(&self, data: &[u8]) -> bool {
self.inner.read().contains(data)
}
pub fn len(&self) -> u64 {
self.inner.read().len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().is_empty()
}
pub fn stats(&self) -> BloomFilterStats {
self.inner.read().stats()
}
}
#[derive(Debug, Clone)]
pub struct TieredBloomFilter {
hot: ConcurrentBloomFilter, warm: ConcurrentBloomFilter, cold: ConcurrentBloomFilter, }
impl TieredBloomFilter {
pub fn new(expected_elements: usize) -> Self {
let hot_size = expected_elements / 10;
let hot = ConcurrentBloomFilter::new(hot_size, 0.01);
let warm_size = expected_elements / 3;
let warm = ConcurrentBloomFilter::new(warm_size, 0.05);
let cold_size = expected_elements;
let cold = ConcurrentBloomFilter::new(cold_size, 0.1);
Self { hot, warm, cold }
}
pub fn insert(&self, data: &[u8], tier: FilterTier) {
match tier {
FilterTier::Hot => self.hot.insert(data),
FilterTier::Warm => self.warm.insert(data),
FilterTier::Cold => self.cold.insert(data),
}
}
pub fn contains(&self, data: &[u8]) -> FilterResult {
if self.hot.contains(data) {
return FilterResult::MayExistHot;
}
if self.warm.contains(data) {
return FilterResult::MayExistWarm;
}
if self.cold.contains(data) {
return FilterResult::MayExistCold;
}
FilterResult::DefinitelyNotExist
}
pub fn stats(&self) -> TieredBloomFilterStats {
TieredBloomFilterStats {
hot: self.hot.stats(),
warm: self.warm.stats(),
cold: self.cold.stats(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum FilterTier {
Hot, Warm, Cold, }
#[derive(Debug, Clone, PartialEq)]
pub enum FilterResult {
DefinitelyNotExist,
MayExistHot,
MayExistWarm,
MayExistCold,
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct TieredBloomFilterStats {
pub hot: BloomFilterStats,
pub warm: BloomFilterStats,
pub cold: BloomFilterStats,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bloom_filter_basic() {
let mut filter = BloomFilter::new(1000, 0.01);
assert!(!filter.contains(b"hello"));
filter.insert(b"hello");
assert!(filter.contains(b"hello"));
assert!(!filter.contains(b"world"));
}
#[test]
fn test_bloom_filter_false_positives() {
let mut filter = BloomFilter::new(100, 0.1);
for i in 0..100 {
let key = format!("key_{}", i);
filter.insert(key.as_bytes());
}
let mut false_positives = 0;
let test_count = 1000;
for i in 100..(100 + test_count) {
let key = format!("key_{}", i);
if filter.contains(key.as_bytes()) {
false_positives += 1;
}
}
let actual_fpp = false_positives as f64 / test_count as f64;
println!("实际误判率: {:.4}", actual_fpp);
assert!(actual_fpp < 0.2); }
#[test]
fn test_concurrent_bloom_filter() {
let filter = ConcurrentBloomFilter::new(100, 0.01);
filter.insert(b"test");
assert!(filter.contains(b"test"));
assert!(!filter.contains(b"not_exist"));
}
#[test]
fn test_tiered_bloom_filter() {
let tiered = TieredBloomFilter::new(100);
tiered.insert(b"hot_key", FilterTier::Hot);
tiered.insert(b"warm_key", FilterTier::Warm);
tiered.insert(b"cold_key", FilterTier::Cold);
assert_eq!(tiered.contains(b"hot_key"), FilterResult::MayExistHot);
assert_eq!(tiered.contains(b"warm_key"), FilterResult::MayExistWarm);
assert_eq!(tiered.contains(b"cold_key"), FilterResult::MayExistCold);
assert_eq!(tiered.contains(b"no_key"), FilterResult::DefinitelyNotExist);
}
#[test]
fn test_bloom_filter_stats() {
let mut filter = BloomFilter::new(1000, 0.01);
for i in 0..100 {
filter.insert(format!("key_{}", i).as_bytes());
}
let stats = filter.stats();
assert_eq!(stats.element_count, 100);
assert!(stats.current_fpp < 0.02); assert!(stats.size_in_bytes > 0);
}
}