1use std::{
2 fmt::Display,
3 net::{Ipv4Addr, Ipv6Addr},
4};
5
6use chrono::TimeDelta;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 metadata::PacketMetadata,
11 packet::{
12 ether::EthAddr,
13 header::{NetworkLayer, SourceDestLayer},
14 icmp::IcmpType,
15 protocol::EtherProto,
16 Packet,
17 },
18 timestamp::Timestamp,
19 tracker::{direction::PacketDirection, process::Process, tuple::Tuple, Trackable},
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
23pub enum TcpState {
24 #[default]
25 Closed,
27
28 SynSent,
31
32 SynReceived,
34
35 Established,
37
38 FinWait,
40
41 CloseWait,
43
44 Closing,
46
47 Reset,
49
50 Unknown,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
55pub struct IpInfo {
56 u_id: Option<u16>,
58 d_id: Option<u16>,
60 u_lost: u32,
62 d_lost: u32,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
67pub struct TcpInfo {
68 ts_syn: Timestamp,
70
71 ts_syn_ack: Timestamp,
73
74 ts_ack: Timestamp,
76
77 rtt: TimeDelta,
80
81 rtt_net: TimeDelta,
83
84 rtt_user: TimeDelta,
86
87 u_seq: u32,
89
90 d_seq: u32,
92
93 state: TcpState,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
98pub struct Flow<T, D> {
99 pub start_ts: Timestamp,
101
102 pub last_ts: Timestamp,
104
105 pub src_mac: EthAddr,
107
108 pub dst_mac: EthAddr,
110
111 pub eth_proto: EtherProto,
113
114 pub tuple: T,
116
117 pub ip_info: IpInfo,
119
120 pub tcp_info: TcpInfo,
122
123 pub u_bytes: usize,
125
126 pub d_bytes: usize,
128
129 pub u_payload_bytes: usize,
131
132 pub d_payload_bytes: usize,
134
135 pub u_pkts: u32,
137
138 pub d_pkts: u32,
140
141 pub u_payload_pkts: u32,
143
144 pub d_payload_pkts: u32,
146
147 pub u_frags: u32,
149
150 pub d_frags: u32,
152
153 pub data: D,
155}
156
157impl<T, D> Flow<T, D>
158where
159 T: Default + Clone + Tuple + Sized,
160 for<'a> NetworkLayer<'a>: SourceDestLayer<T::Addr>,
161 T::Addr: Eq,
162 D: Default,
163{
164 pub fn new(timestamp: Timestamp, tuple: T, pkt: &Packet<'_>, dir: PacketDirection) -> Self {
176 let upwards = matches!(dir, PacketDirection::Upwards);
177 Self {
178 start_ts: timestamp,
179 last_ts: timestamp,
180 src_mac: if upwards {
181 pkt.link().source()
182 } else {
183 pkt.link().dest()
184 },
185 dst_mac: if upwards {
186 pkt.link().dest()
187 } else {
188 pkt.link().source()
189 },
190 eth_proto: pkt.link().protocol(),
191 tuple: if upwards { tuple } else { tuple.flip() },
192 ..Default::default()
193 }
194 }
195
196 #[inline]
207 pub fn packet_dir(&self, pkt: &Packet<'_>) -> PacketDirection {
208 if self.tuple.is_symmetric() {
209 return PacketDirection::infer(pkt);
212 }
213
214 let is_upwards_addr = pkt
216 .network()
217 .and_then(|net| SourceDestLayer::<T::Addr>::source(net))
218 .is_some_and(|src| src == self.tuple.source());
219 let is_upwards_port = pkt
221 .transport()
222 .and_then(|tr| SourceDestLayer::<u16>::source(tr))
223 .is_some_and(|src| src == self.tuple.source_port());
224
225 if is_upwards_addr && is_upwards_port {
227 PacketDirection::Upwards
228 } else {
229 PacketDirection::Downwards
230 }
231 }
232}
233
234impl<A, T> Process for Flow<A, T>
235where
236 T: Process,
237{
238 fn process<Meta: PacketMetadata>(
239 &mut self,
240 meta: &Meta,
241 pkt: &Packet<'_>,
242 dir: PacketDirection,
243 ) {
244 self.last_ts = meta.timestamp();
245 if matches!(dir, PacketDirection::Upwards) {
246 self.u_bytes += meta.caplen() as usize;
247 self.u_pkts += 1;
248 self.u_payload_bytes += pkt.data().len();
249 self.u_payload_pkts += (!pkt.data().is_empty()) as u32;
250
251 if let Some(NetworkLayer::Ipv4(h)) = pkt.network() {
252 if !h.has_dont_fragment() {
253 let id = h.header.id();
254 if self.ip_info.u_id.is_none() {
255 self.ip_info.u_id = Some(id);
256 }
257
258 let delta = id.wrapping_sub(self.ip_info.u_id.unwrap());
259 self.ip_info.u_lost += if delta > 0 && delta < 128 {
260 delta - 1
261 } else {
262 0
263 } as u32;
264 self.ip_info.u_id = Some(id);
265 }
266 }
267 } else {
268 self.d_bytes += meta.caplen() as usize;
269 self.d_pkts += 1;
270 self.d_payload_bytes += pkt.data().len();
271 self.d_payload_pkts += (!pkt.data().is_empty()) as u32;
272
273 if let Some(NetworkLayer::Ipv4(h)) = pkt.network() {
274 if !h.has_dont_fragment() {
275 let id = h.header.id();
276 if self.ip_info.d_id.is_none() {
277 self.ip_info.d_id = Some(id);
278 }
279
280 let delta = id.wrapping_sub(self.ip_info.d_id.unwrap());
281 self.ip_info.d_lost += if delta > 0 && delta < 128 {
282 delta - 1
283 } else {
284 0
285 } as u32;
286 self.ip_info.d_id = Some(id);
287 }
288 }
289 }
290
291 self.data.process(meta, pkt, dir);
292 }
293}
294
295impl<A, T> Trackable for Flow<A, T> {
296 type Timestamp = Timestamp;
297
298 fn timestamp(&self) -> Timestamp {
299 self.start_ts
300 }
301
302 fn set_timestamp(&mut self, ts: Self::Timestamp) {
303 self.last_ts = ts;
304 }
305}
306
307pub type FlowIpV4<T> = Flow<Ipv4Addr, T>;
308pub type FlowIpV6<T> = Flow<Ipv6Addr, T>;