rexis_rag/incremental/
monitoring.rs

1//! # Incremental Indexing Monitoring
2//!
3//! Comprehensive monitoring and alerting system for incremental indexing operations.
4//! Provides performance tracking, health monitoring, and automated alerting.
5
6use crate::RragResult;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13/// Monitoring configuration
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MonitoringConfig {
16    /// Enable performance metrics collection
17    pub enable_performance_metrics: bool,
18
19    /// Enable health monitoring
20    pub enable_health_monitoring: bool,
21
22    /// Enable alerting system
23    pub enable_alerting: bool,
24
25    /// Metrics collection interval in seconds
26    pub metrics_interval_secs: u64,
27
28    /// Health check interval in seconds
29    pub health_check_interval_secs: u64,
30
31    /// Metrics retention period in days
32    pub metrics_retention_days: u32,
33
34    /// Alert configuration
35    pub alert_config: AlertConfig,
36
37    /// Export configuration
38    pub export_config: ExportConfig,
39}
40
41/// Alert system configuration
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct AlertConfig {
44    /// Enable email alerts
45    pub enable_email_alerts: bool,
46
47    /// Enable webhook alerts
48    pub enable_webhook_alerts: bool,
49
50    /// Enable log alerts
51    pub enable_log_alerts: bool,
52
53    /// Alert thresholds
54    pub thresholds: AlertThresholds,
55
56    /// Alert cooldown period in seconds
57    pub cooldown_period_secs: u64,
58
59    /// Maximum alerts per hour
60    pub max_alerts_per_hour: u32,
61}
62
63/// Alert thresholds
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct AlertThresholds {
66    /// Error rate threshold (0.0 to 1.0)
67    pub error_rate_threshold: f64,
68
69    /// Response time threshold in milliseconds
70    pub response_time_threshold_ms: u64,
71
72    /// Queue depth threshold
73    pub queue_depth_threshold: usize,
74
75    /// Memory usage threshold (0.0 to 1.0)
76    pub memory_usage_threshold: f64,
77
78    /// Storage usage threshold (0.0 to 1.0)
79    pub storage_usage_threshold: f64,
80
81    /// Throughput threshold (operations per second)
82    pub throughput_threshold_ops: f64,
83}
84
85/// Export configuration for metrics
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ExportConfig {
88    /// Enable Prometheus export
89    pub enable_prometheus: bool,
90
91    /// Enable JSON export
92    pub enable_json_export: bool,
93
94    /// Export endpoint
95    pub export_endpoint: Option<String>,
96
97    /// Export interval in seconds
98    pub export_interval_secs: u64,
99
100    /// Export format
101    pub export_format: ExportFormat,
102}
103
104/// Export formats
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub enum ExportFormat {
107    Prometheus,
108    Json,
109    InfluxDB,
110    StatsD,
111    Custom(String),
112}
113
114impl Default for MonitoringConfig {
115    fn default() -> Self {
116        Self {
117            enable_performance_metrics: true,
118            enable_health_monitoring: true,
119            enable_alerting: true,
120            metrics_interval_secs: 30,
121            health_check_interval_secs: 60,
122            metrics_retention_days: 30,
123            alert_config: AlertConfig::default(),
124            export_config: ExportConfig::default(),
125        }
126    }
127}
128
129impl Default for AlertConfig {
130    fn default() -> Self {
131        Self {
132            enable_email_alerts: false,
133            enable_webhook_alerts: true,
134            enable_log_alerts: true,
135            thresholds: AlertThresholds::default(),
136            cooldown_period_secs: 300, // 5 minutes
137            max_alerts_per_hour: 10,
138        }
139    }
140}
141
142impl Default for AlertThresholds {
143    fn default() -> Self {
144        Self {
145            error_rate_threshold: 0.05,        // 5%
146            response_time_threshold_ms: 10000, // 10 seconds
147            queue_depth_threshold: 1000,
148            memory_usage_threshold: 0.8,    // 80%
149            storage_usage_threshold: 0.9,   // 90%
150            throughput_threshold_ops: 10.0, // 10 ops/sec minimum
151        }
152    }
153}
154
155impl Default for ExportConfig {
156    fn default() -> Self {
157        Self {
158            enable_prometheus: false,
159            enable_json_export: true,
160            export_endpoint: None,
161            export_interval_secs: 300, // 5 minutes
162            export_format: ExportFormat::Json,
163        }
164    }
165}
166
167/// Comprehensive metrics for incremental indexing
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct IncrementalMetrics {
170    /// System identification
171    pub system_id: String,
172
173    /// Metrics timestamp
174    pub timestamp: chrono::DateTime<chrono::Utc>,
175
176    /// Indexing performance metrics
177    pub indexing_metrics: IndexingMetrics,
178
179    /// System performance metrics
180    pub system_metrics: SystemMetrics,
181
182    /// Operation metrics
183    pub operation_metrics: OperationMetrics,
184
185    /// Health metrics
186    pub health_metrics: HealthMetrics,
187
188    /// Error metrics
189    pub error_metrics: ErrorMetrics,
190
191    /// Custom metrics
192    pub custom_metrics: HashMap<String, f64>,
193}
194
195/// Indexing-specific metrics
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct IndexingMetrics {
198    /// Documents processed per second
199    pub documents_per_second: f64,
200
201    /// Chunks processed per second
202    pub chunks_per_second: f64,
203
204    /// Embeddings processed per second
205    pub embeddings_per_second: f64,
206
207    /// Average indexing time per document
208    pub avg_indexing_time_ms: f64,
209
210    /// Index size growth rate (bytes per second)
211    pub index_growth_rate_bps: f64,
212
213    /// Batch processing efficiency
214    pub batch_efficiency: f64,
215
216    /// Change detection accuracy
217    pub change_detection_accuracy: f64,
218
219    /// Vector update efficiency
220    pub vector_update_efficiency: f64,
221}
222
223/// System performance metrics
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct SystemMetrics {
226    /// CPU usage percentage
227    pub cpu_usage_percent: f64,
228
229    /// Memory usage in bytes
230    pub memory_usage_bytes: u64,
231
232    /// Available memory in bytes
233    pub available_memory_bytes: u64,
234
235    /// Storage usage in bytes
236    pub storage_usage_bytes: u64,
237
238    /// Available storage in bytes
239    pub available_storage_bytes: u64,
240
241    /// Network I/O bytes per second
242    pub network_io_bps: f64,
243
244    /// Disk I/O operations per second
245    pub disk_io_ops: f64,
246
247    /// Active connections
248    pub active_connections: usize,
249}
250
251/// Operation-level metrics
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct OperationMetrics {
254    /// Total operations performed
255    pub total_operations: u64,
256
257    /// Operations by type
258    pub operations_by_type: HashMap<String, u64>,
259
260    /// Success rate (0.0 to 1.0)
261    pub success_rate: f64,
262
263    /// Average operation time in milliseconds
264    pub avg_operation_time_ms: f64,
265
266    /// 95th percentile operation time
267    pub p95_operation_time_ms: f64,
268
269    /// 99th percentile operation time
270    pub p99_operation_time_ms: f64,
271
272    /// Queue depths by type
273    pub queue_depths: HashMap<String, usize>,
274
275    /// Retry statistics
276    pub retry_stats: RetryMetrics,
277}
278
279/// Retry-specific metrics
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct RetryMetrics {
282    /// Total retry attempts
283    pub total_retries: u64,
284
285    /// Successful retries
286    pub successful_retries: u64,
287
288    /// Failed retries (exhausted)
289    pub exhausted_retries: u64,
290
291    /// Average retries per operation
292    pub avg_retries_per_operation: f64,
293}
294
295/// Health monitoring metrics
296#[derive(Debug, Clone, Serialize, Deserialize)]
297pub struct HealthMetrics {
298    /// Overall system health score (0.0 to 1.0)
299    pub overall_health_score: f64,
300
301    /// Component health scores
302    pub component_health: HashMap<String, f64>,
303
304    /// Service availability (0.0 to 1.0)
305    pub service_availability: f64,
306
307    /// Data consistency score (0.0 to 1.0)
308    pub data_consistency_score: f64,
309
310    /// Performance score (0.0 to 1.0)
311    pub performance_score: f64,
312
313    /// Last health check timestamp
314    pub last_health_check: chrono::DateTime<chrono::Utc>,
315}
316
317/// Error tracking metrics
318#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct ErrorMetrics {
320    /// Total errors encountered
321    pub total_errors: u64,
322
323    /// Errors by type
324    pub errors_by_type: HashMap<String, u64>,
325
326    /// Errors by component
327    pub errors_by_component: HashMap<String, u64>,
328
329    /// Error rate (errors per operation)
330    pub error_rate: f64,
331
332    /// Critical errors
333    pub critical_errors: u64,
334
335    /// Recoverable errors
336    pub recoverable_errors: u64,
337
338    /// Error resolution time average
339    pub avg_resolution_time_ms: f64,
340}
341
342/// Performance tracking system
343pub struct PerformanceTracker {
344    /// Data points storage
345    data_points: Arc<RwLock<VecDeque<PerformanceDataPoint>>>,
346
347    /// Aggregated statistics
348    statistics: Arc<RwLock<PerformanceStatistics>>,
349
350    /// Configuration
351    config: MonitoringConfig,
352
353    /// Max data points to retain
354    max_data_points: usize,
355}
356
357/// Individual performance data point
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct PerformanceDataPoint {
360    /// Timestamp
361    pub timestamp: chrono::DateTime<chrono::Utc>,
362
363    /// Operation type
364    pub operation_type: String,
365
366    /// Duration in milliseconds
367    pub duration_ms: u64,
368
369    /// Memory usage at time of operation
370    pub memory_usage_mb: f64,
371
372    /// Success indicator
373    pub success: bool,
374
375    /// Additional metadata
376    pub metadata: HashMap<String, serde_json::Value>,
377}
378
379/// Aggregated performance statistics
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct PerformanceStatistics {
382    /// Statistics by operation type
383    pub by_operation_type: HashMap<String, OperationStatistics>,
384
385    /// Overall statistics
386    pub overall: OperationStatistics,
387
388    /// Time-based trends
389    pub trends: TrendAnalysis,
390
391    /// Last updated
392    pub last_updated: chrono::DateTime<chrono::Utc>,
393}
394
395/// Statistics for a specific operation type
396#[derive(Debug, Clone, Serialize, Deserialize)]
397pub struct OperationStatistics {
398    /// Total operations
399    pub total_count: u64,
400
401    /// Successful operations
402    pub success_count: u64,
403
404    /// Average duration
405    pub avg_duration_ms: f64,
406
407    /// Median duration
408    pub median_duration_ms: f64,
409
410    /// 95th percentile duration
411    pub p95_duration_ms: f64,
412
413    /// 99th percentile duration
414    pub p99_duration_ms: f64,
415
416    /// Standard deviation
417    pub std_deviation_ms: f64,
418
419    /// Operations per second
420    pub operations_per_second: f64,
421}
422
423/// Trend analysis data
424#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct TrendAnalysis {
426    /// Performance trend over time
427    pub performance_trend: TrendDirection,
428
429    /// Error rate trend
430    pub error_rate_trend: TrendDirection,
431
432    /// Throughput trend
433    pub throughput_trend: TrendDirection,
434
435    /// Memory usage trend
436    pub memory_trend: TrendDirection,
437
438    /// Trend analysis period
439    pub analysis_period_hours: u32,
440}
441
442/// Trend directions
443#[derive(Debug, Clone, Serialize, Deserialize)]
444pub enum TrendDirection {
445    Improving,
446    Stable,
447    Degrading,
448    Unknown,
449}
450
451/// Indexing statistics tracking
452#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct IndexingStats {
454    /// Documents indexed
455    pub documents_indexed: u64,
456
457    /// Chunks processed
458    pub chunks_processed: u64,
459
460    /// Embeddings generated
461    pub embeddings_generated: u64,
462
463    /// Index updates performed
464    pub index_updates: u64,
465
466    /// Average processing time per document
467    pub avg_document_processing_ms: f64,
468
469    /// Indexing throughput
470    pub indexing_throughput_dps: f64, // documents per second
471
472    /// Storage efficiency
473    pub storage_efficiency: f64,
474
475    /// Index quality score
476    pub index_quality_score: f64,
477}
478
479/// Metrics collector for gathering system metrics
480pub struct MetricsCollector {
481    /// Current metrics
482    current_metrics: Arc<RwLock<IncrementalMetrics>>,
483
484    /// Metrics history
485    metrics_history: Arc<RwLock<VecDeque<IncrementalMetrics>>>,
486
487    /// Performance tracker
488    performance_tracker: Arc<PerformanceTracker>,
489
490    /// Configuration
491    config: MonitoringConfig,
492
493    /// Collection statistics
494    collection_stats: Arc<RwLock<CollectionStatistics>>,
495
496    /// Background task handles
497    task_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
498}
499
500/// Collection process statistics
501#[derive(Debug, Clone, Serialize, Deserialize)]
502pub struct CollectionStatistics {
503    /// Total collection cycles
504    pub total_collections: u64,
505
506    /// Failed collections
507    pub failed_collections: u64,
508
509    /// Average collection time
510    pub avg_collection_time_ms: f64,
511
512    /// Last collection timestamp
513    pub last_collection: chrono::DateTime<chrono::Utc>,
514
515    /// Collection success rate
516    pub collection_success_rate: f64,
517}
518
519impl IncrementalMetrics {
520    /// Create new metrics instance
521    pub fn new() -> Self {
522        Self {
523            system_id: Uuid::new_v4().to_string(),
524            timestamp: chrono::Utc::now(),
525            indexing_metrics: IndexingMetrics {
526                documents_per_second: 0.0,
527                chunks_per_second: 0.0,
528                embeddings_per_second: 0.0,
529                avg_indexing_time_ms: 0.0,
530                index_growth_rate_bps: 0.0,
531                batch_efficiency: 1.0,
532                change_detection_accuracy: 1.0,
533                vector_update_efficiency: 1.0,
534            },
535            system_metrics: SystemMetrics {
536                cpu_usage_percent: 0.0,
537                memory_usage_bytes: 0,
538                available_memory_bytes: 0,
539                storage_usage_bytes: 0,
540                available_storage_bytes: 0,
541                network_io_bps: 0.0,
542                disk_io_ops: 0.0,
543                active_connections: 0,
544            },
545            operation_metrics: OperationMetrics {
546                total_operations: 0,
547                operations_by_type: HashMap::new(),
548                success_rate: 1.0,
549                avg_operation_time_ms: 0.0,
550                p95_operation_time_ms: 0.0,
551                p99_operation_time_ms: 0.0,
552                queue_depths: HashMap::new(),
553                retry_stats: RetryMetrics {
554                    total_retries: 0,
555                    successful_retries: 0,
556                    exhausted_retries: 0,
557                    avg_retries_per_operation: 0.0,
558                },
559            },
560            health_metrics: HealthMetrics {
561                overall_health_score: 1.0,
562                component_health: HashMap::new(),
563                service_availability: 1.0,
564                data_consistency_score: 1.0,
565                performance_score: 1.0,
566                last_health_check: chrono::Utc::now(),
567            },
568            error_metrics: ErrorMetrics {
569                total_errors: 0,
570                errors_by_type: HashMap::new(),
571                errors_by_component: HashMap::new(),
572                error_rate: 0.0,
573                critical_errors: 0,
574                recoverable_errors: 0,
575                avg_resolution_time_ms: 0.0,
576            },
577            custom_metrics: HashMap::new(),
578        }
579    }
580
581    /// Update metrics with new data
582    pub fn update(&mut self, update_data: MetricsUpdate) {
583        self.timestamp = chrono::Utc::now();
584
585        // Update indexing metrics
586        if let Some(indexing) = update_data.indexing_metrics {
587            self.indexing_metrics = indexing;
588        }
589
590        // Update system metrics
591        if let Some(system) = update_data.system_metrics {
592            self.system_metrics = system;
593        }
594
595        // Update operation metrics
596        if let Some(operations) = update_data.operation_metrics {
597            self.operation_metrics = operations;
598        }
599
600        // Update health metrics
601        if let Some(health) = update_data.health_metrics {
602            self.health_metrics = health;
603        }
604
605        // Update error metrics
606        if let Some(errors) = update_data.error_metrics {
607            self.error_metrics = errors;
608        }
609
610        // Merge custom metrics
611        for (key, value) in update_data.custom_metrics {
612            self.custom_metrics.insert(key, value);
613        }
614    }
615
616    /// Calculate overall system score
617    pub fn calculate_system_score(&self) -> f64 {
618        let health_weight = 0.4;
619        let performance_weight = 0.3;
620        let reliability_weight = 0.3;
621
622        let health_score = self.health_metrics.overall_health_score;
623        let performance_score = self.health_metrics.performance_score;
624        let reliability_score = 1.0 - self.error_metrics.error_rate.min(1.0);
625
626        (health_score * health_weight)
627            + (performance_score * performance_weight)
628            + (reliability_score * reliability_weight)
629    }
630}
631
632/// Update data for metrics
633#[derive(Debug, Clone)]
634pub struct MetricsUpdate {
635    pub indexing_metrics: Option<IndexingMetrics>,
636    pub system_metrics: Option<SystemMetrics>,
637    pub operation_metrics: Option<OperationMetrics>,
638    pub health_metrics: Option<HealthMetrics>,
639    pub error_metrics: Option<ErrorMetrics>,
640    pub custom_metrics: HashMap<String, f64>,
641}
642
643impl PerformanceTracker {
644    /// Create new performance tracker
645    pub fn new(config: MonitoringConfig, max_data_points: usize) -> Self {
646        Self {
647            data_points: Arc::new(RwLock::new(VecDeque::new())),
648            statistics: Arc::new(RwLock::new(PerformanceStatistics {
649                by_operation_type: HashMap::new(),
650                overall: OperationStatistics {
651                    total_count: 0,
652                    success_count: 0,
653                    avg_duration_ms: 0.0,
654                    median_duration_ms: 0.0,
655                    p95_duration_ms: 0.0,
656                    p99_duration_ms: 0.0,
657                    std_deviation_ms: 0.0,
658                    operations_per_second: 0.0,
659                },
660                trends: TrendAnalysis {
661                    performance_trend: TrendDirection::Stable,
662                    error_rate_trend: TrendDirection::Stable,
663                    throughput_trend: TrendDirection::Stable,
664                    memory_trend: TrendDirection::Stable,
665                    analysis_period_hours: 24,
666                },
667                last_updated: chrono::Utc::now(),
668            })),
669            config,
670            max_data_points,
671        }
672    }
673
674    /// Record a performance data point
675    pub async fn record_data_point(&self, data_point: PerformanceDataPoint) {
676        let mut data_points = self.data_points.write().await;
677        data_points.push_back(data_point);
678
679        // Limit data points
680        while data_points.len() > self.max_data_points {
681            data_points.pop_front();
682        }
683
684        // Update statistics
685        self.update_statistics().await;
686    }
687
688    /// Get current statistics
689    pub async fn get_statistics(&self) -> PerformanceStatistics {
690        self.statistics.read().await.clone()
691    }
692
693    /// Update aggregated statistics
694    async fn update_statistics(&self) {
695        let data_points = self.data_points.read().await;
696
697        if data_points.is_empty() {
698            return;
699        }
700
701        let mut by_operation_type: HashMap<String, Vec<&PerformanceDataPoint>> = HashMap::new();
702        let mut all_points = Vec::new();
703
704        // Group by operation type
705        for point in data_points.iter() {
706            by_operation_type
707                .entry(point.operation_type.clone())
708                .or_insert_with(Vec::new)
709                .push(point);
710            all_points.push(point);
711        }
712
713        let mut statistics = self.statistics.write().await;
714
715        // Calculate statistics for each operation type
716        for (op_type, points) in by_operation_type {
717            let stats = self.calculate_operation_statistics(&points);
718            statistics.by_operation_type.insert(op_type, stats);
719        }
720
721        // Calculate overall statistics
722        statistics.overall = self.calculate_operation_statistics(&all_points);
723        statistics.last_updated = chrono::Utc::now();
724    }
725
726    /// Calculate statistics for a set of data points
727    fn calculate_operation_statistics(
728        &self,
729        points: &[&PerformanceDataPoint],
730    ) -> OperationStatistics {
731        if points.is_empty() {
732            return OperationStatistics {
733                total_count: 0,
734                success_count: 0,
735                avg_duration_ms: 0.0,
736                median_duration_ms: 0.0,
737                p95_duration_ms: 0.0,
738                p99_duration_ms: 0.0,
739                std_deviation_ms: 0.0,
740                operations_per_second: 0.0,
741            };
742        }
743
744        let total_count = points.len() as u64;
745        let success_count = points.iter().filter(|p| p.success).count() as u64;
746
747        let mut durations: Vec<u64> = points.iter().map(|p| p.duration_ms).collect();
748        durations.sort();
749
750        let avg_duration_ms = durations.iter().sum::<u64>() as f64 / durations.len() as f64;
751        let median_duration_ms = if durations.len() % 2 == 0 {
752            (durations[durations.len() / 2 - 1] + durations[durations.len() / 2]) as f64 / 2.0
753        } else {
754            durations[durations.len() / 2] as f64
755        };
756
757        let p95_index = ((durations.len() as f64) * 0.95) as usize;
758        let p99_index = ((durations.len() as f64) * 0.99) as usize;
759
760        let p95_duration_ms = durations
761            .get(p95_index.min(durations.len() - 1))
762            .unwrap_or(&0) as &u64;
763        let p99_duration_ms = durations
764            .get(p99_index.min(durations.len() - 1))
765            .unwrap_or(&0) as &u64;
766
767        // Calculate standard deviation
768        let variance = durations
769            .iter()
770            .map(|d| (*d as f64 - avg_duration_ms).powi(2))
771            .sum::<f64>()
772            / durations.len() as f64;
773        let std_deviation_ms = variance.sqrt();
774
775        // Calculate operations per second (simplified)
776        let time_span_secs = if points.len() > 1 {
777            let first = points.first().unwrap().timestamp;
778            let last = points.last().unwrap().timestamp;
779            last.signed_duration_since(first).num_seconds().max(1) as f64
780        } else {
781            1.0
782        };
783        let operations_per_second = total_count as f64 / time_span_secs;
784
785        OperationStatistics {
786            total_count,
787            success_count,
788            avg_duration_ms,
789            median_duration_ms,
790            p95_duration_ms: *p95_duration_ms as f64,
791            p99_duration_ms: *p99_duration_ms as f64,
792            std_deviation_ms,
793            operations_per_second,
794        }
795    }
796}
797
798impl MetricsCollector {
799    /// Create new metrics collector
800    pub async fn new(config: MonitoringConfig) -> RragResult<Self> {
801        let collector = Self {
802            current_metrics: Arc::new(RwLock::new(IncrementalMetrics::new())),
803            metrics_history: Arc::new(RwLock::new(VecDeque::new())),
804            performance_tracker: Arc::new(PerformanceTracker::new(config.clone(), 10000)),
805            config: config.clone(),
806            collection_stats: Arc::new(RwLock::new(CollectionStatistics {
807                total_collections: 0,
808                failed_collections: 0,
809                avg_collection_time_ms: 0.0,
810                last_collection: chrono::Utc::now(),
811                collection_success_rate: 1.0,
812            })),
813            task_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
814        };
815
816        if config.enable_performance_metrics {
817            collector.start_collection_tasks().await?;
818        }
819
820        Ok(collector)
821    }
822
823    /// Get current metrics
824    pub async fn get_current_metrics(&self) -> IncrementalMetrics {
825        self.current_metrics.read().await.clone()
826    }
827
828    /// Get metrics history
829    pub async fn get_metrics_history(&self, limit: Option<usize>) -> Vec<IncrementalMetrics> {
830        let history = self.metrics_history.read().await;
831        let limit = limit.unwrap_or(history.len());
832        history.iter().rev().take(limit).cloned().collect()
833    }
834
835    /// Update metrics
836    pub async fn update_metrics(&self, update: MetricsUpdate) -> RragResult<()> {
837        let mut current = self.current_metrics.write().await;
838        current.update(update);
839
840        // Add to history
841        let mut history = self.metrics_history.write().await;
842        history.push_back(current.clone());
843
844        // Limit history size based on retention
845        let max_history_size = (self.config.metrics_retention_days as usize) * 24 * 60 * 60
846            / (self.config.metrics_interval_secs as usize);
847        while history.len() > max_history_size {
848            history.pop_front();
849        }
850
851        Ok(())
852    }
853
854    /// Record performance data
855    pub async fn record_performance(&self, data_point: PerformanceDataPoint) -> RragResult<()> {
856        self.performance_tracker.record_data_point(data_point).await;
857        Ok(())
858    }
859
860    /// Get performance statistics
861    pub async fn get_performance_stats(&self) -> PerformanceStatistics {
862        self.performance_tracker.get_statistics().await
863    }
864
865    /// Health check
866    pub async fn health_check(&self) -> RragResult<bool> {
867        let handles = self.task_handles.lock().await;
868        let all_running = handles.iter().all(|handle| !handle.is_finished());
869
870        let stats = self.collection_stats.read().await;
871        let healthy_collection = stats.collection_success_rate > 0.8;
872
873        Ok(all_running && healthy_collection)
874    }
875
876    /// Start background collection tasks
877    async fn start_collection_tasks(&self) -> RragResult<()> {
878        let mut handles = self.task_handles.lock().await;
879
880        // Metrics collection task
881        handles.push(self.start_metrics_collection_task().await);
882
883        // Health monitoring task
884        if self.config.enable_health_monitoring {
885            handles.push(self.start_health_monitoring_task().await);
886        }
887
888        // Export task
889        if self.config.export_config.enable_json_export
890            || self.config.export_config.enable_prometheus
891        {
892            handles.push(self.start_export_task().await);
893        }
894
895        Ok(())
896    }
897
898    /// Start metrics collection background task
899    async fn start_metrics_collection_task(&self) -> tokio::task::JoinHandle<()> {
900        let current_metrics = Arc::clone(&self.current_metrics);
901        let collection_stats = Arc::clone(&self.collection_stats);
902        let interval = self.config.metrics_interval_secs;
903
904        tokio::spawn(async move {
905            let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
906
907            loop {
908                timer.tick().await;
909
910                let start_time = std::time::Instant::now();
911                let collection_successful = {
912                    // Collect system metrics (simplified)
913                    let update = MetricsUpdate {
914                        indexing_metrics: Some(IndexingMetrics {
915                            documents_per_second: 10.0, // Would be actual measurement
916                            chunks_per_second: 50.0,
917                            embeddings_per_second: 50.0,
918                            avg_indexing_time_ms: 100.0,
919                            index_growth_rate_bps: 1024.0,
920                            batch_efficiency: 0.95,
921                            change_detection_accuracy: 0.98,
922                            vector_update_efficiency: 0.92,
923                        }),
924                        system_metrics: Some(SystemMetrics {
925                            cpu_usage_percent: 45.0,                      // Would be actual measurement
926                            memory_usage_bytes: 512 * 1024 * 1024,        // 512MB
927                            available_memory_bytes: 1024 * 1024 * 1024,   // 1GB
928                            storage_usage_bytes: 10 * 1024 * 1024 * 1024, // 10GB
929                            available_storage_bytes: 90 * 1024 * 1024 * 1024, // 90GB
930                            network_io_bps: 1024.0 * 100.0,               // 100KB/s
931                            disk_io_ops: 50.0,
932                            active_connections: 10,
933                        }),
934                        operation_metrics: None,
935                        health_metrics: None,
936                        error_metrics: None,
937                        custom_metrics: HashMap::new(),
938                    };
939
940                    // Update metrics
941                    let mut metrics = current_metrics.write().await;
942                    metrics.update(update);
943                    true
944                };
945
946                let collection_time = start_time.elapsed().as_millis() as f64;
947
948                // Update collection statistics
949                let mut stats = collection_stats.write().await;
950                stats.total_collections += 1;
951                if !collection_successful {
952                    stats.failed_collections += 1;
953                }
954                stats.avg_collection_time_ms =
955                    (stats.avg_collection_time_ms + collection_time) / 2.0;
956                stats.last_collection = chrono::Utc::now();
957                stats.collection_success_rate = (stats.total_collections - stats.failed_collections)
958                    as f64
959                    / stats.total_collections as f64;
960            }
961        })
962    }
963
964    /// Start health monitoring task
965    async fn start_health_monitoring_task(&self) -> tokio::task::JoinHandle<()> {
966        let current_metrics = Arc::clone(&self.current_metrics);
967        let interval = self.config.health_check_interval_secs;
968
969        tokio::spawn(async move {
970            let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
971
972            loop {
973                timer.tick().await;
974
975                // Perform health checks and update health metrics
976                let health_update = HealthMetrics {
977                    overall_health_score: 0.95,       // Would be calculated
978                    component_health: HashMap::new(), // Would include component scores
979                    service_availability: 0.99,
980                    data_consistency_score: 0.98,
981                    performance_score: 0.92,
982                    last_health_check: chrono::Utc::now(),
983                };
984
985                let mut metrics = current_metrics.write().await;
986                metrics.health_metrics = health_update;
987            }
988        })
989    }
990
991    /// Start export task
992    async fn start_export_task(&self) -> tokio::task::JoinHandle<()> {
993        let current_metrics = Arc::clone(&self.current_metrics);
994        let export_config = self.config.export_config.clone();
995
996        tokio::spawn(async move {
997            let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(
998                export_config.export_interval_secs,
999            ));
1000
1001            loop {
1002                timer.tick().await;
1003
1004                if export_config.enable_json_export {
1005                    let metrics = current_metrics.read().await;
1006                    // Export metrics as JSON (simplified)
1007                    match serde_json::to_string_pretty(&*metrics) {
1008                        Ok(json) => {
1009                            // Would export to configured endpoint
1010                            tracing::debug!("Exported metrics: {} chars", json.len());
1011                        }
1012                        Err(e) => {
1013                            tracing::debug!("Failed to serialize metrics: {}", e);
1014                        }
1015                    }
1016                }
1017            }
1018        })
1019    }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use super::*;
1025
1026    #[tokio::test]
1027    async fn test_metrics_creation() {
1028        let metrics = IncrementalMetrics::new();
1029        assert!(!metrics.system_id.is_empty());
1030        assert_eq!(metrics.health_metrics.overall_health_score, 1.0);
1031    }
1032
1033    #[tokio::test]
1034    async fn test_performance_tracker() {
1035        let config = MonitoringConfig::default();
1036        let tracker = PerformanceTracker::new(config, 100);
1037
1038        let data_point = PerformanceDataPoint {
1039            timestamp: chrono::Utc::now(),
1040            operation_type: "indexing".to_string(),
1041            duration_ms: 100,
1042            memory_usage_mb: 50.0,
1043            success: true,
1044            metadata: HashMap::new(),
1045        };
1046
1047        tracker.record_data_point(data_point).await;
1048
1049        let stats = tracker.get_statistics().await;
1050        assert_eq!(stats.overall.total_count, 1);
1051        assert_eq!(stats.overall.success_count, 1);
1052    }
1053
1054    #[tokio::test]
1055    async fn test_metrics_collector() {
1056        let config = MonitoringConfig {
1057            enable_performance_metrics: false, // Disable for test
1058            ..MonitoringConfig::default()
1059        };
1060
1061        let collector = MetricsCollector::new(config).await.unwrap();
1062        assert!(collector.health_check().await.unwrap());
1063
1064        let metrics = collector.get_current_metrics().await;
1065        assert!(!metrics.system_id.is_empty());
1066    }
1067
1068    #[tokio::test]
1069    async fn test_metrics_update() {
1070        let config = MonitoringConfig {
1071            enable_performance_metrics: false,
1072            ..MonitoringConfig::default()
1073        };
1074
1075        let collector = MetricsCollector::new(config).await.unwrap();
1076
1077        let update = MetricsUpdate {
1078            indexing_metrics: Some(IndexingMetrics {
1079                documents_per_second: 20.0,
1080                chunks_per_second: 100.0,
1081                embeddings_per_second: 100.0,
1082                avg_indexing_time_ms: 50.0,
1083                index_growth_rate_bps: 2048.0,
1084                batch_efficiency: 0.98,
1085                change_detection_accuracy: 0.99,
1086                vector_update_efficiency: 0.95,
1087            }),
1088            system_metrics: None,
1089            operation_metrics: None,
1090            health_metrics: None,
1091            error_metrics: None,
1092            custom_metrics: HashMap::new(),
1093        };
1094
1095        collector.update_metrics(update).await.unwrap();
1096
1097        let metrics = collector.get_current_metrics().await;
1098        assert_eq!(metrics.indexing_metrics.documents_per_second, 20.0);
1099    }
1100
1101    #[test]
1102    fn test_trend_directions() {
1103        let directions = vec![
1104            TrendDirection::Improving,
1105            TrendDirection::Stable,
1106            TrendDirection::Degrading,
1107            TrendDirection::Unknown,
1108        ];
1109
1110        // Ensure all directions are different
1111        for (i, dir1) in directions.iter().enumerate() {
1112            for (j, dir2) in directions.iter().enumerate() {
1113                if i != j {
1114                    assert_ne!(format!("{:?}", dir1), format!("{:?}", dir2));
1115                }
1116            }
1117        }
1118    }
1119
1120    #[test]
1121    fn test_export_formats() {
1122        let formats = vec![
1123            ExportFormat::Prometheus,
1124            ExportFormat::Json,
1125            ExportFormat::InfluxDB,
1126            ExportFormat::StatsD,
1127            ExportFormat::Custom("custom".to_string()),
1128        ];
1129
1130        // Ensure all formats are different
1131        for (i, fmt1) in formats.iter().enumerate() {
1132            for (j, fmt2) in formats.iter().enumerate() {
1133                if i != j {
1134                    assert_ne!(format!("{:?}", fmt1), format!("{:?}", fmt2));
1135                }
1136            }
1137        }
1138    }
1139}