#![allow(clippy::uninlined_format_args)]
use probabilistic_rs::bloom::{
BloomFilter, BloomFilterConfigBuilder, BloomFilterOps, BloomFilterStats,
BulkBloomFilterOps, PersistenceConfigBuilder,
};
use probabilistic_rs::common::bits2hr;
use std::collections::HashSet;
use std::path::PathBuf;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::WARN)
.with_env_filter(
tracing_subscriber::EnvFilter::new("warn")
.add_directive("fjall=off".parse()?)
.add_directive("lsm_tree=off".parse()?),
)
.with_target(false)
.init();
println!("🌸 Core Bloom Filter Example");
println!("==============================");
basic_workflow_example().await?;
false_positive_rate_example().await?;
capacity_limits_example().await?;
persistence_example().await?;
bulk_operations_example().await?;
Ok(())
}
async fn basic_workflow_example() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📝 Basic Workflow Example");
println!("---------------------------");
let config = BloomFilterConfigBuilder::default()
.capacity(1_000_000)
.false_positive_rate(0.01) .build()?;
let filter = BloomFilter::create(config).await?;
println!("Created bloom filter:");
println!(" Capacity: {}", filter.capacity());
println!(" Target FPR: {:.2}%", filter.false_positive_rate() * 100.0);
println!(" Bit vector size: {} bits", filter.bit_vector_size);
println!(
" Bit vector size in memory: {}",
filter.approx_memory_bits()
);
println!(" Bits per item: {:.2}", filter.bits_per_item());
println!(" Bit vector size: {}", bits2hr(filter.bit_vector_size));
println!(" Hash functions: {}", filter.num_hashes);
let items = vec!["apple", "banana", "cherry", "date", "elderberry"];
for item in &items {
filter.insert(item.as_bytes())?;
println!(" ✅ Inserted: {}", item);
}
println!("\nQuerying items:");
for item in &items {
let exists = filter.contains(item.as_bytes())?;
println!(" {} exists: {}", item, if exists { "✅" } else { "❌" });
}
let test_items = vec!["grape", "kiwi", "mango"];
for item in &test_items {
let exists = filter.contains(item.as_bytes())?;
println!(
" {} exists: {}",
item,
if exists {
"🟡 (false positive?)"
} else {
"❌"
}
);
}
println!(" Inserted elements inserted: {}", filter.insert_count());
Ok(())
}
async fn false_positive_rate_example() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📊 False Positive Rate Measurement");
println!("-----------------------------------");
let config = BloomFilterConfigBuilder::default()
.capacity(1_000)
.false_positive_rate(0.05) .build()?;
let filter = BloomFilter::create(config).await?;
let mut inserted_items = HashSet::new();
for i in 0..500 {
let item = format!("item_{:04}", i);
filter.insert(item.as_bytes())?;
inserted_items.insert(item);
}
println!("Inserted {} items into filter", inserted_items.len());
let mut false_positives = 0;
let test_count = 1000;
for i in 1000..1000 + test_count {
let test_item = format!("test_{:04}", i);
if filter.contains(test_item.as_bytes())? {
if !inserted_items.contains(&test_item) {
false_positives += 1;
}
}
}
let measured_fpr = false_positives as f64 / test_count as f64;
let target_fpr = filter.false_positive_rate();
println!("False positive rate analysis:");
println!(
" Target FPR: {:.4}% ({:.4})",
target_fpr * 100.0,
target_fpr
);
println!(
" Measured FPR: {:.4}% ({:.4})",
measured_fpr * 100.0,
measured_fpr
);
println!(
" Ratio (measured/target): {:.2}x",
measured_fpr / target_fpr
);
println!(
" False positives found: {}/{}",
false_positives, test_count
);
Ok(())
}
async fn capacity_limits_example() -> Result<(), Box<dyn std::error::Error>> {
println!("\n⚠️ Capacity Limits & Performance");
println!("----------------------------------");
let config = BloomFilterConfigBuilder::default()
.capacity(100) .false_positive_rate(0.01)
.build()?;
let _ = BloomFilter::create(config).await?;
println!("Testing with small capacity filter (100 items):");
for fill_percentage in [25, 50, 75, 100, 150, 200] {
let test_filter = BloomFilter::create(
BloomFilterConfigBuilder::default()
.capacity(100)
.false_positive_rate(0.01)
.build()?,
)
.await?;
let items_to_insert = fill_percentage;
for i in 0..items_to_insert {
let item = format!("load_test_{:03}", i);
test_filter.insert(item.as_bytes())?;
}
let mut fps = 0;
let tests = 100;
for i in 1000..(1000 + tests) {
let test_item = format!("fp_test_{:03}", i);
if test_filter.contains(test_item.as_bytes())? {
fps += 1;
}
}
let measured_fpr = fps as f64 / tests as f64;
println!(
" {}% fill ({} items): FPR = {:.4}%",
fill_percentage,
items_to_insert,
measured_fpr * 100.0
);
}
println!(
"\n💡 Observation: FPR increases significantly when exceeding capacity!"
);
Ok(())
}
async fn persistence_example() -> Result<(), Box<dyn std::error::Error>> {
use std::fs;
println!("\n🗄️ Database Persistence Example");
println!("----------------------------------");
fs::create_dir_all("tmp")?;
let db_path = PathBuf::from("tmp/test_bloom_db.fjall");
println!("Step 1: Creating new database at {:?}", db_path);
let persistence_config = PersistenceConfigBuilder::default()
.db_path(db_path.clone())
.chunk_size_bytes(1024) .build()?;
let original_config = BloomFilterConfigBuilder::default()
.capacity(50000)
.false_positive_rate(0.02) .persistence(Some(persistence_config))
.build()?;
println!(
" Original config - capacity: {}, FPR: {:.3}%",
original_config.capacity,
original_config.false_positive_rate * 100.0
);
let filter = BloomFilter::create(original_config.clone()).await?;
let test_items = ["apple", "banana", "cherry", "date", "elderberry"];
for item in &test_items {
filter.insert(item.as_bytes())?;
}
println!(" Inserted {} test items", test_items.len());
println!(" Saving snapshot to database...");
filter.save_snapshot().await?;
drop(filter);
println!("\nStep 2: Analyzing database folder");
if db_path.exists() {
let metadata = fs::metadata(&db_path)?;
if metadata.is_dir() {
let entries: Vec<_> =
fs::read_dir(&db_path)?.collect::<Result<Vec<_>, _>>()?;
let total_size: u64 = entries
.iter()
.filter_map(|entry| entry.metadata().ok().map(|m| m.len()))
.sum();
println!(" Database folder: {:?}", db_path);
println!(" File count: {}", entries.len());
println!(
" Total size: {} bytes ({:.2} KB)",
total_size,
total_size as f64 / 1024.0
);
for entry in entries {
let metadata = entry.metadata()?;
println!(
" - {}: {} bytes",
entry.file_name().to_string_lossy(),
metadata.len()
);
}
}
} else {
println!(
" ⚠️ Database folder not found (expected for dummy implementation)"
);
}
println!("\nStep 3: Loading from database and verifying config");
let loaded_filter = BloomFilter::load(db_path.clone()).await?;
let loaded_config = loaded_filter.config();
println!(
" Loaded config - capacity: {}, FPR: {:.3}%",
loaded_config.capacity,
loaded_config.false_positive_rate * 100.0
);
println!(" Verifying persisted data:");
for item in &test_items {
let exists = loaded_filter.contains(item.as_bytes())?;
println!(" {} exists: {}", item, if exists { "✅" } else { "❌" });
}
let capacity_match = original_config.capacity == loaded_config.capacity;
let fpr_match = (original_config.false_positive_rate
- loaded_config.false_positive_rate)
.abs()
< f64::EPSILON;
println!(" Config verification:");
println!(
" Capacity match: {} ({} == {})",
if capacity_match { "✅" } else { "❌" },
original_config.capacity,
loaded_config.capacity
);
println!(
" FPR match: {} ({:.4} == {:.4})",
if fpr_match { "✅" } else { "❌" },
original_config.false_positive_rate,
loaded_config.false_positive_rate
);
if capacity_match && fpr_match {
println!(" 🎉 All parameters match successfully!");
} else {
println!(" ❌ Parameter mismatch detected!");
}
println!("\nStep 4: Testing create_or_load behavior");
println!(" Testing create_or_load with existing DB...");
let reloaded_filter =
BloomFilter::create_or_load(original_config.clone()).await?;
println!(
" Loaded existing - capacity: {}",
reloaded_filter.capacity()
);
if db_path.exists() {
fs::remove_dir_all(&db_path)?;
println!(" Removed database for create test");
}
println!(" Testing create_or_load with missing DB...");
let new_filter = BloomFilter::create_or_load(original_config.clone()).await?;
println!(" Created new - capacity: {}", new_filter.capacity());
println!("\n✅ Persistence example completed!");
Ok(())
}
async fn bulk_operations_example() -> Result<(), Box<dyn std::error::Error>> {
println!("\n⚡ Bulk Operations Example");
println!("-------------------------");
let config = BloomFilterConfigBuilder::default()
.capacity(10_000)
.false_positive_rate(0.01)
.build()?;
let filter = BloomFilter::create(config).await?;
let bulk_items: Vec<String> =
(0..100).map(|i| format!("bulk_item_{:04}", i)).collect();
let bulk_refs: Vec<&[u8]> = bulk_items.iter().map(|s| s.as_bytes()).collect();
println!("Prepared {} items for bulk operations", bulk_items.len());
println!("\n📝 Testing bulk insert...");
let start = std::time::Instant::now();
filter.insert_bulk(&bulk_refs)?;
let bulk_insert_duration = start.elapsed();
println!("Testing bulk contains...");
let start = std::time::Instant::now();
let results = filter.contains_bulk(&bulk_refs)?;
let bulk_query_duration = start.elapsed();
let found_count = results.iter().filter(|&&exists| exists).count();
println!("\n📊 Performance comparison:");
let test_filter = BloomFilter::create(
BloomFilterConfigBuilder::default()
.capacity(10_000)
.false_positive_rate(0.01)
.build()?,
)
.await?;
let start = std::time::Instant::now();
for item_bytes in &bulk_refs {
test_filter.insert(item_bytes)?;
}
let individual_insert_duration = start.elapsed();
let start = std::time::Instant::now();
let mut individual_found_count = 0;
for item_bytes in &bulk_refs {
if test_filter.contains(item_bytes)? {
individual_found_count += 1;
}
}
let individual_query_duration = start.elapsed();
let bulk_insert_rate = if bulk_insert_duration.as_millis() > 0 {
bulk_items.len() as f64 / bulk_insert_duration.as_millis() as f64
} else {
bulk_items.len() as f64
/ (bulk_insert_duration.as_micros() as f64 / 1000.0)
};
let bulk_query_rate = if bulk_query_duration.as_millis() > 0 {
bulk_items.len() as f64 / bulk_query_duration.as_millis() as f64
} else {
bulk_items.len() as f64
/ (bulk_query_duration.as_micros() as f64 / 1000.0)
};
let individual_insert_rate = if individual_insert_duration.as_millis() > 0 {
bulk_items.len() as f64 / individual_insert_duration.as_millis() as f64
} else {
bulk_items.len() as f64
/ (individual_insert_duration.as_micros() as f64 / 1000.0)
};
let individual_query_rate = if individual_query_duration.as_millis() > 0 {
bulk_items.len() as f64 / individual_query_duration.as_millis() as f64
} else {
bulk_items.len() as f64
/ (individual_query_duration.as_micros() as f64 / 1000.0)
};
println!("\n🔧 Bulk Operations:");
println!(
" Insert time: {:?} ({:.1} ops/ms)",
bulk_insert_duration, bulk_insert_rate
);
println!(
" Query time: {:?} ({:.1} ops/ms)",
bulk_query_duration, bulk_query_rate
);
println!(" Items found: {}/{}", found_count, bulk_items.len());
println!("\n🔧 Individual Operations:");
println!(
" Insert time: {:?} ({:.1} ops/ms)",
individual_insert_duration, individual_insert_rate
);
println!(
" Query time: {:?} ({:.1} ops/ms)",
individual_query_duration, individual_query_rate
);
println!(
" Items found: {}/{}",
individual_found_count,
bulk_items.len()
);
println!("\n📈 Performance Improvement:");
let insert_speedup = individual_insert_duration.as_nanos() as f64
/ bulk_insert_duration.as_nanos() as f64;
let query_speedup = individual_query_duration.as_nanos() as f64
/ bulk_query_duration.as_nanos() as f64;
println!(" Insert speedup: {:.2}x", insert_speedup);
println!(" Query speedup: {:.2}x", query_speedup);
let test_items: Vec<String> = (1000..1010)
.map(|i| format!("test_item_{:04}", i))
.collect();
let test_refs: Vec<&[u8]> = test_items.iter().map(|s| s.as_bytes()).collect();
let test_results = filter.contains_bulk(&test_refs)?;
let false_positives = test_results.iter().filter(|&&exists| exists).count();
println!(
"\n🎯 False positives: {}/{} test items",
false_positives,
test_items.len()
);
println!("\n✅ Bulk operations implemented and working!");
Ok(())
}