1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
//! Query methods for the statistics aggregator.
//!
//! This module provides methods for querying statistics from the aggregator,
//! including per-host speeds, application statistics, connection details, etc.
use ringbuf::traits::{Consumer, Observer};
use std::{collections::HashMap, net::Ipv4Addr};
use crate::stats::{
aggregate_by_app, format_connections as pairs_format_connections,
format_port_stats as ports_format_port_stats, top_ports_all as ports_top_ports_all,
top_ports_per_host as ports_top_ports_per_host, AppRegistry, AppStats, ConnectionDetails,
Direction, IpPair, PortStats, QualityMetrics, Speed, SpeedAccumulator, TcpState,
};
use super::{helpers, StatsAggregator};
impl StatsAggregator {
/// Get speed statistics per host (average over window).
///
/// Returns a map of IP addresses to their average speed over the window.
///
/// # Note
/// **DEPRECATED**: This method uses count-based averaging instead of time-weighted averaging.
/// This can produce incorrect results when tick intervals vary.
///
/// For accurate time-weighted per-host speeds, consider using `speed_per_host_instant()`
/// for the latest tick or use the pairs buffer directly with time-weighted aggregation.
#[deprecated(since = "0.2.0", note = "Use time-weighted averaging methods instead")]
#[allow(deprecated)]
pub fn speed_per_host(&self) -> HashMap<Ipv4Addr, Speed> {
helpers::aggregate_sum_speed(self.hosts_buffer.iter())
}
/// Get instantaneous speed statistics per host (latest tick).
pub fn speed_per_host_instant(&self) -> HashMap<Ipv4Addr, Speed> {
self.hosts_buffer.iter().last().cloned().unwrap_or_default()
}
/// Get peak speed statistics per host (max over window).
pub fn speed_per_host_peak(&self) -> HashMap<Ipv4Addr, Speed> {
helpers::aggregate_peak_speed(self.hosts_buffer.iter())
}
/// Get the total speed as a formatted string.
///
/// Returns the average total speed across the window, formatted as
/// "↓ XX Mib/s | ↑ XX Mib/s"
///
/// Uses time-weighted average for accuracy when tick intervals vary.
pub fn speed_str(&self) -> String {
if self.total_speed_buffer.is_empty() {
return "".to_string();
}
// Calculate weighted average by duration
let total_duration: f64 = self
.total_speed_buffer
.iter()
.map(|ts| ts.duration_secs)
.sum();
if total_duration == 0.0 {
return Speed::default().to_string();
}
let weighted_input = self
.total_speed_buffer
.iter()
.map(|ts| ts.speed.input as f64 * ts.duration_secs)
.sum::<f64>()
/ total_duration;
let weighted_output = self
.total_speed_buffer
.iter()
.map(|ts| ts.speed.output as f64 * ts.duration_secs)
.sum::<f64>()
/ total_duration;
Speed::new(weighted_input as u128, weighted_output as u128).to_string()
}
/// Get the instantaneous total speed (last tick).
pub fn total_speed_instant(&self) -> Speed {
self.total_speed_buffer
.iter()
.last()
.map(|ts| ts.speed)
.unwrap_or_default()
}
/// Get the peak total speed (max over window).
pub fn total_speed_peak(&self) -> Speed {
let mut peak_speed = Speed::default();
self.total_speed_buffer.iter().for_each(|timed_speed| {
if timed_speed.speed.input > peak_speed.input {
peak_speed.input = timed_speed.speed.input;
}
if timed_speed.speed.output > peak_speed.output {
peak_speed.output = timed_speed.speed.output;
}
});
peak_speed
}
/// Get speed history for sparkline visualization.
///
/// Returns a vector of Speed values representing the historical total speed
/// across the aggregation window. If `host_ip` is provided, returns history
/// for that specific host; otherwise returns session-wide totals.
///
/// # Arguments
/// * `host_ip` - Optional host IP to filter history for
///
/// # Returns
/// Vector of Speed values (one per sample in the window)
pub fn speed_history(&self, host_ip: Option<Ipv4Addr>) -> Vec<Speed> {
if let Some(ip) = host_ip {
// Return per-host speed history
self.hosts_buffer
.iter()
.filter_map(|per_host| per_host.get(&ip).copied())
.collect()
} else {
// Return total speed history (extract Speed from TimedSpeed)
self.total_speed_buffer.iter().map(|ts| ts.speed).collect()
}
}
/// Get application statistics grouped by service type.
///
/// Returns a list of AppStats sorted by total bandwidth (descending),
/// showing bandwidth usage per application (HTTP, SSH, etc.).
///
/// # Returns
/// Vector of AppStats sorted by bandwidth
pub fn apps_stats(&self) -> Vec<AppStats> {
let registry = AppRegistry::default();
aggregate_by_app(&self.pairs_buffer, ®istry, &self.connection_ports)
}
/// Get all connections as formatted strings.
///
/// Returns a sorted list of connections with average bandwidth.
pub fn connection_strings(&self) -> Vec<String> {
pairs_format_connections(&self.pairs_buffer)
}
/// Get all connections as formatted strings.
///
/// Returns a sorted list of connections with average bandwidth.
///
/// # Deprecated
/// Use [`connection_strings()`][Self::connection_strings] instead.
#[deprecated(since = "0.2.0", note = "Use `connection_strings()` instead")]
pub fn connections_strs(&self) -> Vec<String> {
self.connection_strings()
}
/// Get top N ports by bandwidth for a specific host.
///
/// Returns ports sorted by total bandwidth (descending).
pub fn top_ports_per_host(&self, host_ip: Ipv4Addr, n: usize) -> Vec<PortStats> {
ports_top_ports_per_host(&self.stats_buffer, host_ip, n)
}
/// Get top N ports globally across all hosts.
///
/// Returns ports sorted by total bandwidth (descending).
pub fn top_ports_all(&self, n: usize) -> Vec<PortStats> {
ports_top_ports_all(&self.stats_buffer, n)
}
/// Format port stats for display.
pub fn format_port_stats(port_stats: &PortStats) -> String {
ports_format_port_stats(port_stats)
}
/// Get connections with their TCP states and speeds.
///
/// Returns a vector of (IpPair, TcpState, Speed) tuples.
/// Speeds are time-weighted averages across the aggregation window.
pub fn connections_with_state(&self) -> Vec<(IpPair, TcpState, Speed)> {
let mut accumulators: HashMap<IpPair, SpeedAccumulator> = Default::default();
// Aggregate speeds using time-weighted average via SpeedAccumulator
self.pairs_buffer.iter().for_each(|pair_map| {
pair_map.iter().for_each(|(pair, timed_speed)| {
accumulators
.entry(*pair)
.and_modify(|acc| acc.add(timed_speed))
.or_insert_with(|| timed_speed.accumulate());
});
});
// Finalize time-weighted average speeds for each connection
let averaged_speeds: HashMap<IpPair, Speed> = accumulators
.into_iter()
.filter_map(|(pair, acc)| acc.finalize().map(|speed| (pair, speed)))
.collect();
self.tcp_state_tracker
.connections_with_state(&averaged_speeds)
}
/// Get connections with full details including ports.
///
/// Returns a vector of (ConnectionDetails, TcpState, Speed, Option<Duration>) tuples.
/// The Duration is the connection age if available.
pub fn connections_with_details(
&self,
) -> Vec<(
ConnectionDetails,
TcpState,
Speed,
Option<std::time::Duration>,
)> {
self.connections_with_state()
.into_iter()
.map(|(pair, state, speed)| {
let (ports, direction) = self
.connection_ports
.get(&pair)
.map(|(src_port, dst_port, dir)| ((*src_port, *dst_port), *dir))
.unwrap_or(((0, 0), Direction::None));
// For incoming connections, IPs are swapped during normalization (pairs.rs)
// so local IP becomes pair.src_ip. We must swap ports to match this order.
let (src_port, dst_port) = if direction == Direction::Incoming {
(ports.1, ports.0) // dst_port is the local service port for incoming
} else {
ports
};
let details = ConnectionDetails::new(
pair.src_ip,
pair.dst_ip,
src_port,
dst_port,
pair.is_local,
direction,
pair.protocol,
);
let age = self.connection_age(&pair);
(details, state, speed, age)
})
.collect()
}
/// Get the age of a connection.
///
/// Returns the duration since first seen, or None if not tracked.
pub fn connection_age(&self, pair: &IpPair) -> Option<std::time::Duration> {
self.tcp_state_tracker.connection_age(pair)
}
/// Get the AppRegistry instance for port-to-app lookups.
pub fn app_registry(&self) -> AppRegistry {
AppRegistry::default()
}
/// Get all quality metrics.
///
/// Returns a map of IpPair to QualityMetrics.
pub fn quality_metrics(&self) -> HashMap<IpPair, QualityMetrics> {
// Clone the metrics from the tracker
// Note: This is a simplified implementation. For production,
// we'd want to avoid cloning by returning references.
self.quality_tracker.all().clone()
}
/// Get quality metrics for a specific connection.
///
/// Returns quality metrics if available for the connection.
pub fn quality_metrics_for_connection(&self, pair: &IpPair) -> Option<&QualityMetrics> {
self.quality_tracker.get(pair)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stats::{StatKey, StatValues};
// Helper macro for IP address creation in tests
macro_rules! ip {
($a:expr, $b:expr, $c:expr, $d:expr) => {
std::net::Ipv4Addr::new($a, $b, $c, $d)
};
}
#[test]
fn test_total_speed_modes() {
let mut agg = StatsAggregator::new_with_window_size(10);
let mut stats1 = HashMap::new();
// Tick 1: 1000 bits input
let key1 = StatKey {
src_port: 123,
dst_port: 456,
src_ip: ip!(192, 168, 1, 2),
dst_ip: ip!(10, 0, 0, 1),
direction: Direction::Incoming,
protocol: 6,
tcp_syn: false,
tcp_ack: false,
tcp_fin: false,
tcp_rst: false,
};
stats1.insert(
key1,
StatValues {
size: 1000,
last_timestamp: None,
last_seq: None,
last_ack: None,
},
);
agg.tick(stats1);
// Check instant
let instant = agg.total_speed_instant();
assert_eq!(instant.input, 1000);
assert_eq!(instant.output, 0);
// Check peak (should be 1000)
let peak = agg.total_speed_peak();
assert_eq!(peak.input, 1000);
// Tick 2: 2000 bits input
let mut stats2 = HashMap::new();
stats2.insert(
key1,
StatValues {
size: 2000,
last_timestamp: None,
last_seq: None,
last_ack: None,
},
);
agg.tick(stats2);
// Check instant (now 2000)
let instant = agg.total_speed_instant();
assert_eq!(instant.input, 2000);
// Check peak (now 2000)
let peak = agg.total_speed_peak();
assert_eq!(peak.input, 2000);
// Tick 3: 500 bits input
let mut stats3 = HashMap::new();
stats3.insert(
key1,
StatValues {
size: 500,
last_timestamp: None,
last_seq: None,
last_ack: None,
},
);
agg.tick(stats3);
// Check instant (now 500)
let instant = agg.total_speed_instant();
assert_eq!(instant.input, 500);
// Check peak (still 2000)
let peak = agg.total_speed_peak();
assert_eq!(peak.input, 2000);
// Check average (rough check: (1000 + 2000 + 500) / 3 = 1166 bits/s)
// 1166 bits/s / 8 = 145.75 Bytes/s
let avg_str = agg.speed_str();
assert!(avg_str.contains("145.75 B/s"));
}
#[test]
fn test_connection_strings_weighted_average() {
// Verify that connection_strings uses time-weighted averaging,
// not simple summation across samples.
let mut agg = StatsAggregator::new_with_window_size(10);
let key1 = StatKey {
src_port: 123,
dst_port: 456,
src_ip: ip!(192, 168, 1, 2),
dst_ip: ip!(10, 0, 0, 1),
direction: Direction::Incoming,
protocol: 6,
tcp_syn: false,
tcp_ack: false,
tcp_fin: false,
tcp_rst: false,
};
// Simulate 3 ticks with the same rate (1000 bytes each tick)
for _ in 0..3 {
let mut stats = HashMap::new();
stats.insert(
key1,
StatValues {
size: 1000,
last_timestamp: None,
last_seq: None,
last_ack: None,
},
);
agg.tick(stats);
}
let connections = agg.connection_strings();
assert_eq!(connections.len(), 1);
// The connection string should show the average rate, not the sum
// With 3 ticks of 1000 bytes each at ~1 sec intervals:
// - WRONG (simple sum): would show ~3000 B/s
// - CORRECT (weighted avg): should show ~1000 B/s
let conn_str = &connections[0];
// The format is "source_ip <-> dest_ip \t (down X | up Y)"
// We just check that it doesn't show an absurdly high value (like 3000)
// Parse the download speed
let speed_part = conn_str
.split("down ")
.nth(1)
.and_then(|s| s.split(" |").next());
if let Some(speed_str) = speed_part {
let speed_val = speed_str.trim().parse::<f64>().ok();
if let Some(val) = speed_val {
// Should be roughly 1000 (allowing for timing variance), not 3000
assert!(
val < 2000.0,
"Connection speed {} seems to be a sum, not an average",
val
);
}
}
}
#[test]
fn test_connections_with_details_incoming_port_fix() {
// Verifies that for incoming connections, ports are correctly paired
// with normalized IP addresses (src_ip gets the local service port)
let mut agg = StatsAggregator::new_with_window_size(10);
// Simulate an incoming HTTPS connection:
// Remote 93.184.216.34:56381 -> Local 192.168.1.1:443
let key = StatKey {
src_port: 56381, // Remote ephemeral port
dst_port: 443, // Local HTTPS service port
src_ip: ip!(93, 184, 216, 34), // Remote IP
dst_ip: ip!(192, 168, 1, 1), // Local IP
direction: Direction::Incoming,
protocol: 6,
tcp_syn: false,
tcp_ack: false,
tcp_fin: false,
tcp_rst: false,
};
let mut stats = HashMap::new();
stats.insert(
key,
StatValues {
size: 1000,
last_timestamp: None,
last_seq: None,
last_ack: None,
},
);
agg.tick(stats);
let connections = agg.connections_with_details();
assert_eq!(connections.len(), 1);
let (details, _state, _speed, _age) = &connections[0];
// After normalization, local IP (192.168.1.1) should be src_ip
// and it should be paired with port 443 (the local service port)
assert_eq!(
details.src_ip,
ip!(192, 168, 1, 1),
"Local IP should be src_ip after normalization"
);
assert_eq!(
details.src_port, 443,
"Local service port (443) should be paired with local IP"
);
// Remote IP should be dst_ip paired with remote's ephemeral port
assert_eq!(
details.dst_ip,
ip!(93, 184, 216, 34),
"Remote IP should be dst_ip"
);
assert_eq!(
details.dst_port, 56381,
"Remote ephemeral port should be paired with remote IP"
);
assert_eq!(details.direction, Direction::Incoming);
}
}