1use crate::StreamEvent;
24use anyhow::Result;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::hash::{Hash, Hasher};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct SamplingConfig {
32 pub reservoir_size: usize,
34 pub cms_hash_count: usize,
36 pub cms_width: usize,
38 pub hll_precision: u8,
40 pub tdigest_delta: f64,
42 pub bloom_filter_bits: usize,
44 pub bloom_filter_hashes: usize,
46 pub stratified_categories: Vec<String>,
48 pub stratified_sample_rates: HashMap<String, f64>,
50}
51
52impl Default for SamplingConfig {
53 fn default() -> Self {
54 Self {
55 reservoir_size: 1000,
56 cms_hash_count: 4,
57 cms_width: 10000,
58 hll_precision: 14, tdigest_delta: 0.01,
60 bloom_filter_bits: 100000,
61 bloom_filter_hashes: 7,
62 stratified_categories: Vec::new(),
63 stratified_sample_rates: HashMap::new(),
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
72pub struct ReservoirSampler {
73 reservoir: Vec<StreamEvent>,
74 capacity: usize,
75 count: u64,
76}
77
78impl ReservoirSampler {
79 pub fn new(capacity: usize) -> Self {
84 Self {
85 reservoir: Vec::with_capacity(capacity),
86 capacity,
87 count: 0,
88 }
89 }
90
91 pub fn add(&mut self, event: StreamEvent) {
95 self.count += 1;
96
97 if self.reservoir.len() < self.capacity {
98 self.reservoir.push(event);
100 } else {
101 let j = (fastrand::f64() * self.count as f64) as usize;
103 if j < self.capacity {
104 self.reservoir[j] = event;
105 }
106 }
107 }
108
109 pub fn sample(&self) -> &[StreamEvent] {
111 &self.reservoir
112 }
113
114 pub fn count(&self) -> u64 {
116 self.count
117 }
118
119 pub fn clear(&mut self) {
121 self.reservoir.clear();
122 self.count = 0;
123 }
124
125 pub fn stats(&self) -> ReservoirStats {
127 ReservoirStats {
128 capacity: self.capacity,
129 current_size: self.reservoir.len(),
130 total_events: self.count,
131 sampling_rate: if self.count > 0 {
132 self.reservoir.len() as f64 / self.count as f64
133 } else {
134 0.0
135 },
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ReservoirStats {
143 pub capacity: usize,
144 pub current_size: usize,
145 pub total_events: u64,
146 pub sampling_rate: f64,
147}
148
149#[derive(Debug, Clone)]
154pub struct StratifiedSampler {
155 samplers: HashMap<String, ReservoirSampler>,
156 sample_rates: HashMap<String, f64>,
157 default_capacity: usize,
158 category_extractor: fn(&StreamEvent) -> Option<String>,
159}
160
161impl StratifiedSampler {
162 pub fn new(
168 default_capacity: usize,
169 category_extractor: fn(&StreamEvent) -> Option<String>,
170 ) -> Self {
171 Self {
172 samplers: HashMap::new(),
173 sample_rates: HashMap::new(),
174 default_capacity,
175 category_extractor,
176 }
177 }
178
179 pub fn set_category_rate(&mut self, category: String, rate: f64) {
181 assert!((0.0..=1.0).contains(&rate), "Rate must be in [0, 1]");
182 self.sample_rates.insert(category, rate);
183 }
184
185 pub fn add(&mut self, event: StreamEvent) {
187 if let Some(category) = (self.category_extractor)(&event) {
188 let rate = self.sample_rates.get(&category).copied().unwrap_or(1.0);
190
191 if rate <= 0.0 {
192 return; }
194
195 let sampler = self.samplers.entry(category.clone()).or_insert_with(|| {
197 let capacity = (self.default_capacity as f64 * rate) as usize;
198 ReservoirSampler::new(capacity.max(1))
199 });
200
201 sampler.add(event);
202 }
203 }
204
205 pub fn category_sample(&self, category: &str) -> Option<&[StreamEvent]> {
207 self.samplers.get(category).map(|s| s.sample())
208 }
209
210 pub fn all_samples(&self) -> HashMap<String, Vec<StreamEvent>> {
212 self.samplers
213 .iter()
214 .map(|(cat, sampler)| (cat.clone(), sampler.sample().to_vec()))
215 .collect()
216 }
217
218 pub fn stats(&self) -> StratifiedStats {
220 let category_stats = self
221 .samplers
222 .iter()
223 .map(|(cat, sampler)| (cat.clone(), sampler.stats()))
224 .collect();
225
226 StratifiedStats {
227 category_count: self.samplers.len(),
228 category_stats,
229 }
230 }
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct StratifiedStats {
236 pub category_count: usize,
237 pub category_stats: HashMap<String, ReservoirStats>,
238}
239
240#[derive(Debug, Clone)]
245pub struct HyperLogLog {
246 registers: Vec<u8>,
247 precision: u8,
248 alpha: f64,
249}
250
251impl HyperLogLog {
252 pub fn new(precision: u8) -> Self {
258 assert!(
259 (4..=16).contains(&precision),
260 "Precision must be between 4 and 16"
261 );
262
263 let m = 1 << precision; let alpha = match m {
267 16 => 0.673,
268 32 => 0.697,
269 64 => 0.709,
270 _ => 0.7213 / (1.0 + 1.079 / m as f64),
271 };
272
273 Self {
274 registers: vec![0; m],
275 precision,
276 alpha,
277 }
278 }
279
280 pub fn add<T: Hash>(&mut self, element: &T) {
282 let hash = self.hash(element);
283
284 let idx = (hash >> (64 - self.precision)) as usize;
286
287 let remaining = hash << self.precision;
289 let leading_zeros = remaining.leading_zeros() as u8 + 1;
290
291 self.registers[idx] = self.registers[idx].max(leading_zeros);
293 }
294
295 pub fn cardinality(&self) -> u64 {
297 let m = self.registers.len() as f64;
298
299 let raw_estimate = self.alpha * m * m
301 / self
302 .registers
303 .iter()
304 .map(|&r| 2.0_f64.powi(-(r as i32)))
305 .sum::<f64>();
306
307 if raw_estimate <= 5.0 * m {
309 let zeros = self.registers.iter().filter(|&&r| r == 0).count() as f64;
311 if zeros > 0.0 {
312 return (m * (m / zeros).ln()) as u64;
313 }
314 }
315
316 if raw_estimate <= (1.0 / 30.0) * (1u64 << 32) as f64 {
317 raw_estimate as u64
319 } else {
320 let two_32 = (1u64 << 32) as f64;
322 (-(two_32) * ((1.0 - raw_estimate / two_32).ln())) as u64
323 }
324 }
325
326 pub fn merge(&mut self, other: &HyperLogLog) {
328 assert_eq!(
329 self.precision, other.precision,
330 "Cannot merge HyperLogLogs with different precisions"
331 );
332
333 for (i, &other_val) in other.registers.iter().enumerate() {
334 self.registers[i] = self.registers[i].max(other_val);
335 }
336 }
337
338 fn hash<T: Hash>(&self, element: &T) -> u64 {
340 use std::collections::hash_map::DefaultHasher;
341 let mut hasher = DefaultHasher::new();
342 element.hash(&mut hasher);
343 hasher.finish()
344 }
345
346 pub fn stats(&self) -> HyperLogLogStats {
348 HyperLogLogStats {
349 precision: self.precision,
350 register_count: self.registers.len(),
351 estimated_cardinality: self.cardinality(),
352 memory_bytes: self.registers.len(),
353 }
354 }
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct HyperLogLogStats {
360 pub precision: u8,
361 pub register_count: usize,
362 pub estimated_cardinality: u64,
363 pub memory_bytes: usize,
364}
365
366#[derive(Debug, Clone)]
371pub struct CountMinSketch {
372 table: Vec<Vec<u64>>,
373 hash_count: usize,
374 width: usize,
375 total_count: u64,
376}
377
378impl CountMinSketch {
379 pub fn new(hash_count: usize, width: usize) -> Self {
387 Self {
388 table: vec![vec![0; width]; hash_count],
389 hash_count,
390 width,
391 total_count: 0,
392 }
393 }
394
395 pub fn add<T: Hash>(&mut self, element: &T, count: u64) {
397 self.total_count += count;
398
399 for i in 0..self.hash_count {
400 let idx = self.hash_i(element, i) % self.width;
401 self.table[i][idx] += count;
402 }
403 }
404
405 pub fn estimate<T: Hash>(&self, element: &T) -> u64 {
407 (0..self.hash_count)
408 .map(|i| {
409 let idx = self.hash_i(element, i) % self.width;
410 self.table[i][idx]
411 })
412 .min()
413 .unwrap_or(0)
414 }
415
416 pub fn total_count(&self) -> u64 {
418 self.total_count
419 }
420
421 fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
423 use std::collections::hash_map::DefaultHasher;
424 let mut hasher = DefaultHasher::new();
425 element.hash(&mut hasher);
426 i.hash(&mut hasher);
427 hasher.finish() as usize
428 }
429
430 pub fn stats(&self) -> CountMinSketchStats {
432 CountMinSketchStats {
433 hash_count: self.hash_count,
434 width: self.width,
435 total_count: self.total_count,
436 memory_bytes: self.hash_count * self.width * std::mem::size_of::<u64>(),
437 }
438 }
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct CountMinSketchStats {
444 pub hash_count: usize,
445 pub width: usize,
446 pub total_count: u64,
447 pub memory_bytes: usize,
448}
449
450#[derive(Debug, Clone)]
455pub struct TDigest {
456 centroids: Vec<Centroid>,
457 delta: f64,
458 total_weight: f64,
459 max_size: usize,
460}
461
462#[derive(Debug, Clone, Copy)]
463struct Centroid {
464 mean: f64,
465 weight: f64,
466}
467
468impl TDigest {
469 pub fn new(delta: f64) -> Self {
474 Self {
475 centroids: Vec::new(),
476 delta,
477 total_weight: 0.0,
478 max_size: (1.0 / delta) as usize,
479 }
480 }
481
482 pub fn add(&mut self, value: f64, weight: f64) {
484 self.centroids.push(Centroid {
485 mean: value,
486 weight,
487 });
488 self.total_weight += weight;
489
490 if self.centroids.len() > self.max_size {
492 self.compress();
493 }
494 }
495
496 pub fn quantile(&mut self, q: f64) -> Option<f64> {
498 if self.centroids.is_empty() {
499 return None;
500 }
501
502 if self.centroids.len() > 1 {
503 self.compress();
504 }
505
506 let index = q * self.total_weight;
507 let mut sum = 0.0;
508
509 for centroid in &self.centroids {
510 sum += centroid.weight;
511 if sum >= index {
512 return Some(centroid.mean);
513 }
514 }
515
516 self.centroids.last().map(|c| c.mean)
517 }
518
519 fn compress(&mut self) {
521 if self.centroids.is_empty() {
522 return;
523 }
524
525 self.centroids.sort_by(|a, b| {
527 a.mean
528 .partial_cmp(&b.mean)
529 .unwrap_or(std::cmp::Ordering::Equal)
530 });
531
532 let mut compressed = Vec::new();
533 let mut current = self.centroids[0];
534
535 for ¢roid in &self.centroids[1..] {
536 let q = (current.weight + centroid.weight) / self.total_weight;
538 let k = self.k_limit(q);
539
540 if current.weight + centroid.weight <= k {
541 let total_weight = current.weight + centroid.weight;
543 current.mean = (current.mean * current.weight + centroid.mean * centroid.weight)
544 / total_weight;
545 current.weight = total_weight;
546 } else {
547 compressed.push(current);
548 current = centroid;
549 }
550 }
551 compressed.push(current);
552
553 self.centroids = compressed;
554 }
555
556 fn k_limit(&self, q: f64) -> f64 {
558 4.0 * self.total_weight * self.delta * q * (1.0 - q)
559 }
560
561 pub fn stats(&self) -> TDigestStats {
563 TDigestStats {
564 centroid_count: self.centroids.len(),
565 total_weight: self.total_weight,
566 delta: self.delta,
567 max_size: self.max_size,
568 }
569 }
570}
571
572#[derive(Debug, Clone, Serialize, Deserialize)]
574pub struct TDigestStats {
575 pub centroid_count: usize,
576 pub total_weight: f64,
577 pub delta: f64,
578 pub max_size: usize,
579}
580
581#[derive(Debug, Clone)]
586pub struct BloomFilter {
587 bits: Vec<bool>,
588 hash_count: usize,
589 insert_count: u64,
590}
591
592impl BloomFilter {
593 pub fn new(size: usize, hash_count: usize) -> Self {
601 Self {
602 bits: vec![false; size],
603 hash_count,
604 insert_count: 0,
605 }
606 }
607
608 pub fn optimal(expected_items: usize, false_positive_rate: f64) -> Self {
610 let bits = Self::optimal_bits(expected_items, false_positive_rate);
611 let hash_count = Self::optimal_hash_count(bits, expected_items);
612 Self::new(bits, hash_count)
613 }
614
615 fn optimal_bits(n: usize, p: f64) -> usize {
617 let numerator = -(n as f64 * p.ln());
618 let denominator = 2.0_f64.ln().powi(2);
619 (numerator / denominator).ceil() as usize
620 }
621
622 fn optimal_hash_count(m: usize, n: usize) -> usize {
624 ((m as f64 / n as f64) * 2.0_f64.ln()).ceil() as usize
625 }
626
627 pub fn add<T: Hash>(&mut self, element: &T) {
629 self.insert_count += 1;
630 for i in 0..self.hash_count {
631 let idx = self.hash_i(element, i) % self.bits.len();
632 self.bits[idx] = true;
633 }
634 }
635
636 pub fn contains<T: Hash>(&self, element: &T) -> bool {
638 (0..self.hash_count).all(|i| {
639 let idx = self.hash_i(element, i) % self.bits.len();
640 self.bits[idx]
641 })
642 }
643
644 fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
646 use std::collections::hash_map::DefaultHasher;
647 let mut hasher = DefaultHasher::new();
648 element.hash(&mut hasher);
649 i.hash(&mut hasher);
650 hasher.finish() as usize
651 }
652
653 pub fn false_positive_rate(&self) -> f64 {
655 let set_bits = self.bits.iter().filter(|&&b| b).count() as f64;
656 let p = set_bits / self.bits.len() as f64;
657 p.powi(self.hash_count as i32)
658 }
659
660 pub fn stats(&self) -> BloomFilterStats {
662 let set_bits = self.bits.iter().filter(|&&b| b).count();
663
664 BloomFilterStats {
665 size_bits: self.bits.len(),
666 hash_count: self.hash_count,
667 insert_count: self.insert_count,
668 set_bits,
669 estimated_fpr: self.false_positive_rate(),
670 memory_bytes: self.bits.len() / 8,
671 }
672 }
673}
674
675#[derive(Debug, Clone, Serialize, Deserialize)]
677pub struct BloomFilterStats {
678 pub size_bits: usize,
679 pub hash_count: usize,
680 pub insert_count: u64,
681 pub set_bits: usize,
682 pub estimated_fpr: f64,
683 pub memory_bytes: usize,
684}
685
686pub struct AdvancedSamplingManager {
688 config: SamplingConfig,
689 reservoir: ReservoirSampler,
690 stratified: Option<StratifiedSampler>,
691 hyperloglog: HyperLogLog,
692 count_min: CountMinSketch,
693 tdigest: TDigest,
694 bloom: BloomFilter,
695 event_count: u64,
696}
697
698impl AdvancedSamplingManager {
699 pub fn new(config: SamplingConfig) -> Self {
701 let reservoir = ReservoirSampler::new(config.reservoir_size);
702 let hyperloglog = HyperLogLog::new(config.hll_precision);
703 let count_min = CountMinSketch::new(config.cms_hash_count, config.cms_width);
704 let tdigest = TDigest::new(config.tdigest_delta);
705 let bloom = BloomFilter::new(config.bloom_filter_bits, config.bloom_filter_hashes);
706
707 Self {
708 config,
709 reservoir,
710 stratified: None,
711 hyperloglog,
712 count_min,
713 tdigest,
714 bloom,
715 event_count: 0,
716 }
717 }
718
719 pub fn enable_stratified(&mut self, extractor: fn(&StreamEvent) -> Option<String>) {
721 let mut sampler = StratifiedSampler::new(self.config.reservoir_size, extractor);
722
723 for (category, rate) in &self.config.stratified_sample_rates {
725 sampler.set_category_rate(category.clone(), *rate);
726 }
727
728 self.stratified = Some(sampler);
729 }
730
731 pub fn process_event(&mut self, event: StreamEvent) -> Result<()> {
733 self.event_count += 1;
734
735 self.reservoir.add(event.clone());
737
738 if let Some(ref mut stratified) = self.stratified {
740 stratified.add(event.clone());
741 }
742
743 let event_id = self.event_id(&event);
745 self.hyperloglog.add(&event_id);
746
747 self.count_min.add(&event_id, 1);
749
750 if let Some(value) = self.extract_numeric_value(&event) {
752 self.tdigest.add(value, 1.0);
753 }
754
755 self.bloom.add(&event_id);
757
758 Ok(())
759 }
760
761 pub fn reservoir_sample(&self) -> &[StreamEvent] {
763 self.reservoir.sample()
764 }
765
766 pub fn stratified_samples(&self) -> Option<HashMap<String, Vec<StreamEvent>>> {
768 self.stratified.as_ref().map(|s| s.all_samples())
769 }
770
771 pub fn distinct_count(&self) -> u64 {
773 self.hyperloglog.cardinality()
774 }
775
776 pub fn event_frequency(&self, event: &StreamEvent) -> u64 {
778 let event_id = self.event_id(event);
779 self.count_min.estimate(&event_id)
780 }
781
782 pub fn likely_seen(&self, event: &StreamEvent) -> bool {
784 let event_id = self.event_id(event);
785 self.bloom.contains(&event_id)
786 }
787
788 pub fn quantile(&mut self, q: f64) -> Option<f64> {
790 self.tdigest.quantile(q)
791 }
792
793 pub fn stats(&self) -> SamplingManagerStats {
795 SamplingManagerStats {
796 event_count: self.event_count,
797 reservoir_stats: self.reservoir.stats(),
798 stratified_stats: self.stratified.as_ref().map(|s| s.stats()),
799 hyperloglog_stats: self.hyperloglog.stats(),
800 count_min_stats: self.count_min.stats(),
801 tdigest_stats: self.tdigest.stats(),
802 bloom_stats: self.bloom.stats(),
803 }
804 }
805
806 fn event_id(&self, event: &StreamEvent) -> String {
808 match event {
809 StreamEvent::TripleAdded {
810 subject,
811 predicate,
812 object,
813 ..
814 } => format!("{}-{}-{}", subject, predicate, object),
815 StreamEvent::TripleRemoved {
816 subject,
817 predicate,
818 object,
819 ..
820 } => format!("{}-{}-{}", subject, predicate, object),
821 StreamEvent::GraphCreated { graph, .. } => format!("graph-{}", graph),
822 StreamEvent::GraphDeleted { graph, .. } => format!("graph-{}", graph),
823 _ => "unknown".to_string(),
824 }
825 }
826
827 fn extract_numeric_value(&self, event: &StreamEvent) -> Option<f64> {
829 match event {
830 StreamEvent::TripleAdded { metadata, .. }
831 | StreamEvent::TripleRemoved { metadata, .. } => {
832 Some(metadata.timestamp.timestamp() as f64)
833 }
834 _ => None,
835 }
836 }
837}
838
839#[derive(Debug, Clone, Serialize, Deserialize)]
841pub struct SamplingManagerStats {
842 pub event_count: u64,
843 pub reservoir_stats: ReservoirStats,
844 pub stratified_stats: Option<StratifiedStats>,
845 pub hyperloglog_stats: HyperLogLogStats,
846 pub count_min_stats: CountMinSketchStats,
847 pub tdigest_stats: TDigestStats,
848 pub bloom_stats: BloomFilterStats,
849}
850
851#[cfg(test)]
852mod tests {
853 use super::*;
854 use crate::EventMetadata;
855 use std::collections::HashMap;
856
857 fn create_test_event(id: &str) -> StreamEvent {
858 StreamEvent::TripleAdded {
859 subject: format!("http://example.org/{}", id),
860 predicate: "http://example.org/prop".to_string(),
861 object: "value".to_string(),
862 graph: None,
863 metadata: EventMetadata {
864 event_id: id.to_string(),
865 timestamp: chrono::Utc::now(),
866 source: "test".to_string(),
867 user: None,
868 context: None,
869 caused_by: None,
870 version: "1.0".to_string(),
871 properties: HashMap::new(),
872 checksum: None,
873 },
874 }
875 }
876
877 #[test]
878 fn test_reservoir_sampler() {
879 let mut sampler = ReservoirSampler::new(10);
880
881 for i in 0..100 {
883 sampler.add(create_test_event(&format!("event-{}", i)));
884 }
885
886 let stats = sampler.stats();
887 assert_eq!(stats.capacity, 10);
888 assert_eq!(stats.current_size, 10);
889 assert_eq!(stats.total_events, 100);
890 assert_eq!(stats.sampling_rate, 0.1);
891 }
892
893 #[test]
894 fn test_stratified_sampler() {
895 fn category_extractor(event: &StreamEvent) -> Option<String> {
896 match event {
897 StreamEvent::TripleAdded { metadata, .. } => Some(metadata.source.clone()),
898 _ => None,
899 }
900 }
901
902 let mut sampler = StratifiedSampler::new(10, category_extractor);
903 sampler.set_category_rate("source1".to_string(), 0.5);
904 sampler.set_category_rate("source2".to_string(), 1.0);
905
906 for i in 0..50 {
908 let mut event = create_test_event(&format!("event-{}", i));
909 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
910 metadata.source = if i < 25 {
911 "source1".to_string()
912 } else {
913 "source2".to_string()
914 };
915 }
916 sampler.add(event);
917 }
918
919 let stats = sampler.stats();
920 assert_eq!(stats.category_count, 2);
921 assert!(stats.category_stats.contains_key("source1"));
922 assert!(stats.category_stats.contains_key("source2"));
923 }
924
925 #[test]
926 fn test_hyperloglog() {
927 let mut hll = HyperLogLog::new(14);
928
929 for i in 0..1000 {
931 hll.add(&format!("element-{}", i));
932 }
933
934 let cardinality = hll.cardinality();
935
936 let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
938 assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
939 }
940
941 #[test]
942 fn test_count_min_sketch() {
943 let mut cms = CountMinSketch::new(4, 1000);
944
945 for _ in 0..100 {
947 cms.add(&"frequent", 1);
948 }
949 for _ in 0..10 {
950 cms.add(&"rare", 1);
951 }
952
953 let freq_frequent = cms.estimate(&"frequent");
954 let freq_rare = cms.estimate(&"rare");
955
956 assert!(freq_frequent >= 100);
957 assert!(freq_rare >= 10);
958 assert!(freq_frequent > freq_rare);
959 }
960
961 #[test]
962 fn test_tdigest() {
963 let mut digest = TDigest::new(0.01);
964
965 for i in 1..=1000 {
967 digest.add(i as f64, 1.0);
968 }
969
970 let median = digest.quantile(0.5).unwrap();
972 assert!((median - 500.0).abs() < 50.0, "Median: {}", median);
973
974 let p90 = digest.quantile(0.9).unwrap();
976 assert!((p90 - 900.0).abs() < 100.0, "P90: {}", p90);
977 }
978
979 #[test]
980 fn test_bloom_filter() {
981 let mut bloom = BloomFilter::optimal(1000, 0.01);
982
983 for i in 0..500 {
985 bloom.add(&format!("element-{}", i));
986 }
987
988 for i in 0..500 {
990 assert!(bloom.contains(&format!("element-{}", i)));
991 }
992
993 let mut false_positives = 0;
995 for i in 1000..2000 {
996 if bloom.contains(&format!("element-{}", i)) {
997 false_positives += 1;
998 }
999 }
1000
1001 let fpr = false_positives as f64 / 1000.0;
1002 assert!(fpr < 0.05, "False positive rate too high: {}", fpr);
1003 }
1004
1005 #[test]
1006 fn test_sampling_manager() {
1007 let config = SamplingConfig::default();
1008 let mut manager = AdvancedSamplingManager::new(config);
1009
1010 for i in 0..100 {
1012 let event = create_test_event(&format!("event-{}", i));
1013 manager.process_event(event).unwrap();
1014 }
1015
1016 let stats = manager.stats();
1017 assert_eq!(stats.event_count, 100);
1018 assert!(stats.reservoir_stats.current_size > 0);
1019 assert!(stats.hyperloglog_stats.estimated_cardinality > 0);
1020 assert!(stats.count_min_stats.total_count > 0);
1021 }
1022
1023 #[test]
1024 fn test_hyperloglog_merge() {
1025 let mut hll1 = HyperLogLog::new(14);
1026 let mut hll2 = HyperLogLog::new(14);
1027
1028 for i in 0..500 {
1030 hll1.add(&format!("element-{}", i));
1031 }
1032 for i in 500..1000 {
1033 hll2.add(&format!("element-{}", i));
1034 }
1035
1036 hll1.merge(&hll2);
1038
1039 let cardinality = hll1.cardinality();
1040
1041 let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
1043 assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
1044 }
1045
1046 #[test]
1047 fn test_bloom_filter_optimal() {
1048 let bloom = BloomFilter::optimal(10000, 0.01);
1049 let stats = bloom.stats();
1050
1051 assert!(stats.size_bits > 0);
1053 assert!(stats.hash_count > 0);
1054 }
1055
1056 #[test]
1057 fn test_sampling_manager_with_stratified() {
1058 fn category_extractor(event: &StreamEvent) -> Option<String> {
1059 match event {
1060 StreamEvent::TripleAdded { subject, .. } => {
1061 if subject.contains("type1") {
1062 Some("type1".to_string())
1063 } else if subject.contains("type2") {
1064 Some("type2".to_string())
1065 } else {
1066 None
1067 }
1068 }
1069 _ => None,
1070 }
1071 }
1072
1073 let config = SamplingConfig::default();
1074 let mut manager = AdvancedSamplingManager::new(config);
1075 manager.enable_stratified(category_extractor);
1076
1077 for i in 0..50 {
1079 let event = StreamEvent::TripleAdded {
1080 subject: format!("http://example.org/type1/{}", i),
1081 predicate: "http://example.org/prop".to_string(),
1082 object: "value".to_string(),
1083 graph: None,
1084 metadata: EventMetadata {
1085 event_id: format!("event-{}", i),
1086 timestamp: chrono::Utc::now(),
1087 source: "test".to_string(),
1088 user: None,
1089 context: None,
1090 caused_by: None,
1091 version: "1.0".to_string(),
1092 properties: HashMap::new(),
1093 checksum: None,
1094 },
1095 };
1096 manager.process_event(event).unwrap();
1097 }
1098
1099 for i in 50..100 {
1100 let event = StreamEvent::TripleAdded {
1101 subject: format!("http://example.org/type2/{}", i),
1102 predicate: "http://example.org/prop".to_string(),
1103 object: "value".to_string(),
1104 graph: None,
1105 metadata: EventMetadata {
1106 event_id: format!("event-{}", i),
1107 timestamp: chrono::Utc::now(),
1108 source: "test".to_string(),
1109 user: None,
1110 context: None,
1111 caused_by: None,
1112 version: "1.0".to_string(),
1113 properties: HashMap::new(),
1114 checksum: None,
1115 },
1116 };
1117 manager.process_event(event).unwrap();
1118 }
1119
1120 let stats = manager.stats();
1121 assert_eq!(stats.event_count, 100);
1122 assert!(stats.stratified_stats.is_some());
1123
1124 let stratified = stats.stratified_stats.unwrap();
1125 assert_eq!(stratified.category_count, 2);
1126 }
1127}