packet_strata/
tracker.rs

1use ahash::RandomState;
2use hashlink::LinkedHashMap;
3
4pub mod direction;
5pub mod flow;
6pub mod process;
7pub mod tuple;
8pub mod vni;
9
10/// Trait for types that have an intrinsic timestamp
11pub trait Trackable {
12    type Timestamp: PartialOrd + Clone;
13    fn timestamp(&self) -> Self::Timestamp;
14    fn set_timestamp(&mut self, ts: Self::Timestamp);
15}
16
17/// Flow tracker for types that have an intrinsic timestamp (via Trackable trait)
18pub struct Tracker<K, V: Trackable> {
19    lru: LinkedHashMap<K, V, RandomState>,
20}
21
22impl<K, V> Tracker<K, V>
23where
24    K: Eq + std::hash::Hash + Clone,
25    V: Trackable,
26{
27    #[inline]
28    pub fn new() -> Self {
29        Tracker {
30            lru: LinkedHashMap::with_hasher(RandomState::new()),
31        }
32    }
33
34    #[inline]
35    pub fn with_capacity(capacity: usize) -> Self {
36        Tracker {
37            lru: LinkedHashMap::with_capacity_and_hasher(capacity, RandomState::new()),
38        }
39    }
40
41    #[inline]
42    pub fn get_or_insert_with<F>(&mut self, key: &K, create: F) -> &mut V
43    where
44        F: FnOnce() -> V,
45    {
46        if self.lru.contains_key(key) {
47            self.lru.to_back(key).unwrap()
48        } else {
49            self.lru.entry(key.clone()).or_insert_with(create)
50        }
51    }
52
53    /// Evict entries based on a predicate function, in lru order
54    pub fn process_and_evict<F>(&mut self, mut process: F) -> usize
55    where
56        F: FnMut(&K, &V) -> bool, // true: evict, false: keep and interrupt
57    {
58        let mut evicted = 0;
59        loop {
60            match self.lru.front() {
61                Some((k, v)) if process(k, v) => {
62                    self.lru.pop_front(); // evict and continue
63                    evicted += 1;
64                }
65                _ => break,
66            }
67        }
68        evicted
69    }
70
71    /// Get the front (oldest) entry without removing it
72    #[inline]
73    pub fn front(&self) -> Option<(&K, &V)> {
74        self.lru.front()
75    }
76
77    /// Get the back (newest) entry without removing it
78    #[inline]
79    pub fn back(&self) -> Option<(&K, &V)> {
80        self.lru.back()
81    }
82
83    /// Remove and return the front (oldest) entry
84    #[inline]
85    pub fn pop_front(&mut self) -> Option<(K, V)> {
86        self.lru.pop_front()
87    }
88
89    /// Remove and return the back (newest) entry
90    #[inline]
91    pub fn pop_back(&mut self) -> Option<(K, V)> {
92        self.lru.pop_back()
93    }
94
95    /// Remove a specific entry by key
96    #[inline]
97    pub fn remove(&mut self, key: &K) -> Option<V> {
98        self.lru.remove(key)
99    }
100
101    /// Get the timestamp of an entry
102    #[inline]
103    pub fn get_timestamp(&self, key: &K) -> Option<V::Timestamp> {
104        self.lru.get(key).map(|v| v.timestamp())
105    }
106
107    /// Get a reference to the value
108    #[inline]
109    pub fn get(&self, key: &K) -> Option<&V> {
110        self.lru.get(key)
111    }
112
113    /// Get a mutable reference to the value
114    #[inline]
115    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
116        self.lru.get_mut(key)
117    }
118
119    /// Iterator over entries (from oldest to newest)
120    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
121        self.lru.iter()
122    }
123
124    /// Mutable iterator over entries (from oldest to newest)
125    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
126        self.lru.iter_mut()
127    }
128
129    /// Iterator over keys (from oldest to newest)
130    pub fn keys(&self) -> impl Iterator<Item = &K> {
131        self.lru.keys()
132    }
133
134    /// Iterator over values (from oldest to newest)
135    pub fn values(&self) -> impl Iterator<Item = &V> {
136        self.lru.values()
137    }
138
139    /// Mutable iterator over values (from oldest to newest)
140    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
141        self.lru.values_mut()
142    }
143
144    /// Number of entries
145    #[inline]
146    pub fn len(&self) -> usize {
147        self.lru.len()
148    }
149
150    /// Check if empty
151    #[inline]
152    pub fn is_empty(&self) -> bool {
153        self.lru.is_empty()
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[derive(Debug, Clone, PartialEq)]
162    struct FlowData {
163        timestamp: u64,
164        bytes: u64,
165        packets: u32,
166    }
167
168    impl Trackable for FlowData {
169        type Timestamp = u64;
170
171        fn timestamp(&self) -> u64 {
172            self.timestamp
173        }
174
175        fn set_timestamp(&mut self, ts: u64) {
176            self.timestamp = ts;
177        }
178    }
179
180    #[test]
181    fn test_basic_insertion() {
182        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
183
184        // Insert items with data that already contains timestamp
185        tracker.get_or_insert_with(&1, || FlowData {
186            timestamp: 100,
187            bytes: 1000,
188            packets: 10,
189        });
190        tracker.get_or_insert_with(&2, || FlowData {
191            timestamp: 200,
192            bytes: 2000,
193            packets: 20,
194        });
195        tracker.get_or_insert_with(&3, || FlowData {
196            timestamp: 300,
197            bytes: 3000,
198            packets: 30,
199        });
200
201        assert_eq!(tracker.len(), 3);
202
203        // Verify we can access the data
204        let flow1 = tracker.get(&1).unwrap();
205        assert_eq!(flow1.timestamp, 100);
206        assert_eq!(flow1.bytes, 1000);
207    }
208
209    #[test]
210    fn test_lru_behavior() {
211        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
212
213        tracker.get_or_insert_with(&1, || FlowData {
214            timestamp: 100,
215            bytes: 1000,
216            packets: 10,
217        });
218        tracker.get_or_insert_with(&2, || FlowData {
219            timestamp: 200,
220            bytes: 2000,
221            packets: 20,
222        });
223        tracker.get_or_insert_with(&3, || FlowData {
224            timestamp: 300,
225            bytes: 3000,
226            packets: 30,
227        });
228
229        // Check initial order: front -> back (oldest -> newest)
230        let keys: Vec<_> = tracker.lru.keys().copied().collect();
231        println!("After inserts: {:?}", keys);
232        assert_eq!(keys, vec![1, 2, 3]);
233
234        // Access key 1 - moves it to back
235        tracker.get_or_insert_with(&1, || FlowData {
236            timestamp: 400,
237            bytes: 1000,
238            packets: 10,
239        });
240
241        // Verify item moved to back (timestamp unchanged since entry existed)
242        let keys: Vec<_> = tracker.lru.keys().copied().collect();
243        println!("After accessing 1: {:?}", keys);
244        assert_eq!(keys, vec![2, 3, 1]); // 1 moved to back as most recent
245
246        // Verify front is oldest (2) and back is newest (1)
247        assert_eq!(tracker.lru.front().map(|(k, _)| *k), Some(2));
248        assert_eq!(tracker.lru.back().map(|(k, _)| *k), Some(1));
249    }
250
251    #[test]
252    fn test_iteration_order() {
253        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
254
255        tracker.get_or_insert_with(&1, || FlowData {
256            timestamp: 100,
257            bytes: 1000,
258            packets: 10,
259        });
260        tracker.get_or_insert_with(&2, || FlowData {
261            timestamp: 200,
262            bytes: 2000,
263            packets: 20,
264        });
265        tracker.get_or_insert_with(&3, || FlowData {
266            timestamp: 300,
267            bytes: 3000,
268            packets: 30,
269        });
270
271        // Iterator goes from front (oldest) to back (newest)
272        let keys: Vec<_> = tracker.iter().map(|(k, _)| *k).collect();
273        assert_eq!(keys, vec![1, 2, 3]);
274
275        // Verify timestamps are in order
276        let timestamps: Vec<_> = tracker.iter().map(|(_, v)| v.timestamp).collect();
277        assert_eq!(timestamps, vec![100, 200, 300]);
278    }
279
280    #[test]
281    fn test_get_or_insert_with() {
282        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
283
284        // Insert with closure
285        tracker.get_or_insert_with(&1, || FlowData {
286            timestamp: 100,
287            bytes: 1000,
288            packets: 10,
289        });
290
291        assert_eq!(tracker.len(), 1);
292        assert_eq!(tracker.get(&1).unwrap().bytes, 1000);
293
294        // Access existing entry (closure should not be called)
295        tracker.get_or_insert_with(&1, || FlowData {
296            timestamp: 200,
297            bytes: 9999,
298            packets: 99,
299        });
300
301        // Value should not change, but entry moves to back
302        assert_eq!(tracker.get(&1).unwrap().bytes, 1000);
303    }
304
305    #[test]
306    fn test_empty_tracker() {
307        let tracker = Tracker::<i32, FlowData>::new();
308
309        assert!(tracker.is_empty());
310        assert_eq!(tracker.len(), 0);
311        assert!(tracker.get(&1).is_none());
312        assert!(tracker.get_timestamp(&1).is_none());
313    }
314
315    #[test]
316    fn test_process_and_evict_basic() {
317        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
318
319        tracker.get_or_insert_with(&1, || FlowData {
320            timestamp: 100,
321            bytes: 1000,
322            packets: 10,
323        });
324        tracker.get_or_insert_with(&2, || FlowData {
325            timestamp: 200,
326            bytes: 2000,
327            packets: 20,
328        });
329        tracker.get_or_insert_with(&3, || FlowData {
330            timestamp: 300,
331            bytes: 3000,
332            packets: 30,
333        });
334        tracker.get_or_insert_with(&4, || FlowData {
335            timestamp: 400,
336            bytes: 4000,
337            packets: 40,
338        });
339
340        // Evict entries with timestamp < 250
341        let threshold = 250;
342        let evicted = tracker.process_and_evict(|_key, value| {
343            value.timestamp < threshold // true = evict, false = keep + stop
344        });
345
346        assert_eq!(evicted, 2); // Evicted entries 1 and 2
347        assert_eq!(tracker.len(), 2); // Entries 3 and 4 remain
348
349        assert!(tracker.get(&1).is_none());
350        assert!(tracker.get(&2).is_none());
351        assert!(tracker.get(&3).is_some());
352        assert!(tracker.get(&4).is_some());
353    }
354
355    #[test]
356    fn test_process_and_evict_with_keep() {
357        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
358
359        tracker.get_or_insert_with(&1, || FlowData {
360            timestamp: 100,
361            bytes: 1000,
362            packets: 10,
363        });
364        tracker.get_or_insert_with(&2, || FlowData {
365            timestamp: 200,
366            bytes: 2000,
367            packets: 20,
368        });
369        tracker.get_or_insert_with(&3, || FlowData {
370            timestamp: 300,
371            bytes: 3000,
372            packets: 30,
373        });
374        tracker.get_or_insert_with(&4, || FlowData {
375            timestamp: 400,
376            bytes: 4000,
377            packets: 40,
378        });
379
380        // With new API: false = keep + stop, so we stop at entry 1 if we want to keep it
381        // This test now verifies that returning false stops iteration immediately
382        let threshold = 250;
383        let evicted = tracker.process_and_evict(|_key, value| {
384            value.timestamp < threshold // true = evict, false = keep + stop
385        });
386
387        assert_eq!(evicted, 2); // Entries 1 and 2 evicted (both < 250)
388        assert_eq!(tracker.len(), 2); // Entries 3 and 4 remain
389
390        assert!(tracker.get(&1).is_none());
391        assert!(tracker.get(&2).is_none());
392        assert!(tracker.get(&3).is_some());
393        assert!(tracker.get(&4).is_some());
394    }
395
396    #[test]
397    fn test_process_and_evict_stop_early() {
398        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
399
400        tracker.get_or_insert_with(&1, || FlowData {
401            timestamp: 100,
402            bytes: 1000,
403            packets: 10,
404        });
405        tracker.get_or_insert_with(&2, || FlowData {
406            timestamp: 200,
407            bytes: 2000,
408            packets: 20,
409        });
410        tracker.get_or_insert_with(&3, || FlowData {
411            timestamp: 300,
412            bytes: 3000,
413            packets: 30,
414        });
415        tracker.get_or_insert_with(&4, || FlowData {
416            timestamp: 400,
417            bytes: 4000,
418            packets: 40,
419        });
420
421        // Process and evict, stop at first entry >= threshold
422        let mut inspected = Vec::new();
423        let threshold = 250;
424        let evicted = tracker.process_and_evict(|key, value| {
425            inspected.push(*key);
426            value.timestamp < threshold // true = evict, false = keep + stop
427        });
428
429        assert_eq!(evicted, 2); // Evicted entries 1 and 2
430        assert_eq!(inspected, vec![1, 2, 3]); // Inspected 1, 2, and 3 (stopped at 3)
431        assert_eq!(tracker.len(), 2); // Entries 3 and 4 remain (3 was not consumed)
432
433        assert!(tracker.get(&1).is_none());
434        assert!(tracker.get(&2).is_none());
435        assert!(tracker.get(&3).is_some()); // Entry 3 remains (not consumed)
436        assert!(tracker.get(&4).is_some());
437    }
438
439    #[test]
440    fn test_process_and_evict_with_accumulation() {
441        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
442
443        tracker.get_or_insert_with(&1, || FlowData {
444            timestamp: 100,
445            bytes: 1000,
446            packets: 10,
447        });
448        tracker.get_or_insert_with(&2, || FlowData {
449            timestamp: 200,
450            bytes: 2000,
451            packets: 20,
452        });
453        tracker.get_or_insert_with(&3, || FlowData {
454            timestamp: 300,
455            bytes: 3000,
456            packets: 30,
457        });
458        tracker.get_or_insert_with(&4, || FlowData {
459            timestamp: 400,
460            bytes: 4000,
461            packets: 40,
462        });
463
464        // Accumulate stats from entries before eviction
465        let mut total_bytes_evicted = 0u64;
466        let threshold = 250;
467
468        let evicted = tracker.process_and_evict(|_key, value| {
469            if value.timestamp < threshold {
470                total_bytes_evicted += value.bytes;
471                true // evict
472            } else {
473                false // keep + stop
474            }
475        });
476
477        assert_eq!(evicted, 2);
478        assert_eq!(total_bytes_evicted, 3000); // 1000 + 2000
479        assert_eq!(tracker.len(), 2);
480    }
481
482    #[test]
483    fn test_front_back_access() {
484        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
485
486        // Empty tracker
487        assert!(tracker.front().is_none());
488        assert!(tracker.back().is_none());
489
490        tracker.get_or_insert_with(&1, || FlowData {
491            timestamp: 100,
492            bytes: 1000,
493            packets: 10,
494        });
495        tracker.get_or_insert_with(&2, || FlowData {
496            timestamp: 200,
497            bytes: 2000,
498            packets: 20,
499        });
500        tracker.get_or_insert_with(&3, || FlowData {
501            timestamp: 300,
502            bytes: 3000,
503            packets: 30,
504        });
505
506        // Front is oldest
507        assert_eq!(tracker.front().map(|(k, _)| *k), Some(1));
508        assert_eq!(tracker.front().map(|(_, v)| v.timestamp), Some(100));
509
510        // Back is newest
511        assert_eq!(tracker.back().map(|(k, _)| *k), Some(3));
512        assert_eq!(tracker.back().map(|(_, v)| v.timestamp), Some(300));
513    }
514
515    #[test]
516    fn test_pop_operations() {
517        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
518
519        tracker.get_or_insert_with(&1, || FlowData {
520            timestamp: 100,
521            bytes: 1000,
522            packets: 10,
523        });
524        tracker.get_or_insert_with(&2, || FlowData {
525            timestamp: 200,
526            bytes: 2000,
527            packets: 20,
528        });
529        tracker.get_or_insert_with(&3, || FlowData {
530            timestamp: 300,
531            bytes: 3000,
532            packets: 30,
533        });
534
535        // Pop front (oldest)
536        let (k, v) = tracker.pop_front().unwrap();
537        assert_eq!(k, 1);
538        assert_eq!(v.timestamp, 100);
539        assert_eq!(tracker.len(), 2);
540
541        // Pop back (newest)
542        let (k, v) = tracker.pop_back().unwrap();
543        assert_eq!(k, 3);
544        assert_eq!(v.timestamp, 300);
545        assert_eq!(tracker.len(), 1);
546
547        // Only item 2 remains
548        assert_eq!(tracker.front().map(|(k, _)| *k), Some(2));
549    }
550
551    #[test]
552    fn test_iterator_methods() {
553        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
554
555        tracker.get_or_insert_with(&1, || FlowData {
556            timestamp: 100,
557            bytes: 1000,
558            packets: 10,
559        });
560        tracker.get_or_insert_with(&2, || FlowData {
561            timestamp: 200,
562            bytes: 2000,
563            packets: 20,
564        });
565        tracker.get_or_insert_with(&3, || FlowData {
566            timestamp: 300,
567            bytes: 3000,
568            packets: 30,
569        });
570
571        // Keys iterator
572        let keys: Vec<_> = tracker.keys().copied().collect();
573        assert_eq!(keys, vec![1, 2, 3]);
574
575        // Values iterator
576        let bytes: Vec<_> = tracker.values().map(|v| v.bytes).collect();
577        assert_eq!(bytes, vec![1000, 2000, 3000]);
578
579        // Mutable values iterator
580        for value in tracker.values_mut() {
581            value.bytes *= 2;
582        }
583        let bytes: Vec<_> = tracker.values().map(|v| v.bytes).collect();
584        assert_eq!(bytes, vec![2000, 4000, 6000]);
585    }
586
587    #[test]
588    fn test_remove() {
589        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
590
591        tracker.get_or_insert_with(&1, || FlowData {
592            timestamp: 100,
593            bytes: 1000,
594            packets: 10,
595        });
596        tracker.get_or_insert_with(&2, || FlowData {
597            timestamp: 200,
598            bytes: 2000,
599            packets: 20,
600        });
601        tracker.get_or_insert_with(&3, || FlowData {
602            timestamp: 300,
603            bytes: 3000,
604            packets: 30,
605        });
606
607        // Remove middle entry
608        let removed = tracker.remove(&2).unwrap();
609        assert_eq!(removed.bytes, 2000);
610        assert_eq!(tracker.len(), 2);
611
612        // Verify order preserved
613        let keys: Vec<_> = tracker.keys().copied().collect();
614        assert_eq!(keys, vec![1, 3]);
615
616        // Remove non-existent
617        assert!(tracker.remove(&99).is_none());
618    }
619
620    #[test]
621    fn test_manual_eviction_pattern() {
622        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
623
624        tracker.get_or_insert_with(&1, || FlowData {
625            timestamp: 100,
626            bytes: 1000,
627            packets: 10,
628        });
629        tracker.get_or_insert_with(&2, || FlowData {
630            timestamp: 200,
631            bytes: 2000,
632            packets: 20,
633        });
634        tracker.get_or_insert_with(&3, || FlowData {
635            timestamp: 300,
636            bytes: 3000,
637            packets: 30,
638        });
639        tracker.get_or_insert_with(&4, || FlowData {
640            timestamp: 400,
641            bytes: 4000,
642            packets: 40,
643        });
644
645        // Manual eviction: iterate and pop old entries
646        let threshold = 250u64;
647        let mut evicted = Vec::new();
648
649        while let Some((_, v)) = tracker.front() {
650            if v.timestamp < threshold {
651                let (k, v) = tracker.pop_front().unwrap();
652                evicted.push((k, v));
653            } else {
654                break;
655            }
656        }
657
658        assert_eq!(evicted.len(), 2);
659        assert_eq!(evicted[0].0, 1);
660        assert_eq!(evicted[1].0, 2);
661        assert_eq!(tracker.len(), 2);
662    }
663}