1use derive_more::{Add, AddAssign, Sub, SubAssign};
2use itertools::{EitherOrBoth, Itertools};
3use std::fmt::{Debug, Display, Formatter};
4use std::net::IpAddr;
5use tracing::instrument;
6
7#[derive(
9 Debug,
10 Clone,
11 Copy,
12 Default,
13 Ord,
14 PartialOrd,
15 Eq,
16 PartialEq,
17 Hash,
18 Add,
19 AddAssign,
20 Sub,
21 SubAssign,
22)]
23pub struct FlowId(pub u64);
24
25impl Display for FlowId {
26 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
27 write!(f, "{}", self.0)
28 }
29}
30
31#[derive(Debug, Clone, Default)]
33pub struct FlowRegistry {
34 next_flow_id: FlowId,
36 flows: Vec<(Flow, FlowId)>,
38}
39
40impl FlowRegistry {
41 pub const fn new() -> Self {
43 Self {
44 flows: Vec::new(),
45 next_flow_id: FlowId(1),
46 }
47 }
48
49 #[instrument(skip(self), level = "trace")]
62 pub fn register(&mut self, flow: Flow) -> FlowId {
63 for (entry, id) in &mut self.flows {
64 let status = entry.check(&flow);
65 match status {
66 CheckStatus::Match => {
67 return *id;
68 }
69 CheckStatus::NoMatch => {}
70 CheckStatus::MatchMerge => {
71 entry.merge(&flow);
72 return *id;
73 }
74 }
75 }
76 let flow_id = self.next_flow_id;
77 self.flows.push((flow, flow_id));
78 self.next_flow_id.0 += 1;
79 flow_id
80 }
81
82 pub fn flows(&self) -> &[(Flow, FlowId)] {
84 &self.flows
85 }
86}
87
88#[derive(Debug, Clone, Eq, PartialEq, Hash)]
90pub struct Flow {
91 pub entries: Vec<FlowEntry>,
92}
93
94impl Flow {
95 pub fn from_hops(hops: impl IntoIterator<Item = Option<IpAddr>>) -> Self {
101 let entries = hops
102 .into_iter()
103 .map(|addr| {
104 if let Some(addr) = addr {
105 FlowEntry::Known(addr)
106 } else {
107 FlowEntry::Unknown
108 }
109 })
110 .collect();
111 Self { entries }
112 }
113
114 #[instrument(skip(self), level = "trace")]
129 pub fn check(&self, flow: &Self) -> CheckStatus {
130 let mut additions = 0;
131 for (old, new) in self.entries.iter().zip(&flow.entries) {
132 match (old, new) {
133 (FlowEntry::Known(fst), FlowEntry::Known(snd)) if fst != snd => {
134 return CheckStatus::NoMatch;
135 }
136 (FlowEntry::Unknown, FlowEntry::Known(_)) => additions += 1,
137 _ => {}
138 }
139 }
140 if flow.entries.len() > self.entries.len() || additions > 0 {
141 CheckStatus::MatchMerge
142 } else {
143 CheckStatus::Match
144 }
145 }
146
147 #[instrument(skip(self), level = "trace")]
149 fn merge(&mut self, flow: &Self) {
150 self.entries = self
151 .entries
152 .iter()
153 .zip_longest(flow.entries.iter())
154 .map(|eob| match eob {
155 EitherOrBoth::Both(left, right) => match (left, right) {
156 (FlowEntry::Unknown, FlowEntry::Known(_)) => *right,
157 _ => *left,
158 },
159 EitherOrBoth::Left(left) => *left,
160 EitherOrBoth::Right(right) => *right,
161 })
162 .collect::<Vec<_>>();
163 }
164}
165
166impl Display for Flow {
167 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168 write!(f, "{}", self.entries.iter().format(", "))
169 }
170}
171
172#[derive(Debug, Clone, Copy, Eq, PartialEq)]
174pub enum CheckStatus {
175 Match,
177 NoMatch,
179 MatchMerge,
181}
182
183#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
185pub enum FlowEntry {
186 Unknown,
188 Known(IpAddr),
190}
191
192impl Display for FlowEntry {
193 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
194 match self {
195 Self::Unknown => f.write_str("*"),
196 Self::Known(addr) => {
197 write!(f, "{addr}")
198 }
199 }
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use std::net::Ipv4Addr;
207 use std::str::FromStr;
208
209 #[test]
210 fn test_single_flow() {
211 let mut registry = FlowRegistry::new();
212 let flow1 = Flow::from_hops([addr("1.1.1.1")]);
213 let flow_id = registry.register(flow1);
214 assert_eq!(FlowId(1), flow_id);
215 assert_eq!(
216 &[(Flow::from_hops([addr("1.1.1.1")]), FlowId(1))],
217 registry.flows()
218 );
219 }
220
221 #[test]
222 fn test_two_different_flows() {
223 let mut registry = FlowRegistry::new();
224 let flow1 = Flow::from_hops([addr("1.1.1.1")]);
225 let flow1_id = registry.register(flow1.clone());
226 let flow2 = Flow::from_hops([addr("2.2.2.2")]);
227 let flow2_id = registry.register(flow2.clone());
228 assert_eq!(FlowId(1), flow1_id);
229 assert_eq!(FlowId(2), flow2_id);
230 assert_eq!(&[(flow1, flow1_id), (flow2, flow2_id)], registry.flows());
231 }
232
233 #[test]
234 fn test_two_same_flows() {
235 let mut registry = FlowRegistry::new();
236 let flow1 = Flow::from_hops([addr("1.1.1.1")]);
237 let flow1_id = registry.register(flow1.clone());
238 let flow2 = Flow::from_hops([addr("1.1.1.1")]);
239 let flow2_id = registry.register(flow2);
240 assert_eq!(FlowId(1), flow1_id);
241 assert_eq!(FlowId(1), flow2_id);
242 assert_eq!(&[(flow1, flow1_id)], registry.flows());
243 }
244
245 #[test]
246 fn test_two_same_one_different_flows() {
247 let mut registry = FlowRegistry::new();
248 let flow1 = Flow::from_hops([addr("1.1.1.1")]);
249 let flow1_id = registry.register(flow1.clone());
250 let flow2 = Flow::from_hops([addr("2.2.2.2")]);
251 let flow2_id = registry.register(flow2.clone());
252 let flow3 = Flow::from_hops([addr("1.1.1.1")]);
253 let flow3_id = registry.register(flow3);
254 assert_eq!(FlowId(1), flow1_id);
255 assert_eq!(FlowId(2), flow2_id);
256 assert_eq!(FlowId(1), flow3_id);
257 assert_eq!(&[(flow1, flow1_id), (flow2, flow2_id)], registry.flows());
258 }
259
260 #[test]
261 fn test_merge_flow1() {
262 let mut registry = FlowRegistry::new();
263 let flow1 = Flow::from_hops([addr("1.1.1.1")]);
264 let flow1_id = registry.register(flow1);
265 let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
266 let flow2_id = registry.register(flow2);
267 let flow3 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
268 let flow3_id = registry.register(flow3);
269 let flow4 = Flow::from_hops([addr("1.1.1.1"), addr("3.3.3.3")]);
270 let flow4_id = registry.register(flow4);
271 let flow5 = Flow::from_hops([addr("1.1.1.1")]);
272 let flow5_id = registry.register(flow5);
273 assert_eq!(FlowId(1), flow1_id);
274 assert_eq!(FlowId(1), flow2_id);
275 assert_eq!(FlowId(1), flow3_id);
276 assert_eq!(FlowId(2), flow4_id);
277 assert_eq!(FlowId(1), flow5_id);
278 }
279
280 #[test]
281 fn test_merge_flow2() {
282 let mut registry = FlowRegistry::new();
283 let flow1 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2"), addr("3.3.3.3")]);
284 let flow1_id = registry.register(flow1);
285 let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
286 let flow2_id = registry.register(flow2);
287 let flow3 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
288 let flow3_id = registry.register(flow3);
289 let flow4 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2"), addr("3.3.3.3")]);
290 let flow4_id = registry.register(flow4);
291 assert_eq!(FlowId(1), flow1_id);
292 assert_eq!(FlowId(1), flow2_id);
293 assert_eq!(FlowId(1), flow3_id);
294 assert_eq!(FlowId(1), flow4_id);
295 }
296
297 #[test]
298 fn test_merge_flow3() {
299 let mut registry = FlowRegistry::new();
300 let flow1 = Flow::from_hops([addr("1.1.1.1"), None, addr("3.3.3.3")]);
301 let flow1_id = registry.register(flow1);
302 let flow2 = Flow::from_hops([addr("2.2.2.2")]);
304 let flow2_id = registry.register(flow2);
305 let flow3 = Flow::from_hops([
307 None,
308 addr("2.2.2.2"),
309 None,
310 addr("4.4.4.4"),
311 addr("5.5.5.5"),
312 ]);
313 let flow3_id = registry.register(flow3);
314 let flow4 = Flow::from_hops([addr("2.2.2.2")]);
316 let flow4_id = registry.register(flow4);
317 assert_eq!(FlowId(1), flow1_id);
318 assert_eq!(FlowId(2), flow2_id);
319 assert_eq!(FlowId(1), flow3_id);
320 assert_eq!(FlowId(2), flow4_id);
321 }
322
323 #[test]
324 fn test_subset() {
325 let mut registry = FlowRegistry::new();
326 let flow1 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
327 let flow1_id = registry.register(flow1);
328 let flow2 = Flow::from_hops([addr("1.1.1.1")]);
329 let flow2_id = registry.register(flow2);
330 assert_eq!(FlowId(1), flow1_id);
331 assert_eq!(FlowId(1), flow2_id);
332 }
333
334 #[test]
335 fn test_subset_any() {
336 let mut registry = FlowRegistry::new();
337 let flow1 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
338 let flow1_id = registry.register(flow1);
339 let flow2 = Flow::from_hops([addr("1.1.1.1"), None]);
340 let flow2_id = registry.register(flow2);
341 assert_eq!(FlowId(1), flow1_id);
342 assert_eq!(FlowId(1), flow2_id);
343 }
344
345 #[test]
346 fn test_superset() {
347 let mut registry = FlowRegistry::new();
348 let flow1 = Flow::from_hops([addr("1.1.1.1")]);
349 let flow1_id = registry.register(flow1);
350 let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
351 let flow2_id = registry.register(flow2);
352 assert_eq!(FlowId(1), flow1_id);
353 assert_eq!(FlowId(1), flow2_id);
354 }
355
356 #[test]
357 fn test_superset_any() {
358 let mut registry = FlowRegistry::new();
359 let flow1 = Flow::from_hops([addr("1.1.1.1"), None]);
360 let flow1_id = registry.register(flow1);
361 let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
362 let flow2_id = registry.register(flow2);
363 assert_eq!(FlowId(1), flow1_id);
364 assert_eq!(FlowId(1), flow2_id);
365 }
366
367 #[test]
368 fn test_start_any_then_same_flows() {
369 let mut registry = FlowRegistry::new();
370 let flow1 = Flow::from_hops([None, addr("1.1.1.1")]);
371 let flow1_id = registry.register(flow1);
372 let flow2 = Flow::from_hops([None, addr("1.1.1.1")]);
373 let flow2_id = registry.register(flow2);
374 assert_eq!(FlowId(1), flow1_id);
375 assert_eq!(FlowId(1), flow2_id);
376 }
377
378 #[test]
379 fn test_start_any_then_diff_flows() {
380 let mut registry = FlowRegistry::new();
381 let flow1 = Flow::from_hops([None, addr("1.1.1.1")]);
382 let flow1_id = registry.register(flow1);
383 let flow2 = Flow::from_hops([None, addr("2.2.2.2")]);
384 let flow2_id = registry.register(flow2);
385 assert_eq!(FlowId(1), flow1_id);
386 assert_eq!(FlowId(2), flow2_id);
387 }
388
389 #[allow(clippy::unnecessary_wraps)]
390 fn addr(addr: &str) -> Option<IpAddr> {
391 Some(IpAddr::V4(Ipv4Addr::from_str(addr).unwrap()))
392 }
393}