1use serde::{Deserialize, Serialize};
2
3use crate::{
4 metadata::PacketMetadata,
5 packet::{
6 ether::EthAddr,
7 header::{NetworkLayer, SourceDestLayer, TransportLayer},
8 protocol::EtherProto,
9 Packet,
10 },
11 timestamp::{Interval, Timestamp},
12 tracker::{direction::PacketDirection, process::Process, tuple::Tuple, Trackable},
13};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum TcpStatus {
18 #[default]
19 Closed,
21
22 SynSent,
24
25 SynReceived,
27
28 Established,
30
31 FinWait,
33
34 CloseWait,
36
37 Closing,
39
40 Reset,
42}
43
44impl TcpStatus {
45 #[inline]
46 #[must_use]
47 pub fn is_established(self) -> bool {
48 matches!(self, TcpStatus::Established)
49 }
50
51 #[inline]
52 #[must_use]
53 pub fn is_close_in_progress(self) -> bool {
54 matches!(self, TcpStatus::FinWait)
55 || matches!(self, TcpStatus::CloseWait)
56 || matches!(self, TcpStatus::Closing)
57 }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
61pub struct FlowId<T> {
62 pub tuple: T,
64 pub eth_proto: EtherProto,
66 pub src_mac: EthAddr,
68 pub dst_mac: EthAddr,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
73pub struct FlowTiming {
74 pub start: Timestamp,
76 pub last: Timestamp,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
81pub struct IpState {
82 pub u_id: Option<u16>,
84 pub d_id: Option<u16>,
86 pub u_lost: u32,
88 pub d_lost: u32,
90}
91
92impl IpState {
93 pub fn update(&mut self, pkt: &Packet<'_>, dir: PacketDirection) {
94 if let Some(NetworkLayer::Ipv4(h)) = pkt.network() {
95 if !h.has_dont_fragment() {
96 let id = h.header.id();
97 let (last_id, lost) = match dir {
98 PacketDirection::Upwards => (&mut self.u_id, &mut self.u_lost),
99 PacketDirection::Downwards => (&mut self.d_id, &mut self.d_lost),
100 };
101
102 if let Some(prev) = *last_id {
103 let delta = id.wrapping_sub(prev);
104 if delta > 0 && delta < 128 {
105 *lost += (delta - 1) as u32;
106 }
107 }
108 *last_id = Some(id);
109 }
110 }
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
115pub struct TcpState {
116 pub ts_syn: Timestamp,
118 pub ts_syn_ack: Timestamp,
120 pub ts_ack: Timestamp,
122 pub rtt: Interval,
124 pub rtt_net: Interval,
126 pub rtt_user: Interval,
128 pub u_seq: u32,
130 pub d_seq: u32,
132 pub status: TcpStatus,
134}
135
136impl TcpState {
137 pub fn update<Meta: PacketMetadata>(
138 &mut self,
139 pkt: &Packet<'_>,
140 _dir: PacketDirection,
141 meta: &Meta,
142 metrics: &FlowMetrics,
143 ) {
144 if let Some(TransportLayer::Tcp(hdr)) = pkt.transport() {
145 if hdr.has_rst() {
146 self.status = TcpStatus::Reset;
148 } else if hdr.has_fin() {
149 if self.status < TcpStatus::FinWait {
151 self.status = TcpStatus::FinWait;
152 } else if self.status == TcpStatus::FinWait || self.status == TcpStatus::CloseWait {
153 self.status = TcpStatus::Closing;
154 }
155 } else if hdr.has_ack() {
156 if hdr.has_syn() {
157 self.status = TcpStatus::SynReceived;
159 self.ts_syn_ack = meta.timestamp();
160 self.d_seq = hdr.sequence_number();
161 if self.ts_syn.0 > 0 {
162 self.rtt_net = meta.timestamp() - self.ts_syn;
163 }
164 } else {
165 if matches!(self.status, TcpStatus::SynReceived) {
167 self.status = TcpStatus::Established;
168 self.ts_ack = meta.timestamp();
169 if self.ts_syn.0 > 0 {
170 self.rtt = self.ts_ack - self.ts_syn;
171 }
172
173 if self.ts_syn_ack.0 > 0 {
174 self.rtt_user = self.ts_ack - self.ts_syn_ack;
175 }
176 } else if matches!(self.status, TcpStatus::FinWait) {
177 self.status = TcpStatus::CloseWait;
178 } else {
179 if self.status < TcpStatus::Established {
180 if metrics.d_pkts > 0 && metrics.u_pkts > 0 {
181 self.status = TcpStatus::Established;
182 }
183 }
184 }
185 }
186 } else if hdr.has_syn() {
187 self.status = TcpStatus::SynSent;
188 self.ts_syn = meta.timestamp();
189 }
190 }
191 }
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
195pub struct FlowMetrics {
196 pub u_bytes: usize,
198 pub d_bytes: usize,
200 pub u_payload_bytes: usize,
202 pub d_payload_bytes: usize,
204 pub u_pkts: u32,
206 pub d_pkts: u32,
208 pub u_payload_pkts: u32,
210 pub d_payload_pkts: u32,
212 pub u_fragments: u32,
214 pub u_fragmented_pkts: u32,
216 pub d_fragments: u32,
218 pub d_fragmented_pkts: u32,
220}
221
222impl FlowMetrics {
223 pub fn update<Meta: PacketMetadata>(
224 &mut self,
225 pkt: &Packet<'_>,
226 dir: PacketDirection,
227 meta: &Meta,
228 ) {
229 let payload_len = pkt.data().len();
230 let caplen = meta.caplen() as usize;
231 let is_payload = payload_len > 0;
232 match dir {
233 PacketDirection::Upwards => {
234 self.u_bytes += caplen;
235 self.u_pkts += 1;
236 self.u_payload_bytes += payload_len;
237 self.u_payload_pkts += is_payload as u32;
238 if let Some(NetworkLayer::Ipv4(hdr)) = pkt.network() {
239 self.u_fragments += hdr.is_fragmenting() as u32;
240 self.u_fragmented_pkts += hdr.is_first_fragment() as u32;
241 };
242 }
243 PacketDirection::Downwards => {
244 self.d_bytes += caplen;
245 self.d_pkts += 1;
246 self.d_payload_bytes += payload_len;
247 self.d_payload_pkts += is_payload as u32;
248 if let Some(NetworkLayer::Ipv4(hdr)) = pkt.network() {
249 self.d_fragments += hdr.is_fragmenting() as u32;
250 self.d_fragmented_pkts += hdr.is_first_fragment() as u32;
251 };
252 }
253 }
254 }
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
258pub struct FlowBase<T> {
259 pub id: FlowId<T>,
261 pub timing: FlowTiming,
263 pub ip: IpState,
265 pub tcp: TcpState,
267 pub metrics: FlowMetrics,
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
272pub struct Flow<T, D> {
273 pub base: FlowBase<T>,
275 pub data: D,
277}
278
279impl<T, D> Flow<T, D>
280where
281 T: Default + Clone + Tuple + Sized,
282 for<'a> NetworkLayer<'a>: SourceDestLayer<T::Addr>,
283 T::Addr: Eq,
284 D: Default,
285{
286 pub fn new(timestamp: Timestamp, tuple: T, pkt: &Packet<'_>, dir: PacketDirection) -> Self {
288 let upwards = matches!(dir, PacketDirection::Upwards);
289 Self {
290 base: FlowBase {
291 id: FlowId {
292 tuple: if upwards { tuple } else { tuple.flip() },
293 eth_proto: pkt.link().protocol(),
294 src_mac: if upwards {
295 pkt.link().source()
296 } else {
297 pkt.link().dest()
298 },
299 dst_mac: if upwards {
300 pkt.link().dest()
301 } else {
302 pkt.link().source()
303 },
304 },
305 timing: FlowTiming {
306 start: timestamp,
307 last: timestamp,
308 },
309 ..Default::default()
310 },
311 data: Default::default(),
312 }
313 }
314
315 #[inline]
317 pub fn packet_dir(&self, pkt: &Packet<'_>) -> PacketDirection {
318 if self.base.id.tuple.is_symmetric() {
319 return PacketDirection::infer(pkt);
320 }
321
322 let is_upwards_addr = pkt
323 .network()
324 .and_then(|net| SourceDestLayer::<T::Addr>::source(net))
325 .is_some_and(|src| src == self.base.id.tuple.source());
326
327 let is_upwards_port = pkt
328 .transport()
329 .and_then(|tr| SourceDestLayer::<u16>::source(tr))
330 .is_some_and(|src| src == self.base.id.tuple.source_port());
331
332 if is_upwards_addr && is_upwards_port {
333 PacketDirection::Upwards
334 } else {
335 PacketDirection::Downwards
336 }
337 }
338}
339
340impl<T, D> Flow<T, D>
341where
342 D: Process,
343{
344 pub fn process<Meta: PacketMetadata>(
346 &mut self,
347 meta: &Meta,
348 pkt: &Packet<'_>,
349 dir: PacketDirection,
350 ) {
351 self.base.timing.last = meta.timestamp();
352 self.base.metrics.update(pkt, dir, meta);
353 self.base.ip.update(pkt, dir);
354 self.base.tcp.update(pkt, dir, meta, &self.base.metrics);
355 self.data.process(meta, pkt, dir, &mut self.base);
356 }
357}
358
359impl<T, D> Trackable for Flow<T, D> {
360 type Timestamp = Timestamp;
361
362 fn timestamp(&self) -> Timestamp {
363 self.base.timing.start
364 }
365
366 fn set_timestamp(&mut self, ts: Self::Timestamp) {
367 self.base.timing.last = ts;
368 }
369}
370
371pub type FlowIpV4<D> = Flow<crate::tracker::tuple::TupleV4, D>;
372pub type FlowIpV6<D> = Flow<crate::tracker::tuple::TupleV6, D>;