1use crate::StreamEvent;
24use anyhow::Result;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::hash::{Hash, Hasher};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct SamplingConfig {
32 pub reservoir_size: usize,
34 pub cms_hash_count: usize,
36 pub cms_width: usize,
38 pub hll_precision: u8,
40 pub tdigest_delta: f64,
42 pub bloom_filter_bits: usize,
44 pub bloom_filter_hashes: usize,
46 pub stratified_categories: Vec<String>,
48 pub stratified_sample_rates: HashMap<String, f64>,
50}
51
52impl Default for SamplingConfig {
53 fn default() -> Self {
54 Self {
55 reservoir_size: 1000,
56 cms_hash_count: 4,
57 cms_width: 10000,
58 hll_precision: 14, tdigest_delta: 0.01,
60 bloom_filter_bits: 100000,
61 bloom_filter_hashes: 7,
62 stratified_categories: Vec::new(),
63 stratified_sample_rates: HashMap::new(),
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
72pub struct ReservoirSampler {
73 reservoir: Vec<StreamEvent>,
74 capacity: usize,
75 count: u64,
76}
77
78impl ReservoirSampler {
79 pub fn new(capacity: usize) -> Self {
84 Self {
85 reservoir: Vec::with_capacity(capacity),
86 capacity,
87 count: 0,
88 }
89 }
90
91 pub fn add(&mut self, event: StreamEvent) {
95 self.count += 1;
96
97 if self.reservoir.len() < self.capacity {
98 self.reservoir.push(event);
100 } else {
101 let j = (fastrand::f64() * self.count as f64) as usize;
103 if j < self.capacity {
104 self.reservoir[j] = event;
105 }
106 }
107 }
108
109 pub fn sample(&self) -> &[StreamEvent] {
111 &self.reservoir
112 }
113
114 pub fn count(&self) -> u64 {
116 self.count
117 }
118
119 pub fn clear(&mut self) {
121 self.reservoir.clear();
122 self.count = 0;
123 }
124
125 pub fn stats(&self) -> ReservoirStats {
127 ReservoirStats {
128 capacity: self.capacity,
129 current_size: self.reservoir.len(),
130 total_events: self.count,
131 sampling_rate: if self.count > 0 {
132 self.reservoir.len() as f64 / self.count as f64
133 } else {
134 0.0
135 },
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ReservoirStats {
143 pub capacity: usize,
144 pub current_size: usize,
145 pub total_events: u64,
146 pub sampling_rate: f64,
147}
148
149#[derive(Debug, Clone)]
154pub struct StratifiedSampler {
155 samplers: HashMap<String, ReservoirSampler>,
156 sample_rates: HashMap<String, f64>,
157 default_capacity: usize,
158 category_extractor: fn(&StreamEvent) -> Option<String>,
159}
160
161impl StratifiedSampler {
162 pub fn new(
168 default_capacity: usize,
169 category_extractor: fn(&StreamEvent) -> Option<String>,
170 ) -> Self {
171 Self {
172 samplers: HashMap::new(),
173 sample_rates: HashMap::new(),
174 default_capacity,
175 category_extractor,
176 }
177 }
178
179 pub fn set_category_rate(&mut self, category: String, rate: f64) {
181 assert!((0.0..=1.0).contains(&rate), "Rate must be in [0, 1]");
182 self.sample_rates.insert(category, rate);
183 }
184
185 pub fn add(&mut self, event: StreamEvent) {
187 if let Some(category) = (self.category_extractor)(&event) {
188 let rate = self.sample_rates.get(&category).copied().unwrap_or(1.0);
190
191 if rate <= 0.0 {
192 return; }
194
195 let sampler = self.samplers.entry(category.clone()).or_insert_with(|| {
197 let capacity = (self.default_capacity as f64 * rate) as usize;
198 ReservoirSampler::new(capacity.max(1))
199 });
200
201 sampler.add(event);
202 }
203 }
204
205 pub fn category_sample(&self, category: &str) -> Option<&[StreamEvent]> {
207 self.samplers.get(category).map(|s| s.sample())
208 }
209
210 pub fn all_samples(&self) -> HashMap<String, Vec<StreamEvent>> {
212 self.samplers
213 .iter()
214 .map(|(cat, sampler)| (cat.clone(), sampler.sample().to_vec()))
215 .collect()
216 }
217
218 pub fn stats(&self) -> StratifiedStats {
220 let category_stats = self
221 .samplers
222 .iter()
223 .map(|(cat, sampler)| (cat.clone(), sampler.stats()))
224 .collect();
225
226 StratifiedStats {
227 category_count: self.samplers.len(),
228 category_stats,
229 }
230 }
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct StratifiedStats {
236 pub category_count: usize,
237 pub category_stats: HashMap<String, ReservoirStats>,
238}
239
240#[derive(Debug, Clone)]
245pub struct HyperLogLog {
246 registers: Vec<u8>,
247 precision: u8,
248 alpha: f64,
249}
250
251impl HyperLogLog {
252 pub fn new(precision: u8) -> Self {
258 assert!(
259 (4..=16).contains(&precision),
260 "Precision must be between 4 and 16"
261 );
262
263 let m = 1 << precision; let alpha = match m {
267 16 => 0.673,
268 32 => 0.697,
269 64 => 0.709,
270 _ => 0.7213 / (1.0 + 1.079 / m as f64),
271 };
272
273 Self {
274 registers: vec![0; m],
275 precision,
276 alpha,
277 }
278 }
279
280 pub fn add<T: Hash>(&mut self, element: &T) {
282 let hash = self.hash(element);
283
284 let idx = (hash >> (64 - self.precision)) as usize;
286
287 let remaining = hash << self.precision;
289 let leading_zeros = remaining.leading_zeros() as u8 + 1;
290
291 self.registers[idx] = self.registers[idx].max(leading_zeros);
293 }
294
295 pub fn cardinality(&self) -> u64 {
297 let m = self.registers.len() as f64;
298
299 let raw_estimate = self.alpha * m * m
301 / self
302 .registers
303 .iter()
304 .map(|&r| 2.0_f64.powi(-(r as i32)))
305 .sum::<f64>();
306
307 if raw_estimate <= 5.0 * m {
309 let zeros = self.registers.iter().filter(|&&r| r == 0).count() as f64;
311 if zeros > 0.0 {
312 return (m * (m / zeros).ln()) as u64;
313 }
314 }
315
316 if raw_estimate <= (1.0 / 30.0) * (1u64 << 32) as f64 {
317 raw_estimate as u64
319 } else {
320 let two_32 = (1u64 << 32) as f64;
322 (-(two_32) * ((1.0 - raw_estimate / two_32).ln())) as u64
323 }
324 }
325
326 pub fn merge(&mut self, other: &HyperLogLog) {
328 assert_eq!(
329 self.precision, other.precision,
330 "Cannot merge HyperLogLogs with different precisions"
331 );
332
333 for (i, &other_val) in other.registers.iter().enumerate() {
334 self.registers[i] = self.registers[i].max(other_val);
335 }
336 }
337
338 fn hash<T: Hash>(&self, element: &T) -> u64 {
340 use std::collections::hash_map::DefaultHasher;
341 let mut hasher = DefaultHasher::new();
342 element.hash(&mut hasher);
343 hasher.finish()
344 }
345
346 pub fn stats(&self) -> HyperLogLogStats {
348 HyperLogLogStats {
349 precision: self.precision,
350 register_count: self.registers.len(),
351 estimated_cardinality: self.cardinality(),
352 memory_bytes: self.registers.len(),
353 }
354 }
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct HyperLogLogStats {
360 pub precision: u8,
361 pub register_count: usize,
362 pub estimated_cardinality: u64,
363 pub memory_bytes: usize,
364}
365
366#[derive(Debug, Clone)]
371pub struct CountMinSketch {
372 table: Vec<Vec<u64>>,
373 hash_count: usize,
374 width: usize,
375 total_count: u64,
376}
377
378impl CountMinSketch {
379 pub fn new(hash_count: usize, width: usize) -> Self {
387 Self {
388 table: vec![vec![0; width]; hash_count],
389 hash_count,
390 width,
391 total_count: 0,
392 }
393 }
394
395 pub fn add<T: Hash>(&mut self, element: &T, count: u64) {
397 self.total_count += count;
398
399 for i in 0..self.hash_count {
400 let idx = self.hash_i(element, i) % self.width;
401 self.table[i][idx] += count;
402 }
403 }
404
405 pub fn estimate<T: Hash>(&self, element: &T) -> u64 {
407 (0..self.hash_count)
408 .map(|i| {
409 let idx = self.hash_i(element, i) % self.width;
410 self.table[i][idx]
411 })
412 .min()
413 .unwrap_or(0)
414 }
415
416 pub fn total_count(&self) -> u64 {
418 self.total_count
419 }
420
421 fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
423 use std::collections::hash_map::DefaultHasher;
424 let mut hasher = DefaultHasher::new();
425 element.hash(&mut hasher);
426 i.hash(&mut hasher);
427 hasher.finish() as usize
428 }
429
430 pub fn stats(&self) -> CountMinSketchStats {
432 CountMinSketchStats {
433 hash_count: self.hash_count,
434 width: self.width,
435 total_count: self.total_count,
436 memory_bytes: self.hash_count * self.width * std::mem::size_of::<u64>(),
437 }
438 }
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize)]
443pub struct CountMinSketchStats {
444 pub hash_count: usize,
445 pub width: usize,
446 pub total_count: u64,
447 pub memory_bytes: usize,
448}
449
450#[derive(Debug, Clone)]
455pub struct TDigest {
456 centroids: Vec<Centroid>,
457 delta: f64,
458 total_weight: f64,
459 max_size: usize,
460}
461
462#[derive(Debug, Clone, Copy)]
463struct Centroid {
464 mean: f64,
465 weight: f64,
466}
467
468impl TDigest {
469 pub fn new(delta: f64) -> Self {
474 Self {
475 centroids: Vec::new(),
476 delta,
477 total_weight: 0.0,
478 max_size: (1.0 / delta) as usize,
479 }
480 }
481
482 pub fn add(&mut self, value: f64, weight: f64) {
484 self.centroids.push(Centroid {
485 mean: value,
486 weight,
487 });
488 self.total_weight += weight;
489
490 if self.centroids.len() > self.max_size {
492 self.compress();
493 }
494 }
495
496 pub fn quantile(&mut self, q: f64) -> Option<f64> {
498 if self.centroids.is_empty() {
499 return None;
500 }
501
502 if self.centroids.len() > 1 {
503 self.compress();
504 }
505
506 let index = q * self.total_weight;
507 let mut sum = 0.0;
508
509 for centroid in &self.centroids {
510 sum += centroid.weight;
511 if sum >= index {
512 return Some(centroid.mean);
513 }
514 }
515
516 self.centroids.last().map(|c| c.mean)
517 }
518
519 fn compress(&mut self) {
521 if self.centroids.is_empty() {
522 return;
523 }
524
525 self.centroids
527 .sort_by(|a, b| a.mean.partial_cmp(&b.mean).unwrap());
528
529 let mut compressed = Vec::new();
530 let mut current = self.centroids[0];
531
532 for ¢roid in &self.centroids[1..] {
533 let q = (current.weight + centroid.weight) / self.total_weight;
535 let k = self.k_limit(q);
536
537 if current.weight + centroid.weight <= k {
538 let total_weight = current.weight + centroid.weight;
540 current.mean = (current.mean * current.weight + centroid.mean * centroid.weight)
541 / total_weight;
542 current.weight = total_weight;
543 } else {
544 compressed.push(current);
545 current = centroid;
546 }
547 }
548 compressed.push(current);
549
550 self.centroids = compressed;
551 }
552
553 fn k_limit(&self, q: f64) -> f64 {
555 4.0 * self.total_weight * self.delta * q * (1.0 - q)
556 }
557
558 pub fn stats(&self) -> TDigestStats {
560 TDigestStats {
561 centroid_count: self.centroids.len(),
562 total_weight: self.total_weight,
563 delta: self.delta,
564 max_size: self.max_size,
565 }
566 }
567}
568
569#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct TDigestStats {
572 pub centroid_count: usize,
573 pub total_weight: f64,
574 pub delta: f64,
575 pub max_size: usize,
576}
577
578#[derive(Debug, Clone)]
583pub struct BloomFilter {
584 bits: Vec<bool>,
585 hash_count: usize,
586 insert_count: u64,
587}
588
589impl BloomFilter {
590 pub fn new(size: usize, hash_count: usize) -> Self {
598 Self {
599 bits: vec![false; size],
600 hash_count,
601 insert_count: 0,
602 }
603 }
604
605 pub fn optimal(expected_items: usize, false_positive_rate: f64) -> Self {
607 let bits = Self::optimal_bits(expected_items, false_positive_rate);
608 let hash_count = Self::optimal_hash_count(bits, expected_items);
609 Self::new(bits, hash_count)
610 }
611
612 fn optimal_bits(n: usize, p: f64) -> usize {
614 let numerator = -(n as f64 * p.ln());
615 let denominator = 2.0_f64.ln().powi(2);
616 (numerator / denominator).ceil() as usize
617 }
618
619 fn optimal_hash_count(m: usize, n: usize) -> usize {
621 ((m as f64 / n as f64) * 2.0_f64.ln()).ceil() as usize
622 }
623
624 pub fn add<T: Hash>(&mut self, element: &T) {
626 self.insert_count += 1;
627 for i in 0..self.hash_count {
628 let idx = self.hash_i(element, i) % self.bits.len();
629 self.bits[idx] = true;
630 }
631 }
632
633 pub fn contains<T: Hash>(&self, element: &T) -> bool {
635 (0..self.hash_count).all(|i| {
636 let idx = self.hash_i(element, i) % self.bits.len();
637 self.bits[idx]
638 })
639 }
640
641 fn hash_i<T: Hash>(&self, element: &T, i: usize) -> usize {
643 use std::collections::hash_map::DefaultHasher;
644 let mut hasher = DefaultHasher::new();
645 element.hash(&mut hasher);
646 i.hash(&mut hasher);
647 hasher.finish() as usize
648 }
649
650 pub fn false_positive_rate(&self) -> f64 {
652 let set_bits = self.bits.iter().filter(|&&b| b).count() as f64;
653 let p = set_bits / self.bits.len() as f64;
654 p.powi(self.hash_count as i32)
655 }
656
657 pub fn stats(&self) -> BloomFilterStats {
659 let set_bits = self.bits.iter().filter(|&&b| b).count();
660
661 BloomFilterStats {
662 size_bits: self.bits.len(),
663 hash_count: self.hash_count,
664 insert_count: self.insert_count,
665 set_bits,
666 estimated_fpr: self.false_positive_rate(),
667 memory_bytes: self.bits.len() / 8,
668 }
669 }
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize)]
674pub struct BloomFilterStats {
675 pub size_bits: usize,
676 pub hash_count: usize,
677 pub insert_count: u64,
678 pub set_bits: usize,
679 pub estimated_fpr: f64,
680 pub memory_bytes: usize,
681}
682
683pub struct AdvancedSamplingManager {
685 config: SamplingConfig,
686 reservoir: ReservoirSampler,
687 stratified: Option<StratifiedSampler>,
688 hyperloglog: HyperLogLog,
689 count_min: CountMinSketch,
690 tdigest: TDigest,
691 bloom: BloomFilter,
692 event_count: u64,
693}
694
695impl AdvancedSamplingManager {
696 pub fn new(config: SamplingConfig) -> Self {
698 let reservoir = ReservoirSampler::new(config.reservoir_size);
699 let hyperloglog = HyperLogLog::new(config.hll_precision);
700 let count_min = CountMinSketch::new(config.cms_hash_count, config.cms_width);
701 let tdigest = TDigest::new(config.tdigest_delta);
702 let bloom = BloomFilter::new(config.bloom_filter_bits, config.bloom_filter_hashes);
703
704 Self {
705 config,
706 reservoir,
707 stratified: None,
708 hyperloglog,
709 count_min,
710 tdigest,
711 bloom,
712 event_count: 0,
713 }
714 }
715
716 pub fn enable_stratified(&mut self, extractor: fn(&StreamEvent) -> Option<String>) {
718 let mut sampler = StratifiedSampler::new(self.config.reservoir_size, extractor);
719
720 for (category, rate) in &self.config.stratified_sample_rates {
722 sampler.set_category_rate(category.clone(), *rate);
723 }
724
725 self.stratified = Some(sampler);
726 }
727
728 pub fn process_event(&mut self, event: StreamEvent) -> Result<()> {
730 self.event_count += 1;
731
732 self.reservoir.add(event.clone());
734
735 if let Some(ref mut stratified) = self.stratified {
737 stratified.add(event.clone());
738 }
739
740 let event_id = self.event_id(&event);
742 self.hyperloglog.add(&event_id);
743
744 self.count_min.add(&event_id, 1);
746
747 if let Some(value) = self.extract_numeric_value(&event) {
749 self.tdigest.add(value, 1.0);
750 }
751
752 self.bloom.add(&event_id);
754
755 Ok(())
756 }
757
758 pub fn reservoir_sample(&self) -> &[StreamEvent] {
760 self.reservoir.sample()
761 }
762
763 pub fn stratified_samples(&self) -> Option<HashMap<String, Vec<StreamEvent>>> {
765 self.stratified.as_ref().map(|s| s.all_samples())
766 }
767
768 pub fn distinct_count(&self) -> u64 {
770 self.hyperloglog.cardinality()
771 }
772
773 pub fn event_frequency(&self, event: &StreamEvent) -> u64 {
775 let event_id = self.event_id(event);
776 self.count_min.estimate(&event_id)
777 }
778
779 pub fn likely_seen(&self, event: &StreamEvent) -> bool {
781 let event_id = self.event_id(event);
782 self.bloom.contains(&event_id)
783 }
784
785 pub fn quantile(&mut self, q: f64) -> Option<f64> {
787 self.tdigest.quantile(q)
788 }
789
790 pub fn stats(&self) -> SamplingManagerStats {
792 SamplingManagerStats {
793 event_count: self.event_count,
794 reservoir_stats: self.reservoir.stats(),
795 stratified_stats: self.stratified.as_ref().map(|s| s.stats()),
796 hyperloglog_stats: self.hyperloglog.stats(),
797 count_min_stats: self.count_min.stats(),
798 tdigest_stats: self.tdigest.stats(),
799 bloom_stats: self.bloom.stats(),
800 }
801 }
802
803 fn event_id(&self, event: &StreamEvent) -> String {
805 match event {
806 StreamEvent::TripleAdded {
807 subject,
808 predicate,
809 object,
810 ..
811 } => format!("{}-{}-{}", subject, predicate, object),
812 StreamEvent::TripleRemoved {
813 subject,
814 predicate,
815 object,
816 ..
817 } => format!("{}-{}-{}", subject, predicate, object),
818 StreamEvent::GraphCreated { graph, .. } => format!("graph-{}", graph),
819 StreamEvent::GraphDeleted { graph, .. } => format!("graph-{}", graph),
820 _ => "unknown".to_string(),
821 }
822 }
823
824 fn extract_numeric_value(&self, event: &StreamEvent) -> Option<f64> {
826 match event {
827 StreamEvent::TripleAdded { metadata, .. }
828 | StreamEvent::TripleRemoved { metadata, .. } => {
829 Some(metadata.timestamp.timestamp() as f64)
830 }
831 _ => None,
832 }
833 }
834}
835
836#[derive(Debug, Clone, Serialize, Deserialize)]
838pub struct SamplingManagerStats {
839 pub event_count: u64,
840 pub reservoir_stats: ReservoirStats,
841 pub stratified_stats: Option<StratifiedStats>,
842 pub hyperloglog_stats: HyperLogLogStats,
843 pub count_min_stats: CountMinSketchStats,
844 pub tdigest_stats: TDigestStats,
845 pub bloom_stats: BloomFilterStats,
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::EventMetadata;
852 use std::collections::HashMap;
853
854 fn create_test_event(id: &str) -> StreamEvent {
855 StreamEvent::TripleAdded {
856 subject: format!("http://example.org/{}", id),
857 predicate: "http://example.org/prop".to_string(),
858 object: "value".to_string(),
859 graph: None,
860 metadata: EventMetadata {
861 event_id: id.to_string(),
862 timestamp: chrono::Utc::now(),
863 source: "test".to_string(),
864 user: None,
865 context: None,
866 caused_by: None,
867 version: "1.0".to_string(),
868 properties: HashMap::new(),
869 checksum: None,
870 },
871 }
872 }
873
874 #[test]
875 fn test_reservoir_sampler() {
876 let mut sampler = ReservoirSampler::new(10);
877
878 for i in 0..100 {
880 sampler.add(create_test_event(&format!("event-{}", i)));
881 }
882
883 let stats = sampler.stats();
884 assert_eq!(stats.capacity, 10);
885 assert_eq!(stats.current_size, 10);
886 assert_eq!(stats.total_events, 100);
887 assert_eq!(stats.sampling_rate, 0.1);
888 }
889
890 #[test]
891 fn test_stratified_sampler() {
892 fn category_extractor(event: &StreamEvent) -> Option<String> {
893 match event {
894 StreamEvent::TripleAdded { metadata, .. } => Some(metadata.source.clone()),
895 _ => None,
896 }
897 }
898
899 let mut sampler = StratifiedSampler::new(10, category_extractor);
900 sampler.set_category_rate("source1".to_string(), 0.5);
901 sampler.set_category_rate("source2".to_string(), 1.0);
902
903 for i in 0..50 {
905 let mut event = create_test_event(&format!("event-{}", i));
906 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
907 metadata.source = if i < 25 {
908 "source1".to_string()
909 } else {
910 "source2".to_string()
911 };
912 }
913 sampler.add(event);
914 }
915
916 let stats = sampler.stats();
917 assert_eq!(stats.category_count, 2);
918 assert!(stats.category_stats.contains_key("source1"));
919 assert!(stats.category_stats.contains_key("source2"));
920 }
921
922 #[test]
923 fn test_hyperloglog() {
924 let mut hll = HyperLogLog::new(14);
925
926 for i in 0..1000 {
928 hll.add(&format!("element-{}", i));
929 }
930
931 let cardinality = hll.cardinality();
932
933 let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
935 assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
936 }
937
938 #[test]
939 fn test_count_min_sketch() {
940 let mut cms = CountMinSketch::new(4, 1000);
941
942 for _ in 0..100 {
944 cms.add(&"frequent", 1);
945 }
946 for _ in 0..10 {
947 cms.add(&"rare", 1);
948 }
949
950 let freq_frequent = cms.estimate(&"frequent");
951 let freq_rare = cms.estimate(&"rare");
952
953 assert!(freq_frequent >= 100);
954 assert!(freq_rare >= 10);
955 assert!(freq_frequent > freq_rare);
956 }
957
958 #[test]
959 fn test_tdigest() {
960 let mut digest = TDigest::new(0.01);
961
962 for i in 1..=1000 {
964 digest.add(i as f64, 1.0);
965 }
966
967 let median = digest.quantile(0.5).unwrap();
969 assert!((median - 500.0).abs() < 50.0, "Median: {}", median);
970
971 let p90 = digest.quantile(0.9).unwrap();
973 assert!((p90 - 900.0).abs() < 100.0, "P90: {}", p90);
974 }
975
976 #[test]
977 fn test_bloom_filter() {
978 let mut bloom = BloomFilter::optimal(1000, 0.01);
979
980 for i in 0..500 {
982 bloom.add(&format!("element-{}", i));
983 }
984
985 for i in 0..500 {
987 assert!(bloom.contains(&format!("element-{}", i)));
988 }
989
990 let mut false_positives = 0;
992 for i in 1000..2000 {
993 if bloom.contains(&format!("element-{}", i)) {
994 false_positives += 1;
995 }
996 }
997
998 let fpr = false_positives as f64 / 1000.0;
999 assert!(fpr < 0.05, "False positive rate too high: {}", fpr);
1000 }
1001
1002 #[test]
1003 fn test_sampling_manager() {
1004 let config = SamplingConfig::default();
1005 let mut manager = AdvancedSamplingManager::new(config);
1006
1007 for i in 0..100 {
1009 let event = create_test_event(&format!("event-{}", i));
1010 manager.process_event(event).unwrap();
1011 }
1012
1013 let stats = manager.stats();
1014 assert_eq!(stats.event_count, 100);
1015 assert!(stats.reservoir_stats.current_size > 0);
1016 assert!(stats.hyperloglog_stats.estimated_cardinality > 0);
1017 assert!(stats.count_min_stats.total_count > 0);
1018 }
1019
1020 #[test]
1021 fn test_hyperloglog_merge() {
1022 let mut hll1 = HyperLogLog::new(14);
1023 let mut hll2 = HyperLogLog::new(14);
1024
1025 for i in 0..500 {
1027 hll1.add(&format!("element-{}", i));
1028 }
1029 for i in 500..1000 {
1030 hll2.add(&format!("element-{}", i));
1031 }
1032
1033 hll1.merge(&hll2);
1035
1036 let cardinality = hll1.cardinality();
1037
1038 let error = ((cardinality as i64 - 1000).abs() as f64) / 1000.0;
1040 assert!(error < 0.05, "Error: {}, Estimated: {}", error, cardinality);
1041 }
1042
1043 #[test]
1044 fn test_bloom_filter_optimal() {
1045 let bloom = BloomFilter::optimal(10000, 0.01);
1046 let stats = bloom.stats();
1047
1048 assert!(stats.size_bits > 0);
1050 assert!(stats.hash_count > 0);
1051 }
1052
1053 #[test]
1054 fn test_sampling_manager_with_stratified() {
1055 fn category_extractor(event: &StreamEvent) -> Option<String> {
1056 match event {
1057 StreamEvent::TripleAdded { subject, .. } => {
1058 if subject.contains("type1") {
1059 Some("type1".to_string())
1060 } else if subject.contains("type2") {
1061 Some("type2".to_string())
1062 } else {
1063 None
1064 }
1065 }
1066 _ => None,
1067 }
1068 }
1069
1070 let config = SamplingConfig::default();
1071 let mut manager = AdvancedSamplingManager::new(config);
1072 manager.enable_stratified(category_extractor);
1073
1074 for i in 0..50 {
1076 let event = StreamEvent::TripleAdded {
1077 subject: format!("http://example.org/type1/{}", i),
1078 predicate: "http://example.org/prop".to_string(),
1079 object: "value".to_string(),
1080 graph: None,
1081 metadata: EventMetadata {
1082 event_id: format!("event-{}", i),
1083 timestamp: chrono::Utc::now(),
1084 source: "test".to_string(),
1085 user: None,
1086 context: None,
1087 caused_by: None,
1088 version: "1.0".to_string(),
1089 properties: HashMap::new(),
1090 checksum: None,
1091 },
1092 };
1093 manager.process_event(event).unwrap();
1094 }
1095
1096 for i in 50..100 {
1097 let event = StreamEvent::TripleAdded {
1098 subject: format!("http://example.org/type2/{}", i),
1099 predicate: "http://example.org/prop".to_string(),
1100 object: "value".to_string(),
1101 graph: None,
1102 metadata: EventMetadata {
1103 event_id: format!("event-{}", i),
1104 timestamp: chrono::Utc::now(),
1105 source: "test".to_string(),
1106 user: None,
1107 context: None,
1108 caused_by: None,
1109 version: "1.0".to_string(),
1110 properties: HashMap::new(),
1111 checksum: None,
1112 },
1113 };
1114 manager.process_event(event).unwrap();
1115 }
1116
1117 let stats = manager.stats();
1118 assert_eq!(stats.event_count, 100);
1119 assert!(stats.stratified_stats.is_some());
1120
1121 let stratified = stats.stratified_stats.unwrap();
1122 assert_eq!(stratified.category_count, 2);
1123 }
1124}