Skip to main content

packet_strata/tracker/
flow.rs

1use serde::{Deserialize, Serialize};
2
3use crate::{
4    metadata::PacketMetadata,
5    packet::{
6        ether::EthAddr,
7        header::{NetworkLayer, SourceDestLayer, TransportLayer},
8        protocol::EtherProto,
9        Packet,
10    },
11    timestamp::{Interval, Timestamp},
12    tracker::{direction::PacketDirection, process::Process, tuple::Tuple, Trackable},
13};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum TcpStatus {
18    #[default]
19    /// Initial state, or when tracking metadata is evicted/expired.
20    Closed,
21
22    /// Client sent SYN, waiting for SYN-ACK.
23    SynSent,
24
25    /// Server sent SYN-ACK.
26    SynReceived,
27
28    /// Handshake completed (ACK received).
29    Established,
30
31    /// One side sent a FIN.
32    FinWait,
33
34    /// The other side acknowledged the FIN.
35    CloseWait,
36
37    /// Simultaneous close or final stages of termination.
38    Closing,
39
40    /// Connection reset via RST flag.
41    Reset,
42}
43
44impl TcpStatus {
45    #[inline]
46    #[must_use]
47    pub fn is_established(self) -> bool {
48        matches!(self, TcpStatus::Established)
49    }
50
51    #[inline]
52    #[must_use]
53    pub fn is_close_in_progress(self) -> bool {
54        matches!(self, TcpStatus::FinWait)
55            || matches!(self, TcpStatus::CloseWait)
56            || matches!(self, TcpStatus::Closing)
57    }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
61pub struct FlowId<T> {
62    /// Generic address tuple (e.g. TupleV4, TupleV6 or TupleEth)
63    pub tuple: T,
64    /// Ethernet protocol.
65    pub eth_proto: EtherProto,
66    /// Source MAC address.
67    pub src_mac: EthAddr,
68    /// Destination MAC address.
69    pub dst_mac: EthAddr,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
73pub struct FlowTiming {
74    /// Timestamp of the first packet in the flow.
75    pub start: Timestamp,
76    /// Timestamp of the last packet in the flow.
77    pub last: Timestamp,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
81pub struct IpState {
82    /// Uplink IP identification.
83    pub u_id: Option<u16>,
84    /// Downlink IP identification.
85    pub d_id: Option<u16>,
86    /// Uplink packets lost.
87    pub u_lost: u32,
88    /// Downlink packets lost.
89    pub d_lost: u32,
90}
91
92impl IpState {
93    pub fn update(&mut self, pkt: &Packet<'_>, dir: PacketDirection) {
94        if let Some(NetworkLayer::Ipv4(h)) = pkt.network() {
95            if !h.has_dont_fragment() {
96                let id = h.header.id();
97                let (last_id, lost) = match dir {
98                    PacketDirection::Upwards => (&mut self.u_id, &mut self.u_lost),
99                    PacketDirection::Downwards => (&mut self.d_id, &mut self.d_lost),
100                };
101
102                if let Some(prev) = *last_id {
103                    let delta = id.wrapping_sub(prev);
104                    if delta > 0 && delta < 128 {
105                        *lost += (delta - 1) as u32;
106                    }
107                }
108                *last_id = Some(id);
109            }
110        }
111    }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
115pub struct TcpState {
116    /// Timestamp of the initial SYN packet.
117    pub ts_syn: Timestamp,
118    /// Timestamp of the SYN-ACK packet.
119    pub ts_syn_ack: Timestamp,
120    /// Timestamp of the ACK packet completing the handshake.
121    pub ts_ack: Timestamp,
122    /// Round-trip time.
123    pub rtt: Interval,
124    /// Network component of the RTT.
125    pub rtt_net: Interval,
126    /// User/Application component of the RTT.
127    pub rtt_user: Interval,
128    /// Uplink sequence number.
129    pub u_seq: u32,
130    /// Downlink sequence number.
131    pub d_seq: u32,
132    /// Current TCP status.
133    pub status: TcpStatus,
134}
135
136impl TcpState {
137    pub fn update<Meta: PacketMetadata>(
138        &mut self,
139        pkt: &Packet<'_>,
140        _dir: PacketDirection,
141        meta: &Meta,
142        metrics: &FlowMetrics,
143    ) {
144        if let Some(TransportLayer::Tcp(hdr)) = pkt.transport() {
145            if hdr.has_rst() {
146                // R
147                self.status = TcpStatus::Reset;
148            } else if hdr.has_fin() {
149                // F
150                if self.status < TcpStatus::FinWait {
151                    self.status = TcpStatus::FinWait;
152                } else if self.status == TcpStatus::FinWait || self.status == TcpStatus::CloseWait {
153                    self.status = TcpStatus::Closing;
154                }
155            } else if hdr.has_ack() {
156                if hdr.has_syn() {
157                    // S|A
158                    self.status = TcpStatus::SynReceived;
159                    self.ts_syn_ack = meta.timestamp();
160                    self.d_seq = hdr.sequence_number();
161                    if self.ts_syn.0 > 0 {
162                        self.rtt_net = meta.timestamp() - self.ts_syn;
163                    }
164                } else {
165                    // A only
166                    if matches!(self.status, TcpStatus::SynReceived) {
167                        self.status = TcpStatus::Established;
168                        self.ts_ack = meta.timestamp();
169                        if self.ts_syn.0 > 0 {
170                            self.rtt = self.ts_ack - self.ts_syn;
171                        }
172
173                        if self.ts_syn_ack.0 > 0 {
174                            self.rtt_user = self.ts_ack - self.ts_syn_ack;
175                        }
176                    } else if matches!(self.status, TcpStatus::FinWait) {
177                        self.status = TcpStatus::CloseWait;
178                    } else {
179                        if self.status < TcpStatus::Established {
180                            if metrics.d_pkts > 0 && metrics.u_pkts > 0 {
181                                self.status = TcpStatus::Established;
182                            }
183                        }
184                    }
185                }
186            } else if hdr.has_syn() {
187                self.status = TcpStatus::SynSent;
188                self.ts_syn = meta.timestamp();
189            }
190        }
191    }
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
195pub struct FlowMetrics {
196    /// Total uplink bytes.
197    pub u_bytes: usize,
198    /// Total downlink bytes.
199    pub d_bytes: usize,
200    /// Total uplink payload bytes.
201    pub u_payload_bytes: usize,
202    /// Total downlink payload bytes.
203    pub d_payload_bytes: usize,
204    /// Total uplink packets.
205    pub u_pkts: u32,
206    /// Total downlink packets.
207    pub d_pkts: u32,
208    /// Uplink packets with payload.
209    pub u_payload_pkts: u32,
210    /// Downlink packets with payload.
211    pub d_payload_pkts: u32,
212    /// Total uplink fragments count.
213    pub u_fragments: u32,
214    /// Total uplink fragmented packets.
215    pub u_fragmented_pkts: u32,
216    /// Total downlink fragments count.
217    pub d_fragments: u32,
218    /// Total downlink fragmented packets.
219    pub d_fragmented_pkts: u32,
220}
221
222impl FlowMetrics {
223    pub fn update<Meta: PacketMetadata>(
224        &mut self,
225        pkt: &Packet<'_>,
226        dir: PacketDirection,
227        meta: &Meta,
228    ) {
229        let payload_len = pkt.data().len();
230        let caplen = meta.caplen() as usize;
231        let is_payload = payload_len > 0;
232        match dir {
233            PacketDirection::Upwards => {
234                self.u_bytes += caplen;
235                self.u_pkts += 1;
236                self.u_payload_bytes += payload_len;
237                self.u_payload_pkts += is_payload as u32;
238                if let Some(NetworkLayer::Ipv4(hdr)) = pkt.network() {
239                    self.u_fragments += hdr.is_fragmenting() as u32;
240                    self.u_fragmented_pkts += hdr.is_first_fragment() as u32;
241                };
242            }
243            PacketDirection::Downwards => {
244                self.d_bytes += caplen;
245                self.d_pkts += 1;
246                self.d_payload_bytes += payload_len;
247                self.d_payload_pkts += is_payload as u32;
248                if let Some(NetworkLayer::Ipv4(hdr)) = pkt.network() {
249                    self.d_fragments += hdr.is_fragmenting() as u32;
250                    self.d_fragmented_pkts += hdr.is_first_fragment() as u32;
251                };
252            }
253        }
254    }
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
258pub struct FlowBase<T> {
259    /// Identity information (Tuple, MACs, Proto).
260    pub id: FlowId<T>,
261    /// Timing information (Start, Last).
262    pub timing: FlowTiming,
263    /// IP layer state and loss tracking.
264    pub ip: IpState,
265    /// TCP layer state and info.
266    pub tcp: TcpState,
267    /// Flow statistics and counters.
268    pub metrics: FlowMetrics,
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
272pub struct Flow<T, D> {
273    /// Basic flow data organized in logical blocks.
274    pub base: FlowBase<T>,
275    /// Custom data associated with the flow.
276    pub data: D,
277}
278
279impl<T, D> Flow<T, D>
280where
281    T: Default + Clone + Tuple + Sized,
282    for<'a> NetworkLayer<'a>: SourceDestLayer<T::Addr>,
283    T::Addr: Eq,
284    D: Default,
285{
286    /// Creates a new `Flow` instance.
287    pub fn new(timestamp: Timestamp, tuple: T, pkt: &Packet<'_>, dir: PacketDirection) -> Self {
288        let upwards = matches!(dir, PacketDirection::Upwards);
289        Self {
290            base: FlowBase {
291                id: FlowId {
292                    tuple: if upwards { tuple } else { tuple.flip() },
293                    eth_proto: pkt.link().protocol(),
294                    src_mac: if upwards {
295                        pkt.link().source()
296                    } else {
297                        pkt.link().dest()
298                    },
299                    dst_mac: if upwards {
300                        pkt.link().dest()
301                    } else {
302                        pkt.link().source()
303                    },
304                },
305                timing: FlowTiming {
306                    start: timestamp,
307                    last: timestamp,
308                },
309                ..Default::default()
310            },
311            data: Default::default(),
312        }
313    }
314
315    /// Determines the direction of a packet relative to the flow.
316    #[inline]
317    pub fn packet_dir(&self, pkt: &Packet<'_>) -> PacketDirection {
318        if self.base.id.tuple.is_symmetric() {
319            return PacketDirection::infer(pkt);
320        }
321
322        let is_upwards_addr = pkt
323            .network()
324            .and_then(|net| SourceDestLayer::<T::Addr>::source(net))
325            .is_some_and(|src| src == self.base.id.tuple.source());
326
327        let is_upwards_port = pkt
328            .transport()
329            .and_then(|tr| SourceDestLayer::<u16>::source(tr))
330            .is_some_and(|src| src == self.base.id.tuple.source_port());
331
332        if is_upwards_addr && is_upwards_port {
333            PacketDirection::Upwards
334        } else {
335            PacketDirection::Downwards
336        }
337    }
338}
339
340impl<T, D> Flow<T, D>
341where
342    D: Process,
343{
344    /// Process a packet and update flow statistics.
345    pub fn process<Meta: PacketMetadata>(
346        &mut self,
347        meta: &Meta,
348        pkt: &Packet<'_>,
349        dir: PacketDirection,
350    ) {
351        self.base.timing.last = meta.timestamp();
352        self.base.metrics.update(pkt, dir, meta);
353        self.base.ip.update(pkt, dir);
354        self.base.tcp.update(pkt, dir, meta, &self.base.metrics);
355        self.data.process(meta, pkt, dir, &mut self.base);
356    }
357}
358
359impl<T, D> Trackable for Flow<T, D> {
360    type Timestamp = Timestamp;
361
362    fn timestamp(&self) -> Timestamp {
363        self.base.timing.start
364    }
365
366    fn set_timestamp(&mut self, ts: Self::Timestamp) {
367        self.base.timing.last = ts;
368    }
369}
370
371pub type FlowIpV4<D> = Flow<crate::tracker::tuple::TupleV4, D>;
372pub type FlowIpV6<D> = Flow<crate::tracker::tuple::TupleV6, D>;