1pub mod flow;
10
11use std::collections::{HashMap, HashSet};
12use std::net::IpAddr;
13
14use chrono::{DateTime, TimeZone, Utc};
15
16pub use flow::FlowKey;
17
18#[derive(Debug, Clone)]
20pub struct FlowStats {
21 pub key: FlowKey,
23 pub flow_id: u64,
25 pub packets: u64,
27 pub bytes: u64,
29}
30
31#[derive(Debug)]
33pub struct CaptureInfo {
34 pub start: Option<DateTime<Utc>>,
36 pub end: Option<DateTime<Utc>>,
38 pub total_packets: u64,
40 pub total_bytes: u64,
42 pub unique_src_ips: HashSet<IpAddr>,
44 pub unique_dst_ips: HashSet<IpAddr>,
46 pub flows: Vec<FlowStats>,
48}
49
50impl CaptureInfo {
51 pub fn flow_count(&self) -> usize {
53 self.flows.len()
54 }
55
56 pub fn duration(&self) -> Option<chrono::Duration> {
58 match (self.start, self.end) {
59 (Some(s), Some(e)) => Some(e - s),
60 _ => None,
61 }
62 }
63}
64
65pub struct StatsCollector {
67 unidirectional: bool,
68 start_ns: Option<u64>,
69 end_ns: Option<u64>,
70 total_packets: u64,
71 total_bytes: u64,
72 unique_src_ips: HashSet<IpAddr>,
73 unique_dst_ips: HashSet<IpAddr>,
74 flow_map: HashMap<FlowKey, (u64, u64, u64)>,
76}
77
78impl StatsCollector {
79 pub fn new(unidirectional: bool) -> Self {
83 Self {
84 unidirectional,
85 start_ns: None,
86 end_ns: None,
87 total_packets: 0,
88 total_bytes: 0,
89 unique_src_ips: HashSet::new(),
90 unique_dst_ips: HashSet::new(),
91 flow_map: HashMap::new(),
92 }
93 }
94
95 pub fn feed(&mut self, timestamp_ns: u64, captured_len: u32, key: Option<FlowKey>) {
101 self.start_ns = Some(self.start_ns.map_or(timestamp_ns, |t| t.min(timestamp_ns)));
103 self.end_ns = Some(self.end_ns.map_or(timestamp_ns, |t| t.max(timestamp_ns)));
104
105 self.total_packets += 1;
106 self.total_bytes += u64::from(captured_len);
107
108 if let Some(ref k) = key {
109 self.unique_src_ips.insert(k.src_ip);
110 self.unique_dst_ips.insert(k.dst_ip);
111
112 let flow_id = k.flow_id(self.unidirectional);
113 let entry = self.flow_map.entry(k.clone()).or_insert((flow_id, 0, 0));
114 entry.1 += 1;
115 entry.2 += u64::from(captured_len);
116 }
117 }
118
119 pub fn finish(self) -> CaptureInfo {
121 let start = self.start_ns.map(ns_to_datetime);
122 let end = self.end_ns.map(ns_to_datetime);
123
124 let flows = self
125 .flow_map
126 .into_iter()
127 .map(|(key, (flow_id, packets, bytes))| FlowStats {
128 key,
129 flow_id,
130 packets,
131 bytes,
132 })
133 .collect();
134
135 CaptureInfo {
136 start,
137 end,
138 total_packets: self.total_packets,
139 total_bytes: self.total_bytes,
140 unique_src_ips: self.unique_src_ips,
141 unique_dst_ips: self.unique_dst_ips,
142 flows,
143 }
144 }
145}
146
147fn ns_to_datetime(ns: u64) -> DateTime<Utc> {
148 let secs = (ns / 1_000_000_000) as i64;
149 let nanos = (ns % 1_000_000_000) as u32;
150 Utc.timestamp_opt(secs, nanos)
151 .single()
152 .unwrap_or(DateTime::UNIX_EPOCH)
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use std::net::{IpAddr, Ipv4Addr};
159
160 fn v4(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
161 IpAddr::V4(Ipv4Addr::new(a, b, c, d))
162 }
163
164 fn key(s: IpAddr, d: IpAddr, sp: u16, dp: u16, proto: u8) -> FlowKey {
165 FlowKey::new(s, d, sp, dp, proto)
166 }
167
168 #[test]
169 fn test_empty_collector_produces_zero_stats() {
170 let info = StatsCollector::new(false).finish();
171 assert_eq!(info.total_packets, 0);
172 assert_eq!(info.total_bytes, 0);
173 assert!(info.start.is_none());
174 assert!(info.end.is_none());
175 assert!(info.flows.is_empty());
176 }
177
178 #[test]
179 fn test_single_packet_start_equals_end() {
180 let mut col = StatsCollector::new(false);
181 col.feed(1_000_000_000, 100, None);
182 let info = col.finish();
183 assert_eq!(info.total_packets, 1);
184 assert_eq!(info.total_bytes, 100);
185 assert_eq!(info.start, info.end);
186 assert!(info.duration().unwrap().is_zero());
187 }
188
189 #[test]
190 fn test_flow_aggregation_counts_correctly() {
191 let mut col = StatsCollector::new(false);
192 let k = key(v4(10, 0, 0, 1), v4(10, 0, 0, 2), 1000, 443, 6);
193 col.feed(1_000_000_000, 100, Some(k.clone()));
194 col.feed(2_000_000_000, 200, Some(k.clone()));
195 let rev = key(v4(10, 0, 0, 2), v4(10, 0, 0, 1), 443, 1000, 6);
197 col.feed(3_000_000_000, 50, Some(rev));
198 let info = col.finish();
199 assert_eq!(info.total_packets, 3);
200 assert_eq!(info.total_bytes, 350);
201 assert_eq!(info.flow_count(), 2);
203 }
204
205 #[test]
206 fn test_unique_ips_tracked() {
207 let mut col = StatsCollector::new(false);
208 col.feed(
209 1_000_000_000,
210 60,
211 Some(key(v4(1, 1, 1, 1), v4(2, 2, 2, 2), 100, 80, 6)),
212 );
213 col.feed(
214 2_000_000_000,
215 60,
216 Some(key(v4(1, 1, 1, 1), v4(3, 3, 3, 3), 100, 80, 6)),
217 );
218 let info = col.finish();
219 assert_eq!(info.unique_src_ips.len(), 1);
220 assert_eq!(info.unique_dst_ips.len(), 2);
221 }
222}