Skip to main content

rivven_core/
bloom.rs

1//! High-Performance Bloom Filters and Probabilistic Data Structures
2//!
3//! Optimized for streaming workloads with:
4//! - **Bloom Filter**: Fast set membership testing for offset ranges
5//! - **Counting Bloom Filter**: Supports deletions for dynamic sets
6//! - **Cuckoo Filter**: Better space efficiency with deletions
7//! - **HyperLogLog**: Cardinality estimation for metrics
8//!
9//! # Use Cases in Rivven
10//!
11//! 1. **Offset Range Queries**: Quickly determine if an offset exists in a segment
12//! 2. **Consumer Group Tracking**: Track which offsets have been consumed
13//! 3. **Deduplication**: Detect duplicate messages in idempotent producers
14//! 4. **Cardinality Estimation**: Estimate unique keys/consumers without memory explosion
15//!
16//! # Architecture
17//!
18//! ```text
19//! ┌─────────────────────────────────────────────────────────────────────────┐
20//! │                    Probabilistic Data Structures                         │
21//! ├─────────────────────────────────────────────────────────────────────────┤
22//! │                                                                          │
23//! │  ┌────────────────────┐     ┌────────────────────┐                      │
24//! │  │    Bloom Filter     │     │  Counting Bloom    │                      │
25//! │  │  ┌──┬──┬──┬──┬──┐  │     │  ┌──┬──┬──┬──┬──┐  │                      │
26//! │  │  │0 │1 │0 │1 │0 │  │     │  │0 │2 │0 │1 │0 │  │                      │
27//! │  │  └──┴──┴──┴──┴──┘  │     │  └──┴──┴──┴──┴──┘  │                      │
28//! │  │  O(k) lookup       │     │  Supports delete   │                      │
29//! │  │  False positive    │     │  Counter overflow  │                      │
30//! │  └────────────────────┘     └────────────────────┘                      │
31//! │                                                                          │
32//! │  ┌────────────────────┐     ┌────────────────────┐                      │
33//! │  │   Cuckoo Filter    │     │    HyperLogLog     │                      │
34//! │  │  ┌──────┐ ┌──────┐ │     │  ┌──┬──┬──┬──┬──┐  │                      │
35//! │  │  │ fp₁  │ │ fp₂  │ │     │  │3 │5 │2 │7 │4 │  │                      │
36//! │  │  └──────┘ └──────┘ │     │  └──┴──┴──┴──┴──┘  │                      │
37//! │  │  Better deletion   │     │  Cardinality est.  │                      │
38//! │  │  Lower false pos   │     │  1.04/√m accuracy  │                      │
39//! │  └────────────────────┘     └────────────────────┘                      │
40//! │                                                                          │
41//! └─────────────────────────────────────────────────────────────────────────┘
42//! ```
43
44use std::collections::hash_map::DefaultHasher;
45use std::hash::{Hash, Hasher};
46use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
47
48// ============================================================================
49// Bloom Filter
50// ============================================================================
51
52/// High-performance Bloom filter with SIMD-friendly layout
53///
54/// Uses multiple hash functions derived from two base hashes (Kirsch-Mitzenmacher optimization)
55/// to provide O(k) lookup where k is the number of hash functions.
56pub struct BloomFilter {
57    /// Bit array stored as 64-bit words for cache efficiency
58    bits: Vec<AtomicU64>,
59    /// Number of bits
60    num_bits: usize,
61    /// Number of hash functions
62    num_hashes: usize,
63    /// Number of items inserted
64    count: AtomicU64,
65}
66
67impl BloomFilter {
68    /// Create a new Bloom filter with optimal parameters
69    ///
70    /// # Arguments
71    /// * `expected_items` - Expected number of items to insert
72    /// * `false_positive_rate` - Desired false positive rate (0.0 - 1.0)
73    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
74        // Optimal number of bits: m = -n*ln(p) / (ln(2)^2)
75        let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
76        let num_bits =
77            (-(expected_items as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
78
79        // Round up to multiple of 64 for word alignment
80        let num_bits = num_bits.div_ceil(64) * 64;
81
82        // Optimal number of hash functions: k = (m/n) * ln(2)
83        let num_hashes =
84            ((num_bits as f64 / expected_items as f64) * std::f64::consts::LN_2).ceil() as usize;
85        let num_hashes = num_hashes.clamp(1, 16); // Clamp to reasonable range
86
87        let num_words = num_bits / 64;
88        let bits = (0..num_words).map(|_| AtomicU64::new(0)).collect();
89
90        Self {
91            bits,
92            num_bits,
93            num_hashes,
94            count: AtomicU64::new(0),
95        }
96    }
97
98    /// Create a Bloom filter with explicit parameters
99    pub fn with_params(num_bits: usize, num_hashes: usize) -> Self {
100        let num_bits = num_bits.div_ceil(64) * 64;
101        let num_words = num_bits / 64;
102        let bits = (0..num_words).map(|_| AtomicU64::new(0)).collect();
103
104        Self {
105            bits,
106            num_bits,
107            num_hashes: num_hashes.max(1),
108            count: AtomicU64::new(0),
109        }
110    }
111
112    /// Insert an item into the filter
113    pub fn insert<T: Hash>(&self, item: &T) {
114        let (h1, h2) = self.hash_pair(item);
115
116        for i in 0..self.num_hashes {
117            let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
118            self.set_bit(bit_index);
119        }
120
121        self.count.fetch_add(1, Ordering::Relaxed);
122    }
123
124    /// Check if an item might be in the filter
125    ///
126    /// Returns `true` if the item might be present (possible false positive)
127    /// Returns `false` if the item is definitely not present
128    pub fn contains<T: Hash>(&self, item: &T) -> bool {
129        let (h1, h2) = self.hash_pair(item);
130
131        for i in 0..self.num_hashes {
132            let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
133            if !self.get_bit(bit_index) {
134                return false;
135            }
136        }
137
138        true
139    }
140
141    /// Insert an item and check if it was already present
142    pub fn insert_and_check<T: Hash>(&self, item: &T) -> bool {
143        let (h1, h2) = self.hash_pair(item);
144        let mut was_present = true;
145
146        for i in 0..self.num_hashes {
147            let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
148            if !self.set_bit(bit_index) {
149                was_present = false;
150            }
151        }
152
153        self.count.fetch_add(1, Ordering::Relaxed);
154        was_present
155    }
156
157    /// Get the number of items inserted
158    pub fn count(&self) -> u64 {
159        self.count.load(Ordering::Relaxed)
160    }
161
162    /// Estimate current false positive rate
163    pub fn estimated_fp_rate(&self) -> f64 {
164        let bits_set = self.count_bits_set();
165        let fill_ratio = bits_set as f64 / self.num_bits as f64;
166        fill_ratio.powi(self.num_hashes as i32)
167    }
168
169    /// Get fill ratio (0.0 - 1.0)
170    pub fn fill_ratio(&self) -> f64 {
171        self.count_bits_set() as f64 / self.num_bits as f64
172    }
173
174    /// Get memory usage in bytes
175    pub fn memory_usage(&self) -> usize {
176        self.bits.len() * 8
177    }
178
179    /// Clear the filter
180    pub fn clear(&self) {
181        for word in &self.bits {
182            word.store(0, Ordering::Relaxed);
183        }
184        self.count.store(0, Ordering::Relaxed);
185    }
186
187    /// Compute two independent hashes for an item
188    fn hash_pair<T: Hash>(&self, item: &T) -> (u64, u64) {
189        let mut h1 = DefaultHasher::new();
190        item.hash(&mut h1);
191        let hash1 = h1.finish();
192
193        // Use different seeds for second hash
194        let mut h2 = DefaultHasher::new();
195        hash1.hash(&mut h2);
196        let hash2 = h2.finish();
197
198        (hash1, hash2)
199    }
200
201    /// Combine two hashes using Kirsch-Mitzenmacher optimization
202    fn combined_hash(&self, h1: u64, h2: u64, i: usize) -> usize {
203        h1.wrapping_add(h2.wrapping_mul(i as u64)) as usize
204    }
205
206    /// Set a bit and return whether it was already set
207    fn set_bit(&self, bit_index: usize) -> bool {
208        let word_index = bit_index / 64;
209        let bit_offset = bit_index % 64;
210        let mask = 1u64 << bit_offset;
211
212        let old = self.bits[word_index].fetch_or(mask, Ordering::AcqRel);
213        (old & mask) != 0
214    }
215
216    /// Get a bit value
217    fn get_bit(&self, bit_index: usize) -> bool {
218        let word_index = bit_index / 64;
219        let bit_offset = bit_index % 64;
220        let mask = 1u64 << bit_offset;
221
222        (self.bits[word_index].load(Ordering::Acquire) & mask) != 0
223    }
224
225    /// Count total bits set
226    fn count_bits_set(&self) -> usize {
227        self.bits
228            .iter()
229            .map(|w| w.load(Ordering::Relaxed).count_ones() as usize)
230            .sum()
231    }
232}
233
234// ============================================================================
235// Counting Bloom Filter
236// ============================================================================
237
238/// Counting Bloom filter that supports deletions
239///
240/// Uses 4-bit counters per bucket, allowing items to be removed.
241/// Counter overflow is handled by saturation (max 15).
242pub struct CountingBloomFilter {
243    /// 4-bit counters packed into bytes (2 counters per byte)
244    counters: Vec<AtomicU8>,
245    /// Number of counters
246    num_counters: usize,
247    /// Number of hash functions
248    num_hashes: usize,
249    /// Number of items inserted (approximate)
250    count: AtomicU64,
251}
252
253impl CountingBloomFilter {
254    /// Create a new counting Bloom filter
255    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
256        let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
257        let num_counters =
258            (-(expected_items as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
259        let num_counters = num_counters.max(64);
260
261        let num_hashes = ((num_counters as f64 / expected_items as f64) * std::f64::consts::LN_2)
262            .ceil() as usize;
263        let num_hashes = num_hashes.clamp(1, 16);
264
265        // Each byte holds 2 counters
266        let num_bytes = num_counters.div_ceil(2);
267        let counters = (0..num_bytes).map(|_| AtomicU8::new(0)).collect();
268
269        Self {
270            counters,
271            num_counters,
272            num_hashes,
273            count: AtomicU64::new(0),
274        }
275    }
276
277    /// Insert an item
278    pub fn insert<T: Hash>(&self, item: &T) {
279        let (h1, h2) = self.hash_pair(item);
280
281        for i in 0..self.num_hashes {
282            let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
283            self.increment_counter(counter_index);
284        }
285
286        self.count.fetch_add(1, Ordering::Relaxed);
287    }
288
289    /// Remove an item (decrement counters)
290    pub fn remove<T: Hash>(&self, item: &T) {
291        let (h1, h2) = self.hash_pair(item);
292
293        for i in 0..self.num_hashes {
294            let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
295            self.decrement_counter(counter_index);
296        }
297
298        self.count.fetch_sub(1, Ordering::Relaxed);
299    }
300
301    /// Check if an item might be present
302    pub fn contains<T: Hash>(&self, item: &T) -> bool {
303        let (h1, h2) = self.hash_pair(item);
304
305        for i in 0..self.num_hashes {
306            let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
307            if self.get_counter(counter_index) == 0 {
308                return false;
309            }
310        }
311
312        true
313    }
314
315    /// Get approximate count of items
316    pub fn count(&self) -> u64 {
317        self.count.load(Ordering::Relaxed)
318    }
319
320    fn hash_pair<T: Hash>(&self, item: &T) -> (u64, u64) {
321        let mut h1 = DefaultHasher::new();
322        item.hash(&mut h1);
323        let hash1 = h1.finish();
324
325        let mut h2 = DefaultHasher::new();
326        hash1.hash(&mut h2);
327        let hash2 = h2.finish();
328
329        (hash1, hash2)
330    }
331
332    fn combined_hash(&self, h1: u64, h2: u64, i: usize) -> usize {
333        h1.wrapping_add(h2.wrapping_mul(i as u64)) as usize
334    }
335
336    fn increment_counter(&self, counter_index: usize) {
337        let byte_index = counter_index / 2;
338        let is_high_nibble = counter_index % 2 == 1;
339
340        loop {
341            let old_byte = self.counters[byte_index].load(Ordering::Acquire);
342            let old_counter = if is_high_nibble {
343                (old_byte >> 4) & 0x0F
344            } else {
345                old_byte & 0x0F
346            };
347
348            // Saturate at 15
349            if old_counter >= 15 {
350                return;
351            }
352
353            let new_byte = if is_high_nibble {
354                (old_byte & 0x0F) | ((old_counter + 1) << 4)
355            } else {
356                (old_byte & 0xF0) | (old_counter + 1)
357            };
358
359            if self.counters[byte_index]
360                .compare_exchange_weak(old_byte, new_byte, Ordering::AcqRel, Ordering::Acquire)
361                .is_ok()
362            {
363                return;
364            }
365        }
366    }
367
368    fn decrement_counter(&self, counter_index: usize) {
369        let byte_index = counter_index / 2;
370        let is_high_nibble = counter_index % 2 == 1;
371
372        loop {
373            let old_byte = self.counters[byte_index].load(Ordering::Acquire);
374            let old_counter = if is_high_nibble {
375                (old_byte >> 4) & 0x0F
376            } else {
377                old_byte & 0x0F
378            };
379
380            // Don't go below 0
381            if old_counter == 0 {
382                return;
383            }
384
385            let new_byte = if is_high_nibble {
386                (old_byte & 0x0F) | ((old_counter - 1) << 4)
387            } else {
388                (old_byte & 0xF0) | (old_counter - 1)
389            };
390
391            if self.counters[byte_index]
392                .compare_exchange_weak(old_byte, new_byte, Ordering::AcqRel, Ordering::Acquire)
393                .is_ok()
394            {
395                return;
396            }
397        }
398    }
399
400    fn get_counter(&self, counter_index: usize) -> u8 {
401        let byte_index = counter_index / 2;
402        let is_high_nibble = counter_index % 2 == 1;
403
404        let byte = self.counters[byte_index].load(Ordering::Acquire);
405        if is_high_nibble {
406            (byte >> 4) & 0x0F
407        } else {
408            byte & 0x0F
409        }
410    }
411}
412
413// ============================================================================
414// HyperLogLog for Cardinality Estimation
415// ============================================================================
416
417/// HyperLogLog for cardinality estimation
418///
419/// Estimates the number of unique elements with ~1.04/√m relative error
420/// where m is the number of registers.
421pub struct HyperLogLog {
422    /// Registers (each stores max leading zeros seen)
423    registers: Vec<AtomicU8>,
424    /// Number of registers (2^p)
425    num_registers: usize,
426    /// Precision parameter (log2 of num_registers)
427    precision: u8,
428    /// Alpha correction factor
429    alpha: f64,
430}
431
432impl HyperLogLog {
433    /// Create a new HyperLogLog with given precision
434    ///
435    /// # Arguments
436    /// * `precision` - Number of bits for register indexing (4-18, higher = more accurate)
437    pub fn new(precision: u8) -> Self {
438        let precision = precision.clamp(4, 18);
439        let num_registers = 1 << precision;
440
441        // Alpha correction factor
442        let alpha = match precision {
443            4 => 0.673,
444            5 => 0.697,
445            6 => 0.709,
446            _ => 0.7213 / (1.0 + 1.079 / num_registers as f64),
447        };
448
449        let registers = (0..num_registers).map(|_| AtomicU8::new(0)).collect();
450
451        Self {
452            registers,
453            num_registers,
454            precision,
455            alpha,
456        }
457    }
458
459    /// Add an item to the estimator
460    pub fn add<T: Hash>(&self, item: &T) {
461        let mut hasher = DefaultHasher::new();
462        item.hash(&mut hasher);
463        let hash = hasher.finish();
464
465        // Use first p bits for register index
466        let register_idx = (hash >> (64 - self.precision)) as usize;
467
468        // Count leading zeros in remaining bits + 1
469        let remaining = (hash << self.precision) | (1 << (self.precision - 1));
470        let leading_zeros = (remaining.leading_zeros() + 1) as u8;
471
472        // Update register with max
473        loop {
474            let current = self.registers[register_idx].load(Ordering::Acquire);
475            if leading_zeros <= current {
476                break;
477            }
478            if self.registers[register_idx]
479                .compare_exchange_weak(current, leading_zeros, Ordering::AcqRel, Ordering::Acquire)
480                .is_ok()
481            {
482                break;
483            }
484        }
485    }
486
487    /// Estimate the cardinality
488    pub fn estimate(&self) -> u64 {
489        // Harmonic mean of 2^register values
490        let mut sum = 0.0;
491        let mut zeros = 0;
492
493        for reg in &self.registers {
494            let val = reg.load(Ordering::Relaxed);
495            sum += 1.0 / (1u64 << val) as f64;
496            if val == 0 {
497                zeros += 1;
498            }
499        }
500
501        let m = self.num_registers as f64;
502        let raw_estimate = self.alpha * m * m / sum;
503
504        // Apply corrections for small and large cardinalities
505        let estimate = if raw_estimate <= 2.5 * m && zeros > 0 {
506            // Small range correction (linear counting)
507            m * (m / zeros as f64).ln()
508        } else if raw_estimate > (1u64 << 32) as f64 / 30.0 {
509            // Large range correction
510            -((1u64 << 32) as f64) * (1.0 - raw_estimate / (1u64 << 32) as f64).ln()
511        } else {
512            raw_estimate
513        };
514
515        estimate.round() as u64
516    }
517
518    /// Merge another HyperLogLog into this one
519    pub fn merge(&self, other: &HyperLogLog) {
520        assert_eq!(self.num_registers, other.num_registers);
521
522        for i in 0..self.num_registers {
523            loop {
524                let current = self.registers[i].load(Ordering::Acquire);
525                let other_val = other.registers[i].load(Ordering::Relaxed);
526                let new_val = current.max(other_val);
527
528                if new_val == current {
529                    break;
530                }
531
532                if self.registers[i]
533                    .compare_exchange_weak(current, new_val, Ordering::AcqRel, Ordering::Acquire)
534                    .is_ok()
535                {
536                    break;
537                }
538            }
539        }
540    }
541
542    /// Get memory usage in bytes
543    pub fn memory_usage(&self) -> usize {
544        self.num_registers
545    }
546
547    /// Get the relative error (theoretical)
548    pub fn relative_error(&self) -> f64 {
549        1.04 / (self.num_registers as f64).sqrt()
550    }
551}
552
553// ============================================================================
554// Offset Range Bloom Filter (Specialized for Rivven)
555// ============================================================================
556
557/// Specialized Bloom filter for offset range queries
558///
559/// Optimized for the common pattern of "does this segment contain offset X?"
560/// Uses bucketed approach for range-based queries.
561pub struct OffsetBloomFilter {
562    /// Main filter for individual offsets
563    filter: BloomFilter,
564    /// Minimum offset in the filter
565    min_offset: AtomicU64,
566    /// Maximum offset in the filter
567    max_offset: AtomicU64,
568    /// Bucket size for range queries (e.g., 1000 offsets per bucket)
569    bucket_size: u64,
570}
571
572impl OffsetBloomFilter {
573    /// Create a new offset Bloom filter
574    ///
575    /// # Arguments
576    /// * `expected_offsets` - Expected number of offsets
577    /// * `bucket_size` - Size of offset buckets for range queries
578    pub fn new(expected_offsets: usize, bucket_size: u64) -> Self {
579        Self {
580            filter: BloomFilter::new(expected_offsets, 0.01),
581            min_offset: AtomicU64::new(u64::MAX),
582            max_offset: AtomicU64::new(0),
583            bucket_size,
584        }
585    }
586
587    /// Insert an offset
588    pub fn insert(&self, offset: u64) {
589        self.filter.insert(&offset);
590
591        // Update min/max
592        self.min_offset.fetch_min(offset, Ordering::AcqRel);
593        self.max_offset.fetch_max(offset, Ordering::AcqRel);
594    }
595
596    /// Check if an offset might exist
597    pub fn contains(&self, offset: u64) -> bool {
598        // Quick range check first
599        let min = self.min_offset.load(Ordering::Acquire);
600        let max = self.max_offset.load(Ordering::Acquire);
601
602        if offset < min || offset > max {
603            return false;
604        }
605
606        self.filter.contains(&offset)
607    }
608
609    /// Check if any offset in the range might exist
610    pub fn contains_range(&self, start: u64, end: u64) -> bool {
611        let min = self.min_offset.load(Ordering::Acquire);
612        let max = self.max_offset.load(Ordering::Acquire);
613
614        // Quick range overlap check
615        if end < min || start > max {
616            return false;
617        }
618
619        // For small ranges, check each offset
620        if end - start <= 10 {
621            for offset in start..=end {
622                if self.filter.contains(&offset) {
623                    return true;
624                }
625            }
626            return false;
627        }
628
629        // For larger ranges, check bucket boundaries
630        let start_bucket = start / self.bucket_size;
631        let end_bucket = end / self.bucket_size;
632
633        for bucket in start_bucket..=end_bucket {
634            let bucket_start = bucket * self.bucket_size;
635            let bucket_end = bucket_start + self.bucket_size - 1;
636
637            // Check bucket boundaries and midpoint
638            for &offset in &[
639                bucket_start,
640                bucket_start + self.bucket_size / 2,
641                bucket_end,
642            ] {
643                if offset >= start && offset <= end && self.filter.contains(&offset) {
644                    return true;
645                }
646            }
647        }
648
649        // Fallback: check some offsets in the range
650        let step = ((end - start) / 10).max(1);
651        let mut offset = start;
652        while offset <= end {
653            if self.filter.contains(&offset) {
654                return true;
655            }
656            offset += step;
657        }
658
659        false
660    }
661
662    /// Get the offset range
663    pub fn offset_range(&self) -> (u64, u64) {
664        (
665            self.min_offset.load(Ordering::Acquire),
666            self.max_offset.load(Ordering::Acquire),
667        )
668    }
669
670    /// Get count of inserted offsets
671    pub fn count(&self) -> u64 {
672        self.filter.count()
673    }
674}
675
676// ============================================================================
677// Adaptive Batch Accumulator
678// ============================================================================
679
680/// Configuration for adaptive batching
681#[derive(Debug, Clone)]
682pub struct BatchConfig {
683    /// Minimum batch size before flushing
684    pub min_batch_size: usize,
685    /// Maximum batch size
686    pub max_batch_size: usize,
687    /// Maximum time to wait for a batch (microseconds)
688    pub max_linger_us: u64,
689    /// Target batch latency (microseconds)
690    pub target_latency_us: u64,
691    /// Adaptive sizing enabled
692    pub adaptive: bool,
693}
694
695impl Default for BatchConfig {
696    fn default() -> Self {
697        Self {
698            min_batch_size: 16,
699            max_batch_size: 1024,
700            max_linger_us: 5000,     // 5ms
701            target_latency_us: 1000, // 1ms
702            adaptive: true,
703        }
704    }
705}
706
707/// Adaptive batch accumulator for high-throughput ingestion
708///
709/// Automatically adjusts batch size based on throughput and latency.
710pub struct AdaptiveBatcher<T> {
711    config: BatchConfig,
712    /// Current batch
713    batch: parking_lot::Mutex<Vec<T>>,
714    /// Current adaptive batch size
715    current_batch_size: AtomicU32,
716    /// Timestamp of first item in batch
717    batch_start_us: AtomicU64,
718    /// Recent latencies for adaptation (microseconds)
719    recent_latencies: [AtomicU64; 8],
720    latency_index: AtomicU32,
721    /// Total batches flushed
722    batches_flushed: AtomicU64,
723    /// Total items batched
724    items_batched: AtomicU64,
725}
726
727impl<T> AdaptiveBatcher<T> {
728    /// Create a new adaptive batcher
729    pub fn new(config: BatchConfig) -> Self {
730        let initial_size = (config.min_batch_size + config.max_batch_size) / 2;
731
732        Self {
733            config,
734            batch: parking_lot::Mutex::new(Vec::with_capacity(initial_size)),
735            current_batch_size: AtomicU32::new(initial_size as u32),
736            batch_start_us: AtomicU64::new(0),
737            recent_latencies: std::array::from_fn(|_| AtomicU64::new(0)),
738            latency_index: AtomicU32::new(0),
739            batches_flushed: AtomicU64::new(0),
740            items_batched: AtomicU64::new(0),
741        }
742    }
743
744    /// Add an item to the batch
745    /// Returns Some(batch) if the batch should be flushed
746    pub fn add(&self, item: T) -> Option<Vec<T>> {
747        let now = Self::now_us();
748        let mut batch = self.batch.lock();
749
750        // Set batch start time on first item
751        if batch.is_empty() {
752            self.batch_start_us.store(now, Ordering::Release);
753        }
754
755        batch.push(item);
756        self.items_batched.fetch_add(1, Ordering::Relaxed);
757
758        let batch_size = self.current_batch_size.load(Ordering::Relaxed) as usize;
759        let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
760
761        // Check if we should flush
762        let should_flush = batch.len() >= batch_size
763            || batch_age >= self.config.max_linger_us
764            || batch.len() >= self.config.max_batch_size;
765
766        if should_flush {
767            let flushed = std::mem::take(&mut *batch);
768            batch.reserve(batch_size);
769            self.batches_flushed.fetch_add(1, Ordering::Relaxed);
770
771            // Record latency for adaptation
772            self.record_latency(batch_age);
773
774            Some(flushed)
775        } else {
776            None
777        }
778    }
779
780    /// Force flush the current batch
781    pub fn flush(&self) -> Vec<T> {
782        let now = Self::now_us();
783        let mut batch = self.batch.lock();
784
785        if !batch.is_empty() {
786            let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
787            self.record_latency(batch_age);
788            self.batches_flushed.fetch_add(1, Ordering::Relaxed);
789        }
790
791        let batch_size = self.current_batch_size.load(Ordering::Relaxed) as usize;
792        let flushed = std::mem::take(&mut *batch);
793        batch.reserve(batch_size);
794
795        flushed
796    }
797
798    /// Check if batch needs flushing due to time
799    pub fn needs_flush(&self) -> bool {
800        let batch = self.batch.lock();
801        if batch.is_empty() {
802            return false;
803        }
804
805        let now = Self::now_us();
806        let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
807        batch_age >= self.config.max_linger_us
808    }
809
810    /// Get current batch size setting
811    pub fn current_batch_size(&self) -> usize {
812        self.current_batch_size.load(Ordering::Relaxed) as usize
813    }
814
815    /// Get number of items currently in batch
816    pub fn pending_count(&self) -> usize {
817        self.batch.lock().len()
818    }
819
820    /// Get statistics
821    pub fn stats(&self) -> BatcherStats {
822        BatcherStats {
823            batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
824            items_batched: self.items_batched.load(Ordering::Relaxed),
825            current_batch_size: self.current_batch_size.load(Ordering::Relaxed) as usize,
826            avg_latency_us: self.average_latency(),
827        }
828    }
829
830    fn record_latency(&self, latency_us: u64) {
831        if !self.config.adaptive {
832            return;
833        }
834
835        let idx = self.latency_index.fetch_add(1, Ordering::Relaxed) as usize % 8;
836        self.recent_latencies[idx].store(latency_us, Ordering::Relaxed);
837
838        // Adapt batch size based on latency
839        let avg_latency = self.average_latency();
840        let current_size = self.current_batch_size.load(Ordering::Relaxed);
841
842        let new_size = if avg_latency > self.config.target_latency_us * 2 {
843            // Latency too high, reduce batch size
844            (current_size * 3 / 4).max(self.config.min_batch_size as u32)
845        } else if avg_latency < self.config.target_latency_us / 2 {
846            // Latency low, can increase batch size
847            (current_size * 5 / 4).min(self.config.max_batch_size as u32)
848        } else {
849            current_size
850        };
851
852        self.current_batch_size.store(new_size, Ordering::Relaxed);
853    }
854
855    fn average_latency(&self) -> u64 {
856        let mut sum = 0u64;
857        let mut count = 0u64;
858
859        for lat in &self.recent_latencies {
860            let l = lat.load(Ordering::Relaxed);
861            if l > 0 {
862                sum += l;
863                count += 1;
864            }
865        }
866
867        if count > 0 {
868            sum / count
869        } else {
870            0
871        }
872    }
873
874    fn now_us() -> u64 {
875        std::time::SystemTime::now()
876            .duration_since(std::time::UNIX_EPOCH)
877            .unwrap_or_default()
878            .as_micros() as u64
879    }
880}
881
882/// Batcher statistics
883#[derive(Debug, Clone)]
884pub struct BatcherStats {
885    pub batches_flushed: u64,
886    pub items_batched: u64,
887    pub current_batch_size: usize,
888    pub avg_latency_us: u64,
889}
890
891// ============================================================================
892// Tests
893// ============================================================================
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    #[test]
900    fn test_bloom_filter() {
901        let filter = BloomFilter::new(1000, 0.01);
902
903        // Insert items
904        for i in 0..1000 {
905            filter.insert(&i);
906        }
907
908        // Check presence (should be ~100% accurate for inserted items)
909        for i in 0..1000 {
910            assert!(filter.contains(&i), "Item {} should be present", i);
911        }
912
913        // Check false positives (should be ~1%)
914        let mut false_positives = 0;
915        for i in 1000..2000 {
916            if filter.contains(&i) {
917                false_positives += 1;
918            }
919        }
920
921        // Allow up to 3% FP rate due to randomness
922        assert!(
923            false_positives < 30,
924            "Too many false positives: {}",
925            false_positives
926        );
927    }
928
929    #[test]
930    fn test_counting_bloom_filter() {
931        let filter = CountingBloomFilter::new(1000, 0.01);
932
933        // Insert
934        filter.insert(&42);
935        filter.insert(&43);
936        assert!(filter.contains(&42));
937        assert!(filter.contains(&43));
938
939        // Remove
940        filter.remove(&42);
941        assert!(!filter.contains(&42));
942        assert!(filter.contains(&43));
943    }
944
945    #[test]
946    fn test_hyperloglog() {
947        let hll = HyperLogLog::new(14); // 2^14 = 16384 registers
948
949        // Add unique items
950        for i in 0..10000 {
951            hll.add(&i);
952        }
953
954        let estimate = hll.estimate();
955
956        // Should be within 10% of actual
957        let error = (estimate as i64 - 10000i64).abs() as f64 / 10000.0;
958        assert!(
959            error < 0.1,
960            "Estimate {} too far from 10000 (error: {}%)",
961            estimate,
962            error * 100.0
963        );
964    }
965
966    #[test]
967    fn test_hyperloglog_merge() {
968        let hll1 = HyperLogLog::new(10);
969        let hll2 = HyperLogLog::new(10);
970
971        for i in 0..5000 {
972            hll1.add(&i);
973        }
974        for i in 5000..10000 {
975            hll2.add(&i);
976        }
977
978        hll1.merge(&hll2);
979
980        let estimate = hll1.estimate();
981        let error = (estimate as i64 - 10000i64).abs() as f64 / 10000.0;
982        assert!(
983            error < 0.15,
984            "Merged estimate {} too far from 10000",
985            estimate
986        );
987    }
988
989    #[test]
990    fn test_offset_bloom_filter() {
991        let filter = OffsetBloomFilter::new(1000, 100);
992
993        // Insert offsets
994        for offset in (0..1000).step_by(10) {
995            filter.insert(offset);
996        }
997
998        // Check contains
999        assert!(filter.contains(0));
1000        assert!(filter.contains(100));
1001        assert!(!filter.contains(5)); // Not inserted
1002
1003        // Check range
1004        assert!(filter.contains_range(0, 50));
1005        assert!(filter.contains_range(90, 110));
1006
1007        // Range outside
1008        let (min, max) = filter.offset_range();
1009        assert_eq!(min, 0);
1010        assert_eq!(max, 990);
1011    }
1012
1013    #[test]
1014    fn test_adaptive_batcher() {
1015        let config = BatchConfig {
1016            min_batch_size: 4,
1017            max_batch_size: 16,
1018            max_linger_us: 10000,
1019            target_latency_us: 1000,
1020            adaptive: true,
1021        };
1022
1023        let batcher = AdaptiveBatcher::new(config);
1024
1025        // Add items until batch flushes
1026        let mut flushed = None;
1027        for i in 0..20 {
1028            if let Some(batch) = batcher.add(i) {
1029                flushed = Some(batch);
1030                break;
1031            }
1032        }
1033
1034        assert!(flushed.is_some());
1035        let batch = flushed.unwrap();
1036        assert!(!batch.is_empty());
1037    }
1038}