packet_strata/tracker/
flow.rs

1use std::{
2    fmt::Display,
3    net::{Ipv4Addr, Ipv6Addr},
4};
5
6use chrono::TimeDelta;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    metadata::PacketMetadata,
11    packet::{
12        ether::EthAddr,
13        header::{NetworkLayer, SourceDestLayer},
14        icmp::IcmpType,
15        protocol::EtherProto,
16        Packet,
17    },
18    timestamp::Timestamp,
19    tracker::{direction::PacketDirection, process::Process, tuple::Tuple, Trackable},
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
23pub enum TcpState {
24    #[default]
25    /// Initial state, or when tracking metadata is evicted/expired.
26    Closed,
27
28    /// Client sent SYN, waiting for SYN-ACK.
29    /// Crucial for detecting SYN floods or failed connections.
30    SynSent,
31
32    /// Server sent SYN-ACK. The handshake is partially complete.
33    SynReceived,
34
35    /// Handshake completed (ACK received). Data transfer phase.
36    Established,
37
38    /// One side sent a FIN. Passive monitor infers this when seeing the first FIN.
39    FinWait,
40
41    /// The other side acknowledged the FIN.
42    CloseWait,
43
44    /// Simultaneous close or final stages of termination.
45    Closing,
46
47    /// Connection reset via RST flag.
48    Reset,
49
50    /// State cannot be inferred from the observed packet sequence.
51    Unknown,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
55pub struct IpInfo {
56    /// Uplink IP identification.
57    u_id: Option<u16>,
58    /// Downlink IP identification.
59    d_id: Option<u16>,
60    /// Uplink packets lost.
61    u_lost: u32,
62    /// Downlink packets lost.
63    d_lost: u32,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
67pub struct TcpInfo {
68    /// Timestamp of the initial SYN packet.
69    ts_syn: Timestamp,
70
71    /// Timestamp of the SYN-ACK packet.
72    ts_syn_ack: Timestamp,
73
74    /// Timestamp of the ACK packet completing the handshake.
75    ts_ack: Timestamp,
76
77    /// Round-trip time. Usually `rtt = rtt_net + rtt_usr`.
78    /// If a packet is missing (e.g. SYN-ACK), only total RTT might be computable.
79    rtt: TimeDelta,
80
81    /// Network component of the round-trip time.
82    rtt_net: TimeDelta,
83
84    /// User/Application component of the round-trip time.
85    rtt_user: TimeDelta,
86
87    /// Uplink sequence number.
88    u_seq: u32,
89
90    /// Downlink sequence number.
91    d_seq: u32,
92
93    /// Current TCP state.
94    state: TcpState,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
98pub struct Flow<T, D> {
99    /// Timestamp of the first packet in the flow.
100    pub start_ts: Timestamp,
101
102    /// Timestamp of the last packet in the flow.
103    pub last_ts: Timestamp,
104
105    /// Source MAC address.
106    pub src_mac: EthAddr,
107
108    /// Destination MAC address.
109    pub dst_mac: EthAddr,
110
111    /// Ethernet protocol.
112    pub eth_proto: EtherProto,
113
114    /// Generic address tuple (e.g. TupleV4, TupleV6 or TupleL2)
115    pub tuple: T,
116
117    /// IP layer statistics and info.
118    pub ip_info: IpInfo,
119
120    /// TCP layer statistics and info (optional).
121    pub tcp_info: TcpInfo,
122
123    /// Total uplink bytes.
124    pub u_bytes: usize,
125
126    /// Total downlink bytes.
127    pub d_bytes: usize,
128
129    /// Total uplink payload bytes.
130    pub u_payload_bytes: usize,
131
132    /// Total downlink payload bytes.
133    pub d_payload_bytes: usize,
134
135    /// Total uplink packets.
136    pub u_pkts: u32,
137
138    /// Total downlink packets.
139    pub d_pkts: u32,
140
141    /// Uplink packets with payload.
142    pub u_payload_pkts: u32,
143
144    /// Downlink packets with payload.
145    pub d_payload_pkts: u32,
146
147    /// Uplink fragments count.
148    pub u_frags: u32,
149
150    /// Downlink fragments count.
151    pub d_frags: u32,
152
153    /// Custom data associated with the flow.
154    pub data: D,
155}
156
157impl<T, D> Flow<T, D>
158where
159    T: Default + Clone + Tuple + Sized,
160    for<'a> NetworkLayer<'a>: SourceDestLayer<T::Addr>,
161    T::Addr: Eq,
162    D: Default,
163{
164    /// Creates a new `Flow` instance.
165    ///
166    /// # Arguments
167    ///
168    /// * `timestamp` - The timestamp of the first packet in the flow.
169    /// * `tuple` - The flow tuple that identifies this connection (canonical direction).
170    /// * `pkt` - The initial packet that triggered flow creation.
171    /// * `dir` - The direction of the initial packet (`Upwards` or `Downwards`).
172    ///
173    /// The source and destination MAC addresses and the tuple orientation are automatically
174    /// adjusted based on the packet direction to ensure a consistent flow representation.
175    pub fn new(timestamp: Timestamp, tuple: T, pkt: &Packet<'_>, dir: PacketDirection) -> Self {
176        let upwards = matches!(dir, PacketDirection::Upwards);
177        Self {
178            start_ts: timestamp,
179            last_ts: timestamp,
180            src_mac: if upwards {
181                pkt.link().source()
182            } else {
183                pkt.link().dest()
184            },
185            dst_mac: if upwards {
186                pkt.link().dest()
187            } else {
188                pkt.link().source()
189            },
190            eth_proto: pkt.link().protocol(),
191            tuple: if upwards { tuple } else { tuple.flip() },
192            ..Default::default()
193        }
194    }
195
196    /// Determines the direction of a packet relative to the flow.
197    ///
198    /// # Arguments
199    ///
200    /// * `pkt` - The packet to analyze for direction determination.
201    ///
202    /// A packet is considered "upwards" if both its source address and source port
203    /// match the flow's source tuple. Otherwise, it's considered "downwards".
204    /// This method is essential for tracking bidirectional communication within
205    /// a single flow instance.
206    #[inline]
207    pub fn packet_dir(&self, pkt: &Packet<'_>) -> PacketDirection {
208        if self.tuple.is_symmetric() {
209            // Loopback packets (e.g., ICMP on localhost) require inferring
210            // the direction from the packet itself.
211            return PacketDirection::infer(pkt);
212        }
213
214        // Check if packet source address matches flow source address
215        let is_upwards_addr = pkt
216            .network()
217            .and_then(|net| SourceDestLayer::<T::Addr>::source(net))
218            .is_some_and(|src| src == self.tuple.source());
219        // Check if packet source port matches flow source port
220        let is_upwards_port = pkt
221            .transport()
222            .and_then(|tr| SourceDestLayer::<u16>::source(tr))
223            .is_some_and(|src| src == self.tuple.source_port());
224
225        // Packet is upwards if both address and port match flow source
226        if is_upwards_addr && is_upwards_port {
227            PacketDirection::Upwards
228        } else {
229            PacketDirection::Downwards
230        }
231    }
232}
233
234impl<A, T> Process for Flow<A, T>
235where
236    T: Process,
237{
238    fn process<Meta: PacketMetadata>(
239        &mut self,
240        meta: &Meta,
241        pkt: &Packet<'_>,
242        dir: PacketDirection,
243    ) {
244        self.last_ts = meta.timestamp();
245        if matches!(dir, PacketDirection::Upwards) {
246            self.u_bytes += meta.caplen() as usize;
247            self.u_pkts += 1;
248            self.u_payload_bytes += pkt.data().len();
249            self.u_payload_pkts += (!pkt.data().is_empty()) as u32;
250
251            if let Some(NetworkLayer::Ipv4(h)) = pkt.network() {
252                if !h.has_dont_fragment() {
253                    let id = h.header.id();
254                    if self.ip_info.u_id.is_none() {
255                        self.ip_info.u_id = Some(id);
256                    }
257
258                    let delta = id.wrapping_sub(self.ip_info.u_id.unwrap());
259                    self.ip_info.u_lost += if delta > 0 && delta < 128 {
260                        delta - 1
261                    } else {
262                        0
263                    } as u32;
264                    self.ip_info.u_id = Some(id);
265                }
266            }
267        } else {
268            self.d_bytes += meta.caplen() as usize;
269            self.d_pkts += 1;
270            self.d_payload_bytes += pkt.data().len();
271            self.d_payload_pkts += (!pkt.data().is_empty()) as u32;
272
273            if let Some(NetworkLayer::Ipv4(h)) = pkt.network() {
274                if !h.has_dont_fragment() {
275                    let id = h.header.id();
276                    if self.ip_info.d_id.is_none() {
277                        self.ip_info.d_id = Some(id);
278                    }
279
280                    let delta = id.wrapping_sub(self.ip_info.d_id.unwrap());
281                    self.ip_info.d_lost += if delta > 0 && delta < 128 {
282                        delta - 1
283                    } else {
284                        0
285                    } as u32;
286                    self.ip_info.d_id = Some(id);
287                }
288            }
289        }
290
291        self.data.process(meta, pkt, dir);
292    }
293}
294
295impl<A, T> Trackable for Flow<A, T> {
296    type Timestamp = Timestamp;
297
298    fn timestamp(&self) -> Timestamp {
299        self.start_ts
300    }
301
302    fn set_timestamp(&mut self, ts: Self::Timestamp) {
303        self.last_ts = ts;
304    }
305}
306
307pub type FlowIpV4<T> = Flow<Ipv4Addr, T>;
308pub type FlowIpV6<T> = Flow<Ipv6Addr, T>;