1use crate::RragResult;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MonitoringConfig {
16 pub enable_performance_metrics: bool,
18
19 pub enable_health_monitoring: bool,
21
22 pub enable_alerting: bool,
24
25 pub metrics_interval_secs: u64,
27
28 pub health_check_interval_secs: u64,
30
31 pub metrics_retention_days: u32,
33
34 pub alert_config: AlertConfig,
36
37 pub export_config: ExportConfig,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct AlertConfig {
44 pub enable_email_alerts: bool,
46
47 pub enable_webhook_alerts: bool,
49
50 pub enable_log_alerts: bool,
52
53 pub thresholds: AlertThresholds,
55
56 pub cooldown_period_secs: u64,
58
59 pub max_alerts_per_hour: u32,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct AlertThresholds {
66 pub error_rate_threshold: f64,
68
69 pub response_time_threshold_ms: u64,
71
72 pub queue_depth_threshold: usize,
74
75 pub memory_usage_threshold: f64,
77
78 pub storage_usage_threshold: f64,
80
81 pub throughput_threshold_ops: f64,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ExportConfig {
88 pub enable_prometheus: bool,
90
91 pub enable_json_export: bool,
93
94 pub export_endpoint: Option<String>,
96
97 pub export_interval_secs: u64,
99
100 pub export_format: ExportFormat,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub enum ExportFormat {
107 Prometheus,
108 Json,
109 InfluxDB,
110 StatsD,
111 Custom(String),
112}
113
114impl Default for MonitoringConfig {
115 fn default() -> Self {
116 Self {
117 enable_performance_metrics: true,
118 enable_health_monitoring: true,
119 enable_alerting: true,
120 metrics_interval_secs: 30,
121 health_check_interval_secs: 60,
122 metrics_retention_days: 30,
123 alert_config: AlertConfig::default(),
124 export_config: ExportConfig::default(),
125 }
126 }
127}
128
129impl Default for AlertConfig {
130 fn default() -> Self {
131 Self {
132 enable_email_alerts: false,
133 enable_webhook_alerts: true,
134 enable_log_alerts: true,
135 thresholds: AlertThresholds::default(),
136 cooldown_period_secs: 300, max_alerts_per_hour: 10,
138 }
139 }
140}
141
142impl Default for AlertThresholds {
143 fn default() -> Self {
144 Self {
145 error_rate_threshold: 0.05, response_time_threshold_ms: 10000, queue_depth_threshold: 1000,
148 memory_usage_threshold: 0.8, storage_usage_threshold: 0.9, throughput_threshold_ops: 10.0, }
152 }
153}
154
155impl Default for ExportConfig {
156 fn default() -> Self {
157 Self {
158 enable_prometheus: false,
159 enable_json_export: true,
160 export_endpoint: None,
161 export_interval_secs: 300, export_format: ExportFormat::Json,
163 }
164 }
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct IncrementalMetrics {
170 pub system_id: String,
172
173 pub timestamp: chrono::DateTime<chrono::Utc>,
175
176 pub indexing_metrics: IndexingMetrics,
178
179 pub system_metrics: SystemMetrics,
181
182 pub operation_metrics: OperationMetrics,
184
185 pub health_metrics: HealthMetrics,
187
188 pub error_metrics: ErrorMetrics,
190
191 pub custom_metrics: HashMap<String, f64>,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct IndexingMetrics {
198 pub documents_per_second: f64,
200
201 pub chunks_per_second: f64,
203
204 pub embeddings_per_second: f64,
206
207 pub avg_indexing_time_ms: f64,
209
210 pub index_growth_rate_bps: f64,
212
213 pub batch_efficiency: f64,
215
216 pub change_detection_accuracy: f64,
218
219 pub vector_update_efficiency: f64,
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct SystemMetrics {
226 pub cpu_usage_percent: f64,
228
229 pub memory_usage_bytes: u64,
231
232 pub available_memory_bytes: u64,
234
235 pub storage_usage_bytes: u64,
237
238 pub available_storage_bytes: u64,
240
241 pub network_io_bps: f64,
243
244 pub disk_io_ops: f64,
246
247 pub active_connections: usize,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct OperationMetrics {
254 pub total_operations: u64,
256
257 pub operations_by_type: HashMap<String, u64>,
259
260 pub success_rate: f64,
262
263 pub avg_operation_time_ms: f64,
265
266 pub p95_operation_time_ms: f64,
268
269 pub p99_operation_time_ms: f64,
271
272 pub queue_depths: HashMap<String, usize>,
274
275 pub retry_stats: RetryMetrics,
277}
278
279#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct RetryMetrics {
282 pub total_retries: u64,
284
285 pub successful_retries: u64,
287
288 pub exhausted_retries: u64,
290
291 pub avg_retries_per_operation: f64,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
297pub struct HealthMetrics {
298 pub overall_health_score: f64,
300
301 pub component_health: HashMap<String, f64>,
303
304 pub service_availability: f64,
306
307 pub data_consistency_score: f64,
309
310 pub performance_score: f64,
312
313 pub last_health_check: chrono::DateTime<chrono::Utc>,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct ErrorMetrics {
320 pub total_errors: u64,
322
323 pub errors_by_type: HashMap<String, u64>,
325
326 pub errors_by_component: HashMap<String, u64>,
328
329 pub error_rate: f64,
331
332 pub critical_errors: u64,
334
335 pub recoverable_errors: u64,
337
338 pub avg_resolution_time_ms: f64,
340}
341
342pub struct PerformanceTracker {
344 data_points: Arc<RwLock<VecDeque<PerformanceDataPoint>>>,
346
347 statistics: Arc<RwLock<PerformanceStatistics>>,
349
350 config: MonitoringConfig,
352
353 max_data_points: usize,
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct PerformanceDataPoint {
360 pub timestamp: chrono::DateTime<chrono::Utc>,
362
363 pub operation_type: String,
365
366 pub duration_ms: u64,
368
369 pub memory_usage_mb: f64,
371
372 pub success: bool,
374
375 pub metadata: HashMap<String, serde_json::Value>,
377}
378
379#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct PerformanceStatistics {
382 pub by_operation_type: HashMap<String, OperationStatistics>,
384
385 pub overall: OperationStatistics,
387
388 pub trends: TrendAnalysis,
390
391 pub last_updated: chrono::DateTime<chrono::Utc>,
393}
394
395#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct OperationStatistics {
398 pub total_count: u64,
400
401 pub success_count: u64,
403
404 pub avg_duration_ms: f64,
406
407 pub median_duration_ms: f64,
409
410 pub p95_duration_ms: f64,
412
413 pub p99_duration_ms: f64,
415
416 pub std_deviation_ms: f64,
418
419 pub operations_per_second: f64,
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct TrendAnalysis {
426 pub performance_trend: TrendDirection,
428
429 pub error_rate_trend: TrendDirection,
431
432 pub throughput_trend: TrendDirection,
434
435 pub memory_trend: TrendDirection,
437
438 pub analysis_period_hours: u32,
440}
441
442#[derive(Debug, Clone, Serialize, Deserialize)]
444pub enum TrendDirection {
445 Improving,
446 Stable,
447 Degrading,
448 Unknown,
449}
450
451#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct IndexingStats {
454 pub documents_indexed: u64,
456
457 pub chunks_processed: u64,
459
460 pub embeddings_generated: u64,
462
463 pub index_updates: u64,
465
466 pub avg_document_processing_ms: f64,
468
469 pub indexing_throughput_dps: f64, pub storage_efficiency: f64,
474
475 pub index_quality_score: f64,
477}
478
479pub struct MetricsCollector {
481 current_metrics: Arc<RwLock<IncrementalMetrics>>,
483
484 metrics_history: Arc<RwLock<VecDeque<IncrementalMetrics>>>,
486
487 performance_tracker: Arc<PerformanceTracker>,
489
490 config: MonitoringConfig,
492
493 collection_stats: Arc<RwLock<CollectionStatistics>>,
495
496 task_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
498}
499
500#[derive(Debug, Clone, Serialize, Deserialize)]
502pub struct CollectionStatistics {
503 pub total_collections: u64,
505
506 pub failed_collections: u64,
508
509 pub avg_collection_time_ms: f64,
511
512 pub last_collection: chrono::DateTime<chrono::Utc>,
514
515 pub collection_success_rate: f64,
517}
518
519impl IncrementalMetrics {
520 pub fn new() -> Self {
522 Self {
523 system_id: Uuid::new_v4().to_string(),
524 timestamp: chrono::Utc::now(),
525 indexing_metrics: IndexingMetrics {
526 documents_per_second: 0.0,
527 chunks_per_second: 0.0,
528 embeddings_per_second: 0.0,
529 avg_indexing_time_ms: 0.0,
530 index_growth_rate_bps: 0.0,
531 batch_efficiency: 1.0,
532 change_detection_accuracy: 1.0,
533 vector_update_efficiency: 1.0,
534 },
535 system_metrics: SystemMetrics {
536 cpu_usage_percent: 0.0,
537 memory_usage_bytes: 0,
538 available_memory_bytes: 0,
539 storage_usage_bytes: 0,
540 available_storage_bytes: 0,
541 network_io_bps: 0.0,
542 disk_io_ops: 0.0,
543 active_connections: 0,
544 },
545 operation_metrics: OperationMetrics {
546 total_operations: 0,
547 operations_by_type: HashMap::new(),
548 success_rate: 1.0,
549 avg_operation_time_ms: 0.0,
550 p95_operation_time_ms: 0.0,
551 p99_operation_time_ms: 0.0,
552 queue_depths: HashMap::new(),
553 retry_stats: RetryMetrics {
554 total_retries: 0,
555 successful_retries: 0,
556 exhausted_retries: 0,
557 avg_retries_per_operation: 0.0,
558 },
559 },
560 health_metrics: HealthMetrics {
561 overall_health_score: 1.0,
562 component_health: HashMap::new(),
563 service_availability: 1.0,
564 data_consistency_score: 1.0,
565 performance_score: 1.0,
566 last_health_check: chrono::Utc::now(),
567 },
568 error_metrics: ErrorMetrics {
569 total_errors: 0,
570 errors_by_type: HashMap::new(),
571 errors_by_component: HashMap::new(),
572 error_rate: 0.0,
573 critical_errors: 0,
574 recoverable_errors: 0,
575 avg_resolution_time_ms: 0.0,
576 },
577 custom_metrics: HashMap::new(),
578 }
579 }
580
581 pub fn update(&mut self, update_data: MetricsUpdate) {
583 self.timestamp = chrono::Utc::now();
584
585 if let Some(indexing) = update_data.indexing_metrics {
587 self.indexing_metrics = indexing;
588 }
589
590 if let Some(system) = update_data.system_metrics {
592 self.system_metrics = system;
593 }
594
595 if let Some(operations) = update_data.operation_metrics {
597 self.operation_metrics = operations;
598 }
599
600 if let Some(health) = update_data.health_metrics {
602 self.health_metrics = health;
603 }
604
605 if let Some(errors) = update_data.error_metrics {
607 self.error_metrics = errors;
608 }
609
610 for (key, value) in update_data.custom_metrics {
612 self.custom_metrics.insert(key, value);
613 }
614 }
615
616 pub fn calculate_system_score(&self) -> f64 {
618 let health_weight = 0.4;
619 let performance_weight = 0.3;
620 let reliability_weight = 0.3;
621
622 let health_score = self.health_metrics.overall_health_score;
623 let performance_score = self.health_metrics.performance_score;
624 let reliability_score = 1.0 - self.error_metrics.error_rate.min(1.0);
625
626 (health_score * health_weight)
627 + (performance_score * performance_weight)
628 + (reliability_score * reliability_weight)
629 }
630}
631
632#[derive(Debug, Clone)]
634pub struct MetricsUpdate {
635 pub indexing_metrics: Option<IndexingMetrics>,
636 pub system_metrics: Option<SystemMetrics>,
637 pub operation_metrics: Option<OperationMetrics>,
638 pub health_metrics: Option<HealthMetrics>,
639 pub error_metrics: Option<ErrorMetrics>,
640 pub custom_metrics: HashMap<String, f64>,
641}
642
643impl PerformanceTracker {
644 pub fn new(config: MonitoringConfig, max_data_points: usize) -> Self {
646 Self {
647 data_points: Arc::new(RwLock::new(VecDeque::new())),
648 statistics: Arc::new(RwLock::new(PerformanceStatistics {
649 by_operation_type: HashMap::new(),
650 overall: OperationStatistics {
651 total_count: 0,
652 success_count: 0,
653 avg_duration_ms: 0.0,
654 median_duration_ms: 0.0,
655 p95_duration_ms: 0.0,
656 p99_duration_ms: 0.0,
657 std_deviation_ms: 0.0,
658 operations_per_second: 0.0,
659 },
660 trends: TrendAnalysis {
661 performance_trend: TrendDirection::Stable,
662 error_rate_trend: TrendDirection::Stable,
663 throughput_trend: TrendDirection::Stable,
664 memory_trend: TrendDirection::Stable,
665 analysis_period_hours: 24,
666 },
667 last_updated: chrono::Utc::now(),
668 })),
669 config,
670 max_data_points,
671 }
672 }
673
674 pub async fn record_data_point(&self, data_point: PerformanceDataPoint) {
676 let mut data_points = self.data_points.write().await;
677 data_points.push_back(data_point);
678
679 while data_points.len() > self.max_data_points {
681 data_points.pop_front();
682 }
683
684 self.update_statistics().await;
686 }
687
688 pub async fn get_statistics(&self) -> PerformanceStatistics {
690 self.statistics.read().await.clone()
691 }
692
693 async fn update_statistics(&self) {
695 let data_points = self.data_points.read().await;
696
697 if data_points.is_empty() {
698 return;
699 }
700
701 let mut by_operation_type: HashMap<String, Vec<&PerformanceDataPoint>> = HashMap::new();
702 let mut all_points = Vec::new();
703
704 for point in data_points.iter() {
706 by_operation_type
707 .entry(point.operation_type.clone())
708 .or_insert_with(Vec::new)
709 .push(point);
710 all_points.push(point);
711 }
712
713 let mut statistics = self.statistics.write().await;
714
715 for (op_type, points) in by_operation_type {
717 let stats = self.calculate_operation_statistics(&points);
718 statistics.by_operation_type.insert(op_type, stats);
719 }
720
721 statistics.overall = self.calculate_operation_statistics(&all_points);
723 statistics.last_updated = chrono::Utc::now();
724 }
725
726 fn calculate_operation_statistics(
728 &self,
729 points: &[&PerformanceDataPoint],
730 ) -> OperationStatistics {
731 if points.is_empty() {
732 return OperationStatistics {
733 total_count: 0,
734 success_count: 0,
735 avg_duration_ms: 0.0,
736 median_duration_ms: 0.0,
737 p95_duration_ms: 0.0,
738 p99_duration_ms: 0.0,
739 std_deviation_ms: 0.0,
740 operations_per_second: 0.0,
741 };
742 }
743
744 let total_count = points.len() as u64;
745 let success_count = points.iter().filter(|p| p.success).count() as u64;
746
747 let mut durations: Vec<u64> = points.iter().map(|p| p.duration_ms).collect();
748 durations.sort();
749
750 let avg_duration_ms = durations.iter().sum::<u64>() as f64 / durations.len() as f64;
751 let median_duration_ms = if durations.len() % 2 == 0 {
752 (durations[durations.len() / 2 - 1] + durations[durations.len() / 2]) as f64 / 2.0
753 } else {
754 durations[durations.len() / 2] as f64
755 };
756
757 let p95_index = ((durations.len() as f64) * 0.95) as usize;
758 let p99_index = ((durations.len() as f64) * 0.99) as usize;
759
760 let p95_duration_ms = durations
761 .get(p95_index.min(durations.len() - 1))
762 .unwrap_or(&0) as &u64;
763 let p99_duration_ms = durations
764 .get(p99_index.min(durations.len() - 1))
765 .unwrap_or(&0) as &u64;
766
767 let variance = durations
769 .iter()
770 .map(|d| (*d as f64 - avg_duration_ms).powi(2))
771 .sum::<f64>()
772 / durations.len() as f64;
773 let std_deviation_ms = variance.sqrt();
774
775 let time_span_secs = if points.len() > 1 {
777 let first = points.first().unwrap().timestamp;
778 let last = points.last().unwrap().timestamp;
779 last.signed_duration_since(first).num_seconds().max(1) as f64
780 } else {
781 1.0
782 };
783 let operations_per_second = total_count as f64 / time_span_secs;
784
785 OperationStatistics {
786 total_count,
787 success_count,
788 avg_duration_ms,
789 median_duration_ms,
790 p95_duration_ms: *p95_duration_ms as f64,
791 p99_duration_ms: *p99_duration_ms as f64,
792 std_deviation_ms,
793 operations_per_second,
794 }
795 }
796}
797
798impl MetricsCollector {
799 pub async fn new(config: MonitoringConfig) -> RragResult<Self> {
801 let collector = Self {
802 current_metrics: Arc::new(RwLock::new(IncrementalMetrics::new())),
803 metrics_history: Arc::new(RwLock::new(VecDeque::new())),
804 performance_tracker: Arc::new(PerformanceTracker::new(config.clone(), 10000)),
805 config: config.clone(),
806 collection_stats: Arc::new(RwLock::new(CollectionStatistics {
807 total_collections: 0,
808 failed_collections: 0,
809 avg_collection_time_ms: 0.0,
810 last_collection: chrono::Utc::now(),
811 collection_success_rate: 1.0,
812 })),
813 task_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
814 };
815
816 if config.enable_performance_metrics {
817 collector.start_collection_tasks().await?;
818 }
819
820 Ok(collector)
821 }
822
823 pub async fn get_current_metrics(&self) -> IncrementalMetrics {
825 self.current_metrics.read().await.clone()
826 }
827
828 pub async fn get_metrics_history(&self, limit: Option<usize>) -> Vec<IncrementalMetrics> {
830 let history = self.metrics_history.read().await;
831 let limit = limit.unwrap_or(history.len());
832 history.iter().rev().take(limit).cloned().collect()
833 }
834
835 pub async fn update_metrics(&self, update: MetricsUpdate) -> RragResult<()> {
837 let mut current = self.current_metrics.write().await;
838 current.update(update);
839
840 let mut history = self.metrics_history.write().await;
842 history.push_back(current.clone());
843
844 let max_history_size = (self.config.metrics_retention_days as usize) * 24 * 60 * 60
846 / (self.config.metrics_interval_secs as usize);
847 while history.len() > max_history_size {
848 history.pop_front();
849 }
850
851 Ok(())
852 }
853
854 pub async fn record_performance(&self, data_point: PerformanceDataPoint) -> RragResult<()> {
856 self.performance_tracker.record_data_point(data_point).await;
857 Ok(())
858 }
859
860 pub async fn get_performance_stats(&self) -> PerformanceStatistics {
862 self.performance_tracker.get_statistics().await
863 }
864
865 pub async fn health_check(&self) -> RragResult<bool> {
867 let handles = self.task_handles.lock().await;
868 let all_running = handles.iter().all(|handle| !handle.is_finished());
869
870 let stats = self.collection_stats.read().await;
871 let healthy_collection = stats.collection_success_rate > 0.8;
872
873 Ok(all_running && healthy_collection)
874 }
875
876 async fn start_collection_tasks(&self) -> RragResult<()> {
878 let mut handles = self.task_handles.lock().await;
879
880 handles.push(self.start_metrics_collection_task().await);
882
883 if self.config.enable_health_monitoring {
885 handles.push(self.start_health_monitoring_task().await);
886 }
887
888 if self.config.export_config.enable_json_export
890 || self.config.export_config.enable_prometheus
891 {
892 handles.push(self.start_export_task().await);
893 }
894
895 Ok(())
896 }
897
898 async fn start_metrics_collection_task(&self) -> tokio::task::JoinHandle<()> {
900 let current_metrics = Arc::clone(&self.current_metrics);
901 let collection_stats = Arc::clone(&self.collection_stats);
902 let interval = self.config.metrics_interval_secs;
903
904 tokio::spawn(async move {
905 let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
906
907 loop {
908 timer.tick().await;
909
910 let start_time = std::time::Instant::now();
911 let collection_successful = {
912 let update = MetricsUpdate {
914 indexing_metrics: Some(IndexingMetrics {
915 documents_per_second: 10.0, chunks_per_second: 50.0,
917 embeddings_per_second: 50.0,
918 avg_indexing_time_ms: 100.0,
919 index_growth_rate_bps: 1024.0,
920 batch_efficiency: 0.95,
921 change_detection_accuracy: 0.98,
922 vector_update_efficiency: 0.92,
923 }),
924 system_metrics: Some(SystemMetrics {
925 cpu_usage_percent: 45.0, memory_usage_bytes: 512 * 1024 * 1024, available_memory_bytes: 1024 * 1024 * 1024, storage_usage_bytes: 10 * 1024 * 1024 * 1024, available_storage_bytes: 90 * 1024 * 1024 * 1024, network_io_bps: 1024.0 * 100.0, disk_io_ops: 50.0,
932 active_connections: 10,
933 }),
934 operation_metrics: None,
935 health_metrics: None,
936 error_metrics: None,
937 custom_metrics: HashMap::new(),
938 };
939
940 let mut metrics = current_metrics.write().await;
942 metrics.update(update);
943 true
944 };
945
946 let collection_time = start_time.elapsed().as_millis() as f64;
947
948 let mut stats = collection_stats.write().await;
950 stats.total_collections += 1;
951 if !collection_successful {
952 stats.failed_collections += 1;
953 }
954 stats.avg_collection_time_ms =
955 (stats.avg_collection_time_ms + collection_time) / 2.0;
956 stats.last_collection = chrono::Utc::now();
957 stats.collection_success_rate = (stats.total_collections - stats.failed_collections)
958 as f64
959 / stats.total_collections as f64;
960 }
961 })
962 }
963
964 async fn start_health_monitoring_task(&self) -> tokio::task::JoinHandle<()> {
966 let current_metrics = Arc::clone(&self.current_metrics);
967 let interval = self.config.health_check_interval_secs;
968
969 tokio::spawn(async move {
970 let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
971
972 loop {
973 timer.tick().await;
974
975 let health_update = HealthMetrics {
977 overall_health_score: 0.95, component_health: HashMap::new(), service_availability: 0.99,
980 data_consistency_score: 0.98,
981 performance_score: 0.92,
982 last_health_check: chrono::Utc::now(),
983 };
984
985 let mut metrics = current_metrics.write().await;
986 metrics.health_metrics = health_update;
987 }
988 })
989 }
990
991 async fn start_export_task(&self) -> tokio::task::JoinHandle<()> {
993 let current_metrics = Arc::clone(&self.current_metrics);
994 let export_config = self.config.export_config.clone();
995
996 tokio::spawn(async move {
997 let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(
998 export_config.export_interval_secs,
999 ));
1000
1001 loop {
1002 timer.tick().await;
1003
1004 if export_config.enable_json_export {
1005 let metrics = current_metrics.read().await;
1006 match serde_json::to_string_pretty(&*metrics) {
1008 Ok(json) => {
1009 tracing::debug!("Exported metrics: {} chars", json.len());
1011 }
1012 Err(e) => {
1013 tracing::debug!("Failed to serialize metrics: {}", e);
1014 }
1015 }
1016 }
1017 }
1018 })
1019 }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use super::*;
1025
1026 #[tokio::test]
1027 async fn test_metrics_creation() {
1028 let metrics = IncrementalMetrics::new();
1029 assert!(!metrics.system_id.is_empty());
1030 assert_eq!(metrics.health_metrics.overall_health_score, 1.0);
1031 }
1032
1033 #[tokio::test]
1034 async fn test_performance_tracker() {
1035 let config = MonitoringConfig::default();
1036 let tracker = PerformanceTracker::new(config, 100);
1037
1038 let data_point = PerformanceDataPoint {
1039 timestamp: chrono::Utc::now(),
1040 operation_type: "indexing".to_string(),
1041 duration_ms: 100,
1042 memory_usage_mb: 50.0,
1043 success: true,
1044 metadata: HashMap::new(),
1045 };
1046
1047 tracker.record_data_point(data_point).await;
1048
1049 let stats = tracker.get_statistics().await;
1050 assert_eq!(stats.overall.total_count, 1);
1051 assert_eq!(stats.overall.success_count, 1);
1052 }
1053
1054 #[tokio::test]
1055 async fn test_metrics_collector() {
1056 let config = MonitoringConfig {
1057 enable_performance_metrics: false, ..MonitoringConfig::default()
1059 };
1060
1061 let collector = MetricsCollector::new(config).await.unwrap();
1062 assert!(collector.health_check().await.unwrap());
1063
1064 let metrics = collector.get_current_metrics().await;
1065 assert!(!metrics.system_id.is_empty());
1066 }
1067
1068 #[tokio::test]
1069 async fn test_metrics_update() {
1070 let config = MonitoringConfig {
1071 enable_performance_metrics: false,
1072 ..MonitoringConfig::default()
1073 };
1074
1075 let collector = MetricsCollector::new(config).await.unwrap();
1076
1077 let update = MetricsUpdate {
1078 indexing_metrics: Some(IndexingMetrics {
1079 documents_per_second: 20.0,
1080 chunks_per_second: 100.0,
1081 embeddings_per_second: 100.0,
1082 avg_indexing_time_ms: 50.0,
1083 index_growth_rate_bps: 2048.0,
1084 batch_efficiency: 0.98,
1085 change_detection_accuracy: 0.99,
1086 vector_update_efficiency: 0.95,
1087 }),
1088 system_metrics: None,
1089 operation_metrics: None,
1090 health_metrics: None,
1091 error_metrics: None,
1092 custom_metrics: HashMap::new(),
1093 };
1094
1095 collector.update_metrics(update).await.unwrap();
1096
1097 let metrics = collector.get_current_metrics().await;
1098 assert_eq!(metrics.indexing_metrics.documents_per_second, 20.0);
1099 }
1100
1101 #[test]
1102 fn test_trend_directions() {
1103 let directions = vec![
1104 TrendDirection::Improving,
1105 TrendDirection::Stable,
1106 TrendDirection::Degrading,
1107 TrendDirection::Unknown,
1108 ];
1109
1110 for (i, dir1) in directions.iter().enumerate() {
1112 for (j, dir2) in directions.iter().enumerate() {
1113 if i != j {
1114 assert_ne!(format!("{:?}", dir1), format!("{:?}", dir2));
1115 }
1116 }
1117 }
1118 }
1119
1120 #[test]
1121 fn test_export_formats() {
1122 let formats = vec![
1123 ExportFormat::Prometheus,
1124 ExportFormat::Json,
1125 ExportFormat::InfluxDB,
1126 ExportFormat::StatsD,
1127 ExportFormat::Custom("custom".to_string()),
1128 ];
1129
1130 for (i, fmt1) in formats.iter().enumerate() {
1132 for (j, fmt2) in formats.iter().enumerate() {
1133 if i != j {
1134 assert_ne!(format!("{:?}", fmt1), format!("{:?}", fmt2));
1135 }
1136 }
1137 }
1138 }
1139}