Skip to main content

pcap_toolkit/
flow.rs

1//! Shared flow key definition and deterministic flow ID computation.
2//!
3//! [`FlowKey`] is the canonical 5-tuple used throughout the crate.
4//! It is placed here (rather than in `stats`) so that `pcap` can depend on it
5//! without creating a circular module dependency.
6
7use std::net::IpAddr;
8
9/// Normalise an IP address: IPv4-mapped IPv6 addresses (`::ffff:A.B.C.D`) are
10/// converted to their plain IPv4 form so that `10.0.0.1` and `::ffff:10.0.0.1`
11/// produce the same [`FlowKey`].
12pub fn normalize_ip(ip: IpAddr) -> IpAddr {
13    match ip {
14        IpAddr::V6(v6) => v6
15            .to_ipv4_mapped()
16            .map(IpAddr::V4)
17            .unwrap_or(IpAddr::V6(v6)),
18        v4 => v4,
19    }
20}
21
22/// A 5-tuple that uniquely identifies a network flow.
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct FlowKey {
25    pub src_ip: IpAddr,
26    pub dst_ip: IpAddr,
27    pub src_port: u16,
28    pub dst_port: u16,
29    /// IP protocol number (6 = TCP, 17 = UDP, 1 = ICMP, …).
30    pub protocol: u8,
31}
32
33impl FlowKey {
34    /// Create a new flow key, normalising IPv4-mapped IPv6 addresses to IPv4.
35    pub fn new(src_ip: IpAddr, dst_ip: IpAddr, src_port: u16, dst_port: u16, protocol: u8) -> Self {
36        Self {
37            src_ip: normalize_ip(src_ip),
38            dst_ip: normalize_ip(dst_ip),
39            src_port,
40            dst_port,
41            protocol,
42        }
43    }
44
45    /// Compute a deterministic 64-bit flow ID using rapidhash.
46    ///
47    /// **Bidirectional** (default): A→B and B→A produce the same ID by
48    /// canonicalising the two endpoints so that `min_ep` always comes first.
49    ///
50    /// **Unidirectional**: hash the 5-tuple as-is, direction-sensitive.
51    ///
52    /// IPs are serialised as fixed-width big-endian bytes (4 for IPv4,
53    /// 16 for IPv6) prefixed with a family tag to prevent collisions between
54    /// address families.
55    pub fn flow_id(&self, unidirectional: bool) -> u64 {
56        let mut buf = [0u8; 38]; // 1 + 16 + 2 + 1 + 16 + 2 = 38
57        let mut pos = 0;
58
59        let (ep_a_ip, ep_a_port, ep_b_ip, ep_b_port) = if unidirectional {
60            (self.src_ip, self.src_port, self.dst_ip, self.dst_port)
61        } else {
62            canonicalize(self.src_ip, self.src_port, self.dst_ip, self.dst_port)
63        };
64
65        pos += write_ip(&mut buf[pos..], ep_a_ip);
66        buf[pos..pos + 2].copy_from_slice(&ep_a_port.to_be_bytes());
67        pos += 2;
68        pos += write_ip(&mut buf[pos..], ep_b_ip);
69        buf[pos..pos + 2].copy_from_slice(&ep_b_port.to_be_bytes());
70        pos += 2;
71        buf[pos] = self.protocol;
72        pos += 1;
73
74        rapidhash::v2::rapidhash_v2_2(&buf[..pos])
75    }
76}
77
78/// Canonicalise two endpoints so the result is consistent regardless of direction.
79fn canonicalize(
80    ip_a: IpAddr,
81    port_a: u16,
82    ip_b: IpAddr,
83    port_b: u16,
84) -> (IpAddr, u16, IpAddr, u16) {
85    let bytes_a = ip_to_bytes(ip_a);
86    let bytes_b = ip_to_bytes(ip_b);
87
88    if (bytes_a.as_slice(), port_a) <= (bytes_b.as_slice(), port_b) {
89        (ip_a, port_a, ip_b, port_b)
90    } else {
91        (ip_b, port_b, ip_a, port_a)
92    }
93}
94
95fn ip_to_bytes(ip: IpAddr) -> [u8; 17] {
96    let mut buf = [0u8; 17];
97    match ip {
98        IpAddr::V4(v4) => {
99            buf[0] = 4;
100            buf[1..5].copy_from_slice(&v4.octets());
101        }
102        IpAddr::V6(v6) => {
103            buf[0] = 6;
104            buf[1..17].copy_from_slice(&v6.octets());
105        }
106    }
107    buf
108}
109
110/// Write an IP address into `out`, returning the number of bytes written.
111fn write_ip(out: &mut [u8], ip: IpAddr) -> usize {
112    match ip {
113        IpAddr::V4(v4) => {
114            out[0] = 4;
115            out[1..5].copy_from_slice(&v4.octets());
116            5
117        }
118        IpAddr::V6(v6) => {
119            out[0] = 6;
120            out[1..17].copy_from_slice(&v6.octets());
121            17
122        }
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use std::net::Ipv4Addr;
130
131    fn v4(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
132        IpAddr::V4(Ipv4Addr::new(a, b, c, d))
133    }
134
135    fn v4mapped(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
136        // ::ffff:a.b.c.d
137        IpAddr::V6(Ipv4Addr::new(a, b, c, d).to_ipv6_mapped())
138    }
139
140    #[test]
141    fn test_normalize_ip_plain_v4_unchanged() {
142        let ip = v4(10, 0, 0, 1);
143        assert_eq!(normalize_ip(ip), ip);
144    }
145
146    #[test]
147    fn test_normalize_ip_plain_v6_unchanged() {
148        let ip: IpAddr = "2001:db8::1".parse().unwrap();
149        assert_eq!(normalize_ip(ip), ip);
150    }
151
152    #[test]
153    fn test_normalize_ip_v4mapped_converts_to_v4() {
154        let mapped = v4mapped(10, 0, 0, 1);
155        let expected = v4(10, 0, 0, 1);
156        assert_eq!(normalize_ip(mapped), expected);
157    }
158
159    #[test]
160    fn test_flow_key_new_normalizes_v4mapped() {
161        // ::ffff:10.0.0.1 and 10.0.0.1 must produce identical FlowKeys.
162        let key_plain = FlowKey::new(v4(10, 0, 0, 1), v4(10, 0, 0, 2), 1000, 443, 6);
163        let key_mapped = FlowKey::new(v4mapped(10, 0, 0, 1), v4mapped(10, 0, 0, 2), 1000, 443, 6);
164        assert_eq!(key_plain, key_mapped);
165        assert_eq!(key_plain.flow_id(false), key_mapped.flow_id(false));
166    }
167
168    #[test]
169    fn test_flow_id_bidirectional_is_symmetric() {
170        let fwd = FlowKey::new(v4(10, 0, 0, 1), v4(10, 0, 0, 2), 12345, 443, 6);
171        let rev = FlowKey::new(v4(10, 0, 0, 2), v4(10, 0, 0, 1), 443, 12345, 6);
172        assert_eq!(fwd.flow_id(false), rev.flow_id(false));
173    }
174
175    #[test]
176    fn test_flow_id_unidirectional_differs_by_direction() {
177        let fwd = FlowKey::new(v4(10, 0, 0, 1), v4(10, 0, 0, 2), 12345, 443, 6);
178        let rev = FlowKey::new(v4(10, 0, 0, 2), v4(10, 0, 0, 1), 443, 12345, 6);
179        assert_ne!(fwd.flow_id(true), rev.flow_id(true));
180    }
181
182    #[test]
183    fn test_flow_id_different_protocols_differ() {
184        let tcp = FlowKey::new(v4(10, 0, 0, 1), v4(10, 0, 0, 2), 12345, 443, 6);
185        let udp = FlowKey::new(v4(10, 0, 0, 1), v4(10, 0, 0, 2), 12345, 443, 17);
186        assert_ne!(tcp.flow_id(false), udp.flow_id(false));
187    }
188
189    #[test]
190    fn test_flow_id_deterministic() {
191        let key = FlowKey::new(v4(192, 168, 1, 1), v4(8, 8, 8, 8), 54321, 53, 17);
192        assert_eq!(key.flow_id(false), key.flow_id(false));
193    }
194
195    #[test]
196    fn test_flow_id_v4mapped_same_as_v4() {
197        // Bidirectional flow IDs must match regardless of whether the IP was
198        // presented as plain IPv4 or IPv4-mapped IPv6.
199        let plain = FlowKey::new(v4(192, 168, 1, 1), v4(8, 8, 8, 8), 54321, 53, 17);
200        let mapped = FlowKey::new(
201            v4mapped(192, 168, 1, 1),
202            v4mapped(8, 8, 8, 8),
203            54321,
204            53,
205            17,
206        );
207        assert_eq!(plain.flow_id(false), mapped.flow_id(false));
208        assert_eq!(plain.flow_id(true), mapped.flow_id(true));
209    }
210}