1use parking_lot::RwLock;
31use serde::{Deserialize, Serialize};
32use std::collections::VecDeque;
33use std::time::{Duration, Instant};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub enum TimeWindow {
38 Second,
40 Minute,
42 Hour,
44 Day,
46}
47
48impl TimeWindow {
49 pub fn duration(&self) -> Duration {
51 match self {
52 TimeWindow::Second => Duration::from_secs(1),
53 TimeWindow::Minute => Duration::from_secs(60),
54 TimeWindow::Hour => Duration::from_secs(3600),
55 TimeWindow::Day => Duration::from_secs(86400),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct AggregatorConfig {
63 pub max_data_points: usize,
65
66 pub retention_period: Duration,
68
69 pub enable_percentiles: bool,
71
72 pub enable_trends: bool,
74
75 pub sample_rate: usize,
77}
78
79impl Default for AggregatorConfig {
80 fn default() -> Self {
81 Self {
82 max_data_points: 10000,
83 retention_period: Duration::from_secs(3600), enable_percentiles: true,
85 enable_trends: true,
86 sample_rate: 1,
87 }
88 }
89}
90
91impl AggregatorConfig {
92 pub fn realtime() -> Self {
94 Self {
95 max_data_points: 1000,
96 retention_period: Duration::from_secs(300), enable_percentiles: true,
98 enable_trends: false,
99 sample_rate: 1,
100 }
101 }
102
103 pub fn longterm() -> Self {
105 Self {
106 max_data_points: 50000,
107 retention_period: Duration::from_secs(86400 * 7), enable_percentiles: false,
109 enable_trends: true,
110 sample_rate: 10, }
112 }
113
114 pub fn balanced() -> Self {
116 Self {
117 max_data_points: 5000,
118 retention_period: Duration::from_secs(3600), enable_percentiles: true,
120 enable_trends: true,
121 sample_rate: 5,
122 }
123 }
124}
125
126#[derive(Debug, Clone, Copy)]
128struct DataPoint {
129 value: f64,
130 timestamp: Instant,
131}
132
133#[derive(Debug)]
135struct TimeSeries {
136 data: VecDeque<DataPoint>,
137 sample_counter: usize,
138}
139
140impl TimeSeries {
141 fn new(capacity: usize) -> Self {
142 Self {
143 data: VecDeque::with_capacity(capacity),
144 sample_counter: 0,
145 }
146 }
147
148 fn add(&mut self, value: f64, max_points: usize, sample_rate: usize) {
149 self.sample_counter += 1;
150 if !self.sample_counter.is_multiple_of(sample_rate) {
151 return;
152 }
153
154 let point = DataPoint {
155 value,
156 timestamp: Instant::now(),
157 };
158
159 self.data.push_back(point);
160
161 while self.data.len() > max_points {
163 self.data.pop_front();
164 }
165 }
166
167 fn cleanup_old(&mut self, retention: Duration) {
168 let now = Instant::now();
169 while let Some(point) = self.data.front() {
170 if now.duration_since(point.timestamp) > retention {
171 self.data.pop_front();
172 } else {
173 break;
174 }
175 }
176 }
177
178 fn get_values_in_window(&self, window: Duration) -> Vec<f64> {
179 let now = Instant::now();
180 self.data
181 .iter()
182 .filter(|p| now.duration_since(p.timestamp) <= window)
183 .map(|p| p.value)
184 .collect()
185 }
186}
187
188#[derive(Debug, Clone, Default, Serialize, Deserialize)]
190pub struct MetricStatistics {
191 pub count: usize,
193
194 pub min: f64,
196
197 pub max: f64,
199
200 pub avg: f64,
202
203 pub stddev: f64,
205
206 pub p50: f64,
208
209 pub p95: f64,
211
212 pub p99: f64,
214
215 pub trend: f64,
217}
218
219#[derive(Debug, Clone, Default, Serialize, Deserialize)]
221pub struct AggregatedStatistics {
222 pub bandwidth: MetricStatistics,
224
225 pub latency: MetricStatistics,
227
228 pub connection_rate: MetricStatistics,
230
231 pub query_rate: MetricStatistics,
233
234 pub error_rate: MetricStatistics,
236}
237
238pub struct MetricsAggregator {
240 config: AggregatorConfig,
241 bandwidth: RwLock<TimeSeries>,
242 latency: RwLock<TimeSeries>,
243 connections: RwLock<TimeSeries>,
244 queries: RwLock<TimeSeries>,
245 errors: RwLock<TimeSeries>,
246}
247
248impl MetricsAggregator {
249 pub fn new(config: AggregatorConfig) -> Self {
251 let capacity = config.max_data_points;
252 Self {
253 config,
254 bandwidth: RwLock::new(TimeSeries::new(capacity)),
255 latency: RwLock::new(TimeSeries::new(capacity)),
256 connections: RwLock::new(TimeSeries::new(capacity)),
257 queries: RwLock::new(TimeSeries::new(capacity)),
258 errors: RwLock::new(TimeSeries::new(capacity)),
259 }
260 }
261
262 pub fn record_bandwidth(&self, bytes: u64) {
264 let mut series = self.bandwidth.write();
265 series.add(
266 bytes as f64,
267 self.config.max_data_points,
268 self.config.sample_rate,
269 );
270 }
271
272 pub fn record_latency(&self, ms: u64) {
274 let mut series = self.latency.write();
275 series.add(
276 ms as f64,
277 self.config.max_data_points,
278 self.config.sample_rate,
279 );
280 }
281
282 pub fn record_connection_event(&self) {
284 let mut series = self.connections.write();
285 series.add(1.0, self.config.max_data_points, self.config.sample_rate);
286 }
287
288 pub fn record_query_event(&self) {
290 let mut series = self.queries.write();
291 series.add(1.0, self.config.max_data_points, self.config.sample_rate);
292 }
293
294 pub fn record_error_event(&self) {
296 let mut series = self.errors.write();
297 series.add(1.0, self.config.max_data_points, self.config.sample_rate);
298 }
299
300 pub fn get_statistics(&self, window: TimeWindow) -> AggregatedStatistics {
302 let duration = window.duration();
303
304 AggregatedStatistics {
305 bandwidth: self.compute_statistics(&self.bandwidth, duration),
306 latency: self.compute_statistics(&self.latency, duration),
307 connection_rate: self.compute_statistics(&self.connections, duration),
308 query_rate: self.compute_statistics(&self.queries, duration),
309 error_rate: self.compute_statistics(&self.errors, duration),
310 }
311 }
312
313 fn compute_statistics(
315 &self,
316 series: &RwLock<TimeSeries>,
317 window: Duration,
318 ) -> MetricStatistics {
319 let data = series.read();
320 let values = data.get_values_in_window(window);
321
322 if values.is_empty() {
323 return MetricStatistics::default();
324 }
325
326 let count = values.len();
327 let sum: f64 = values.iter().sum();
328 let avg = sum / count as f64;
329
330 let min = values.iter().copied().fold(f64::INFINITY, f64::min);
331 let max = values.iter().copied().fold(f64::NEG_INFINITY, f64::max);
332
333 let variance: f64 = values.iter().map(|v| (v - avg).powi(2)).sum::<f64>() / count as f64;
335 let stddev = variance.sqrt();
336
337 let (p50, p95, p99) = if self.config.enable_percentiles {
339 let mut sorted = values.clone();
340 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
341 (
342 percentile(&sorted, 0.50),
343 percentile(&sorted, 0.95),
344 percentile(&sorted, 0.99),
345 )
346 } else {
347 (avg, max, max)
348 };
349
350 let trend = if self.config.enable_trends {
352 calculate_trend(&values)
353 } else {
354 0.0
355 };
356
357 MetricStatistics {
358 count,
359 min,
360 max,
361 avg,
362 stddev,
363 p50,
364 p95,
365 p99,
366 trend,
367 }
368 }
369
370 pub fn cleanup(&self) {
372 let retention = self.config.retention_period;
373 self.bandwidth.write().cleanup_old(retention);
374 self.latency.write().cleanup_old(retention);
375 self.connections.write().cleanup_old(retention);
376 self.queries.write().cleanup_old(retention);
377 self.errors.write().cleanup_old(retention);
378 }
379
380 pub fn data_point_count(&self) -> usize {
382 self.bandwidth.read().data.len()
383 + self.latency.read().data.len()
384 + self.connections.read().data.len()
385 + self.queries.read().data.len()
386 + self.errors.read().data.len()
387 }
388
389 pub fn clear(&self) {
391 self.bandwidth.write().data.clear();
392 self.latency.write().data.clear();
393 self.connections.write().data.clear();
394 self.queries.write().data.clear();
395 self.errors.write().data.clear();
396 }
397}
398
399fn percentile(sorted_values: &[f64], p: f64) -> f64 {
401 if sorted_values.is_empty() {
402 return 0.0;
403 }
404
405 let index = (p * (sorted_values.len() - 1) as f64) as usize;
406 sorted_values[index]
407}
408
409fn calculate_trend(values: &[f64]) -> f64 {
411 if values.len() < 2 {
412 return 0.0;
413 }
414
415 let n = values.len() as f64;
416 let x_mean = (n - 1.0) / 2.0;
417 let y_mean = values.iter().sum::<f64>() / n;
418
419 let mut numerator = 0.0;
420 let mut denominator = 0.0;
421
422 for (i, &y) in values.iter().enumerate() {
423 let x = i as f64;
424 numerator += (x - x_mean) * (y - y_mean);
425 denominator += (x - x_mean).powi(2);
426 }
427
428 if denominator.abs() < 1e-10 {
429 return 0.0;
430 }
431
432 numerator / denominator
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_config_presets() {
441 let realtime = AggregatorConfig::realtime();
442 assert_eq!(realtime.max_data_points, 1000);
443 assert!(!realtime.enable_trends);
444
445 let longterm = AggregatorConfig::longterm();
446 assert_eq!(longterm.max_data_points, 50000);
447 assert!(longterm.enable_trends);
448
449 let balanced = AggregatorConfig::balanced();
450 assert_eq!(balanced.sample_rate, 5);
451 }
452
453 #[test]
454 fn test_time_window_duration() {
455 assert_eq!(TimeWindow::Second.duration(), Duration::from_secs(1));
456 assert_eq!(TimeWindow::Minute.duration(), Duration::from_secs(60));
457 assert_eq!(TimeWindow::Hour.duration(), Duration::from_secs(3600));
458 assert_eq!(TimeWindow::Day.duration(), Duration::from_secs(86400));
459 }
460
461 #[test]
462 fn test_record_bandwidth() {
463 let config = AggregatorConfig::default();
464 let aggregator = MetricsAggregator::new(config);
465
466 aggregator.record_bandwidth(1024);
467 aggregator.record_bandwidth(2048);
468
469 let stats = aggregator.get_statistics(TimeWindow::Minute);
470 assert_eq!(stats.bandwidth.count, 2);
471 assert_eq!(stats.bandwidth.min, 1024.0);
472 assert_eq!(stats.bandwidth.max, 2048.0);
473 }
474
475 #[test]
476 fn test_record_latency() {
477 let config = AggregatorConfig::default();
478 let aggregator = MetricsAggregator::new(config);
479
480 aggregator.record_latency(50);
481 aggregator.record_latency(100);
482 aggregator.record_latency(75);
483
484 let stats = aggregator.get_statistics(TimeWindow::Minute);
485 assert_eq!(stats.latency.count, 3);
486 assert_eq!(stats.latency.min, 50.0);
487 assert_eq!(stats.latency.max, 100.0);
488 assert_eq!(stats.latency.avg, 75.0);
489 }
490
491 #[test]
492 fn test_connection_events() {
493 let config = AggregatorConfig::default();
494 let aggregator = MetricsAggregator::new(config);
495
496 for _ in 0..5 {
497 aggregator.record_connection_event();
498 }
499
500 let stats = aggregator.get_statistics(TimeWindow::Minute);
501 assert_eq!(stats.connection_rate.count, 5);
502 }
503
504 #[test]
505 fn test_query_events() {
506 let config = AggregatorConfig::default();
507 let aggregator = MetricsAggregator::new(config);
508
509 for _ in 0..10 {
510 aggregator.record_query_event();
511 }
512
513 let stats = aggregator.get_statistics(TimeWindow::Minute);
514 assert_eq!(stats.query_rate.count, 10);
515 }
516
517 #[test]
518 fn test_error_events() {
519 let config = AggregatorConfig::default();
520 let aggregator = MetricsAggregator::new(config);
521
522 for _ in 0..3 {
523 aggregator.record_error_event();
524 }
525
526 let stats = aggregator.get_statistics(TimeWindow::Minute);
527 assert_eq!(stats.error_rate.count, 3);
528 }
529
530 #[test]
531 fn test_percentile_calculation() {
532 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
533
534 assert_eq!(percentile(&values, 0.50), 5.0);
535 assert_eq!(percentile(&values, 0.95), 9.0); }
537
538 #[test]
539 fn test_trend_calculation() {
540 let increasing = vec![1.0, 2.0, 3.0, 4.0, 5.0];
542 let trend = calculate_trend(&increasing);
543 assert!(trend > 0.0);
544
545 let decreasing = vec![5.0, 4.0, 3.0, 2.0, 1.0];
547 let trend = calculate_trend(&decreasing);
548 assert!(trend < 0.0);
549
550 let flat = vec![3.0, 3.0, 3.0, 3.0, 3.0];
552 let trend = calculate_trend(&flat);
553 assert!(trend.abs() < 0.01);
554 }
555
556 #[test]
557 fn test_sample_rate() {
558 let config = AggregatorConfig {
559 sample_rate: 2, ..Default::default()
561 };
562
563 let aggregator = MetricsAggregator::new(config);
564
565 for _ in 0..10 {
566 aggregator.record_bandwidth(1024);
567 }
568
569 let stats = aggregator.get_statistics(TimeWindow::Minute);
570 assert_eq!(stats.bandwidth.count, 5); }
572
573 #[test]
574 fn test_data_point_count() {
575 let config = AggregatorConfig::default();
576 let aggregator = MetricsAggregator::new(config);
577
578 aggregator.record_bandwidth(1024);
579 aggregator.record_latency(50);
580 aggregator.record_connection_event();
581
582 assert_eq!(aggregator.data_point_count(), 3);
583 }
584
585 #[test]
586 fn test_clear() {
587 let config = AggregatorConfig::default();
588 let aggregator = MetricsAggregator::new(config);
589
590 aggregator.record_bandwidth(1024);
591 aggregator.record_latency(50);
592
593 assert!(aggregator.data_point_count() > 0);
594
595 aggregator.clear();
596 assert_eq!(aggregator.data_point_count(), 0);
597 }
598
599 #[test]
600 fn test_max_data_points() {
601 let config = AggregatorConfig {
602 max_data_points: 5,
603 ..Default::default()
604 };
605
606 let aggregator = MetricsAggregator::new(config);
607
608 for i in 0..10 {
609 aggregator.record_bandwidth(i * 100);
610 }
611
612 let count = aggregator.bandwidth.read().data.len();
614 assert_eq!(count, 5);
615 }
616
617 #[test]
618 fn test_statistics_with_no_data() {
619 let config = AggregatorConfig::default();
620 let aggregator = MetricsAggregator::new(config);
621
622 let stats = aggregator.get_statistics(TimeWindow::Minute);
623 assert_eq!(stats.bandwidth.count, 0);
624 assert_eq!(stats.bandwidth.avg, 0.0);
625 }
626}