ipfrs_network/
metrics_aggregator.rs

1//! Metrics time-series aggregator for historical tracking and analysis
2//!
3//! This module provides time-series aggregation of network metrics with:
4//! - Configurable time windows (second, minute, hour, day)
5//! - Statistical analysis (min, max, avg, percentiles)
6//! - Historical data retention
7//! - Trend analysis and forecasting
8//! - Multiple aggregation strategies
9//!
10//! # Examples
11//!
12//! ```
13//! use ipfrs_network::metrics_aggregator::{MetricsAggregator, AggregatorConfig, TimeWindow};
14//! use std::time::Duration;
15//!
16//! let config = AggregatorConfig::default();
17//! let mut aggregator = MetricsAggregator::new(config);
18//!
19//! // Record metrics
20//! aggregator.record_bandwidth(1024);
21//! aggregator.record_latency(50);
22//! aggregator.record_connection_event();
23//!
24//! // Get statistics
25//! let stats = aggregator.get_statistics(TimeWindow::Minute);
26//! println!("Avg bandwidth: {:.2} B/s", stats.bandwidth.avg);
27//! println!("P95 latency: {} ms", stats.latency.p95);
28//! ```
29
30use parking_lot::RwLock;
31use serde::{Deserialize, Serialize};
32use std::collections::VecDeque;
33use std::time::{Duration, Instant};
34
35/// Time window for aggregation
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub enum TimeWindow {
38    /// 1 second window
39    Second,
40    /// 1 minute window
41    Minute,
42    /// 1 hour window
43    Hour,
44    /// 1 day window
45    Day,
46}
47
48impl TimeWindow {
49    /// Get the duration for this time window
50    pub fn duration(&self) -> Duration {
51        match self {
52            TimeWindow::Second => Duration::from_secs(1),
53            TimeWindow::Minute => Duration::from_secs(60),
54            TimeWindow::Hour => Duration::from_secs(3600),
55            TimeWindow::Day => Duration::from_secs(86400),
56        }
57    }
58}
59
60/// Configuration for metrics aggregator
61#[derive(Debug, Clone)]
62pub struct AggregatorConfig {
63    /// Maximum number of data points to retain per metric
64    pub max_data_points: usize,
65
66    /// Retention period for historical data
67    pub retention_period: Duration,
68
69    /// Enable percentile calculations (more CPU intensive)
70    pub enable_percentiles: bool,
71
72    /// Enable trend analysis
73    pub enable_trends: bool,
74
75    /// Sample rate for high-frequency metrics (1 = all, 10 = 1 in 10)
76    pub sample_rate: usize,
77}
78
79impl Default for AggregatorConfig {
80    fn default() -> Self {
81        Self {
82            max_data_points: 10000,
83            retention_period: Duration::from_secs(3600), // 1 hour
84            enable_percentiles: true,
85            enable_trends: true,
86            sample_rate: 1,
87        }
88    }
89}
90
91impl AggregatorConfig {
92    /// Configuration for real-time monitoring (short retention, high detail)
93    pub fn realtime() -> Self {
94        Self {
95            max_data_points: 1000,
96            retention_period: Duration::from_secs(300), // 5 minutes
97            enable_percentiles: true,
98            enable_trends: false,
99            sample_rate: 1,
100        }
101    }
102
103    /// Configuration for long-term storage (extended retention, lower detail)
104    pub fn longterm() -> Self {
105        Self {
106            max_data_points: 50000,
107            retention_period: Duration::from_secs(86400 * 7), // 7 days
108            enable_percentiles: false,
109            enable_trends: true,
110            sample_rate: 10, // Sample 1 in 10
111        }
112    }
113
114    /// Configuration for high-frequency metrics (balanced)
115    pub fn balanced() -> Self {
116        Self {
117            max_data_points: 5000,
118            retention_period: Duration::from_secs(3600), // 1 hour
119            enable_percentiles: true,
120            enable_trends: true,
121            sample_rate: 5,
122        }
123    }
124}
125
126/// A single data point with timestamp
127#[derive(Debug, Clone, Copy)]
128struct DataPoint {
129    value: f64,
130    timestamp: Instant,
131}
132
133/// Time series data for a metric
134#[derive(Debug)]
135struct TimeSeries {
136    data: VecDeque<DataPoint>,
137    sample_counter: usize,
138}
139
140impl TimeSeries {
141    fn new(capacity: usize) -> Self {
142        Self {
143            data: VecDeque::with_capacity(capacity),
144            sample_counter: 0,
145        }
146    }
147
148    fn add(&mut self, value: f64, max_points: usize, sample_rate: usize) {
149        self.sample_counter += 1;
150        if !self.sample_counter.is_multiple_of(sample_rate) {
151            return;
152        }
153
154        let point = DataPoint {
155            value,
156            timestamp: Instant::now(),
157        };
158
159        self.data.push_back(point);
160
161        // Remove oldest points if we exceed max
162        while self.data.len() > max_points {
163            self.data.pop_front();
164        }
165    }
166
167    fn cleanup_old(&mut self, retention: Duration) {
168        let now = Instant::now();
169        while let Some(point) = self.data.front() {
170            if now.duration_since(point.timestamp) > retention {
171                self.data.pop_front();
172            } else {
173                break;
174            }
175        }
176    }
177
178    fn get_values_in_window(&self, window: Duration) -> Vec<f64> {
179        let now = Instant::now();
180        self.data
181            .iter()
182            .filter(|p| now.duration_since(p.timestamp) <= window)
183            .map(|p| p.value)
184            .collect()
185    }
186}
187
188/// Statistics for a metric
189#[derive(Debug, Clone, Default, Serialize, Deserialize)]
190pub struct MetricStatistics {
191    /// Number of samples
192    pub count: usize,
193
194    /// Minimum value
195    pub min: f64,
196
197    /// Maximum value
198    pub max: f64,
199
200    /// Average value
201    pub avg: f64,
202
203    /// Standard deviation
204    pub stddev: f64,
205
206    /// 50th percentile (median)
207    pub p50: f64,
208
209    /// 95th percentile
210    pub p95: f64,
211
212    /// 99th percentile
213    pub p99: f64,
214
215    /// Current trend (positive = increasing, negative = decreasing)
216    pub trend: f64,
217}
218
219/// Aggregated statistics for all metrics
220#[derive(Debug, Clone, Default, Serialize, Deserialize)]
221pub struct AggregatedStatistics {
222    /// Bandwidth statistics (bytes/sec)
223    pub bandwidth: MetricStatistics,
224
225    /// Latency statistics (milliseconds)
226    pub latency: MetricStatistics,
227
228    /// Connection event rate (events/sec)
229    pub connection_rate: MetricStatistics,
230
231    /// Query rate (queries/sec)
232    pub query_rate: MetricStatistics,
233
234    /// Error rate (errors/sec)
235    pub error_rate: MetricStatistics,
236}
237
238/// Metrics aggregator for time-series data
239pub struct MetricsAggregator {
240    config: AggregatorConfig,
241    bandwidth: RwLock<TimeSeries>,
242    latency: RwLock<TimeSeries>,
243    connections: RwLock<TimeSeries>,
244    queries: RwLock<TimeSeries>,
245    errors: RwLock<TimeSeries>,
246}
247
248impl MetricsAggregator {
249    /// Create a new metrics aggregator
250    pub fn new(config: AggregatorConfig) -> Self {
251        let capacity = config.max_data_points;
252        Self {
253            config,
254            bandwidth: RwLock::new(TimeSeries::new(capacity)),
255            latency: RwLock::new(TimeSeries::new(capacity)),
256            connections: RwLock::new(TimeSeries::new(capacity)),
257            queries: RwLock::new(TimeSeries::new(capacity)),
258            errors: RwLock::new(TimeSeries::new(capacity)),
259        }
260    }
261
262    /// Record bandwidth measurement (bytes)
263    pub fn record_bandwidth(&self, bytes: u64) {
264        let mut series = self.bandwidth.write();
265        series.add(
266            bytes as f64,
267            self.config.max_data_points,
268            self.config.sample_rate,
269        );
270    }
271
272    /// Record latency measurement (milliseconds)
273    pub fn record_latency(&self, ms: u64) {
274        let mut series = self.latency.write();
275        series.add(
276            ms as f64,
277            self.config.max_data_points,
278            self.config.sample_rate,
279        );
280    }
281
282    /// Record connection event
283    pub fn record_connection_event(&self) {
284        let mut series = self.connections.write();
285        series.add(1.0, self.config.max_data_points, self.config.sample_rate);
286    }
287
288    /// Record query event
289    pub fn record_query_event(&self) {
290        let mut series = self.queries.write();
291        series.add(1.0, self.config.max_data_points, self.config.sample_rate);
292    }
293
294    /// Record error event
295    pub fn record_error_event(&self) {
296        let mut series = self.errors.write();
297        series.add(1.0, self.config.max_data_points, self.config.sample_rate);
298    }
299
300    /// Get statistics for a time window
301    pub fn get_statistics(&self, window: TimeWindow) -> AggregatedStatistics {
302        let duration = window.duration();
303
304        AggregatedStatistics {
305            bandwidth: self.compute_statistics(&self.bandwidth, duration),
306            latency: self.compute_statistics(&self.latency, duration),
307            connection_rate: self.compute_statistics(&self.connections, duration),
308            query_rate: self.compute_statistics(&self.queries, duration),
309            error_rate: self.compute_statistics(&self.errors, duration),
310        }
311    }
312
313    /// Compute statistics for a time series
314    fn compute_statistics(
315        &self,
316        series: &RwLock<TimeSeries>,
317        window: Duration,
318    ) -> MetricStatistics {
319        let data = series.read();
320        let values = data.get_values_in_window(window);
321
322        if values.is_empty() {
323            return MetricStatistics::default();
324        }
325
326        let count = values.len();
327        let sum: f64 = values.iter().sum();
328        let avg = sum / count as f64;
329
330        let min = values.iter().copied().fold(f64::INFINITY, f64::min);
331        let max = values.iter().copied().fold(f64::NEG_INFINITY, f64::max);
332
333        // Calculate standard deviation
334        let variance: f64 = values.iter().map(|v| (v - avg).powi(2)).sum::<f64>() / count as f64;
335        let stddev = variance.sqrt();
336
337        // Calculate percentiles if enabled
338        let (p50, p95, p99) = if self.config.enable_percentiles {
339            let mut sorted = values.clone();
340            sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
341            (
342                percentile(&sorted, 0.50),
343                percentile(&sorted, 0.95),
344                percentile(&sorted, 0.99),
345            )
346        } else {
347            (avg, max, max)
348        };
349
350        // Calculate trend if enabled
351        let trend = if self.config.enable_trends {
352            calculate_trend(&values)
353        } else {
354            0.0
355        };
356
357        MetricStatistics {
358            count,
359            min,
360            max,
361            avg,
362            stddev,
363            p50,
364            p95,
365            p99,
366            trend,
367        }
368    }
369
370    /// Cleanup old data points
371    pub fn cleanup(&self) {
372        let retention = self.config.retention_period;
373        self.bandwidth.write().cleanup_old(retention);
374        self.latency.write().cleanup_old(retention);
375        self.connections.write().cleanup_old(retention);
376        self.queries.write().cleanup_old(retention);
377        self.errors.write().cleanup_old(retention);
378    }
379
380    /// Get the number of data points currently stored
381    pub fn data_point_count(&self) -> usize {
382        self.bandwidth.read().data.len()
383            + self.latency.read().data.len()
384            + self.connections.read().data.len()
385            + self.queries.read().data.len()
386            + self.errors.read().data.len()
387    }
388
389    /// Clear all data
390    pub fn clear(&self) {
391        self.bandwidth.write().data.clear();
392        self.latency.write().data.clear();
393        self.connections.write().data.clear();
394        self.queries.write().data.clear();
395        self.errors.write().data.clear();
396    }
397}
398
399/// Calculate percentile from sorted values
400fn percentile(sorted_values: &[f64], p: f64) -> f64 {
401    if sorted_values.is_empty() {
402        return 0.0;
403    }
404
405    let index = (p * (sorted_values.len() - 1) as f64) as usize;
406    sorted_values[index]
407}
408
409/// Calculate trend using simple linear regression
410fn calculate_trend(values: &[f64]) -> f64 {
411    if values.len() < 2 {
412        return 0.0;
413    }
414
415    let n = values.len() as f64;
416    let x_mean = (n - 1.0) / 2.0;
417    let y_mean = values.iter().sum::<f64>() / n;
418
419    let mut numerator = 0.0;
420    let mut denominator = 0.0;
421
422    for (i, &y) in values.iter().enumerate() {
423        let x = i as f64;
424        numerator += (x - x_mean) * (y - y_mean);
425        denominator += (x - x_mean).powi(2);
426    }
427
428    if denominator.abs() < 1e-10 {
429        return 0.0;
430    }
431
432    numerator / denominator
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_config_presets() {
441        let realtime = AggregatorConfig::realtime();
442        assert_eq!(realtime.max_data_points, 1000);
443        assert!(!realtime.enable_trends);
444
445        let longterm = AggregatorConfig::longterm();
446        assert_eq!(longterm.max_data_points, 50000);
447        assert!(longterm.enable_trends);
448
449        let balanced = AggregatorConfig::balanced();
450        assert_eq!(balanced.sample_rate, 5);
451    }
452
453    #[test]
454    fn test_time_window_duration() {
455        assert_eq!(TimeWindow::Second.duration(), Duration::from_secs(1));
456        assert_eq!(TimeWindow::Minute.duration(), Duration::from_secs(60));
457        assert_eq!(TimeWindow::Hour.duration(), Duration::from_secs(3600));
458        assert_eq!(TimeWindow::Day.duration(), Duration::from_secs(86400));
459    }
460
461    #[test]
462    fn test_record_bandwidth() {
463        let config = AggregatorConfig::default();
464        let aggregator = MetricsAggregator::new(config);
465
466        aggregator.record_bandwidth(1024);
467        aggregator.record_bandwidth(2048);
468
469        let stats = aggregator.get_statistics(TimeWindow::Minute);
470        assert_eq!(stats.bandwidth.count, 2);
471        assert_eq!(stats.bandwidth.min, 1024.0);
472        assert_eq!(stats.bandwidth.max, 2048.0);
473    }
474
475    #[test]
476    fn test_record_latency() {
477        let config = AggregatorConfig::default();
478        let aggregator = MetricsAggregator::new(config);
479
480        aggregator.record_latency(50);
481        aggregator.record_latency(100);
482        aggregator.record_latency(75);
483
484        let stats = aggregator.get_statistics(TimeWindow::Minute);
485        assert_eq!(stats.latency.count, 3);
486        assert_eq!(stats.latency.min, 50.0);
487        assert_eq!(stats.latency.max, 100.0);
488        assert_eq!(stats.latency.avg, 75.0);
489    }
490
491    #[test]
492    fn test_connection_events() {
493        let config = AggregatorConfig::default();
494        let aggregator = MetricsAggregator::new(config);
495
496        for _ in 0..5 {
497            aggregator.record_connection_event();
498        }
499
500        let stats = aggregator.get_statistics(TimeWindow::Minute);
501        assert_eq!(stats.connection_rate.count, 5);
502    }
503
504    #[test]
505    fn test_query_events() {
506        let config = AggregatorConfig::default();
507        let aggregator = MetricsAggregator::new(config);
508
509        for _ in 0..10 {
510            aggregator.record_query_event();
511        }
512
513        let stats = aggregator.get_statistics(TimeWindow::Minute);
514        assert_eq!(stats.query_rate.count, 10);
515    }
516
517    #[test]
518    fn test_error_events() {
519        let config = AggregatorConfig::default();
520        let aggregator = MetricsAggregator::new(config);
521
522        for _ in 0..3 {
523            aggregator.record_error_event();
524        }
525
526        let stats = aggregator.get_statistics(TimeWindow::Minute);
527        assert_eq!(stats.error_rate.count, 3);
528    }
529
530    #[test]
531    fn test_percentile_calculation() {
532        let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
533
534        assert_eq!(percentile(&values, 0.50), 5.0);
535        assert_eq!(percentile(&values, 0.95), 9.0); // 95% of index 9 = 8.55 -> index 8 = 9.0
536    }
537
538    #[test]
539    fn test_trend_calculation() {
540        // Increasing trend
541        let increasing = vec![1.0, 2.0, 3.0, 4.0, 5.0];
542        let trend = calculate_trend(&increasing);
543        assert!(trend > 0.0);
544
545        // Decreasing trend
546        let decreasing = vec![5.0, 4.0, 3.0, 2.0, 1.0];
547        let trend = calculate_trend(&decreasing);
548        assert!(trend < 0.0);
549
550        // Flat trend
551        let flat = vec![3.0, 3.0, 3.0, 3.0, 3.0];
552        let trend = calculate_trend(&flat);
553        assert!(trend.abs() < 0.01);
554    }
555
556    #[test]
557    fn test_sample_rate() {
558        let config = AggregatorConfig {
559            sample_rate: 2, // Sample 1 in 2
560            ..Default::default()
561        };
562
563        let aggregator = MetricsAggregator::new(config);
564
565        for _ in 0..10 {
566            aggregator.record_bandwidth(1024);
567        }
568
569        let stats = aggregator.get_statistics(TimeWindow::Minute);
570        assert_eq!(stats.bandwidth.count, 5); // Half of 10
571    }
572
573    #[test]
574    fn test_data_point_count() {
575        let config = AggregatorConfig::default();
576        let aggregator = MetricsAggregator::new(config);
577
578        aggregator.record_bandwidth(1024);
579        aggregator.record_latency(50);
580        aggregator.record_connection_event();
581
582        assert_eq!(aggregator.data_point_count(), 3);
583    }
584
585    #[test]
586    fn test_clear() {
587        let config = AggregatorConfig::default();
588        let aggregator = MetricsAggregator::new(config);
589
590        aggregator.record_bandwidth(1024);
591        aggregator.record_latency(50);
592
593        assert!(aggregator.data_point_count() > 0);
594
595        aggregator.clear();
596        assert_eq!(aggregator.data_point_count(), 0);
597    }
598
599    #[test]
600    fn test_max_data_points() {
601        let config = AggregatorConfig {
602            max_data_points: 5,
603            ..Default::default()
604        };
605
606        let aggregator = MetricsAggregator::new(config);
607
608        for i in 0..10 {
609            aggregator.record_bandwidth(i * 100);
610        }
611
612        // Should only keep the last 5 points
613        let count = aggregator.bandwidth.read().data.len();
614        assert_eq!(count, 5);
615    }
616
617    #[test]
618    fn test_statistics_with_no_data() {
619        let config = AggregatorConfig::default();
620        let aggregator = MetricsAggregator::new(config);
621
622        let stats = aggregator.get_statistics(TimeWindow::Minute);
623        assert_eq!(stats.bandwidth.count, 0);
624        assert_eq!(stats.bandwidth.avg, 0.0);
625    }
626}