oxirs_stream/
advanced_sampling.rs

1//! # Advanced Sampling Techniques for Stream Processing
2//!
3//! Production-grade probabilistic data structures and sampling algorithms
4//! for high-volume streaming scenarios where exact computation is too expensive.
5//!
6//! ## Features
7//!
8//! - **Reservoir Sampling**: Fixed-size random samples from unbounded streams
9//! - **Stratified Sampling**: Distribution-preserving sampling across categories
10//! - **HyperLogLog**: Approximate cardinality estimation with O(1) space
11//! - **Count-Min Sketch**: Approximate frequency counting for heavy hitters
12//! - **T-Digest**: Approximate percentile calculations for streaming data
13//! - **Bloom Filter**: Space-efficient probabilistic membership testing
14//!
15//! ## Use Cases
16//!
17//! - Real-time analytics on billion-event streams
18//! - Memory-efficient distinct counting
19//! - Top-K heavy hitter detection
20//! - Approximate quantile tracking
21//! - Duplicate detection with minimal memory
22
23use crate::StreamEvent;
24use anyhow::Result;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::hash::{Hash, Hasher};
28
29/// Configuration for advanced sampling operations
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct SamplingConfig {
32    /// Reservoir sampling size
33    pub reservoir_size: usize,
34    /// Number of hash functions for Count-Min Sketch
35    pub cms_hash_count: usize,
36    /// Width of Count-Min Sketch
37    pub cms_width: usize,
38    /// Number of registers for HyperLogLog (power of 2)
39    pub hll_precision: u8,
40    /// Compression parameter for T-Digest
41    pub tdigest_delta: f64,
42    /// Size of Bloom filter in bits
43    pub bloom_filter_bits: usize,
44    /// Number of hash functions for Bloom filter
45    pub bloom_filter_hashes: usize,
46    /// Stratified sampling categories
47    pub stratified_categories: Vec<String>,
48    /// Sample rate per category (0.0 to 1.0)
49    pub stratified_sample_rates: HashMap<String, f64>,
50}
51
52impl Default for SamplingConfig {
53    fn default() -> Self {
54        Self {
55            reservoir_size: 1000,
56            cms_hash_count: 4,
57            cms_width: 10000,
58            hll_precision: 14, // 16K registers
59            tdigest_delta: 0.01,
60            bloom_filter_bits: 100000,
61            bloom_filter_hashes: 7,
62            stratified_categories: Vec::new(),
63            stratified_sample_rates: HashMap::new(),
64        }
65    }
66}
67
68/// Reservoir Sampler - Maintains uniform random sample from unbounded stream
69///
70/// Uses Algorithm R (Vitter, 1985) for efficient single-pass sampling.
71#[derive(Debug, Clone)]
72pub struct ReservoirSampler {
73    reservoir: Vec<StreamEvent>,
74    capacity: usize,
75    count: u64,
76}
77
78impl ReservoirSampler {
79    /// Create a new reservoir sampler
80    ///
81    /// # Arguments
82    /// * `capacity` - Maximum number of samples to retain
83    pub fn new(capacity: usize) -> Self {
84        Self {
85            reservoir: Vec::with_capacity(capacity),
86            capacity,
87            count: 0,
88        }
89    }
90
91    /// Add an event to the reservoir
92    ///
93    /// Uses Algorithm R for O(1) average insertion time
94    pub fn add(&mut self, event: StreamEvent) {
95        self.count += 1;
96
97        if self.reservoir.len() < self.capacity {
98            // Reservoir not full yet, just add
99            self.reservoir.push(event);
100        } else {
101            // Randomly replace existing event with decreasing probability
102            let j = (fastrand::f64() * self.count as f64) as usize;
103            if j < self.capacity {
104                self.reservoir[j] = event;
105            }
106        }
107    }
108
109    /// Get the current sample
110    pub fn sample(&self) -> &[StreamEvent] {
111        &self.reservoir
112    }
113
114    /// Get the number of events processed
115    pub fn count(&self) -> u64 {
116        self.count
117    }
118
119    /// Clear the reservoir
120    pub fn clear(&mut self) {
121        self.reservoir.clear();
122        self.count = 0;
123    }
124
125    /// Get statistics about the sampler
126    pub fn stats(&self) -> ReservoirStats {
127        ReservoirStats {
128            capacity: self.capacity,
129            current_size: self.reservoir.len(),
130            total_events: self.count,
131            sampling_rate: if self.count > 0 {
132                self.reservoir.len() as f64 / self.count as f64
133            } else {
134                0.0
135            },
136        }
137    }
138}
139
140/// Statistics for reservoir sampler
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ReservoirStats {
143    pub capacity: usize,
144    pub current_size: usize,
145    pub total_events: u64,
146    pub sampling_rate: f64,
147}
148
149/// Stratified Sampler - Preserves distribution across categories
150///
151/// Maintains separate reservoirs for each category to ensure
152/// representative sampling across different event types.
153#[derive(Debug, Clone)]
154pub struct StratifiedSampler {
155    samplers: HashMap<String, ReservoirSampler>,
156    sample_rates: HashMap<String, f64>,
157    default_capacity: usize,
158    category_extractor: fn(&StreamEvent) -> Option<String>,
159}
160
161impl StratifiedSampler {
162    /// Create a new stratified sampler
163    ///
164    /// # Arguments
165    /// * `default_capacity` - Default reservoir size per category
166    /// * `category_extractor` - Function to extract category from event
167    pub fn new(
168        default_capacity: usize,
169        category_extractor: fn(&StreamEvent) -> Option<String>,
170    ) -> Self {
171        Self {
172            samplers: HashMap::new(),
173            sample_rates: HashMap::new(),
174            default_capacity,
175            category_extractor,
176        }
177    }
178
179    /// Set sample rate for a specific category
180    pub fn set_category_rate(&mut self, category: String, rate: f64) {
181        assert!((0.0..=1.0).contains(&rate), "Rate must be in [0, 1]");
182        self.sample_rates.insert(category, rate);
183    }
184
185    /// Add an event to the appropriate category reservoir
186    pub fn add(&mut self, event: StreamEvent) {
187        if let Some(category) = (self.category_extractor)(&event) {
188            // Check if we should sample this category
189            let rate = self.sample_rates.get(&category).copied().unwrap_or(1.0);
190
191            if rate <= 0.0 {
192                return; // Skip this category
193            }
194
195            // Get or create sampler for this category
196            let sampler = self.samplers.entry(category.clone()).or_insert_with(|| {
197                let capacity = (self.default_capacity as f64 * rate) as usize;
198                ReservoirSampler::new(capacity.max(1))
199            });
200
201            sampler.add(event);
202        }
203    }
204
205    /// Get samples for a specific category
206    pub fn category_sample(&self, category: &str) -> Option<&[StreamEvent]> {
207        self.samplers.get(category).map(|s| s.sample())
208    }
209
210    /// Get all samples grouped by category
211    pub fn all_samples(&self) -> HashMap<String, Vec<StreamEvent>> {
212        self.samplers
213            .iter()
214            .map(|(cat, sampler)| (cat.clone(), sampler.sample().to_vec()))
215            .collect()
216    }
217
218    /// Get statistics for all categories
219    pub fn stats(&self) -> StratifiedStats {
220        let category_stats = self
221            .samplers
222            .iter()
223            .map(|(cat, sampler)| (cat.clone(), sampler.stats()))
224            .collect();
225
226        StratifiedStats {
227            category_count: self.samplers.len(),
228            category_stats,
229        }
230    }
231}
232
233/// Statistics for stratified sampler
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct StratifiedStats {
236    pub category_count: usize,
237    pub category_stats: HashMap<String, ReservoirStats>,
238}
239
240/// HyperLogLog - Approximate cardinality estimator
241///
242/// Uses probabilistic counting with 2^precision registers to estimate
243/// the number of distinct elements with ~1.04/sqrt(m) relative error.
244#[derive(Debug, Clone)]
245pub struct HyperLogLog {
246    registers: Vec<u8>,
247    precision: u8,
248    alpha: f64,
249}
250
251impl HyperLogLog {
252    /// Create a new HyperLogLog estimator
253    ///
254    /// # Arguments
255    /// * `precision` - Number of bits for register indexing (4-16)
256    ///   Higher precision = more accuracy but more memory
257    pub fn new(precision: u8) -> Self {
258        assert!(
259            (4..=16).contains(&precision),
260            "Precision must be between 4 and 16"
261        );
262
263        let m = 1 << precision; // 2^precision registers
264
265        // Alpha constant for bias correction
266        let alpha = match m {
267            16 => 0.673,
268            32 => 0.697,
269            64 => 0.709,
270            _ => 0.7213 / (1.0 + 1.079 / m as f64),
271        };
272
273        Self {
274            registers: vec![0; m],
275            precision,
276            alpha,
277        }
278    }
279
280    /// Add an element to the estimator
281    pub fn add<T: Hash>(&mut self, element: &T) {
282        let hash = self.hash(element);
283
284        // Use first 'precision' bits for register index
285        let idx = (hash >> (64 - self.precision)) as usize;
286
287        // Count leading zeros in remaining bits + 1
288        let remaining = hash << self.precision;
289        let leading_zeros = remaining.leading_zeros() as u8 + 1;
290
291        // Update register with maximum leading zeros seen
292        self.registers[idx] = self.registers[idx].max(leading_zeros);
293    }
294
295    /// Estimate the cardinality (number of distinct elements)
296    pub fn cardinality(&self) -> u64 {
297        let m = self.registers.len() as f64;
298
299        // Harmonic mean of 2^register values
300        let raw_estimate = self.alpha * m * m
301            / self
302                .registers
303                .iter()
304                .map(|&r| 2.0_f64.powi(-(r as i32)))
305                .sum::<f64>();
306
307        // Apply bias correction
308        if raw_estimate <= 5.0 * m {
309            // Small range correction
310            let zeros = self.registers.iter().filter(|&&r| r == 0).count() as f64;
311            if zeros > 0.0 {
312                return (m * (m / zeros).ln()) as u64;
313            }
314        }
315
316        if raw_estimate <= (1.0 / 30.0) * (1u64 << 32) as f64 {
317            // No correction
318            raw_estimate as u64
319        } else {
320            // Large range correction
321            let two_32 = (1u64 << 32) as f64;
322            (-(two_32) * ((1.0 - raw_estimate / two_32).ln())) as u64
323        }
324    }
325
326    /// Merge another HyperLogLog into this one
327    pub fn merge(&mut self, other: &HyperLogLog) {
328        assert_eq!(
329            self.precision, other.precision,
330            "Cannot merge HyperLogLogs with different precisions"
331        );
332
333        for (i, &other_val) in other.registers.iter().enumerate() {
334            self.registers[i] = self.registers[i].max(other_val);
335        }
336    }
337
338    /// Hash function for HyperLogLog
339    fn hash<T: Hash>(&self, element: &T) -> u64 {
340        use std::collections::hash_map::DefaultHasher;
341        let mut hasher = DefaultHasher::new();
342        element.hash(&mut hasher);
343        hasher.finish()
344    }
345
346    /// Get statistics about the HyperLogLog
347    pub fn stats(&self) -> HyperLogLogStats {
348        HyperLogLogStats {
349            precision: self.precision,
350            register_count: self.registers.len(),
351            estimated_cardinality: self.cardinality(),
352            memory_bytes: self.registers.len(),
353        }
354    }
355}
356
357/// Statistics for HyperLogLog
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct HyperLogLogStats {
360    pub precision: u8,
361    pub register_count: usize,
362    pub estimated_cardinality: u64,
363    pub memory_bytes: usize,
364}
365
366/// Count-Min Sketch - Approximate frequency counter
367///
368/// Space-efficient probabilistic data structure for estimating
369/// frequency of elements in a stream with guaranteed error bounds.
370#[derive(Debug, Clone)]
371pub struct CountMinSketch {
372    table: Vec<Vec<u64>>,
373    hash_count: usize,
374    width: usize,
375    total_count: u64,
376}
377
378impl CountMinSketch {
379    /// Create a new Count-Min Sketch
380    ///
381    /// # Arguments
382    /// * `hash_count` - Number of hash functions (depth)
383    /// * `width` - Number of counters per hash function
384    ///
385    /// Error bounds: ε = e / width, δ = 1 / e^hash_count
386    pub fn new(hash_count: usize, width: usize) -> Self {
387        Self {
388            table: vec![vec![0; width]; hash_count],
389            hash_count,
390            width,
391            total_count: 0,
392        }
393    }
394
395    /// Add an element with count
396    pub fn add<T: Hash>(&mut self, element: &T, count: u64) {
397        self.total_count += count;
398
399        for i in 0..self.hash_count {
400            let idx = self.hash_i(element, i) % self.width;
401            self.table[i][idx] += count;
402        }
403    }
404
405    /// Estimate the frequency of an element
406    pub fn estimate<T: Hash>(&self, element: &T) -> u64 {
407        (0..self.hash_count)
408            .map(|i| {
409                let idx = self.hash_i(element, i) % self.width;
410                self.table[i][idx]
411            })
412            .min()
413            .unwrap_or(0)
414    }
415
416    /// Get the total count of all elements
417    pub fn total_count(&self) -> u64 {
418        self.total_count
419    }
420
421    /// Hash function with index
422    fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
423        use std::collections::hash_map::DefaultHasher;
424        let mut hasher = DefaultHasher::new();
425        element.hash(&mut hasher);
426        i.hash(&mut hasher);
427        hasher.finish() as usize
428    }
429
430    /// Get statistics about the Count-Min Sketch
431    pub fn stats(&self) -> CountMinSketchStats {
432        CountMinSketchStats {
433            hash_count: self.hash_count,
434            width: self.width,
435            total_count: self.total_count,
436            memory_bytes: self.hash_count * self.width * std::mem::size_of::<u64>(),
437        }
438    }
439}
440
441/// Statistics for Count-Min Sketch
442#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct CountMinSketchStats {
444    pub hash_count: usize,
445    pub width: usize,
446    pub total_count: u64,
447    pub memory_bytes: usize,
448}
449
450/// T-Digest - Approximate quantile estimator
451///
452/// Provides accurate percentile estimates with compression
453/// for streaming data. More accurate at extremes (p0, p100).
454#[derive(Debug, Clone)]
455pub struct TDigest {
456    centroids: Vec<Centroid>,
457    delta: f64,
458    total_weight: f64,
459    max_size: usize,
460}
461
462#[derive(Debug, Clone, Copy)]
463struct Centroid {
464    mean: f64,
465    weight: f64,
466}
467
468impl TDigest {
469    /// Create a new T-Digest
470    ///
471    /// # Arguments
472    /// * `delta` - Compression parameter (0.01 = 1% accuracy)
473    pub fn new(delta: f64) -> Self {
474        Self {
475            centroids: Vec::new(),
476            delta,
477            total_weight: 0.0,
478            max_size: (1.0 / delta) as usize,
479        }
480    }
481
482    /// Add a value to the digest
483    pub fn add(&mut self, value: f64, weight: f64) {
484        self.centroids.push(Centroid {
485            mean: value,
486            weight,
487        });
488        self.total_weight += weight;
489
490        // Compress if needed
491        if self.centroids.len() > self.max_size {
492            self.compress();
493        }
494    }
495
496    /// Estimate the quantile (0.0 to 1.0)
497    pub fn quantile(&mut self, q: f64) -> Option<f64> {
498        if self.centroids.is_empty() {
499            return None;
500        }
501
502        if self.centroids.len() > 1 {
503            self.compress();
504        }
505
506        let index = q * self.total_weight;
507        let mut sum = 0.0;
508
509        for centroid in &self.centroids {
510            sum += centroid.weight;
511            if sum >= index {
512                return Some(centroid.mean);
513            }
514        }
515
516        self.centroids.last().map(|c| c.mean)
517    }
518
519    /// Compress centroids
520    fn compress(&mut self) {
521        if self.centroids.is_empty() {
522            return;
523        }
524
525        // Sort by mean
526        self.centroids
527            .sort_by(|a, b| a.mean.partial_cmp(&b.mean).unwrap());
528
529        let mut compressed = Vec::new();
530        let mut current = self.centroids[0];
531
532        for &centroid in &self.centroids[1..] {
533            // Check if we should merge
534            let q = (current.weight + centroid.weight) / self.total_weight;
535            let k = self.k_limit(q);
536
537            if current.weight + centroid.weight <= k {
538                // Merge centroids
539                let total_weight = current.weight + centroid.weight;
540                current.mean = (current.mean * current.weight + centroid.mean * centroid.weight)
541                    / total_weight;
542                current.weight = total_weight;
543            } else {
544                compressed.push(current);
545                current = centroid;
546            }
547        }
548        compressed.push(current);
549
550        self.centroids = compressed;
551    }
552
553    /// Compute k limit for compression
554    fn k_limit(&self, q: f64) -> f64 {
555        4.0 * self.total_weight * self.delta * q * (1.0 - q)
556    }
557
558    /// Get statistics about the T-Digest
559    pub fn stats(&self) -> TDigestStats {
560        TDigestStats {
561            centroid_count: self.centroids.len(),
562            total_weight: self.total_weight,
563            delta: self.delta,
564            max_size: self.max_size,
565        }
566    }
567}
568
569/// Statistics for T-Digest
570#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct TDigestStats {
572    pub centroid_count: usize,
573    pub total_weight: f64,
574    pub delta: f64,
575    pub max_size: usize,
576}
577
578/// Bloom Filter - Probabilistic membership test
579///
580/// Space-efficient set membership test with no false negatives
581/// but possible false positives (configurable error rate).
582#[derive(Debug, Clone)]
583pub struct BloomFilter {
584    bits: Vec<bool>,
585    hash_count: usize,
586    insert_count: u64,
587}
588
589impl BloomFilter {
590    /// Create a new Bloom filter
591    ///
592    /// # Arguments
593    /// * `size` - Number of bits in the filter
594    /// * `hash_count` - Number of hash functions
595    ///
596    /// Optimal hash_count = (bits / expected_items) * ln(2)
597    pub fn new(size: usize, hash_count: usize) -> Self {
598        Self {
599            bits: vec![false; size],
600            hash_count,
601            insert_count: 0,
602        }
603    }
604
605    /// Optimal Bloom filter for expected items and false positive rate
606    pub fn optimal(expected_items: usize, false_positive_rate: f64) -> Self {
607        let bits = Self::optimal_bits(expected_items, false_positive_rate);
608        let hash_count = Self::optimal_hash_count(bits, expected_items);
609        Self::new(bits, hash_count)
610    }
611
612    /// Calculate optimal number of bits
613    fn optimal_bits(n: usize, p: f64) -> usize {
614        let numerator = -(n as f64 * p.ln());
615        let denominator = 2.0_f64.ln().powi(2);
616        (numerator / denominator).ceil() as usize
617    }
618
619    /// Calculate optimal number of hash functions
620    fn optimal_hash_count(m: usize, n: usize) -> usize {
621        ((m as f64 / n as f64) * 2.0_f64.ln()).ceil() as usize
622    }
623
624    /// Add an element to the filter
625    pub fn add<T: Hash>(&mut self, element: &T) {
626        self.insert_count += 1;
627        for i in 0..self.hash_count {
628            let idx = self.hash_i(element, i) % self.bits.len();
629            self.bits[idx] = true;
630        }
631    }
632
633    /// Check if an element might be in the set
634    pub fn contains<T: Hash>(&self, element: &T) -> bool {
635        (0..self.hash_count).all(|i| {
636            let idx = self.hash_i(element, i) % self.bits.len();
637            self.bits[idx]
638        })
639    }
640
641    /// Hash function with index
642    fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
643        use std::collections::hash_map::DefaultHasher;
644        let mut hasher = DefaultHasher::new();
645        element.hash(&mut hasher);
646        i.hash(&mut hasher);
647        hasher.finish() as usize
648    }
649
650    /// Estimate current false positive rate
651    pub fn false_positive_rate(&self) -> f64 {
652        let set_bits = self.bits.iter().filter(|&&b| b).count() as f64;
653        let p = set_bits / self.bits.len() as f64;
654        p.powi(self.hash_count as i32)
655    }
656
657    /// Get statistics about the Bloom filter
658    pub fn stats(&self) -> BloomFilterStats {
659        let set_bits = self.bits.iter().filter(|&&b| b).count();
660
661        BloomFilterStats {
662            size_bits: self.bits.len(),
663            hash_count: self.hash_count,
664            insert_count: self.insert_count,
665            set_bits,
666            estimated_fpr: self.false_positive_rate(),
667            memory_bytes: self.bits.len() / 8,
668        }
669    }
670}
671
672/// Statistics for Bloom filter
673#[derive(Debug, Clone, Serialize, Deserialize)]
674pub struct BloomFilterStats {
675    pub size_bits: usize,
676    pub hash_count: usize,
677    pub insert_count: u64,
678    pub set_bits: usize,
679    pub estimated_fpr: f64,
680    pub memory_bytes: usize,
681}
682
683/// Unified sampling manager for all sampling techniques
684pub struct AdvancedSamplingManager {
685    config: SamplingConfig,
686    reservoir: ReservoirSampler,
687    stratified: Option<StratifiedSampler>,
688    hyperloglog: HyperLogLog,
689    count_min: CountMinSketch,
690    tdigest: TDigest,
691    bloom: BloomFilter,
692    event_count: u64,
693}
694
695impl AdvancedSamplingManager {
696    /// Create a new sampling manager with the given configuration
697    pub fn new(config: SamplingConfig) -> Self {
698        let reservoir = ReservoirSampler::new(config.reservoir_size);
699        let hyperloglog = HyperLogLog::new(config.hll_precision);
700        let count_min = CountMinSketch::new(config.cms_hash_count, config.cms_width);
701        let tdigest = TDigest::new(config.tdigest_delta);
702        let bloom = BloomFilter::new(config.bloom_filter_bits, config.bloom_filter_hashes);
703
704        Self {
705            config,
706            reservoir,
707            stratified: None,
708            hyperloglog,
709            count_min,
710            tdigest,
711            bloom,
712            event_count: 0,
713        }
714    }
715
716    /// Enable stratified sampling with a category extractor function
717    pub fn enable_stratified(&mut self, extractor: fn(&StreamEvent) -> Option<String>) {
718        let mut sampler = StratifiedSampler::new(self.config.reservoir_size, extractor);
719
720        // Apply configured sample rates
721        for (category, rate) in &self.config.stratified_sample_rates {
722            sampler.set_category_rate(category.clone(), *rate);
723        }
724
725        self.stratified = Some(sampler);
726    }
727
728    /// Process an event through all sampling techniques
729    pub fn process_event(&mut self, event: StreamEvent) -> Result<()> {
730        self.event_count += 1;
731
732        // Reservoir sampling
733        self.reservoir.add(event.clone());
734
735        // Stratified sampling
736        if let Some(ref mut stratified) = self.stratified {
737            stratified.add(event.clone());
738        }
739
740        // HyperLogLog (cardinality)
741        let event_id = self.event_id(&event);
742        self.hyperloglog.add(&event_id);
743
744        // Count-Min Sketch (frequency)
745        self.count_min.add(&event_id, 1);
746
747        // T-Digest (quantiles) - using event timestamp as value
748        if let Some(value) = self.extract_numeric_value(&event) {
749            self.tdigest.add(value, 1.0);
750        }
751
752        // Bloom filter (membership)
753        self.bloom.add(&event_id);
754
755        Ok(())
756    }
757
758    /// Get reservoir sample
759    pub fn reservoir_sample(&self) -> &[StreamEvent] {
760        self.reservoir.sample()
761    }
762
763    /// Get stratified samples
764    pub fn stratified_samples(&self) -> Option<HashMap<String, Vec<StreamEvent>>> {
765        self.stratified.as_ref().map(|s| s.all_samples())
766    }
767
768    /// Estimate distinct event count
769    pub fn distinct_count(&self) -> u64 {
770        self.hyperloglog.cardinality()
771    }
772
773    /// Estimate frequency of an event
774    pub fn event_frequency(&self, event: &StreamEvent) -> u64 {
775        let event_id = self.event_id(event);
776        self.count_min.estimate(&event_id)
777    }
778
779    /// Check if an event was likely seen before
780    pub fn likely_seen(&self, event: &StreamEvent) -> bool {
781        let event_id = self.event_id(event);
782        self.bloom.contains(&event_id)
783    }
784
785    /// Estimate quantile (percentile) of numeric values
786    pub fn quantile(&mut self, q: f64) -> Option<f64> {
787        self.tdigest.quantile(q)
788    }
789
790    /// Get comprehensive statistics
791    pub fn stats(&self) -> SamplingManagerStats {
792        SamplingManagerStats {
793            event_count: self.event_count,
794            reservoir_stats: self.reservoir.stats(),
795            stratified_stats: self.stratified.as_ref().map(|s| s.stats()),
796            hyperloglog_stats: self.hyperloglog.stats(),
797            count_min_stats: self.count_min.stats(),
798            tdigest_stats: self.tdigest.stats(),
799            bloom_stats: self.bloom.stats(),
800        }
801    }
802
803    /// Extract event ID for hashing
804    fn event_id(&self, event: &StreamEvent) -> String {
805        match event {
806            StreamEvent::TripleAdded {
807                subject,
808                predicate,
809                object,
810                ..
811            } => format!("{}-{}-{}", subject, predicate, object),
812            StreamEvent::TripleRemoved {
813                subject,
814                predicate,
815                object,
816                ..
817            } => format!("{}-{}-{}", subject, predicate, object),
818            StreamEvent::GraphCreated { graph, .. } => format!("graph-{}", graph),
819            StreamEvent::GraphDeleted { graph, .. } => format!("graph-{}", graph),
820            _ => "unknown".to_string(),
821        }
822    }
823
824    /// Extract numeric value from event for quantile estimation
825    fn extract_numeric_value(&self, event: &StreamEvent) -> Option<f64> {
826        match event {
827            StreamEvent::TripleAdded { metadata, .. }
828            | StreamEvent::TripleRemoved { metadata, .. } => {
829                Some(metadata.timestamp.timestamp() as f64)
830            }
831            _ => None,
832        }
833    }
834}
835
836/// Comprehensive statistics for all sampling techniques
837#[derive(Debug, Clone, Serialize, Deserialize)]
838pub struct SamplingManagerStats {
839    pub event_count: u64,
840    pub reservoir_stats: ReservoirStats,
841    pub stratified_stats: Option<StratifiedStats>,
842    pub hyperloglog_stats: HyperLogLogStats,
843    pub count_min_stats: CountMinSketchStats,
844    pub tdigest_stats: TDigestStats,
845    pub bloom_stats: BloomFilterStats,
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::EventMetadata;
852    use std::collections::HashMap;
853
854    fn create_test_event(id: &str) -> StreamEvent {
855        StreamEvent::TripleAdded {
856            subject: format!("http://example.org/{}", id),
857            predicate: "http://example.org/prop".to_string(),
858            object: "value".to_string(),
859            graph: None,
860            metadata: EventMetadata {
861                event_id: id.to_string(),
862                timestamp: chrono::Utc::now(),
863                source: "test".to_string(),
864                user: None,
865                context: None,
866                caused_by: None,
867                version: "1.0".to_string(),
868                properties: HashMap::new(),
869                checksum: None,
870            },
871        }
872    }
873
874    #[test]
875    fn test_reservoir_sampler() {
876        let mut sampler = ReservoirSampler::new(10);
877
878        // Add 100 events
879        for i in 0..100 {
880            sampler.add(create_test_event(&format!("event-{}", i)));
881        }
882
883        let stats = sampler.stats();
884        assert_eq!(stats.capacity, 10);
885        assert_eq!(stats.current_size, 10);
886        assert_eq!(stats.total_events, 100);
887        assert_eq!(stats.sampling_rate, 0.1);
888    }
889
890    #[test]
891    fn test_stratified_sampler() {
892        fn category_extractor(event: &StreamEvent) -> Option<String> {
893            match event {
894                StreamEvent::TripleAdded { metadata, .. } => Some(metadata.source.clone()),
895                _ => None,
896            }
897        }
898
899        let mut sampler = StratifiedSampler::new(10, category_extractor);
900        sampler.set_category_rate("source1".to_string(), 0.5);
901        sampler.set_category_rate("source2".to_string(), 1.0);
902
903        // Add events from different sources
904        for i in 0..50 {
905            let mut event = create_test_event(&format!("event-{}", i));
906            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
907                metadata.source = if i < 25 {
908                    "source1".to_string()
909                } else {
910                    "source2".to_string()
911                };
912            }
913            sampler.add(event);
914        }
915
916        let stats = sampler.stats();
917        assert_eq!(stats.category_count, 2);
918        assert!(stats.category_stats.contains_key("source1"));
919        assert!(stats.category_stats.contains_key("source2"));
920    }
921
922    #[test]
923    fn test_hyperloglog() {
924        let mut hll = HyperLogLog::new(14);
925
926        // Add 1000 distinct elements
927        for i in 0..1000 {
928            hll.add(&format!("element-{}", i));
929        }
930
931        let cardinality = hll.cardinality();
932
933        // HyperLogLog should estimate within ~2% error
934        let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
935        assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
936    }
937
938    #[test]
939    fn test_count_min_sketch() {
940        let mut cms = CountMinSketch::new(4, 1000);
941
942        // Add elements with different frequencies
943        for _ in 0..100 {
944            cms.add(&"frequent", 1);
945        }
946        for _ in 0..10 {
947            cms.add(&"rare", 1);
948        }
949
950        let freq_frequent = cms.estimate(&"frequent");
951        let freq_rare = cms.estimate(&"rare");
952
953        assert!(freq_frequent >= 100);
954        assert!(freq_rare >= 10);
955        assert!(freq_frequent > freq_rare);
956    }
957
958    #[test]
959    fn test_tdigest() {
960        let mut digest = TDigest::new(0.01);
961
962        // Add values 1..1000
963        for i in 1..=1000 {
964            digest.add(i as f64, 1.0);
965        }
966
967        // Test median (should be around 500)
968        let median = digest.quantile(0.5).unwrap();
969        assert!((median - 500.0).abs() < 50.0, "Median: {}", median);
970
971        // Test P90 (should be around 900)
972        let p90 = digest.quantile(0.9).unwrap();
973        assert!((p90 - 900.0).abs() < 100.0, "P90: {}", p90);
974    }
975
976    #[test]
977    fn test_bloom_filter() {
978        let mut bloom = BloomFilter::optimal(1000, 0.01);
979
980        // Add 500 elements
981        for i in 0..500 {
982            bloom.add(&format!("element-{}", i));
983        }
984
985        // All added elements should be found
986        for i in 0..500 {
987            assert!(bloom.contains(&format!("element-{}", i)));
988        }
989
990        // Test false positive rate
991        let mut false_positives = 0;
992        for i in 1000..2000 {
993            if bloom.contains(&format!("element-{}", i)) {
994                false_positives += 1;
995            }
996        }
997
998        let fpr = false_positives as f64 / 1000.0;
999        assert!(fpr < 0.05, "False positive rate too high: {}", fpr);
1000    }
1001
1002    #[test]
1003    fn test_sampling_manager() {
1004        let config = SamplingConfig::default();
1005        let mut manager = AdvancedSamplingManager::new(config);
1006
1007        // Process 100 events
1008        for i in 0..100 {
1009            let event = create_test_event(&format!("event-{}", i));
1010            manager.process_event(event).unwrap();
1011        }
1012
1013        let stats = manager.stats();
1014        assert_eq!(stats.event_count, 100);
1015        assert!(stats.reservoir_stats.current_size > 0);
1016        assert!(stats.hyperloglog_stats.estimated_cardinality > 0);
1017        assert!(stats.count_min_stats.total_count > 0);
1018    }
1019
1020    #[test]
1021    fn test_hyperloglog_merge() {
1022        let mut hll1 = HyperLogLog::new(14);
1023        let mut hll2 = HyperLogLog::new(14);
1024
1025        // Add different elements to each
1026        for i in 0..500 {
1027            hll1.add(&format!("element-{}", i));
1028        }
1029        for i in 500..1000 {
1030            hll2.add(&format!("element-{}", i));
1031        }
1032
1033        // Merge
1034        hll1.merge(&hll2);
1035
1036        let cardinality = hll1.cardinality();
1037
1038        // Should estimate ~1000 distinct elements
1039        let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
1040        assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
1041    }
1042
1043    #[test]
1044    fn test_bloom_filter_optimal() {
1045        let bloom = BloomFilter::optimal(10000, 0.01);
1046        let stats = bloom.stats();
1047
1048        // Verify optimal sizing
1049        assert!(stats.size_bits > 0);
1050        assert!(stats.hash_count > 0);
1051    }
1052
1053    #[test]
1054    fn test_sampling_manager_with_stratified() {
1055        fn category_extractor(event: &StreamEvent) -> Option<String> {
1056            match event {
1057                StreamEvent::TripleAdded { subject, .. } => {
1058                    if subject.contains("type1") {
1059                        Some("type1".to_string())
1060                    } else if subject.contains("type2") {
1061                        Some("type2".to_string())
1062                    } else {
1063                        None
1064                    }
1065                }
1066                _ => None,
1067            }
1068        }
1069
1070        let config = SamplingConfig::default();
1071        let mut manager = AdvancedSamplingManager::new(config);
1072        manager.enable_stratified(category_extractor);
1073
1074        // Process events from different categories
1075        for i in 0..50 {
1076            let event = StreamEvent::TripleAdded {
1077                subject: format!("http://example.org/type1/{}", i),
1078                predicate: "http://example.org/prop".to_string(),
1079                object: "value".to_string(),
1080                graph: None,
1081                metadata: EventMetadata {
1082                    event_id: format!("event-{}", i),
1083                    timestamp: chrono::Utc::now(),
1084                    source: "test".to_string(),
1085                    user: None,
1086                    context: None,
1087                    caused_by: None,
1088                    version: "1.0".to_string(),
1089                    properties: HashMap::new(),
1090                    checksum: None,
1091                },
1092            };
1093            manager.process_event(event).unwrap();
1094        }
1095
1096        for i in 50..100 {
1097            let event = StreamEvent::TripleAdded {
1098                subject: format!("http://example.org/type2/{}", i),
1099                predicate: "http://example.org/prop".to_string(),
1100                object: "value".to_string(),
1101                graph: None,
1102                metadata: EventMetadata {
1103                    event_id: format!("event-{}", i),
1104                    timestamp: chrono::Utc::now(),
1105                    source: "test".to_string(),
1106                    user: None,
1107                    context: None,
1108                    caused_by: None,
1109                    version: "1.0".to_string(),
1110                    properties: HashMap::new(),
1111                    checksum: None,
1112                },
1113            };
1114            manager.process_event(event).unwrap();
1115        }
1116
1117        let stats = manager.stats();
1118        assert_eq!(stats.event_count, 100);
1119        assert!(stats.stratified_stats.is_some());
1120
1121        let stratified = stats.stratified_stats.unwrap();
1122        assert_eq!(stratified.category_count, 2);
1123    }
1124}