1use std::collections::HashMap;
52use std::sync::Arc;
53
54use async_trait::async_trait;
55use chrono::{DateTime, Duration, Utc};
56use serde::{Deserialize, Serialize};
57use tracing::{debug, info, instrument, warn};
58
59use crate::analyzers::{AnalyzerContext, AnalyzerError, AnalyzerResult, MetricValue};
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct Anomaly {
64 pub metric_name: String,
66
67 pub current_value: MetricValue,
69
70 pub expected_value: Option<MetricValue>,
72
73 pub detection_strategy: String,
75
76 pub confidence: f64,
78
79 pub description: String,
81
82 pub detected_at: DateTime<Utc>,
84
85 pub metadata: HashMap<String, String>,
87}
88
89impl Anomaly {
90 pub fn new(
92 metric_name: String,
93 current_value: MetricValue,
94 detection_strategy: String,
95 confidence: f64,
96 description: String,
97 ) -> Self {
98 Self {
99 metric_name,
100 current_value,
101 expected_value: None,
102 detection_strategy,
103 confidence,
104 description,
105 detected_at: Utc::now(),
106 metadata: HashMap::new(),
107 }
108 }
109
110 pub fn with_expected_value(mut self, value: MetricValue) -> Self {
112 self.expected_value = Some(value);
113 self
114 }
115
116 pub fn with_metadata(mut self, key: String, value: String) -> Self {
118 self.metadata.insert(key, value);
119 self
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct MetricDataPoint {
126 pub value: MetricValue,
128
129 pub timestamp: DateTime<Utc>,
131
132 pub metadata: HashMap<String, String>,
134}
135
136#[async_trait]
138pub trait AnomalyDetector: Send + Sync {
139 async fn detect(
149 &self,
150 metric_name: &str,
151 current_value: &MetricValue,
152 history: &[MetricDataPoint],
153 ) -> AnalyzerResult<Option<Anomaly>>;
154
155 fn name(&self) -> &str;
157
158 fn description(&self) -> &str;
160}
161
162#[async_trait]
164pub trait MetricsRepository: Send + Sync {
165 async fn store_metric(
167 &self,
168 metric_name: &str,
169 value: MetricValue,
170 timestamp: DateTime<Utc>,
171 ) -> AnalyzerResult<()>;
172
173 async fn get_metric_history(
181 &self,
182 metric_name: &str,
183 since: Option<DateTime<Utc>>,
184 until: Option<DateTime<Utc>>,
185 limit: Option<usize>,
186 ) -> AnalyzerResult<Vec<MetricDataPoint>>;
187
188 async fn store_context(&self, context: &AnalyzerContext) -> AnalyzerResult<()> {
190 let timestamp = Utc::now();
191 for (metric_name, value) in context.all_metrics() {
192 self.store_metric(metric_name, value.clone(), timestamp)
193 .await?;
194 }
195 Ok(())
196 }
197}
198
199#[derive(Debug, Clone)]
201pub struct InMemoryMetricsConfig {
202 pub max_points_per_metric: usize,
204 pub max_metrics: usize,
206 pub max_age_seconds: i64,
208}
209
210impl Default for InMemoryMetricsConfig {
211 fn default() -> Self {
212 Self {
213 max_points_per_metric: 10_000,
214 max_metrics: 1_000,
215 max_age_seconds: 30 * 24 * 60 * 60, }
217 }
218}
219
220#[derive(Clone)]
225pub struct InMemoryMetricsRepository {
226 data: Arc<tokio::sync::RwLock<HashMap<String, Vec<MetricDataPoint>>>>,
227 config: InMemoryMetricsConfig,
228}
229
230impl InMemoryMetricsRepository {
231 pub fn new() -> Self {
233 Self::with_config(InMemoryMetricsConfig::default())
234 }
235
236 pub fn with_config(config: InMemoryMetricsConfig) -> Self {
238 Self {
239 data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
240 config,
241 }
242 }
243
244 pub async fn memory_stats(&self) -> MemoryStats {
246 let data = self.data.read().await;
247 let mut total_points = 0;
248 let mut oldest_timestamp = None;
249 let mut newest_timestamp = None;
250
251 for history in data.values() {
252 total_points += history.len();
253 for point in history {
254 match oldest_timestamp {
255 None => oldest_timestamp = Some(point.timestamp),
256 Some(oldest) if point.timestamp < oldest => {
257 oldest_timestamp = Some(point.timestamp)
258 }
259 _ => {}
260 }
261 match newest_timestamp {
262 None => newest_timestamp = Some(point.timestamp),
263 Some(newest) if point.timestamp > newest => {
264 newest_timestamp = Some(point.timestamp)
265 }
266 _ => {}
267 }
268 }
269 }
270
271 MemoryStats {
272 total_metrics: data.len(),
273 total_data_points: total_points,
274 oldest_data_point: oldest_timestamp,
275 newest_data_point: newest_timestamp,
276 estimated_memory_bytes: Self::estimate_memory_usage(&data),
277 }
278 }
279
280 fn estimate_memory_usage(data: &HashMap<String, Vec<MetricDataPoint>>) -> usize {
282 let mut size = std::mem::size_of::<HashMap<String, Vec<MetricDataPoint>>>();
283
284 for (key, values) in data {
285 size += std::mem::size_of::<String>() + key.len();
286 size += std::mem::size_of::<Vec<MetricDataPoint>>();
287 size += values.len() * std::mem::size_of::<MetricDataPoint>();
288
289 for point in values {
291 for (k, v) in &point.metadata {
292 size += std::mem::size_of::<String>() * 2 + k.len() + v.len();
293 }
294 }
295 }
296
297 size
298 }
299
300 async fn cleanup_if_needed(&self) {
302 let mut data = self.data.write().await;
303
304 if data.len() > self.config.max_metrics {
306 warn!(
307 current_metrics = data.len(),
308 max_metrics = self.config.max_metrics,
309 "Metrics limit exceeded, removing oldest metrics"
310 );
311
312 let mut metrics_by_latest: Vec<_> = data
314 .iter()
315 .map(|(name, points)| {
316 let latest = points.iter().map(|p| p.timestamp).max().unwrap_or_default();
317 (name.clone(), latest)
318 })
319 .collect();
320
321 metrics_by_latest.sort_by(|a, b| b.1.cmp(&a.1)); let to_remove = metrics_by_latest.len() - self.config.max_metrics;
325 for (metric_name, _) in metrics_by_latest.iter().skip(self.config.max_metrics) {
326 data.remove(metric_name);
327 }
328
329 info!(
330 removed_metrics = to_remove,
331 remaining_metrics = data.len(),
332 "Cleaned up old metrics"
333 );
334 }
335
336 let cutoff_time = Utc::now() - Duration::seconds(self.config.max_age_seconds);
338 let mut total_points_removed = 0;
339
340 for (metric_name, points) in data.iter_mut() {
341 let original_len = points.len();
342
343 points.retain(|p| p.timestamp >= cutoff_time);
345
346 if points.len() > self.config.max_points_per_metric {
348 points.sort_by_key(|p| p.timestamp);
349 let to_keep = points.len() - self.config.max_points_per_metric;
350 points.drain(0..to_keep);
351 }
352
353 let removed = original_len - points.len();
354 if removed > 0 {
355 total_points_removed += removed;
356 debug!(
357 metric = metric_name,
358 removed_points = removed,
359 remaining_points = points.len(),
360 "Cleaned up old data points"
361 );
362 }
363 }
364
365 if total_points_removed > 0 {
366 info!(
367 total_removed = total_points_removed,
368 "Completed data point cleanup"
369 );
370 }
371 }
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct MemoryStats {
377 pub total_metrics: usize,
379 pub total_data_points: usize,
381 pub oldest_data_point: Option<DateTime<Utc>>,
383 pub newest_data_point: Option<DateTime<Utc>>,
385 pub estimated_memory_bytes: usize,
387}
388
389impl Default for InMemoryMetricsRepository {
390 fn default() -> Self {
391 Self::new()
392 }
393}
394
395#[async_trait]
396impl MetricsRepository for InMemoryMetricsRepository {
397 async fn store_metric(
398 &self,
399 metric_name: &str,
400 value: MetricValue,
401 timestamp: DateTime<Utc>,
402 ) -> AnalyzerResult<()> {
403 {
405 let data = self.data.read().await;
406 if data.len() >= self.config.max_metrics && !data.contains_key(metric_name) {
407 return Err(AnalyzerError::Custom(format!(
408 "Maximum metrics limit ({}) exceeded",
409 self.config.max_metrics
410 )));
411 }
412
413 if let Some(existing) = data.get(metric_name) {
414 if existing.len() >= self.config.max_points_per_metric {
415 warn!(
417 metric = metric_name,
418 current_points = existing.len(),
419 max_points = self.config.max_points_per_metric,
420 "Metric approaching memory limit"
421 );
422 }
423 }
424 }
425
426 {
428 let mut data = self.data.write().await;
429 let entry = data.entry(metric_name.to_string()).or_insert_with(Vec::new);
430 entry.push(MetricDataPoint {
431 value,
432 timestamp,
433 metadata: HashMap::new(),
434 });
435 entry.sort_by_key(|dp| dp.timestamp);
437 }
438
439 self.cleanup_if_needed().await;
441
442 Ok(())
443 }
444
445 async fn get_metric_history(
446 &self,
447 metric_name: &str,
448 since: Option<DateTime<Utc>>,
449 until: Option<DateTime<Utc>>,
450 limit: Option<usize>,
451 ) -> AnalyzerResult<Vec<MetricDataPoint>> {
452 let data = self.data.read().await;
453
454 if let Some(history) = data.get(metric_name) {
455 let mut filtered: Vec<_> = history
456 .iter()
457 .filter(|dp| {
458 let after_since = since.map_or(true, |s| dp.timestamp >= s);
459 let before_until = until.map_or(true, |u| dp.timestamp <= u);
460 after_since && before_until
461 })
462 .cloned()
463 .collect();
464
465 if let Some(limit) = limit {
467 filtered.truncate(limit);
468 }
469
470 Ok(filtered)
471 } else {
472 Ok(Vec::new())
473 }
474 }
475}
476
477pub struct RelativeRateOfChangeDetector {
479 pub max_rate_of_change: f64,
481
482 pub min_history_size: usize,
484}
485
486impl RelativeRateOfChangeDetector {
487 pub fn new(max_rate_of_change: f64) -> Self {
492 Self {
493 max_rate_of_change,
494 min_history_size: 2,
495 }
496 }
497
498 pub fn with_min_history_size(mut self, size: usize) -> Self {
500 self.min_history_size = size;
501 self
502 }
503}
504
505#[async_trait]
506impl AnomalyDetector for RelativeRateOfChangeDetector {
507 async fn detect(
508 &self,
509 metric_name: &str,
510 current_value: &MetricValue,
511 history: &[MetricDataPoint],
512 ) -> AnalyzerResult<Option<Anomaly>> {
513 if history.len() < self.min_history_size {
514 debug!(
515 metric = metric_name,
516 history_size = history.len(),
517 required = self.min_history_size,
518 "Insufficient history for rate of change detection"
519 );
520 return Ok(None);
521 }
522
523 let previous = history.last().unwrap();
525
526 debug!(
527 metric = metric_name,
528 current = ?current_value,
529 previous = ?previous.value,
530 "Comparing values for rate of change"
531 );
532
533 match (current_value, &previous.value) {
535 (MetricValue::Long(current), MetricValue::Long(previous)) => {
536 if *previous == 0 {
537 return Ok(None); }
539
540 let rate_of_change = ((*current - *previous) as f64).abs() / (*previous as f64);
541
542 debug!(
543 metric = metric_name,
544 rate_of_change = rate_of_change,
545 threshold = self.max_rate_of_change,
546 "Calculated rate of change"
547 );
548
549 if rate_of_change > self.max_rate_of_change {
550 let anomaly = Anomaly::new(
551 metric_name.to_string(),
552 current_value.clone(),
553 self.name().to_string(),
554 rate_of_change / self.max_rate_of_change, format!(
556 "Relative change of {:.1}% exceeds threshold of {:.1}%",
557 rate_of_change * 100.0,
558 self.max_rate_of_change * 100.0
559 ),
560 )
561 .with_expected_value(MetricValue::Long(*previous))
562 .with_metadata("rate_of_change".to_string(), format!("{rate_of_change:.4}"));
563
564 return Ok(Some(anomaly));
565 }
566 }
567 (MetricValue::Double(current), MetricValue::Double(previous)) => {
568 if *previous == 0.0 {
569 return Ok(None); }
571
572 let rate_of_change = ((current - previous).abs()) / previous.abs();
573
574 if rate_of_change > self.max_rate_of_change {
575 let anomaly = Anomaly::new(
576 metric_name.to_string(),
577 current_value.clone(),
578 self.name().to_string(),
579 rate_of_change / self.max_rate_of_change, format!(
581 "Relative change of {:.1}% exceeds threshold of {:.1}%",
582 rate_of_change * 100.0,
583 self.max_rate_of_change * 100.0
584 ),
585 )
586 .with_expected_value(MetricValue::Double(*previous))
587 .with_metadata("rate_of_change".to_string(), format!("{rate_of_change:.4}"));
588
589 return Ok(Some(anomaly));
590 }
591 }
592 _ => {
593 return Ok(None);
595 }
596 }
597
598 Ok(None)
599 }
600
601 fn name(&self) -> &str {
602 "RelativeRateOfChange"
603 }
604
605 fn description(&self) -> &str {
606 "Detects anomalies when the relative rate of change exceeds a threshold"
607 }
608}
609
610pub struct AbsoluteChangeDetector {
612 pub max_absolute_change: f64,
614
615 pub min_history_size: usize,
617}
618
619impl AbsoluteChangeDetector {
620 pub fn new(max_absolute_change: f64) -> Self {
625 Self {
626 max_absolute_change,
627 min_history_size: 1,
628 }
629 }
630
631 pub fn with_min_history_size(mut self, size: usize) -> Self {
633 self.min_history_size = size;
634 self
635 }
636}
637
638#[async_trait]
639impl AnomalyDetector for AbsoluteChangeDetector {
640 async fn detect(
641 &self,
642 metric_name: &str,
643 current_value: &MetricValue,
644 history: &[MetricDataPoint],
645 ) -> AnalyzerResult<Option<Anomaly>> {
646 if history.len() < self.min_history_size {
647 return Ok(None);
648 }
649
650 let previous = history.last().unwrap();
651
652 match (current_value, &previous.value) {
653 (MetricValue::Long(current), MetricValue::Long(previous)) => {
654 let change = (*current - *previous).abs() as f64;
655
656 if change > self.max_absolute_change {
657 let anomaly = Anomaly::new(
658 metric_name.to_string(),
659 current_value.clone(),
660 self.name().to_string(),
661 change / self.max_absolute_change,
662 format!(
663 "Absolute change of {change} exceeds threshold of {}",
664 self.max_absolute_change
665 ),
666 )
667 .with_expected_value(MetricValue::Long(*previous))
668 .with_metadata("absolute_change".to_string(), format!("{change}"));
669
670 return Ok(Some(anomaly));
671 }
672 }
673 (MetricValue::Double(current), MetricValue::Double(previous)) => {
674 let change = (current - previous).abs();
675
676 if change > self.max_absolute_change {
677 let anomaly = Anomaly::new(
678 metric_name.to_string(),
679 current_value.clone(),
680 self.name().to_string(),
681 change / self.max_absolute_change,
682 format!(
683 "Absolute change of {change:.4} exceeds threshold of {:.4}",
684 self.max_absolute_change
685 ),
686 )
687 .with_expected_value(MetricValue::Double(*previous))
688 .with_metadata("absolute_change".to_string(), format!("{change:.4}"));
689
690 return Ok(Some(anomaly));
691 }
692 }
693 _ => return Ok(None),
694 }
695
696 Ok(None)
697 }
698
699 fn name(&self) -> &str {
700 "AbsoluteChange"
701 }
702
703 fn description(&self) -> &str {
704 "Detects anomalies when the absolute change exceeds a threshold"
705 }
706}
707
708pub struct ZScoreDetector {
710 pub z_score_threshold: f64,
712
713 pub min_history_size: usize,
715}
716
717impl ZScoreDetector {
718 pub fn new(z_score_threshold: f64) -> Self {
723 Self {
724 z_score_threshold,
725 min_history_size: 10,
726 }
727 }
728
729 pub fn with_min_history_size(mut self, size: usize) -> Self {
731 self.min_history_size = size;
732 self
733 }
734}
735
736#[async_trait]
737impl AnomalyDetector for ZScoreDetector {
738 async fn detect(
739 &self,
740 metric_name: &str,
741 current_value: &MetricValue,
742 history: &[MetricDataPoint],
743 ) -> AnalyzerResult<Option<Anomaly>> {
744 if history.len() < self.min_history_size {
745 return Ok(None);
746 }
747
748 let numeric_values: Vec<f64> = history
750 .iter()
751 .filter_map(|dp| match &dp.value {
752 MetricValue::Long(v) => Some(*v as f64),
753 MetricValue::Double(v) => Some(*v),
754 _ => None,
755 })
756 .collect();
757
758 if numeric_values.len() < self.min_history_size {
759 return Ok(None);
760 }
761
762 let mean = numeric_values.iter().sum::<f64>() / numeric_values.len() as f64;
764 let variance = numeric_values
765 .iter()
766 .map(|v| (v - mean).powi(2))
767 .sum::<f64>()
768 / numeric_values.len() as f64;
769 let std_dev = variance.sqrt();
770
771 if std_dev == 0.0 {
773 return Ok(None);
774 }
775
776 let current_numeric = match current_value {
778 MetricValue::Long(v) => *v as f64,
779 MetricValue::Double(v) => *v,
780 _ => return Ok(None),
781 };
782
783 let z_score = (current_numeric - mean).abs() / std_dev;
784
785 if z_score > self.z_score_threshold {
786 let anomaly = Anomaly::new(
787 metric_name.to_string(),
788 current_value.clone(),
789 self.name().to_string(),
790 (z_score / self.z_score_threshold).min(1.0),
791 format!(
792 "Value is {z_score:.1} standard deviations from mean (threshold: {:.1})",
793 self.z_score_threshold
794 ),
795 )
796 .with_expected_value(MetricValue::Double(mean))
797 .with_metadata("z_score".to_string(), format!("{z_score:.2}"))
798 .with_metadata("mean".to_string(), format!("{mean:.4}"))
799 .with_metadata("std_dev".to_string(), format!("{std_dev:.4}"));
800
801 return Ok(Some(anomaly));
802 }
803
804 Ok(None)
805 }
806
807 fn name(&self) -> &str {
808 "ZScore"
809 }
810
811 fn description(&self) -> &str {
812 "Detects anomalies using statistical Z-score analysis"
813 }
814}
815
816#[derive(Debug, Clone)]
818pub struct AnomalyDetectionConfig {
819 pub min_confidence: f64,
821
822 pub store_current_metrics: bool,
824
825 pub default_history_window: Duration,
827}
828
829impl Default for AnomalyDetectionConfig {
830 fn default() -> Self {
831 Self {
832 min_confidence: 0.7,
833 store_current_metrics: true,
834 default_history_window: Duration::days(30),
835 }
836 }
837}
838
839pub struct AnomalyDetectionRunner {
841 repository: Box<dyn MetricsRepository>,
842 detectors: Vec<(String, Box<dyn AnomalyDetector>)>,
843 config: AnomalyDetectionConfig,
844}
845
846impl AnomalyDetectionRunner {
847 pub fn builder() -> AnomalyDetectionRunnerBuilder {
849 AnomalyDetectionRunnerBuilder::default()
850 }
851
852 #[instrument(skip(self, context))]
854 pub async fn detect_anomalies(
855 &self,
856 context: &AnalyzerContext,
857 ) -> AnalyzerResult<Vec<Anomaly>> {
858 let mut anomalies = Vec::new();
859
860 let detection_time = Utc::now() - chrono::Duration::milliseconds(1);
863
864 if self.config.store_current_metrics {
866 self.repository.store_context(context).await?;
867 }
868
869 for (metric_name, metric_value) in context.all_metrics() {
871 for (pattern, detector) in &self.detectors {
872 if self.matches_pattern(metric_name, pattern) {
874 let since = Utc::now() - self.config.default_history_window;
876 let history = self
877 .repository
878 .get_metric_history(metric_name, Some(since), Some(detection_time), None)
879 .await?;
880
881 debug!(
882 metric = metric_name,
883 history_size = history.len(),
884 current_value = ?metric_value,
885 "Running anomaly detection"
886 );
887
888 match detector.detect(metric_name, metric_value, &history).await {
890 Ok(Some(anomaly)) => {
891 if anomaly.confidence >= self.config.min_confidence {
892 info!(
893 metric = metric_name,
894 strategy = anomaly.detection_strategy,
895 confidence = anomaly.confidence,
896 "Anomaly detected"
897 );
898 anomalies.push(anomaly);
899 }
900 }
901 Ok(None) => {
902 }
904 Err(e) => {
905 warn!(
906 metric = metric_name,
907 detector = detector.name(),
908 error = %e,
909 "Error during anomaly detection"
910 );
911 }
912 }
913 }
914 }
915 }
916
917 Ok(anomalies)
918 }
919
920 fn matches_pattern(&self, metric_name: &str, pattern: &str) -> bool {
922 if pattern == "*" {
923 return true;
924 }
925
926 if let Some(prefix) = pattern.strip_suffix('*') {
927 return metric_name.starts_with(prefix);
928 }
929
930 metric_name == pattern
931 }
932}
933
934#[derive(Default)]
936pub struct AnomalyDetectionRunnerBuilder {
937 repository: Option<Box<dyn MetricsRepository>>,
938 detectors: Vec<(String, Box<dyn AnomalyDetector>)>,
939 config: AnomalyDetectionConfig,
940}
941
942impl AnomalyDetectionRunnerBuilder {
943 pub fn repository(mut self, repository: Box<dyn MetricsRepository>) -> Self {
945 self.repository = Some(repository);
946 self
947 }
948
949 pub fn add_detector(mut self, pattern: &str, detector: Box<dyn AnomalyDetector>) -> Self {
955 self.detectors.push((pattern.to_string(), detector));
956 self
957 }
958
959 pub fn config(mut self, config: AnomalyDetectionConfig) -> Self {
961 self.config = config;
962 self
963 }
964
965 pub fn build(self) -> AnalyzerResult<AnomalyDetectionRunner> {
967 let repository = self
968 .repository
969 .ok_or_else(|| AnalyzerError::Custom("Metrics repository is required".to_string()))?;
970
971 Ok(AnomalyDetectionRunner {
972 repository,
973 detectors: self.detectors,
974 config: self.config,
975 })
976 }
977}
978
979#[cfg(test)]
980mod tests {
981 use super::*;
982
983 #[tokio::test]
984 async fn test_relative_rate_of_change_detector() {
985 let detector = RelativeRateOfChangeDetector::new(0.1).with_min_history_size(1); let history = vec![MetricDataPoint {
989 value: MetricValue::Long(100),
990 timestamp: Utc::now() - Duration::hours(1),
991 metadata: HashMap::new(),
992 }];
993
994 let current = MetricValue::Long(105);
996 let result = detector
997 .detect("test_metric", ¤t, &history)
998 .await
999 .unwrap();
1000 assert!(result.is_none());
1001
1002 let current = MetricValue::Long(120);
1004 let result = detector
1005 .detect("test_metric", ¤t, &history)
1006 .await
1007 .unwrap();
1008 assert!(result.is_some());
1009 let anomaly = result.unwrap();
1010 assert_eq!(anomaly.detection_strategy, "RelativeRateOfChange");
1011 assert!(anomaly.confidence > 1.0); }
1013
1014 #[tokio::test]
1015 async fn test_z_score_detector() {
1016 let detector = ZScoreDetector::new(2.0); let mut history = Vec::new();
1020 for i in 0..20 {
1021 history.push(MetricDataPoint {
1022 value: MetricValue::Long(95 + (i % 10)),
1023 timestamp: Utc::now() - Duration::hours(i),
1024 metadata: HashMap::new(),
1025 });
1026 }
1027
1028 let current = MetricValue::Long(102);
1030 let result = detector
1031 .detect("test_metric", ¤t, &history)
1032 .await
1033 .unwrap();
1034 assert!(result.is_none());
1035
1036 let current = MetricValue::Long(150);
1038 let result = detector
1039 .detect("test_metric", ¤t, &history)
1040 .await
1041 .unwrap();
1042 assert!(result.is_some());
1043 let anomaly = result.unwrap();
1044 assert_eq!(anomaly.detection_strategy, "ZScore");
1045 }
1046
1047 #[tokio::test]
1048 async fn test_in_memory_repository_memory_limits() {
1049 let config = InMemoryMetricsConfig {
1051 max_metrics: 2,
1052 max_points_per_metric: 3,
1053 max_age_seconds: 60,
1054 };
1055 let repo = InMemoryMetricsRepository::with_config(config);
1056
1057 let now = Utc::now();
1058
1059 repo.store_metric("metric1", MetricValue::Long(100), now)
1061 .await
1062 .unwrap();
1063 repo.store_metric("metric2", MetricValue::Long(200), now)
1064 .await
1065 .unwrap();
1066
1067 let result = repo
1069 .store_metric("metric3", MetricValue::Long(300), now)
1070 .await;
1071 assert!(result.is_err());
1072
1073 let stats = repo.memory_stats().await;
1075 assert_eq!(stats.total_metrics, 2);
1076 assert_eq!(stats.total_data_points, 2);
1077
1078 repo.store_metric(
1080 "metric1",
1081 MetricValue::Long(101),
1082 now + Duration::seconds(1),
1083 )
1084 .await
1085 .unwrap();
1086 repo.store_metric(
1087 "metric1",
1088 MetricValue::Long(102),
1089 now + Duration::seconds(2),
1090 )
1091 .await
1092 .unwrap();
1093 repo.store_metric(
1094 "metric1",
1095 MetricValue::Long(103),
1096 now + Duration::seconds(3),
1097 )
1098 .await
1099 .unwrap();
1100
1101 let history = repo
1103 .get_metric_history("metric1", None, None, None)
1104 .await
1105 .unwrap();
1106 assert!(history.len() <= 3);
1107
1108 let final_stats = repo.memory_stats().await;
1109 assert!(final_stats.estimated_memory_bytes > 0);
1110 }
1111
1112 #[tokio::test]
1113 async fn test_in_memory_repository() {
1114 let repo = InMemoryMetricsRepository::new();
1115
1116 let now = Utc::now();
1118 repo.store_metric("metric1", MetricValue::Long(100), now)
1119 .await
1120 .unwrap();
1121 repo.store_metric("metric1", MetricValue::Long(110), now + Duration::hours(1))
1122 .await
1123 .unwrap();
1124 repo.store_metric("metric2", MetricValue::Double(0.95), now)
1125 .await
1126 .unwrap();
1127
1128 let history = repo
1130 .get_metric_history("metric1", None, None, None)
1131 .await
1132 .unwrap();
1133 assert_eq!(history.len(), 2);
1134 assert_eq!(history[0].value, MetricValue::Long(100));
1135 assert_eq!(history[1].value, MetricValue::Long(110));
1136
1137 let history = repo
1139 .get_metric_history("metric1", Some(now + Duration::minutes(30)), None, None)
1140 .await
1141 .unwrap();
1142 assert_eq!(history.len(), 1);
1143 assert_eq!(history[0].value, MetricValue::Long(110));
1144 }
1145}