use crate::merge::Merge;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct BloomCRDT {
bits: Vec<u64>,
k: usize,
count: usize,
m: usize,
}
impl BloomCRDT {
pub fn new(expected_items: usize, fp_rate: f64) -> Self {
let m = optimal_m(expected_items, fp_rate);
let k = optimal_k(m, expected_items);
let words = (m + 63) / 64;
Self {
bits: vec![0u64; words],
k,
count: 0,
m,
}
}
pub fn with_params(m: usize, k: usize) -> Self {
let words = (m + 63) / 64;
Self {
bits: vec![0u64; words],
k,
count: 0,
m,
}
}
pub fn insert(&mut self, item: &str) {
let hashes = self.hash(item);
for h in hashes {
let (word, bit) = self.index(h);
self.bits[word] |= 1u64 << bit;
}
self.count += 1;
}
pub fn contains(&self, item: &str) -> bool {
let hashes = self.hash(item);
for h in hashes {
let (word, bit) = self.index(h);
if self.bits[word] & (1u64 << bit) == 0 {
return false;
}
}
true
}
pub fn estimated_fpr(&self) -> f64 {
let set_bits = self.set_bit_count();
let ratio = set_bits as f64 / self.m as f64;
ratio.powi(self.k as i32)
}
fn set_bit_count(&self) -> usize {
self.bits.iter().map(|w| w.count_ones() as usize).sum()
}
pub fn fill_ratio(&self) -> f64 {
self.set_bit_count() as f64 / self.m as f64
}
pub fn space_bytes(&self) -> usize {
self.bits.len() * 8
}
pub fn wire_size(&self) -> usize {
self.space_bytes()
}
pub fn count(&self) -> usize {
self.count
}
fn hash(&self, item: &str) -> Vec<usize> {
let mut result = Vec::with_capacity(self.k);
let (h1, h2) = double_hash(item);
for i in 0..self.k {
result.push(((h1 as usize).wrapping_add(i.wrapping_mul(h2 as usize))) % self.m);
}
result
}
fn index(&self, pos: usize) -> (usize, usize) {
(pos / 64, pos % 64)
}
}
impl Merge for BloomCRDT {
fn merge(&mut self, other: &Self) {
for i in 0..self.bits.len().min(other.bits.len()) {
self.bits[i] |= other.bits[i];
}
self.count = self.count.max(other.count);
}
}
fn optimal_m(n: usize, p: f64) -> usize {
let m = -(n as f64) * p.ln() / (2.0_f64.ln().powi(2));
m.ceil() as usize
}
fn optimal_k(m: usize, n: usize) -> usize {
let k = (m as f64 / n as f64) * 2.0_f64.ln();
k.ceil() as usize
}
fn double_hash(item: &str) -> (u64, u64) {
let mut h1: u64 = 0xcbf29ce484222325;
let mut h2: u64 = 0x9e3779b97f4a7c15;
for &b in item.as_bytes() {
h1 ^= b as u64;
h1 = h1.wrapping_mul(0x100000001b3);
h2 ^= b as u64;
h2 = h2.wrapping_mul(0x100000001b3);
}
(h1, h2)
}
impl fmt::Display for BloomCRDT {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BloomCRDT({} items, {} bits, k={}, FPR={:.4}, {} bytes)",
self.count, self.m, self.k, self.estimated_fpr(), self.space_bytes())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::merge::laws;
#[test]
fn test_no_false_negatives() {
let mut bf = BloomCRDT::new(1000, 0.01);
let items: Vec<String> = (0..1000).map(|i| format!("constraint_{}", i)).collect();
for item in &items { bf.insert(item); }
for item in &items {
assert!(bf.contains(item), "False negative for {}", item);
}
}
#[test]
fn test_measured_false_positive_rate() {
let mut bf = BloomCRDT::new(1000, 0.01);
for i in 0..1000 { bf.insert(&format!("item_{}", i)); }
let mut false_positives = 0;
let trials = 100_000;
for i in 1000..1000 + trials {
if bf.contains(&format!("item_{}", i)) {
false_positives += 1;
}
}
let measured_fpr = false_positives as f64 / trials as f64;
println!("Target FPR: 0.01, Measured FPR: {:.6}", measured_fpr);
assert!(measured_fpr < 0.05, "FPR too high: {:.4}", measured_fpr);
}
#[test]
fn test_merge_preserves_membership() {
let mut a = BloomCRDT::new(100, 0.01);
let mut b = BloomCRDT::new(100, 0.01);
a.insert("constraint_a");
b.insert("constraint_b");
assert!(a.contains("constraint_a"));
assert!(!a.contains("constraint_b"));
let merged = a.merged(&b);
assert!(merged.contains("constraint_a"));
assert!(merged.contains("constraint_b"));
}
#[test]
fn test_merge_commutative() {
let mut a = BloomCRDT::new(100, 0.01);
a.insert("a1"); a.insert("a2");
let mut b = BloomCRDT::new(100, 0.01);
b.insert("b1"); b.insert("b2");
assert!(laws::check_commutative(&a, &b));
}
#[test]
fn test_merge_idempotent() {
let mut a = BloomCRDT::new(100, 0.01);
a.insert("a1"); a.insert("a2");
assert!(laws::check_idempotent(&a));
}
#[test]
fn test_merge_associative() {
let mut a = BloomCRDT::new(100, 0.01);
a.insert("a1");
let mut b = BloomCRDT::new(100, 0.01);
b.insert("b1");
let mut c = BloomCRDT::new(100, 0.01);
c.insert("c1");
assert!(laws::check_associative(&a, &b, &c));
}
#[test]
fn test_space_efficiency() {
let bf = BloomCRDT::new(10_000, 0.01);
let bits_per_item = bf.m as f64 / 10_000.0;
println!("Bits per item: {:.1} (optimal ~9.6)", bits_per_item);
assert!(bits_per_item < 12.0, "Too many bits per item");
}
#[test]
fn test_wire_size_comparison() {
let bf = BloomCRDT::new(10_000, 0.01);
let bloom_bytes = bf.wire_size();
let exact_bytes = 10_000 * 32; let ratio = bloom_bytes as f64 / exact_bytes as f64;
println!("Bloom: {} bytes, Exact: {} bytes, Ratio: {:.2}x",
bloom_bytes, exact_bytes, ratio);
assert!(ratio < 0.15, "Bloom should be much smaller");
}
}