Skip to main content

oxirs_stream/
advanced_sampling.rs

1//! # Advanced Sampling Techniques for Stream Processing
2//!
3//! Production-grade probabilistic data structures and sampling algorithms
4//! for high-volume streaming scenarios where exact computation is too expensive.
5//!
6//! ## Features
7//!
8//! - **Reservoir Sampling**: Fixed-size random samples from unbounded streams
9//! - **Stratified Sampling**: Distribution-preserving sampling across categories
10//! - **HyperLogLog**: Approximate cardinality estimation with O(1) space
11//! - **Count-Min Sketch**: Approximate frequency counting for heavy hitters
12//! - **T-Digest**: Approximate percentile calculations for streaming data
13//! - **Bloom Filter**: Space-efficient probabilistic membership testing
14//!
15//! ## Use Cases
16//!
17//! - Real-time analytics on billion-event streams
18//! - Memory-efficient distinct counting
19//! - Top-K heavy hitter detection
20//! - Approximate quantile tracking
21//! - Duplicate detection with minimal memory
22
23use crate::StreamEvent;
24use anyhow::Result;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::hash::{Hash, Hasher};
28
29/// Configuration for advanced sampling operations
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct SamplingConfig {
32    /// Reservoir sampling size
33    pub reservoir_size: usize,
34    /// Number of hash functions for Count-Min Sketch
35    pub cms_hash_count: usize,
36    /// Width of Count-Min Sketch
37    pub cms_width: usize,
38    /// Number of registers for HyperLogLog (power of 2)
39    pub hll_precision: u8,
40    /// Compression parameter for T-Digest
41    pub tdigest_delta: f64,
42    /// Size of Bloom filter in bits
43    pub bloom_filter_bits: usize,
44    /// Number of hash functions for Bloom filter
45    pub bloom_filter_hashes: usize,
46    /// Stratified sampling categories
47    pub stratified_categories: Vec<String>,
48    /// Sample rate per category (0.0 to 1.0)
49    pub stratified_sample_rates: HashMap<String, f64>,
50}
51
52impl Default for SamplingConfig {
53    fn default() -> Self {
54        Self {
55            reservoir_size: 1000,
56            cms_hash_count: 4,
57            cms_width: 10000,
58            hll_precision: 14, // 16K registers
59            tdigest_delta: 0.01,
60            bloom_filter_bits: 100000,
61            bloom_filter_hashes: 7,
62            stratified_categories: Vec::new(),
63            stratified_sample_rates: HashMap::new(),
64        }
65    }
66}
67
68/// Reservoir Sampler - Maintains uniform random sample from unbounded stream
69///
70/// Uses Algorithm R (Vitter, 1985) for efficient single-pass sampling.
71#[derive(Debug, Clone)]
72pub struct ReservoirSampler {
73    reservoir: Vec<StreamEvent>,
74    capacity: usize,
75    count: u64,
76}
77
78impl ReservoirSampler {
79    /// Create a new reservoir sampler
80    ///
81    /// # Arguments
82    /// * `capacity` - Maximum number of samples to retain
83    pub fn new(capacity: usize) -> Self {
84        Self {
85            reservoir: Vec::with_capacity(capacity),
86            capacity,
87            count: 0,
88        }
89    }
90
91    /// Add an event to the reservoir
92    ///
93    /// Uses Algorithm R for O(1) average insertion time
94    pub fn add(&mut self, event: StreamEvent) {
95        self.count += 1;
96
97        if self.reservoir.len() < self.capacity {
98            // Reservoir not full yet, just add
99            self.reservoir.push(event);
100        } else {
101            // Randomly replace existing event with decreasing probability
102            let j = (fastrand::f64() * self.count as f64) as usize;
103            if j < self.capacity {
104                self.reservoir[j] = event;
105            }
106        }
107    }
108
109    /// Get the current sample
110    pub fn sample(&self) -> &[StreamEvent] {
111        &self.reservoir
112    }
113
114    /// Get the number of events processed
115    pub fn count(&self) -> u64 {
116        self.count
117    }
118
119    /// Clear the reservoir
120    pub fn clear(&mut self) {
121        self.reservoir.clear();
122        self.count = 0;
123    }
124
125    /// Get statistics about the sampler
126    pub fn stats(&self) -> ReservoirStats {
127        ReservoirStats {
128            capacity: self.capacity,
129            current_size: self.reservoir.len(),
130            total_events: self.count,
131            sampling_rate: if self.count > 0 {
132                self.reservoir.len() as f64 / self.count as f64
133            } else {
134                0.0
135            },
136        }
137    }
138}
139
140/// Statistics for reservoir sampler
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ReservoirStats {
143    pub capacity: usize,
144    pub current_size: usize,
145    pub total_events: u64,
146    pub sampling_rate: f64,
147}
148
149/// Stratified Sampler - Preserves distribution across categories
150///
151/// Maintains separate reservoirs for each category to ensure
152/// representative sampling across different event types.
153#[derive(Debug, Clone)]
154pub struct StratifiedSampler {
155    samplers: HashMap<String, ReservoirSampler>,
156    sample_rates: HashMap<String, f64>,
157    default_capacity: usize,
158    category_extractor: fn(&StreamEvent) -> Option<String>,
159}
160
161impl StratifiedSampler {
162    /// Create a new stratified sampler
163    ///
164    /// # Arguments
165    /// * `default_capacity` - Default reservoir size per category
166    /// * `category_extractor` - Function to extract category from event
167    pub fn new(
168        default_capacity: usize,
169        category_extractor: fn(&StreamEvent) -> Option<String>,
170    ) -> Self {
171        Self {
172            samplers: HashMap::new(),
173            sample_rates: HashMap::new(),
174            default_capacity,
175            category_extractor,
176        }
177    }
178
179    /// Set sample rate for a specific category
180    pub fn set_category_rate(&mut self, category: String, rate: f64) {
181        assert!((0.0..=1.0).contains(&rate), "Rate must be in [0, 1]");
182        self.sample_rates.insert(category, rate);
183    }
184
185    /// Add an event to the appropriate category reservoir
186    pub fn add(&mut self, event: StreamEvent) {
187        if let Some(category) = (self.category_extractor)(&event) {
188            // Check if we should sample this category
189            let rate = self.sample_rates.get(&category).copied().unwrap_or(1.0);
190
191            if rate <= 0.0 {
192                return; // Skip this category
193            }
194
195            // Get or create sampler for this category
196            let sampler = self.samplers.entry(category.clone()).or_insert_with(|| {
197                let capacity = (self.default_capacity as f64 * rate) as usize;
198                ReservoirSampler::new(capacity.max(1))
199            });
200
201            sampler.add(event);
202        }
203    }
204
205    /// Get samples for a specific category
206    pub fn category_sample(&self, category: &str) -> Option<&[StreamEvent]> {
207        self.samplers.get(category).map(|s| s.sample())
208    }
209
210    /// Get all samples grouped by category
211    pub fn all_samples(&self) -> HashMap<String, Vec<StreamEvent>> {
212        self.samplers
213            .iter()
214            .map(|(cat, sampler)| (cat.clone(), sampler.sample().to_vec()))
215            .collect()
216    }
217
218    /// Get statistics for all categories
219    pub fn stats(&self) -> StratifiedStats {
220        let category_stats = self
221            .samplers
222            .iter()
223            .map(|(cat, sampler)| (cat.clone(), sampler.stats()))
224            .collect();
225
226        StratifiedStats {
227            category_count: self.samplers.len(),
228            category_stats,
229        }
230    }
231}
232
233/// Statistics for stratified sampler
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct StratifiedStats {
236    pub category_count: usize,
237    pub category_stats: HashMap<String, ReservoirStats>,
238}
239
240/// HyperLogLog - Approximate cardinality estimator
241///
242/// Uses probabilistic counting with 2^precision registers to estimate
243/// the number of distinct elements with ~1.04/sqrt(m) relative error.
244#[derive(Debug, Clone)]
245pub struct HyperLogLog {
246    registers: Vec<u8>,
247    precision: u8,
248    alpha: f64,
249}
250
251impl HyperLogLog {
252    /// Create a new HyperLogLog estimator
253    ///
254    /// # Arguments
255    /// * `precision` - Number of bits for register indexing (4-16)
256    ///   Higher precision = more accuracy but more memory
257    pub fn new(precision: u8) -> Self {
258        assert!(
259            (4..=16).contains(&precision),
260            "Precision must be between 4 and 16"
261        );
262
263        let m = 1 << precision; // 2^precision registers
264
265        // Alpha constant for bias correction
266        let alpha = match m {
267            16 => 0.673,
268            32 => 0.697,
269            64 => 0.709,
270            _ => 0.7213 / (1.0 + 1.079 / m as f64),
271        };
272
273        Self {
274            registers: vec![0; m],
275            precision,
276            alpha,
277        }
278    }
279
280    /// Add an element to the estimator
281    pub fn add<T: Hash>(&mut self, element: &T) {
282        let hash = self.hash(element);
283
284        // Use first 'precision' bits for register index
285        let idx = (hash >> (64 - self.precision)) as usize;
286
287        // Count leading zeros in remaining bits + 1
288        let remaining = hash << self.precision;
289        let leading_zeros = remaining.leading_zeros() as u8 + 1;
290
291        // Update register with maximum leading zeros seen
292        self.registers[idx] = self.registers[idx].max(leading_zeros);
293    }
294
295    /// Estimate the cardinality (number of distinct elements)
296    pub fn cardinality(&self) -> u64 {
297        let m = self.registers.len() as f64;
298
299        // Harmonic mean of 2^register values
300        let raw_estimate = self.alpha * m * m
301            / self
302                .registers
303                .iter()
304                .map(|&r| 2.0_f64.powi(-(r as i32)))
305                .sum::<f64>();
306
307        // Apply bias correction
308        if raw_estimate <= 5.0 * m {
309            // Small range correction
310            let zeros = self.registers.iter().filter(|&&r| r == 0).count() as f64;
311            if zeros > 0.0 {
312                return (m * (m / zeros).ln()) as u64;
313            }
314        }
315
316        if raw_estimate <= (1.0 / 30.0) * (1u64 << 32) as f64 {
317            // No correction
318            raw_estimate as u64
319        } else {
320            // Large range correction
321            let two_32 = (1u64 << 32) as f64;
322            (-(two_32) * ((1.0 - raw_estimate / two_32).ln())) as u64
323        }
324    }
325
326    /// Merge another HyperLogLog into this one
327    pub fn merge(&mut self, other: &HyperLogLog) {
328        assert_eq!(
329            self.precision, other.precision,
330            "Cannot merge HyperLogLogs with different precisions"
331        );
332
333        for (i, &other_val) in other.registers.iter().enumerate() {
334            self.registers[i] = self.registers[i].max(other_val);
335        }
336    }
337
338    /// Hash function for HyperLogLog
339    fn hash<T: Hash>(&self, element: &T) -> u64 {
340        use std::collections::hash_map::DefaultHasher;
341        let mut hasher = DefaultHasher::new();
342        element.hash(&mut hasher);
343        hasher.finish()
344    }
345
346    /// Get statistics about the HyperLogLog
347    pub fn stats(&self) -> HyperLogLogStats {
348        HyperLogLogStats {
349            precision: self.precision,
350            register_count: self.registers.len(),
351            estimated_cardinality: self.cardinality(),
352            memory_bytes: self.registers.len(),
353        }
354    }
355}
356
357/// Statistics for HyperLogLog
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct HyperLogLogStats {
360    pub precision: u8,
361    pub register_count: usize,
362    pub estimated_cardinality: u64,
363    pub memory_bytes: usize,
364}
365
366/// Count-Min Sketch - Approximate frequency counter
367///
368/// Space-efficient probabilistic data structure for estimating
369/// frequency of elements in a stream with guaranteed error bounds.
370#[derive(Debug, Clone)]
371pub struct CountMinSketch {
372    table: Vec<Vec<u64>>,
373    hash_count: usize,
374    width: usize,
375    total_count: u64,
376}
377
378impl CountMinSketch {
379    /// Create a new Count-Min Sketch
380    ///
381    /// # Arguments
382    /// * `hash_count` - Number of hash functions (depth)
383    /// * `width` - Number of counters per hash function
384    ///
385    /// Error bounds: ε = e / width, δ = 1 / e^hash_count
386    pub fn new(hash_count: usize, width: usize) -> Self {
387        Self {
388            table: vec![vec![0; width]; hash_count],
389            hash_count,
390            width,
391            total_count: 0,
392        }
393    }
394
395    /// Add an element with count
396    pub fn add<T: Hash>(&mut self, element: &T, count: u64) {
397        self.total_count += count;
398
399        for i in 0..self.hash_count {
400            let idx = self.hash_i(element, i) % self.width;
401            self.table[i][idx] += count;
402        }
403    }
404
405    /// Estimate the frequency of an element
406    pub fn estimate<T: Hash>(&self, element: &T) -> u64 {
407        (0..self.hash_count)
408            .map(|i| {
409                let idx = self.hash_i(element, i) % self.width;
410                self.table[i][idx]
411            })
412            .min()
413            .unwrap_or(0)
414    }
415
416    /// Get the total count of all elements
417    pub fn total_count(&self) -> u64 {
418        self.total_count
419    }
420
421    /// Hash function with index
422    fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
423        use std::collections::hash_map::DefaultHasher;
424        let mut hasher = DefaultHasher::new();
425        element.hash(&mut hasher);
426        i.hash(&mut hasher);
427        hasher.finish() as usize
428    }
429
430    /// Get statistics about the Count-Min Sketch
431    pub fn stats(&self) -> CountMinSketchStats {
432        CountMinSketchStats {
433            hash_count: self.hash_count,
434            width: self.width,
435            total_count: self.total_count,
436            memory_bytes: self.hash_count * self.width * std::mem::size_of::<u64>(),
437        }
438    }
439}
440
441/// Statistics for Count-Min Sketch
442#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct CountMinSketchStats {
444    pub hash_count: usize,
445    pub width: usize,
446    pub total_count: u64,
447    pub memory_bytes: usize,
448}
449
450/// T-Digest - Approximate quantile estimator
451///
452/// Provides accurate percentile estimates with compression
453/// for streaming data. More accurate at extremes (p0, p100).
454#[derive(Debug, Clone)]
455pub struct TDigest {
456    centroids: Vec<Centroid>,
457    delta: f64,
458    total_weight: f64,
459    max_size: usize,
460}
461
462#[derive(Debug, Clone, Copy)]
463struct Centroid {
464    mean: f64,
465    weight: f64,
466}
467
468impl TDigest {
469    /// Create a new T-Digest
470    ///
471    /// # Arguments
472    /// * `delta` - Compression parameter (0.01 = 1% accuracy)
473    pub fn new(delta: f64) -> Self {
474        Self {
475            centroids: Vec::new(),
476            delta,
477            total_weight: 0.0,
478            max_size: (1.0 / delta) as usize,
479        }
480    }
481
482    /// Add a value to the digest
483    pub fn add(&mut self, value: f64, weight: f64) {
484        self.centroids.push(Centroid {
485            mean: value,
486            weight,
487        });
488        self.total_weight += weight;
489
490        // Compress if needed
491        if self.centroids.len() > self.max_size {
492            self.compress();
493        }
494    }
495
496    /// Estimate the quantile (0.0 to 1.0)
497    pub fn quantile(&mut self, q: f64) -> Option<f64> {
498        if self.centroids.is_empty() {
499            return None;
500        }
501
502        if self.centroids.len() > 1 {
503            self.compress();
504        }
505
506        let index = q * self.total_weight;
507        let mut sum = 0.0;
508
509        for centroid in &self.centroids {
510            sum += centroid.weight;
511            if sum >= index {
512                return Some(centroid.mean);
513            }
514        }
515
516        self.centroids.last().map(|c| c.mean)
517    }
518
519    /// Compress centroids
520    fn compress(&mut self) {
521        if self.centroids.is_empty() {
522            return;
523        }
524
525        // Sort by mean
526        self.centroids.sort_by(|a, b| {
527            a.mean
528                .partial_cmp(&b.mean)
529                .unwrap_or(std::cmp::Ordering::Equal)
530        });
531
532        let mut compressed = Vec::new();
533        let mut current = self.centroids[0];
534
535        for &centroid in &self.centroids[1..] {
536            // Check if we should merge
537            let q = (current.weight + centroid.weight) / self.total_weight;
538            let k = self.k_limit(q);
539
540            if current.weight + centroid.weight <= k {
541                // Merge centroids
542                let total_weight = current.weight + centroid.weight;
543                current.mean = (current.mean * current.weight + centroid.mean * centroid.weight)
544                    / total_weight;
545                current.weight = total_weight;
546            } else {
547                compressed.push(current);
548                current = centroid;
549            }
550        }
551        compressed.push(current);
552
553        self.centroids = compressed;
554    }
555
556    /// Compute k limit for compression
557    fn k_limit(&self, q: f64) -> f64 {
558        4.0 * self.total_weight * self.delta * q * (1.0 - q)
559    }
560
561    /// Get statistics about the T-Digest
562    pub fn stats(&self) -> TDigestStats {
563        TDigestStats {
564            centroid_count: self.centroids.len(),
565            total_weight: self.total_weight,
566            delta: self.delta,
567            max_size: self.max_size,
568        }
569    }
570}
571
572/// Statistics for T-Digest
573#[derive(Debug, Clone, Serialize, Deserialize)]
574pub struct TDigestStats {
575    pub centroid_count: usize,
576    pub total_weight: f64,
577    pub delta: f64,
578    pub max_size: usize,
579}
580
581/// Bloom Filter - Probabilistic membership test
582///
583/// Space-efficient set membership test with no false negatives
584/// but possible false positives (configurable error rate).
585#[derive(Debug, Clone)]
586pub struct BloomFilter {
587    bits: Vec<bool>,
588    hash_count: usize,
589    insert_count: u64,
590}
591
592impl BloomFilter {
593    /// Create a new Bloom filter
594    ///
595    /// # Arguments
596    /// * `size` - Number of bits in the filter
597    /// * `hash_count` - Number of hash functions
598    ///
599    /// Optimal hash_count = (bits / expected_items) * ln(2)
600    pub fn new(size: usize, hash_count: usize) -> Self {
601        Self {
602            bits: vec![false; size],
603            hash_count,
604            insert_count: 0,
605        }
606    }
607
608    /// Optimal Bloom filter for expected items and false positive rate
609    pub fn optimal(expected_items: usize, false_positive_rate: f64) -> Self {
610        let bits = Self::optimal_bits(expected_items, false_positive_rate);
611        let hash_count = Self::optimal_hash_count(bits, expected_items);
612        Self::new(bits, hash_count)
613    }
614
615    /// Calculate optimal number of bits
616    fn optimal_bits(n: usize, p: f64) -> usize {
617        let numerator = -(n as f64 * p.ln());
618        let denominator = 2.0_f64.ln().powi(2);
619        (numerator / denominator).ceil() as usize
620    }
621
622    /// Calculate optimal number of hash functions
623    fn optimal_hash_count(m: usize, n: usize) -> usize {
624        ((m as f64 / n as f64) * 2.0_f64.ln()).ceil() as usize
625    }
626
627    /// Add an element to the filter
628    pub fn add<T: Hash>(&mut self, element: &T) {
629        self.insert_count += 1;
630        for i in 0..self.hash_count {
631            let idx = self.hash_i(element, i) % self.bits.len();
632            self.bits[idx] = true;
633        }
634    }
635
636    /// Check if an element might be in the set
637    pub fn contains<T: Hash>(&self, element: &T) -> bool {
638        (0..self.hash_count).all(|i| {
639            let idx = self.hash_i(element, i) % self.bits.len();
640            self.bits[idx]
641        })
642    }
643
644    /// Hash function with index
645    fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
646        use std::collections::hash_map::DefaultHasher;
647        let mut hasher = DefaultHasher::new();
648        element.hash(&mut hasher);
649        i.hash(&mut hasher);
650        hasher.finish() as usize
651    }
652
653    /// Estimate current false positive rate
654    pub fn false_positive_rate(&self) -> f64 {
655        let set_bits = self.bits.iter().filter(|&&b| b).count() as f64;
656        let p = set_bits / self.bits.len() as f64;
657        p.powi(self.hash_count as i32)
658    }
659
660    /// Get statistics about the Bloom filter
661    pub fn stats(&self) -> BloomFilterStats {
662        let set_bits = self.bits.iter().filter(|&&b| b).count();
663
664        BloomFilterStats {
665            size_bits: self.bits.len(),
666            hash_count: self.hash_count,
667            insert_count: self.insert_count,
668            set_bits,
669            estimated_fpr: self.false_positive_rate(),
670            memory_bytes: self.bits.len() / 8,
671        }
672    }
673}
674
675/// Statistics for Bloom filter
676#[derive(Debug, Clone, Serialize, Deserialize)]
677pub struct BloomFilterStats {
678    pub size_bits: usize,
679    pub hash_count: usize,
680    pub insert_count: u64,
681    pub set_bits: usize,
682    pub estimated_fpr: f64,
683    pub memory_bytes: usize,
684}
685
686/// Unified sampling manager for all sampling techniques
687pub struct AdvancedSamplingManager {
688    config: SamplingConfig,
689    reservoir: ReservoirSampler,
690    stratified: Option<StratifiedSampler>,
691    hyperloglog: HyperLogLog,
692    count_min: CountMinSketch,
693    tdigest: TDigest,
694    bloom: BloomFilter,
695    event_count: u64,
696}
697
698impl AdvancedSamplingManager {
699    /// Create a new sampling manager with the given configuration
700    pub fn new(config: SamplingConfig) -> Self {
701        let reservoir = ReservoirSampler::new(config.reservoir_size);
702        let hyperloglog = HyperLogLog::new(config.hll_precision);
703        let count_min = CountMinSketch::new(config.cms_hash_count, config.cms_width);
704        let tdigest = TDigest::new(config.tdigest_delta);
705        let bloom = BloomFilter::new(config.bloom_filter_bits, config.bloom_filter_hashes);
706
707        Self {
708            config,
709            reservoir,
710            stratified: None,
711            hyperloglog,
712            count_min,
713            tdigest,
714            bloom,
715            event_count: 0,
716        }
717    }
718
719    /// Enable stratified sampling with a category extractor function
720    pub fn enable_stratified(&mut self, extractor: fn(&StreamEvent) -> Option<String>) {
721        let mut sampler = StratifiedSampler::new(self.config.reservoir_size, extractor);
722
723        // Apply configured sample rates
724        for (category, rate) in &self.config.stratified_sample_rates {
725            sampler.set_category_rate(category.clone(), *rate);
726        }
727
728        self.stratified = Some(sampler);
729    }
730
731    /// Process an event through all sampling techniques
732    pub fn process_event(&mut self, event: StreamEvent) -> Result<()> {
733        self.event_count += 1;
734
735        // Reservoir sampling
736        self.reservoir.add(event.clone());
737
738        // Stratified sampling
739        if let Some(ref mut stratified) = self.stratified {
740            stratified.add(event.clone());
741        }
742
743        // HyperLogLog (cardinality)
744        let event_id = self.event_id(&event);
745        self.hyperloglog.add(&event_id);
746
747        // Count-Min Sketch (frequency)
748        self.count_min.add(&event_id, 1);
749
750        // T-Digest (quantiles) - using event timestamp as value
751        if let Some(value) = self.extract_numeric_value(&event) {
752            self.tdigest.add(value, 1.0);
753        }
754
755        // Bloom filter (membership)
756        self.bloom.add(&event_id);
757
758        Ok(())
759    }
760
761    /// Get reservoir sample
762    pub fn reservoir_sample(&self) -> &[StreamEvent] {
763        self.reservoir.sample()
764    }
765
766    /// Get stratified samples
767    pub fn stratified_samples(&self) -> Option<HashMap<String, Vec<StreamEvent>>> {
768        self.stratified.as_ref().map(|s| s.all_samples())
769    }
770
771    /// Estimate distinct event count
772    pub fn distinct_count(&self) -> u64 {
773        self.hyperloglog.cardinality()
774    }
775
776    /// Estimate frequency of an event
777    pub fn event_frequency(&self, event: &StreamEvent) -> u64 {
778        let event_id = self.event_id(event);
779        self.count_min.estimate(&event_id)
780    }
781
782    /// Check if an event was likely seen before
783    pub fn likely_seen(&self, event: &StreamEvent) -> bool {
784        let event_id = self.event_id(event);
785        self.bloom.contains(&event_id)
786    }
787
788    /// Estimate quantile (percentile) of numeric values
789    pub fn quantile(&mut self, q: f64) -> Option<f64> {
790        self.tdigest.quantile(q)
791    }
792
793    /// Get comprehensive statistics
794    pub fn stats(&self) -> SamplingManagerStats {
795        SamplingManagerStats {
796            event_count: self.event_count,
797            reservoir_stats: self.reservoir.stats(),
798            stratified_stats: self.stratified.as_ref().map(|s| s.stats()),
799            hyperloglog_stats: self.hyperloglog.stats(),
800            count_min_stats: self.count_min.stats(),
801            tdigest_stats: self.tdigest.stats(),
802            bloom_stats: self.bloom.stats(),
803        }
804    }
805
806    /// Extract event ID for hashing
807    fn event_id(&self, event: &StreamEvent) -> String {
808        match event {
809            StreamEvent::TripleAdded {
810                subject,
811                predicate,
812                object,
813                ..
814            } => format!("{}-{}-{}", subject, predicate, object),
815            StreamEvent::TripleRemoved {
816                subject,
817                predicate,
818                object,
819                ..
820            } => format!("{}-{}-{}", subject, predicate, object),
821            StreamEvent::GraphCreated { graph, .. } => format!("graph-{}", graph),
822            StreamEvent::GraphDeleted { graph, .. } => format!("graph-{}", graph),
823            _ => "unknown".to_string(),
824        }
825    }
826
827    /// Extract numeric value from event for quantile estimation
828    fn extract_numeric_value(&self, event: &StreamEvent) -> Option<f64> {
829        match event {
830            StreamEvent::TripleAdded { metadata, .. }
831            | StreamEvent::TripleRemoved { metadata, .. } => {
832                Some(metadata.timestamp.timestamp() as f64)
833            }
834            _ => None,
835        }
836    }
837}
838
839/// Comprehensive statistics for all sampling techniques
840#[derive(Debug, Clone, Serialize, Deserialize)]
841pub struct SamplingManagerStats {
842    pub event_count: u64,
843    pub reservoir_stats: ReservoirStats,
844    pub stratified_stats: Option<StratifiedStats>,
845    pub hyperloglog_stats: HyperLogLogStats,
846    pub count_min_stats: CountMinSketchStats,
847    pub tdigest_stats: TDigestStats,
848    pub bloom_stats: BloomFilterStats,
849}
850
851#[cfg(test)]
852mod tests {
853    use super::*;
854    use crate::EventMetadata;
855    use std::collections::HashMap;
856
857    fn create_test_event(id: &str) -> StreamEvent {
858        StreamEvent::TripleAdded {
859            subject: format!("http://example.org/{}", id),
860            predicate: "http://example.org/prop".to_string(),
861            object: "value".to_string(),
862            graph: None,
863            metadata: EventMetadata {
864                event_id: id.to_string(),
865                timestamp: chrono::Utc::now(),
866                source: "test".to_string(),
867                user: None,
868                context: None,
869                caused_by: None,
870                version: "1.0".to_string(),
871                properties: HashMap::new(),
872                checksum: None,
873            },
874        }
875    }
876
877    #[test]
878    fn test_reservoir_sampler() {
879        let mut sampler = ReservoirSampler::new(10);
880
881        // Add 100 events
882        for i in 0..100 {
883            sampler.add(create_test_event(&format!("event-{}", i)));
884        }
885
886        let stats = sampler.stats();
887        assert_eq!(stats.capacity, 10);
888        assert_eq!(stats.current_size, 10);
889        assert_eq!(stats.total_events, 100);
890        assert_eq!(stats.sampling_rate, 0.1);
891    }
892
893    #[test]
894    fn test_stratified_sampler() {
895        fn category_extractor(event: &StreamEvent) -> Option<String> {
896            match event {
897                StreamEvent::TripleAdded { metadata, .. } => Some(metadata.source.clone()),
898                _ => None,
899            }
900        }
901
902        let mut sampler = StratifiedSampler::new(10, category_extractor);
903        sampler.set_category_rate("source1".to_string(), 0.5);
904        sampler.set_category_rate("source2".to_string(), 1.0);
905
906        // Add events from different sources
907        for i in 0..50 {
908            let mut event = create_test_event(&format!("event-{}", i));
909            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
910                metadata.source = if i < 25 {
911                    "source1".to_string()
912                } else {
913                    "source2".to_string()
914                };
915            }
916            sampler.add(event);
917        }
918
919        let stats = sampler.stats();
920        assert_eq!(stats.category_count, 2);
921        assert!(stats.category_stats.contains_key("source1"));
922        assert!(stats.category_stats.contains_key("source2"));
923    }
924
925    #[test]
926    fn test_hyperloglog() {
927        let mut hll = HyperLogLog::new(14);
928
929        // Add 1000 distinct elements
930        for i in 0..1000 {
931            hll.add(&format!("element-{}", i));
932        }
933
934        let cardinality = hll.cardinality();
935
936        // HyperLogLog should estimate within ~2% error
937        let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
938        assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
939    }
940
941    #[test]
942    fn test_count_min_sketch() {
943        let mut cms = CountMinSketch::new(4, 1000);
944
945        // Add elements with different frequencies
946        for _ in 0..100 {
947            cms.add(&"frequent", 1);
948        }
949        for _ in 0..10 {
950            cms.add(&"rare", 1);
951        }
952
953        let freq_frequent = cms.estimate(&"frequent");
954        let freq_rare = cms.estimate(&"rare");
955
956        assert!(freq_frequent >= 100);
957        assert!(freq_rare >= 10);
958        assert!(freq_frequent > freq_rare);
959    }
960
961    #[test]
962    fn test_tdigest() {
963        let mut digest = TDigest::new(0.01);
964
965        // Add values 1..1000
966        for i in 1..=1000 {
967            digest.add(i as f64, 1.0);
968        }
969
970        // Test median (should be around 500)
971        let median = digest.quantile(0.5).unwrap();
972        assert!((median - 500.0).abs() < 50.0, "Median: {}", median);
973
974        // Test P90 (should be around 900)
975        let p90 = digest.quantile(0.9).unwrap();
976        assert!((p90 - 900.0).abs() < 100.0, "P90: {}", p90);
977    }
978
979    #[test]
980    fn test_bloom_filter() {
981        let mut bloom = BloomFilter::optimal(1000, 0.01);
982
983        // Add 500 elements
984        for i in 0..500 {
985            bloom.add(&format!("element-{}", i));
986        }
987
988        // All added elements should be found
989        for i in 0..500 {
990            assert!(bloom.contains(&format!("element-{}", i)));
991        }
992
993        // Test false positive rate
994        let mut false_positives = 0;
995        for i in 1000..2000 {
996            if bloom.contains(&format!("element-{}", i)) {
997                false_positives += 1;
998            }
999        }
1000
1001        let fpr = false_positives as f64 / 1000.0;
1002        assert!(fpr < 0.05, "False positive rate too high: {}", fpr);
1003    }
1004
1005    #[test]
1006    fn test_sampling_manager() {
1007        let config = SamplingConfig::default();
1008        let mut manager = AdvancedSamplingManager::new(config);
1009
1010        // Process 100 events
1011        for i in 0..100 {
1012            let event = create_test_event(&format!("event-{}", i));
1013            manager.process_event(event).unwrap();
1014        }
1015
1016        let stats = manager.stats();
1017        assert_eq!(stats.event_count, 100);
1018        assert!(stats.reservoir_stats.current_size > 0);
1019        assert!(stats.hyperloglog_stats.estimated_cardinality > 0);
1020        assert!(stats.count_min_stats.total_count > 0);
1021    }
1022
1023    #[test]
1024    fn test_hyperloglog_merge() {
1025        let mut hll1 = HyperLogLog::new(14);
1026        let mut hll2 = HyperLogLog::new(14);
1027
1028        // Add different elements to each
1029        for i in 0..500 {
1030            hll1.add(&format!("element-{}", i));
1031        }
1032        for i in 500..1000 {
1033            hll2.add(&format!("element-{}", i));
1034        }
1035
1036        // Merge
1037        hll1.merge(&hll2);
1038
1039        let cardinality = hll1.cardinality();
1040
1041        // Should estimate ~1000 distinct elements
1042        let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
1043        assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
1044    }
1045
1046    #[test]
1047    fn test_bloom_filter_optimal() {
1048        let bloom = BloomFilter::optimal(10000, 0.01);
1049        let stats = bloom.stats();
1050
1051        // Verify optimal sizing
1052        assert!(stats.size_bits > 0);
1053        assert!(stats.hash_count > 0);
1054    }
1055
1056    #[test]
1057    fn test_sampling_manager_with_stratified() {
1058        fn category_extractor(event: &StreamEvent) -> Option<String> {
1059            match event {
1060                StreamEvent::TripleAdded { subject, .. } => {
1061                    if subject.contains("type1") {
1062                        Some("type1".to_string())
1063                    } else if subject.contains("type2") {
1064                        Some("type2".to_string())
1065                    } else {
1066                        None
1067                    }
1068                }
1069                _ => None,
1070            }
1071        }
1072
1073        let config = SamplingConfig::default();
1074        let mut manager = AdvancedSamplingManager::new(config);
1075        manager.enable_stratified(category_extractor);
1076
1077        // Process events from different categories
1078        for i in 0..50 {
1079            let event = StreamEvent::TripleAdded {
1080                subject: format!("http://example.org/type1/{}", i),
1081                predicate: "http://example.org/prop".to_string(),
1082                object: "value".to_string(),
1083                graph: None,
1084                metadata: EventMetadata {
1085                    event_id: format!("event-{}", i),
1086                    timestamp: chrono::Utc::now(),
1087                    source: "test".to_string(),
1088                    user: None,
1089                    context: None,
1090                    caused_by: None,
1091                    version: "1.0".to_string(),
1092                    properties: HashMap::new(),
1093                    checksum: None,
1094                },
1095            };
1096            manager.process_event(event).unwrap();
1097        }
1098
1099        for i in 50..100 {
1100            let event = StreamEvent::TripleAdded {
1101                subject: format!("http://example.org/type2/{}", i),
1102                predicate: "http://example.org/prop".to_string(),
1103                object: "value".to_string(),
1104                graph: None,
1105                metadata: EventMetadata {
1106                    event_id: format!("event-{}", i),
1107                    timestamp: chrono::Utc::now(),
1108                    source: "test".to_string(),
1109                    user: None,
1110                    context: None,
1111                    caused_by: None,
1112                    version: "1.0".to_string(),
1113                    properties: HashMap::new(),
1114                    checksum: None,
1115                },
1116            };
1117            manager.process_event(event).unwrap();
1118        }
1119
1120        let stats = manager.stats();
1121        assert_eq!(stats.event_count, 100);
1122        assert!(stats.stratified_stats.is_some());
1123
1124        let stratified = stats.stratified_stats.unwrap();
1125        assert_eq!(stratified.category_count, 2);
1126    }
1127}