use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
pub struct BloomFilter {
bits: Vec<AtomicU64>,
num_bits: usize,
num_hashes: usize,
count: AtomicU64,
}
impl BloomFilter {
pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
let num_bits =
(-(expected_items as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
let num_bits = num_bits.div_ceil(64) * 64;
let num_hashes =
((num_bits as f64 / expected_items as f64) * std::f64::consts::LN_2).ceil() as usize;
let num_hashes = num_hashes.clamp(1, 16);
let num_words = num_bits / 64;
let bits = (0..num_words).map(|_| AtomicU64::new(0)).collect();
Self {
bits,
num_bits,
num_hashes,
count: AtomicU64::new(0),
}
}
pub fn with_params(num_bits: usize, num_hashes: usize) -> Self {
let num_bits = num_bits.div_ceil(64) * 64;
let num_words = num_bits / 64;
let bits = (0..num_words).map(|_| AtomicU64::new(0)).collect();
Self {
bits,
num_bits,
num_hashes: num_hashes.max(1),
count: AtomicU64::new(0),
}
}
pub fn insert<T: Hash>(&self, item: &T) {
let (h1, h2) = self.hash_pair(item);
for i in 0..self.num_hashes {
let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
self.set_bit(bit_index);
}
self.count.fetch_add(1, Ordering::Relaxed);
}
pub fn contains<T: Hash>(&self, item: &T) -> bool {
let (h1, h2) = self.hash_pair(item);
for i in 0..self.num_hashes {
let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
if !self.get_bit(bit_index) {
return false;
}
}
true
}
pub fn insert_and_check<T: Hash>(&self, item: &T) -> bool {
let (h1, h2) = self.hash_pair(item);
let mut was_present = true;
for i in 0..self.num_hashes {
let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
if !self.set_bit(bit_index) {
was_present = false;
}
}
self.count.fetch_add(1, Ordering::Relaxed);
was_present
}
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub fn estimated_fp_rate(&self) -> f64 {
let bits_set = self.count_bits_set();
let fill_ratio = bits_set as f64 / self.num_bits as f64;
fill_ratio.powi(self.num_hashes as i32)
}
pub fn fill_ratio(&self) -> f64 {
self.count_bits_set() as f64 / self.num_bits as f64
}
pub fn memory_usage(&self) -> usize {
self.bits.len() * 8
}
pub fn clear(&self) {
for word in &self.bits {
word.store(0, Ordering::Relaxed);
}
self.count.store(0, Ordering::Relaxed);
}
fn hash_pair<T: Hash>(&self, item: &T) -> (u64, u64) {
let mut h1 = DefaultHasher::new();
item.hash(&mut h1);
let hash1 = h1.finish();
let mut h2 = DefaultHasher::new();
hash1.hash(&mut h2);
let hash2 = h2.finish();
(hash1, hash2)
}
fn combined_hash(&self, h1: u64, h2: u64, i: usize) -> usize {
h1.wrapping_add(h2.wrapping_mul(i as u64)) as usize
}
fn set_bit(&self, bit_index: usize) -> bool {
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
let mask = 1u64 << bit_offset;
let old = self.bits[word_index].fetch_or(mask, Ordering::AcqRel);
(old & mask) != 0
}
fn get_bit(&self, bit_index: usize) -> bool {
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
let mask = 1u64 << bit_offset;
(self.bits[word_index].load(Ordering::Acquire) & mask) != 0
}
fn count_bits_set(&self) -> usize {
self.bits
.iter()
.map(|w| w.load(Ordering::Relaxed).count_ones() as usize)
.sum()
}
}
pub struct CountingBloomFilter {
counters: Vec<AtomicU8>,
num_counters: usize,
num_hashes: usize,
count: AtomicU64,
}
impl CountingBloomFilter {
pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
let num_counters =
(-(expected_items as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
let num_counters = num_counters.max(64);
let num_hashes = ((num_counters as f64 / expected_items as f64) * std::f64::consts::LN_2)
.ceil() as usize;
let num_hashes = num_hashes.clamp(1, 16);
let num_bytes = num_counters.div_ceil(2);
let counters = (0..num_bytes).map(|_| AtomicU8::new(0)).collect();
Self {
counters,
num_counters,
num_hashes,
count: AtomicU64::new(0),
}
}
pub fn insert<T: Hash>(&self, item: &T) {
let (h1, h2) = self.hash_pair(item);
for i in 0..self.num_hashes {
let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
self.increment_counter(counter_index);
}
self.count.fetch_add(1, Ordering::Relaxed);
}
pub fn remove<T: Hash>(&self, item: &T) {
let (h1, h2) = self.hash_pair(item);
for i in 0..self.num_hashes {
let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
self.decrement_counter(counter_index);
}
let _ = self
.count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
if c > 0 {
Some(c - 1)
} else {
None
}
});
}
pub fn contains<T: Hash>(&self, item: &T) -> bool {
let (h1, h2) = self.hash_pair(item);
for i in 0..self.num_hashes {
let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
if self.get_counter(counter_index) == 0 {
return false;
}
}
true
}
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
fn hash_pair<T: Hash>(&self, item: &T) -> (u64, u64) {
let mut h1 = DefaultHasher::new();
item.hash(&mut h1);
let hash1 = h1.finish();
let mut h2 = DefaultHasher::new();
hash1.hash(&mut h2);
let hash2 = h2.finish();
(hash1, hash2)
}
fn combined_hash(&self, h1: u64, h2: u64, i: usize) -> usize {
h1.wrapping_add(h2.wrapping_mul(i as u64)) as usize
}
fn increment_counter(&self, counter_index: usize) {
let byte_index = counter_index / 2;
let is_high_nibble = counter_index % 2 == 1;
loop {
let old_byte = self.counters[byte_index].load(Ordering::Acquire);
let old_counter = if is_high_nibble {
(old_byte >> 4) & 0x0F
} else {
old_byte & 0x0F
};
if old_counter >= 15 {
return;
}
let new_byte = if is_high_nibble {
(old_byte & 0x0F) | ((old_counter + 1) << 4)
} else {
(old_byte & 0xF0) | (old_counter + 1)
};
if self.counters[byte_index]
.compare_exchange_weak(old_byte, new_byte, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return;
}
}
}
fn decrement_counter(&self, counter_index: usize) {
let byte_index = counter_index / 2;
let is_high_nibble = counter_index % 2 == 1;
loop {
let old_byte = self.counters[byte_index].load(Ordering::Acquire);
let old_counter = if is_high_nibble {
(old_byte >> 4) & 0x0F
} else {
old_byte & 0x0F
};
if old_counter == 0 {
return;
}
let new_byte = if is_high_nibble {
(old_byte & 0x0F) | ((old_counter - 1) << 4)
} else {
(old_byte & 0xF0) | (old_counter - 1)
};
if self.counters[byte_index]
.compare_exchange_weak(old_byte, new_byte, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return;
}
}
}
fn get_counter(&self, counter_index: usize) -> u8 {
let byte_index = counter_index / 2;
let is_high_nibble = counter_index % 2 == 1;
let byte = self.counters[byte_index].load(Ordering::Acquire);
if is_high_nibble {
(byte >> 4) & 0x0F
} else {
byte & 0x0F
}
}
}
pub struct HyperLogLog {
registers: Vec<AtomicU8>,
num_registers: usize,
precision: u8,
alpha: f64,
}
impl HyperLogLog {
pub fn new(precision: u8) -> Self {
let precision = precision.clamp(4, 18);
let num_registers = 1 << precision;
let alpha = match precision {
4 => 0.673,
5 => 0.697,
6 => 0.709,
_ => 0.7213 / (1.0 + 1.079 / num_registers as f64),
};
let registers = (0..num_registers).map(|_| AtomicU8::new(0)).collect();
Self {
registers,
num_registers,
precision,
alpha,
}
}
pub fn add<T: Hash>(&self, item: &T) {
let mut hasher = DefaultHasher::new();
item.hash(&mut hasher);
let hash = hasher.finish();
let register_idx = (hash >> (64 - self.precision)) as usize;
let remaining = (hash << self.precision) | (1 << (self.precision - 1));
let leading_zeros = (remaining.leading_zeros() + 1) as u8;
loop {
let current = self.registers[register_idx].load(Ordering::Acquire);
if leading_zeros <= current {
break;
}
if self.registers[register_idx]
.compare_exchange_weak(current, leading_zeros, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
}
pub fn estimate(&self) -> u64 {
let mut sum = 0.0;
let mut zeros = 0;
for reg in &self.registers {
let val = reg.load(Ordering::Relaxed);
sum += 1.0 / (1u64 << val) as f64;
if val == 0 {
zeros += 1;
}
}
let m = self.num_registers as f64;
let raw_estimate = self.alpha * m * m / sum;
let estimate = if raw_estimate <= 2.5 * m && zeros > 0 {
m * (m / zeros as f64).ln()
} else if raw_estimate > (1u64 << 32) as f64 / 30.0 {
-((1u64 << 32) as f64) * (1.0 - raw_estimate / (1u64 << 32) as f64).ln()
} else {
raw_estimate
};
estimate.round() as u64
}
pub fn merge(&self, other: &HyperLogLog) {
assert_eq!(self.num_registers, other.num_registers);
for i in 0..self.num_registers {
loop {
let current = self.registers[i].load(Ordering::Acquire);
let other_val = other.registers[i].load(Ordering::Relaxed);
let new_val = current.max(other_val);
if new_val == current {
break;
}
if self.registers[i]
.compare_exchange_weak(current, new_val, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
}
}
pub fn memory_usage(&self) -> usize {
self.num_registers
}
pub fn relative_error(&self) -> f64 {
1.04 / (self.num_registers as f64).sqrt()
}
}
pub struct OffsetBloomFilter {
filter: BloomFilter,
min_offset: AtomicU64,
max_offset: AtomicU64,
bucket_size: u64,
}
impl OffsetBloomFilter {
pub fn new(expected_offsets: usize, bucket_size: u64) -> Self {
Self {
filter: BloomFilter::new(expected_offsets, 0.01),
min_offset: AtomicU64::new(u64::MAX),
max_offset: AtomicU64::new(0),
bucket_size,
}
}
pub fn insert(&self, offset: u64) {
self.filter.insert(&offset);
self.min_offset.fetch_min(offset, Ordering::AcqRel);
self.max_offset.fetch_max(offset, Ordering::AcqRel);
}
pub fn contains(&self, offset: u64) -> bool {
let min = self.min_offset.load(Ordering::Acquire);
let max = self.max_offset.load(Ordering::Acquire);
if offset < min || offset > max {
return false;
}
self.filter.contains(&offset)
}
pub fn contains_range(&self, start: u64, end: u64) -> bool {
let min = self.min_offset.load(Ordering::Acquire);
let max = self.max_offset.load(Ordering::Acquire);
if end < min || start > max {
return false;
}
if end - start <= 10 {
for offset in start..=end {
if self.filter.contains(&offset) {
return true;
}
}
return false;
}
let start_bucket = start / self.bucket_size;
let end_bucket = end / self.bucket_size;
for bucket in start_bucket..=end_bucket {
let bucket_start = bucket * self.bucket_size;
let bucket_end = bucket_start + self.bucket_size - 1;
for &offset in &[
bucket_start,
bucket_start + self.bucket_size / 2,
bucket_end,
] {
if offset >= start && offset <= end && self.filter.contains(&offset) {
return true;
}
}
}
let step = ((end - start) / 10).max(1);
let mut offset = start;
while offset <= end {
if self.filter.contains(&offset) {
return true;
}
offset += step;
}
false
}
pub fn offset_range(&self) -> (u64, u64) {
(
self.min_offset.load(Ordering::Acquire),
self.max_offset.load(Ordering::Acquire),
)
}
pub fn count(&self) -> u64 {
self.filter.count()
}
}
#[derive(Debug, Clone)]
pub struct BatchConfig {
pub min_batch_size: usize,
pub max_batch_size: usize,
pub max_linger_us: u64,
pub target_latency_us: u64,
pub adaptive: bool,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
min_batch_size: 16,
max_batch_size: 1024,
max_linger_us: 5000, target_latency_us: 1000, adaptive: true,
}
}
}
pub struct AdaptiveBatcher<T> {
config: BatchConfig,
batch: parking_lot::Mutex<Vec<T>>,
current_batch_size: AtomicU32,
batch_start_us: AtomicU64,
recent_latencies: [AtomicU64; 8],
latency_index: AtomicU32,
batches_flushed: AtomicU64,
items_batched: AtomicU64,
}
impl<T> AdaptiveBatcher<T> {
pub fn new(config: BatchConfig) -> Self {
let initial_size = (config.min_batch_size + config.max_batch_size) / 2;
Self {
config,
batch: parking_lot::Mutex::new(Vec::with_capacity(initial_size)),
current_batch_size: AtomicU32::new(initial_size as u32),
batch_start_us: AtomicU64::new(0),
recent_latencies: std::array::from_fn(|_| AtomicU64::new(0)),
latency_index: AtomicU32::new(0),
batches_flushed: AtomicU64::new(0),
items_batched: AtomicU64::new(0),
}
}
pub fn add(&self, item: T) -> Option<Vec<T>> {
let now = Self::now_us();
let mut batch = self.batch.lock();
if batch.is_empty() {
self.batch_start_us.store(now, Ordering::Release);
}
batch.push(item);
self.items_batched.fetch_add(1, Ordering::Relaxed);
let batch_size = self.current_batch_size.load(Ordering::Relaxed) as usize;
let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
let should_flush = batch.len() >= batch_size
|| batch_age >= self.config.max_linger_us
|| batch.len() >= self.config.max_batch_size;
if should_flush {
let flushed = std::mem::take(&mut *batch);
batch.reserve(batch_size);
self.batches_flushed.fetch_add(1, Ordering::Relaxed);
self.record_latency(batch_age);
Some(flushed)
} else {
None
}
}
pub fn flush(&self) -> Vec<T> {
let now = Self::now_us();
let mut batch = self.batch.lock();
if !batch.is_empty() {
let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
self.record_latency(batch_age);
self.batches_flushed.fetch_add(1, Ordering::Relaxed);
}
let batch_size = self.current_batch_size.load(Ordering::Relaxed) as usize;
let flushed = std::mem::take(&mut *batch);
batch.reserve(batch_size);
flushed
}
pub fn needs_flush(&self) -> bool {
let batch = self.batch.lock();
if batch.is_empty() {
return false;
}
let now = Self::now_us();
let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
batch_age >= self.config.max_linger_us
}
pub fn current_batch_size(&self) -> usize {
self.current_batch_size.load(Ordering::Relaxed) as usize
}
pub fn pending_count(&self) -> usize {
self.batch.lock().len()
}
pub fn stats(&self) -> BatcherStats {
BatcherStats {
batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
items_batched: self.items_batched.load(Ordering::Relaxed),
current_batch_size: self.current_batch_size.load(Ordering::Relaxed) as usize,
avg_latency_us: self.average_latency(),
}
}
fn record_latency(&self, latency_us: u64) {
if !self.config.adaptive {
return;
}
let idx = self.latency_index.fetch_add(1, Ordering::Relaxed) as usize % 8;
self.recent_latencies[idx].store(latency_us, Ordering::Relaxed);
let avg_latency = self.average_latency();
let current_size = self.current_batch_size.load(Ordering::Relaxed);
let new_size = if avg_latency > self.config.target_latency_us * 2 {
(current_size * 3 / 4).max(self.config.min_batch_size as u32)
} else if avg_latency < self.config.target_latency_us / 2 {
(current_size * 5 / 4).min(self.config.max_batch_size as u32)
} else {
current_size
};
self.current_batch_size.store(new_size, Ordering::Relaxed);
}
fn average_latency(&self) -> u64 {
let mut sum = 0u64;
let mut count = 0u64;
for lat in &self.recent_latencies {
let l = lat.load(Ordering::Relaxed);
if l > 0 {
sum += l;
count += 1;
}
}
if count > 0 {
sum / count
} else {
0
}
}
fn now_us() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64
}
}
#[derive(Debug, Clone)]
pub struct BatcherStats {
pub batches_flushed: u64,
pub items_batched: u64,
pub current_batch_size: usize,
pub avg_latency_us: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bloom_filter() {
let filter = BloomFilter::new(1000, 0.01);
for i in 0..1000 {
filter.insert(&i);
}
for i in 0..1000 {
assert!(filter.contains(&i), "Item {} should be present", i);
}
let mut false_positives = 0;
for i in 1000..2000 {
if filter.contains(&i) {
false_positives += 1;
}
}
assert!(
false_positives < 30,
"Too many false positives: {}",
false_positives
);
}
#[test]
fn test_counting_bloom_filter() {
let filter = CountingBloomFilter::new(1000, 0.01);
filter.insert(&42);
filter.insert(&43);
assert!(filter.contains(&42));
assert!(filter.contains(&43));
filter.remove(&42);
assert!(!filter.contains(&42));
assert!(filter.contains(&43));
}
#[test]
fn test_hyperloglog() {
let hll = HyperLogLog::new(14);
for i in 0..10000 {
hll.add(&i);
}
let estimate = hll.estimate();
let error = (estimate as i64 - 10000i64).abs() as f64 / 10000.0;
assert!(
error < 0.1,
"Estimate {} too far from 10000 (error: {}%)",
estimate,
error * 100.0
);
}
#[test]
fn test_hyperloglog_merge() {
let hll1 = HyperLogLog::new(10);
let hll2 = HyperLogLog::new(10);
for i in 0..5000 {
hll1.add(&i);
}
for i in 5000..10000 {
hll2.add(&i);
}
hll1.merge(&hll2);
let estimate = hll1.estimate();
let error = (estimate as i64 - 10000i64).abs() as f64 / 10000.0;
assert!(
error < 0.15,
"Merged estimate {} too far from 10000",
estimate
);
}
#[test]
fn test_offset_bloom_filter() {
let filter = OffsetBloomFilter::new(1000, 100);
for offset in (0..1000).step_by(10) {
filter.insert(offset);
}
assert!(filter.contains(0));
assert!(filter.contains(100));
assert!(!filter.contains(5));
assert!(filter.contains_range(0, 50));
assert!(filter.contains_range(90, 110));
let (min, max) = filter.offset_range();
assert_eq!(min, 0);
assert_eq!(max, 990);
}
#[test]
fn test_adaptive_batcher() {
let config = BatchConfig {
min_batch_size: 4,
max_batch_size: 16,
max_linger_us: 10000,
target_latency_us: 1000,
adaptive: true,
};
let batcher = AdaptiveBatcher::new(config);
let mut flushed = None;
for i in 0..20 {
if let Some(batch) = batcher.add(i) {
flushed = Some(batch);
break;
}
}
assert!(flushed.is_some());
let batch = flushed.unwrap();
assert!(!batch.is_empty());
}
}