use std::collections::HashMap;
use std::time::{Duration, Instant};
use netwatch_sdk::collectors::connections::collect_connections;
use netwatch_sdk::collectors::process_bandwidth::attribute;
use netwatch_sdk::types::InterfaceMetric;
use super::model::InterfaceTick;
const REFRESH: Duration = Duration::from_secs(2);
const MAX_PROCS: usize = 256;
#[derive(Debug, Clone, Default)]
pub struct ProcessBandwidthCollector {
last_at: Option<Instant>,
cached: HashMap<u32, (f64, f64)>, }
impl ProcessBandwidthCollector {
pub fn new() -> Self {
Self::default()
}
pub fn sample(&mut self, current_net: &[InterfaceTick]) -> HashMap<u32, (f64, f64)> {
let stale = self.last_at.map(|t| t.elapsed() >= REFRESH).unwrap_or(true);
if stale {
self.cached = compute(current_net);
self.last_at = Some(Instant::now());
}
self.cached.clone()
}
}
fn compute(current_net: &[InterfaceTick]) -> HashMap<u32, (f64, f64)> {
let conns = collect_connections();
if conns.is_empty() {
return HashMap::new();
}
let metrics: Vec<InterfaceMetric> = current_net
.iter()
.map(|i| InterfaceMetric {
name: i.name.clone(),
is_up: i.is_up,
rx_bytes: i.rx_bytes,
tx_bytes: i.tx_bytes,
rx_bytes_delta: 0,
tx_bytes_delta: 0,
rx_packets: 0,
tx_packets: 0,
rx_errors: 0,
tx_errors: 0,
rx_drops: 0,
tx_drops: 0,
rx_rate: Some(i.rx_rate),
tx_rate: Some(i.tx_rate),
rx_history: None,
tx_history: None,
})
.collect();
let attributed = attribute(&conns, &metrics, MAX_PROCS);
attributed
.into_iter()
.filter_map(|p| p.pid.map(|pid| (pid, (p.rx_rate, p.tx_rate))))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_interfaces_yields_zero_rates() {
let map = compute(&[]);
for (_pid, (rx, tx)) in &map {
assert_eq!(*rx, 0.0);
assert_eq!(*tx, 0.0);
}
}
#[test]
fn cache_short_circuits_within_refresh_window() {
let mut c = ProcessBandwidthCollector::new();
let _ = c.sample(&[]);
let first_at = c.last_at;
let _ = c.sample(&[]);
assert_eq!(c.last_at, first_at);
}
}