1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
//! Statistics history tracking for MAVLink router.
//!
//! This module provides structures and methods to track and aggregate
//! routing statistics over time windows.
use crate::routing::RoutingStats;
use std::collections::VecDeque;
/// Simple Stats buffer history
pub struct StatsHistory {
/// Recent N seconds samples
pub samples: VecDeque<RoutingStats>,
/// Max retention time in seconds
pub max_age_secs: u64,
}
impl StatsHistory {
/// Creates a new `StatsHistory` with the specified maximum retention time.
pub fn new(max_age_secs: u64) -> Self {
let capacity = max_age_secs as usize;
Self {
samples: VecDeque::with_capacity(capacity),
max_age_secs,
}
}
/// Add sample and clean up old data.
pub fn push(&mut self, stats: RoutingStats) {
self.samples.push_back(stats);
if let Some(latest) = self.samples.back() {
// Clean up old data exceeding max_age
let cutoff = latest.timestamp.saturating_sub(self.max_age_secs);
while let Some(oldest) = self.samples.front() {
if oldest.timestamp < cutoff {
self.samples.pop_front();
} else {
break;
}
}
}
}
/// Calculate aggregated statistics for a specified time window.
///
/// Uses binary search to efficiently find the start of the time window.
pub fn aggregate(&self, window_secs: u64) -> Option<AggregatedStats> {
if self.samples.is_empty() {
return None;
}
let latest = self.samples.back()?;
let cutoff = latest.timestamp.saturating_sub(window_secs);
// Optimization: Use binary search (partition_point) to find the start index
// because timestamps are strictly monotonically increasing.
// partition_point returns the index of the first element where the predicate is false.
// Predicate: timestamp < cutoff.
// So it finds the first element where timestamp >= cutoff.
let start_idx = self.samples.partition_point(|s| s.timestamp < cutoff);
if start_idx >= self.samples.len() {
return None;
}
let window = self.samples.range(start_idx..);
let window_len = self.samples.len() - start_idx;
if window_len == 0 {
return None;
}
// Single-pass computation of sum, min, max
let (sum_routes, min_routes, max_routes) =
window.fold((0usize, usize::MAX, 0usize), |(sum, min, max), s| {
(
sum + s.total_routes,
min.min(s.total_routes),
max.max(s.total_routes),
)
});
// Handle edge case where fold ran on empty iterator (shouldn't happen due to earlier check)
if max_routes == 0 && min_routes == usize::MAX {
return None;
}
let avg_routes = sum_routes as f64 / window_len as f64;
Some(AggregatedStats {
avg_routes,
max_routes,
min_routes,
sample_count: window_len,
})
}
}
/// Simplified aggregated stats
#[derive(Debug)]
pub struct AggregatedStats {
/// Average number of routes in the window
pub avg_routes: f64,
/// Maximum number of routes in the window
pub max_routes: usize,
/// Minimum number of routes in the window
pub min_routes: usize,
/// Number of samples in the window
pub sample_count: usize,
}
#[cfg(test)]
#[allow(clippy::expect_used)] // Allow expect() in tests for descriptive failure messages
mod tests {
use super::*;
use crate::routing::RoutingStats;
fn dummy_stats(
routes: usize,
systems: usize,
endpoints: usize,
timestamp: u64,
) -> RoutingStats {
RoutingStats {
total_routes: routes,
total_systems: systems,
total_endpoints: endpoints,
timestamp,
}
}
#[test]
fn test_stats_history_retention() {
let mut history = StatsHistory::new(60); // 1 minute retention
// Add 30 samples, 1 second apart
for i in 0..30 {
history.push(dummy_stats(i, i, i, i as u64));
}
assert_eq!(history.samples.len(), 30);
// Add 40 more samples, total 70 samples. Max age is 60 secs.
// Expect samples 0-9 to be pruned.
for i in 30..70 {
history.push(dummy_stats(i, i, i, i as u64));
}
assert_eq!(history.samples.len(), 61); // Should retain samples 9 to 69 (61 samples)
// Verify oldest remaining sample
assert_eq!(
history
.samples
.front()
.expect("History should not be empty")
.timestamp,
9
);
assert_eq!(
history
.samples
.back()
.expect("History should not be empty")
.timestamp,
69
);
// Test push with 0 retention (effectively disabled)
let mut history_zero_retention = StatsHistory::new(0);
history_zero_retention.push(dummy_stats(1, 1, 1, 1));
history_zero_retention.push(dummy_stats(2, 2, 2, 2));
assert_eq!(
history_zero_retention.samples.len(),
1,
"0 retention should only keep the latest sample"
);
}
#[test]
fn test_aggregate_window() {
let mut history = StatsHistory::new(100); // More than enough retention
// Sample every second
// Routes: 10, 20, 30, 40, 50 (timestamps 0-4)
for i in 0..5 {
history.push(dummy_stats((i + 1) * 10, 0, 0, i as u64));
}
// Current state: [t0:10, t1:20, t2:30, t3:40, t4:50]
// Aggregate 3-second window (t2, t3, t4)
// Values: 20, 30, 40, 50 (timestamps 1-4)
// sum = 140, count = 4, avg = 35.0, min = 20, max = 50
let agg_3s = history
.aggregate(3)
.expect("Aggregation should not be None");
assert_eq!(agg_3s.sample_count, 4);
assert_eq!(agg_3s.avg_routes, 35.0);
assert_eq!(agg_3s.min_routes, 20);
assert_eq!(agg_3s.max_routes, 50);
// Aggregate 5-second window (all samples)
// Values: 10, 20, 30, 40, 50
// sum = 150, count = 5, avg = 30.0, min = 10, max = 50
let agg_5s = history
.aggregate(5)
.expect("Aggregation should not be None");
assert_eq!(agg_5s.sample_count, 5);
assert_eq!(agg_5s.avg_routes, 30.0);
assert_eq!(agg_5s.min_routes, 10);
assert_eq!(agg_5s.max_routes, 50);
// Aggregate a window before any samples
assert!(history.aggregate(0).is_some()); // Latest sample's timestamp - 0 should include just the last one
let last_sample_agg = history
.aggregate(0)
.expect("Aggregation for 0-window should not be None"); // Should be only the last sample if 0-window is used as (latest - 0)
assert_eq!(last_sample_agg.sample_count, 1);
assert_eq!(last_sample_agg.avg_routes, 50.0);
// Aggregate a window larger than retention, should cover all
let agg_large = history
.aggregate(100)
.expect("Aggregation for large window should not be None");
assert_eq!(agg_large.sample_count, 5);
}
#[test]
fn test_empty_history_aggregation() {
let history = StatsHistory::new(60);
assert!(history.aggregate(60).is_none());
}
}