trippy_core/
flows.rs

1use derive_more::{Add, AddAssign, Sub, SubAssign};
2use itertools::{EitherOrBoth, Itertools};
3use std::fmt::{Debug, Display, Formatter};
4use std::net::IpAddr;
5use tracing::instrument;
6
7/// Identifies a tracing `Flow`.
8#[derive(
9    Debug,
10    Clone,
11    Copy,
12    Default,
13    Ord,
14    PartialOrd,
15    Eq,
16    PartialEq,
17    Hash,
18    Add,
19    AddAssign,
20    Sub,
21    SubAssign,
22)]
23pub struct FlowId(pub u64);
24
25impl Display for FlowId {
26    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
27        write!(f, "{}", self.0)
28    }
29}
30
31/// A register of tracing `Flows`.
32#[derive(Debug, Clone, Default)]
33pub struct FlowRegistry {
34    /// The id to assign to the next flow registered.
35    next_flow_id: FlowId,
36    /// The registry of flows observed.
37    flows: Vec<(Flow, FlowId)>,
38}
39
40impl FlowRegistry {
41    /// Create a new `FlowRegistry`.
42    pub const fn new() -> Self {
43        Self {
44            flows: Vec::new(),
45            next_flow_id: FlowId(1),
46        }
47    }
48
49    /// Register a `Flow` with the `FlowRegistry`.
50    ///
51    /// If the flow matches a flow that has previously been observed by the registry then
52    /// the id of that flow is return.  Otherwise, a new flow id is created and
53    /// returned and the corresponding flow is stored in the registry.
54    ///
55    /// If the flow matches but also contains additional data not previously
56    /// observed for that flow then the existing flow will be updated to
57    /// merge the data.  In this case the existing flow id will be reused.
58    ///
59    /// If a flow matches more than one existing flow then only the first
60    /// matching flow will be updated.
61    #[instrument(skip(self), level = "trace")]
62    pub fn register(&mut self, flow: Flow) -> FlowId {
63        for (entry, id) in &mut self.flows {
64            let status = entry.check(&flow);
65            match status {
66                CheckStatus::Match => {
67                    return *id;
68                }
69                CheckStatus::NoMatch => {}
70                CheckStatus::MatchMerge => {
71                    entry.merge(&flow);
72                    return *id;
73                }
74            }
75        }
76        let flow_id = self.next_flow_id;
77        self.flows.push((flow, flow_id));
78        self.next_flow_id.0 += 1;
79        flow_id
80    }
81
82    /// All recorded flows.
83    pub fn flows(&self) -> &[(Flow, FlowId)] {
84        &self.flows
85    }
86}
87
88/// Represents a single tracing path over a number of (possibly unknown) hops.
89#[derive(Debug, Clone, Eq, PartialEq, Hash)]
90pub struct Flow {
91    pub entries: Vec<FlowEntry>,
92}
93
94impl Flow {
95    /// Create a new Flow from a slice of hops.
96    ///
97    /// Note that each entry is implicitly associated with a `ttl`.  For
98    /// example `hops[0]` would have a `ttl` of 1, `hops[1]` would have a
99    /// `ttl` of 2 and so on.
100    pub fn from_hops(hops: impl IntoIterator<Item = Option<IpAddr>>) -> Self {
101        let entries = hops
102            .into_iter()
103            .map(|addr| {
104                if let Some(addr) = addr {
105                    FlowEntry::Known(addr)
106                } else {
107                    FlowEntry::Unknown
108                }
109            })
110            .collect();
111        Self { entries }
112    }
113
114    /// Check if a given `Flow` matches this `Flow`.
115    ///
116    /// Two flows are said to match _unless_ they contain different IP
117    /// addresses for the _same_ position (i.e. the same `ttl`).
118    ///
119    /// This is true even for flows of differing lengths.
120    ///
121    /// In the even of a match, if the flow being checked contains
122    /// `FlowEntry::Known` entries which are `FlowEntry::Unknown` in the
123    /// current flow then `CheckStatus::MatchMerge` is returned to indicate
124    /// the two flows should be merged.
125    ///
126    /// This will also be the case if the flow being checked matches and is
127    /// longer than the existing flow.
128    #[instrument(skip(self), level = "trace")]
129    pub fn check(&self, flow: &Self) -> CheckStatus {
130        let mut additions = 0;
131        for (old, new) in self.entries.iter().zip(&flow.entries) {
132            match (old, new) {
133                (FlowEntry::Known(fst), FlowEntry::Known(snd)) if fst != snd => {
134                    return CheckStatus::NoMatch;
135                }
136                (FlowEntry::Unknown, FlowEntry::Known(_)) => additions += 1,
137                _ => {}
138            }
139        }
140        if flow.entries.len() > self.entries.len() || additions > 0 {
141            CheckStatus::MatchMerge
142        } else {
143            CheckStatus::Match
144        }
145    }
146
147    /// Marge the entries from the given `Flow` into our `Flow`.
148    #[instrument(skip(self), level = "trace")]
149    fn merge(&mut self, flow: &Self) {
150        self.entries = self
151            .entries
152            .iter()
153            .zip_longest(flow.entries.iter())
154            .map(|eob| match eob {
155                EitherOrBoth::Both(left, right) => match (left, right) {
156                    (FlowEntry::Unknown, FlowEntry::Known(_)) => *right,
157                    _ => *left,
158                },
159                EitherOrBoth::Left(left) => *left,
160                EitherOrBoth::Right(right) => *right,
161            })
162            .collect::<Vec<_>>();
163    }
164}
165
166impl Display for Flow {
167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
168        write!(f, "{}", self.entries.iter().format(", "))
169    }
170}
171
172/// The result of a `Flow` comparison check.
173#[derive(Debug, Clone, Copy, Eq, PartialEq)]
174pub enum CheckStatus {
175    /// The flows match.
176    Match,
177    /// The flows do not match.
178    NoMatch,
179    /// The flows match but should be merged.
180    MatchMerge,
181}
182
183/// An entry in a `Flow`.
184#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
185pub enum FlowEntry {
186    /// An unknown flow entry.
187    Unknown,
188    /// A known flow entry with an `IpAddr`.
189    Known(IpAddr),
190}
191
192impl Display for FlowEntry {
193    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
194        match self {
195            Self::Unknown => f.write_str("*"),
196            Self::Known(addr) => {
197                write!(f, "{addr}")
198            }
199        }
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use std::net::Ipv4Addr;
207    use std::str::FromStr;
208
209    #[test]
210    fn test_single_flow() {
211        let mut registry = FlowRegistry::new();
212        let flow1 = Flow::from_hops([addr("1.1.1.1")]);
213        let flow_id = registry.register(flow1);
214        assert_eq!(FlowId(1), flow_id);
215        assert_eq!(
216            &[(Flow::from_hops([addr("1.1.1.1")]), FlowId(1))],
217            registry.flows()
218        );
219    }
220
221    #[test]
222    fn test_two_different_flows() {
223        let mut registry = FlowRegistry::new();
224        let flow1 = Flow::from_hops([addr("1.1.1.1")]);
225        let flow1_id = registry.register(flow1.clone());
226        let flow2 = Flow::from_hops([addr("2.2.2.2")]);
227        let flow2_id = registry.register(flow2.clone());
228        assert_eq!(FlowId(1), flow1_id);
229        assert_eq!(FlowId(2), flow2_id);
230        assert_eq!(&[(flow1, flow1_id), (flow2, flow2_id)], registry.flows());
231    }
232
233    #[test]
234    fn test_two_same_flows() {
235        let mut registry = FlowRegistry::new();
236        let flow1 = Flow::from_hops([addr("1.1.1.1")]);
237        let flow1_id = registry.register(flow1.clone());
238        let flow2 = Flow::from_hops([addr("1.1.1.1")]);
239        let flow2_id = registry.register(flow2);
240        assert_eq!(FlowId(1), flow1_id);
241        assert_eq!(FlowId(1), flow2_id);
242        assert_eq!(&[(flow1, flow1_id)], registry.flows());
243    }
244
245    #[test]
246    fn test_two_same_one_different_flows() {
247        let mut registry = FlowRegistry::new();
248        let flow1 = Flow::from_hops([addr("1.1.1.1")]);
249        let flow1_id = registry.register(flow1.clone());
250        let flow2 = Flow::from_hops([addr("2.2.2.2")]);
251        let flow2_id = registry.register(flow2.clone());
252        let flow3 = Flow::from_hops([addr("1.1.1.1")]);
253        let flow3_id = registry.register(flow3);
254        assert_eq!(FlowId(1), flow1_id);
255        assert_eq!(FlowId(2), flow2_id);
256        assert_eq!(FlowId(1), flow3_id);
257        assert_eq!(&[(flow1, flow1_id), (flow2, flow2_id)], registry.flows());
258    }
259
260    #[test]
261    fn test_merge_flow1() {
262        let mut registry = FlowRegistry::new();
263        let flow1 = Flow::from_hops([addr("1.1.1.1")]);
264        let flow1_id = registry.register(flow1);
265        let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
266        let flow2_id = registry.register(flow2);
267        let flow3 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
268        let flow3_id = registry.register(flow3);
269        let flow4 = Flow::from_hops([addr("1.1.1.1"), addr("3.3.3.3")]);
270        let flow4_id = registry.register(flow4);
271        let flow5 = Flow::from_hops([addr("1.1.1.1")]);
272        let flow5_id = registry.register(flow5);
273        assert_eq!(FlowId(1), flow1_id);
274        assert_eq!(FlowId(1), flow2_id);
275        assert_eq!(FlowId(1), flow3_id);
276        assert_eq!(FlowId(2), flow4_id);
277        assert_eq!(FlowId(1), flow5_id);
278    }
279
280    #[test]
281    fn test_merge_flow2() {
282        let mut registry = FlowRegistry::new();
283        let flow1 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2"), addr("3.3.3.3")]);
284        let flow1_id = registry.register(flow1);
285        let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
286        let flow2_id = registry.register(flow2);
287        let flow3 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
288        let flow3_id = registry.register(flow3);
289        let flow4 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2"), addr("3.3.3.3")]);
290        let flow4_id = registry.register(flow4);
291        assert_eq!(FlowId(1), flow1_id);
292        assert_eq!(FlowId(1), flow2_id);
293        assert_eq!(FlowId(1), flow3_id);
294        assert_eq!(FlowId(1), flow4_id);
295    }
296
297    #[test]
298    fn test_merge_flow3() {
299        let mut registry = FlowRegistry::new();
300        let flow1 = Flow::from_hops([addr("1.1.1.1"), None, addr("3.3.3.3")]);
301        let flow1_id = registry.register(flow1);
302        // doesn't match so new flow
303        let flow2 = Flow::from_hops([addr("2.2.2.2")]);
304        let flow2_id = registry.register(flow2);
305        // matches and replaces flow 0
306        let flow3 = Flow::from_hops([
307            None,
308            addr("2.2.2.2"),
309            None,
310            addr("4.4.4.4"),
311            addr("5.5.5.5"),
312        ]);
313        let flow3_id = registry.register(flow3);
314        // still matches flow 1
315        let flow4 = Flow::from_hops([addr("2.2.2.2")]);
316        let flow4_id = registry.register(flow4);
317        assert_eq!(FlowId(1), flow1_id);
318        assert_eq!(FlowId(2), flow2_id);
319        assert_eq!(FlowId(1), flow3_id);
320        assert_eq!(FlowId(2), flow4_id);
321    }
322
323    #[test]
324    fn test_subset() {
325        let mut registry = FlowRegistry::new();
326        let flow1 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
327        let flow1_id = registry.register(flow1);
328        let flow2 = Flow::from_hops([addr("1.1.1.1")]);
329        let flow2_id = registry.register(flow2);
330        assert_eq!(FlowId(1), flow1_id);
331        assert_eq!(FlowId(1), flow2_id);
332    }
333
334    #[test]
335    fn test_subset_any() {
336        let mut registry = FlowRegistry::new();
337        let flow1 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
338        let flow1_id = registry.register(flow1);
339        let flow2 = Flow::from_hops([addr("1.1.1.1"), None]);
340        let flow2_id = registry.register(flow2);
341        assert_eq!(FlowId(1), flow1_id);
342        assert_eq!(FlowId(1), flow2_id);
343    }
344
345    #[test]
346    fn test_superset() {
347        let mut registry = FlowRegistry::new();
348        let flow1 = Flow::from_hops([addr("1.1.1.1")]);
349        let flow1_id = registry.register(flow1);
350        let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
351        let flow2_id = registry.register(flow2);
352        assert_eq!(FlowId(1), flow1_id);
353        assert_eq!(FlowId(1), flow2_id);
354    }
355
356    #[test]
357    fn test_superset_any() {
358        let mut registry = FlowRegistry::new();
359        let flow1 = Flow::from_hops([addr("1.1.1.1"), None]);
360        let flow1_id = registry.register(flow1);
361        let flow2 = Flow::from_hops([addr("1.1.1.1"), addr("2.2.2.2")]);
362        let flow2_id = registry.register(flow2);
363        assert_eq!(FlowId(1), flow1_id);
364        assert_eq!(FlowId(1), flow2_id);
365    }
366
367    #[test]
368    fn test_start_any_then_same_flows() {
369        let mut registry = FlowRegistry::new();
370        let flow1 = Flow::from_hops([None, addr("1.1.1.1")]);
371        let flow1_id = registry.register(flow1);
372        let flow2 = Flow::from_hops([None, addr("1.1.1.1")]);
373        let flow2_id = registry.register(flow2);
374        assert_eq!(FlowId(1), flow1_id);
375        assert_eq!(FlowId(1), flow2_id);
376    }
377
378    #[test]
379    fn test_start_any_then_diff_flows() {
380        let mut registry = FlowRegistry::new();
381        let flow1 = Flow::from_hops([None, addr("1.1.1.1")]);
382        let flow1_id = registry.register(flow1);
383        let flow2 = Flow::from_hops([None, addr("2.2.2.2")]);
384        let flow2_id = registry.register(flow2);
385        assert_eq!(FlowId(1), flow1_id);
386        assert_eq!(FlowId(2), flow2_id);
387    }
388
389    #[allow(clippy::unnecessary_wraps)]
390    fn addr(addr: &str) -> Option<IpAddr> {
391        Some(IpAddr::V4(Ipv4Addr::from_str(addr).unwrap()))
392    }
393}