Skip to main content

pcap_toolkit/stats/
mod.rs

1//! Streaming statistics collection for PCAP captures.
2//!
3//! [`StatsCollector`] performs a single pass over packets, computing:
4//! - capture start/end timestamps
5//! - total packet and byte counts
6//! - unique source and destination IPs
7//! - per-flow packet/byte counts keyed by 5-tuple
8
9pub mod flow;
10
11use std::collections::{HashMap, HashSet};
12use std::net::IpAddr;
13
14use chrono::{DateTime, TimeZone, Utc};
15
16pub use flow::FlowKey;
17
18/// Per-flow statistics entry.
19#[derive(Debug, Clone)]
20pub struct FlowStats {
21    /// 5-tuple key for this flow.
22    pub key: FlowKey,
23    /// Deterministic flow ID (bidirectional xxh3-style hash).
24    pub flow_id: u64,
25    /// Number of packets in this flow.
26    pub packets: u64,
27    /// Total bytes (captured length) in this flow.
28    pub bytes: u64,
29}
30
31/// Aggregate statistics for an entire capture.
32#[derive(Debug)]
33pub struct CaptureInfo {
34    /// Timestamp of the first packet.
35    pub start: Option<DateTime<Utc>>,
36    /// Timestamp of the last packet.
37    pub end: Option<DateTime<Utc>>,
38    /// Total number of packets.
39    pub total_packets: u64,
40    /// Total captured bytes (sum of captured lengths).
41    pub total_bytes: u64,
42    /// Unique source IP addresses observed.
43    pub unique_src_ips: HashSet<IpAddr>,
44    /// Unique destination IP addresses observed.
45    pub unique_dst_ips: HashSet<IpAddr>,
46    /// Per-flow statistics, ordered by insertion.
47    pub flows: Vec<FlowStats>,
48}
49
50impl CaptureInfo {
51    /// Return the number of unique flows observed.
52    pub fn flow_count(&self) -> usize {
53        self.flows.len()
54    }
55
56    /// Return the capture duration, or `None` if fewer than two packets were seen.
57    pub fn duration(&self) -> Option<chrono::Duration> {
58        match (self.start, self.end) {
59            (Some(s), Some(e)) => Some(e - s),
60            _ => None,
61        }
62    }
63}
64
65/// Incrementally accumulates capture statistics in a single streaming pass.
66pub struct StatsCollector {
67    unidirectional: bool,
68    start_ns: Option<u64>,
69    end_ns: Option<u64>,
70    total_packets: u64,
71    total_bytes: u64,
72    unique_src_ips: HashSet<IpAddr>,
73    unique_dst_ips: HashSet<IpAddr>,
74    /// Map from FlowKey → (flow_id, packets, bytes).
75    flow_map: HashMap<FlowKey, (u64, u64, u64)>,
76}
77
78impl StatsCollector {
79    /// Create a new collector.
80    ///
81    /// Set `unidirectional = true` to compute direction-sensitive flow IDs.
82    pub fn new(unidirectional: bool) -> Self {
83        Self {
84            unidirectional,
85            start_ns: None,
86            end_ns: None,
87            total_packets: 0,
88            total_bytes: 0,
89            unique_src_ips: HashSet::new(),
90            unique_dst_ips: HashSet::new(),
91            flow_map: HashMap::new(),
92        }
93    }
94
95    /// Feed one packet into the collector.
96    ///
97    /// - `timestamp_ns`: packet timestamp in nanoseconds since the Unix epoch.
98    /// - `captured_len`: number of bytes captured for this packet.
99    /// - `key`: optional parsed 5-tuple; `None` for non-IP packets (e.g. ARP).
100    pub fn feed(&mut self, timestamp_ns: u64, captured_len: u32, key: Option<FlowKey>) {
101        // Update time bounds.
102        self.start_ns = Some(self.start_ns.map_or(timestamp_ns, |t| t.min(timestamp_ns)));
103        self.end_ns = Some(self.end_ns.map_or(timestamp_ns, |t| t.max(timestamp_ns)));
104
105        self.total_packets += 1;
106        self.total_bytes += u64::from(captured_len);
107
108        if let Some(ref k) = key {
109            self.unique_src_ips.insert(k.src_ip);
110            self.unique_dst_ips.insert(k.dst_ip);
111
112            let flow_id = k.flow_id(self.unidirectional);
113            let entry = self.flow_map.entry(k.clone()).or_insert((flow_id, 0, 0));
114            entry.1 += 1;
115            entry.2 += u64::from(captured_len);
116        }
117    }
118
119    /// Consume the collector and produce the final [`CaptureInfo`].
120    pub fn finish(self) -> CaptureInfo {
121        let start = self.start_ns.map(ns_to_datetime);
122        let end = self.end_ns.map(ns_to_datetime);
123
124        let flows = self
125            .flow_map
126            .into_iter()
127            .map(|(key, (flow_id, packets, bytes))| FlowStats {
128                key,
129                flow_id,
130                packets,
131                bytes,
132            })
133            .collect();
134
135        CaptureInfo {
136            start,
137            end,
138            total_packets: self.total_packets,
139            total_bytes: self.total_bytes,
140            unique_src_ips: self.unique_src_ips,
141            unique_dst_ips: self.unique_dst_ips,
142            flows,
143        }
144    }
145}
146
147fn ns_to_datetime(ns: u64) -> DateTime<Utc> {
148    let secs = (ns / 1_000_000_000) as i64;
149    let nanos = (ns % 1_000_000_000) as u32;
150    Utc.timestamp_opt(secs, nanos)
151        .single()
152        .unwrap_or(DateTime::UNIX_EPOCH)
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use std::net::{IpAddr, Ipv4Addr};
159
160    fn v4(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
161        IpAddr::V4(Ipv4Addr::new(a, b, c, d))
162    }
163
164    fn key(s: IpAddr, d: IpAddr, sp: u16, dp: u16, proto: u8) -> FlowKey {
165        FlowKey::new(s, d, sp, dp, proto)
166    }
167
168    #[test]
169    fn test_empty_collector_produces_zero_stats() {
170        let info = StatsCollector::new(false).finish();
171        assert_eq!(info.total_packets, 0);
172        assert_eq!(info.total_bytes, 0);
173        assert!(info.start.is_none());
174        assert!(info.end.is_none());
175        assert!(info.flows.is_empty());
176    }
177
178    #[test]
179    fn test_single_packet_start_equals_end() {
180        let mut col = StatsCollector::new(false);
181        col.feed(1_000_000_000, 100, None);
182        let info = col.finish();
183        assert_eq!(info.total_packets, 1);
184        assert_eq!(info.total_bytes, 100);
185        assert_eq!(info.start, info.end);
186        assert!(info.duration().unwrap().is_zero());
187    }
188
189    #[test]
190    fn test_flow_aggregation_counts_correctly() {
191        let mut col = StatsCollector::new(false);
192        let k = key(v4(10, 0, 0, 1), v4(10, 0, 0, 2), 1000, 443, 6);
193        col.feed(1_000_000_000, 100, Some(k.clone()));
194        col.feed(2_000_000_000, 200, Some(k.clone()));
195        // Reverse direction — same bidirectional flow.
196        let rev = key(v4(10, 0, 0, 2), v4(10, 0, 0, 1), 443, 1000, 6);
197        col.feed(3_000_000_000, 50, Some(rev));
198        let info = col.finish();
199        assert_eq!(info.total_packets, 3);
200        assert_eq!(info.total_bytes, 350);
201        // Two distinct FlowKeys but both map to the same bidirectional flow ID.
202        assert_eq!(info.flow_count(), 2);
203    }
204
205    #[test]
206    fn test_unique_ips_tracked() {
207        let mut col = StatsCollector::new(false);
208        col.feed(
209            1_000_000_000,
210            60,
211            Some(key(v4(1, 1, 1, 1), v4(2, 2, 2, 2), 100, 80, 6)),
212        );
213        col.feed(
214            2_000_000_000,
215            60,
216            Some(key(v4(1, 1, 1, 1), v4(3, 3, 3, 3), 100, 80, 6)),
217        );
218        let info = col.finish();
219        assert_eq!(info.unique_src_ips.len(), 1);
220        assert_eq!(info.unique_dst_ips.len(), 2);
221    }
222}