Skip to main content

packet_strata/
tracker.rs

1use ahash::RandomState;
2use hashlink::LinkedHashMap;
3
4pub mod direction;
5pub mod flow;
6pub mod process;
7pub mod tuple;
8pub mod vni;
9
10/// Trait for types that have an intrinsic timestamp
11pub trait Trackable {
12    type Timestamp: PartialOrd + Clone;
13    fn timestamp(&self) -> Self::Timestamp;
14    fn set_timestamp(&mut self, ts: Self::Timestamp);
15}
16
17/// Flow tracker for types that have an intrinsic timestamp (via Trackable trait)
18pub struct Tracker<K, V: Trackable> {
19    lru: LinkedHashMap<K, V, RandomState>,
20}
21
22impl<K, V> Tracker<K, V>
23where
24    K: Eq + std::hash::Hash + Clone,
25    V: Trackable,
26{
27    #[inline]
28    pub fn new() -> Self {
29        Tracker {
30            lru: LinkedHashMap::with_hasher(RandomState::new()),
31        }
32    }
33
34    #[inline]
35    pub fn with_capacity(capacity: usize) -> Self {
36        Tracker {
37            lru: LinkedHashMap::with_capacity_and_hasher(capacity, RandomState::new()),
38        }
39    }
40
41    #[inline]
42    pub fn get_or_insert_with<F>(&mut self, key: &K, create: F) -> &mut V
43    where
44        F: FnOnce() -> V,
45    {
46        // Workaround for Rust borrow checker limitation.
47        // Using a raw pointer avoids the double-lookup and avoids cloning the key on hit.
48        // This is safe because the mutable borrow is strictly disjoint across branches.
49        let lru_ptr = &mut self.lru as *mut LinkedHashMap<K, V, RandomState>;
50        if let Some(v) = unsafe { &mut *lru_ptr }.to_back(key) {
51            return v;
52        }
53
54        self.lru.entry(key.clone()).or_insert_with(create)
55    }
56
57    /// Evict entries based on a predicate function, in lru order
58    pub fn process_and_evict<F>(&mut self, mut process: F) -> usize
59    where
60        F: FnMut(&K, &V) -> bool, // true: evict, false: keep and interrupt
61    {
62        let mut evicted = 0;
63        loop {
64            match self.lru.front() {
65                Some((k, v)) if process(k, v) => {
66                    self.lru.pop_front(); // evict and continue
67                    evicted += 1;
68                }
69                _ => break,
70            }
71        }
72        evicted
73    }
74
75    /// Get the front (oldest) entry without removing it
76    #[inline]
77    pub fn front(&self) -> Option<(&K, &V)> {
78        self.lru.front()
79    }
80
81    /// Get the back (newest) entry without removing it
82    #[inline]
83    pub fn back(&self) -> Option<(&K, &V)> {
84        self.lru.back()
85    }
86
87    /// Remove and return the front (oldest) entry
88    #[inline]
89    pub fn pop_front(&mut self) -> Option<(K, V)> {
90        self.lru.pop_front()
91    }
92
93    /// Remove and return the back (newest) entry
94    #[inline]
95    pub fn pop_back(&mut self) -> Option<(K, V)> {
96        self.lru.pop_back()
97    }
98
99    /// Remove a specific entry by key
100    #[inline]
101    pub fn remove(&mut self, key: &K) -> Option<V> {
102        self.lru.remove(key)
103    }
104
105    /// Get the timestamp of an entry
106    #[inline]
107    pub fn get_timestamp(&self, key: &K) -> Option<V::Timestamp> {
108        self.lru.get(key).map(|v| v.timestamp())
109    }
110
111    /// Get a reference to the value
112    #[inline]
113    pub fn get(&self, key: &K) -> Option<&V> {
114        self.lru.get(key)
115    }
116
117    /// Get a mutable reference to the value
118    #[inline]
119    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
120        self.lru.get_mut(key)
121    }
122
123    /// Iterator over entries (from oldest to newest)
124    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
125        self.lru.iter()
126    }
127
128    /// Mutable iterator over entries (from oldest to newest)
129    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
130        self.lru.iter_mut()
131    }
132
133    /// Iterator over keys (from oldest to newest)
134    pub fn keys(&self) -> impl Iterator<Item = &K> {
135        self.lru.keys()
136    }
137
138    /// Iterator over values (from oldest to newest)
139    pub fn values(&self) -> impl Iterator<Item = &V> {
140        self.lru.values()
141    }
142
143    /// Mutable iterator over values (from oldest to newest)
144    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
145        self.lru.values_mut()
146    }
147
148    /// Number of entries
149    #[inline]
150    pub fn len(&self) -> usize {
151        self.lru.len()
152    }
153
154    /// Check if empty
155    #[inline]
156    pub fn is_empty(&self) -> bool {
157        self.lru.is_empty()
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    #[derive(Debug, Clone, PartialEq)]
166    struct FlowData {
167        timestamp: u64,
168        bytes: u64,
169        packets: u32,
170    }
171
172    impl Trackable for FlowData {
173        type Timestamp = u64;
174
175        fn timestamp(&self) -> u64 {
176            self.timestamp
177        }
178
179        fn set_timestamp(&mut self, ts: u64) {
180            self.timestamp = ts;
181        }
182    }
183
184    #[test]
185    fn test_basic_insertion() {
186        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
187
188        // Insert items with data that already contains timestamp
189        tracker.get_or_insert_with(&1, || FlowData {
190            timestamp: 100,
191            bytes: 1000,
192            packets: 10,
193        });
194        tracker.get_or_insert_with(&2, || FlowData {
195            timestamp: 200,
196            bytes: 2000,
197            packets: 20,
198        });
199        tracker.get_or_insert_with(&3, || FlowData {
200            timestamp: 300,
201            bytes: 3000,
202            packets: 30,
203        });
204
205        assert_eq!(tracker.len(), 3);
206
207        // Verify we can access the data
208        let flow1 = tracker.get(&1).unwrap();
209        assert_eq!(flow1.timestamp, 100);
210        assert_eq!(flow1.bytes, 1000);
211    }
212
213    #[test]
214    fn test_lru_behavior() {
215        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
216
217        tracker.get_or_insert_with(&1, || FlowData {
218            timestamp: 100,
219            bytes: 1000,
220            packets: 10,
221        });
222        tracker.get_or_insert_with(&2, || FlowData {
223            timestamp: 200,
224            bytes: 2000,
225            packets: 20,
226        });
227        tracker.get_or_insert_with(&3, || FlowData {
228            timestamp: 300,
229            bytes: 3000,
230            packets: 30,
231        });
232
233        // Check initial order: front -> back (oldest -> newest)
234        let keys: Vec<_> = tracker.lru.keys().copied().collect();
235        println!("After inserts: {:?}", keys);
236        assert_eq!(keys, vec![1, 2, 3]);
237
238        // Access key 1 - moves it to back
239        tracker.get_or_insert_with(&1, || FlowData {
240            timestamp: 400,
241            bytes: 1000,
242            packets: 10,
243        });
244
245        // Verify item moved to back (timestamp unchanged since entry existed)
246        let keys: Vec<_> = tracker.lru.keys().copied().collect();
247        println!("After accessing 1: {:?}", keys);
248        assert_eq!(keys, vec![2, 3, 1]); // 1 moved to back as most recent
249
250        // Verify front is oldest (2) and back is newest (1)
251        assert_eq!(tracker.lru.front().map(|(k, _)| *k), Some(2));
252        assert_eq!(tracker.lru.back().map(|(k, _)| *k), Some(1));
253    }
254
255    #[test]
256    fn test_iteration_order() {
257        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
258
259        tracker.get_or_insert_with(&1, || FlowData {
260            timestamp: 100,
261            bytes: 1000,
262            packets: 10,
263        });
264        tracker.get_or_insert_with(&2, || FlowData {
265            timestamp: 200,
266            bytes: 2000,
267            packets: 20,
268        });
269        tracker.get_or_insert_with(&3, || FlowData {
270            timestamp: 300,
271            bytes: 3000,
272            packets: 30,
273        });
274
275        // Iterator goes from front (oldest) to back (newest)
276        let keys: Vec<_> = tracker.iter().map(|(k, _)| *k).collect();
277        assert_eq!(keys, vec![1, 2, 3]);
278
279        // Verify timestamps are in order
280        let timestamps: Vec<_> = tracker.iter().map(|(_, v)| v.timestamp).collect();
281        assert_eq!(timestamps, vec![100, 200, 300]);
282    }
283
284    #[test]
285    fn test_get_or_insert_with() {
286        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
287
288        // Insert with closure
289        tracker.get_or_insert_with(&1, || FlowData {
290            timestamp: 100,
291            bytes: 1000,
292            packets: 10,
293        });
294
295        assert_eq!(tracker.len(), 1);
296        assert_eq!(tracker.get(&1).unwrap().bytes, 1000);
297
298        // Access existing entry (closure should not be called)
299        tracker.get_or_insert_with(&1, || FlowData {
300            timestamp: 200,
301            bytes: 9999,
302            packets: 99,
303        });
304
305        // Value should not change, but entry moves to back
306        assert_eq!(tracker.get(&1).unwrap().bytes, 1000);
307    }
308
309    #[test]
310    fn test_empty_tracker() {
311        let tracker = Tracker::<i32, FlowData>::new();
312
313        assert!(tracker.is_empty());
314        assert_eq!(tracker.len(), 0);
315        assert!(tracker.get(&1).is_none());
316        assert!(tracker.get_timestamp(&1).is_none());
317    }
318
319    #[test]
320    fn test_process_and_evict_basic() {
321        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
322
323        tracker.get_or_insert_with(&1, || FlowData {
324            timestamp: 100,
325            bytes: 1000,
326            packets: 10,
327        });
328        tracker.get_or_insert_with(&2, || FlowData {
329            timestamp: 200,
330            bytes: 2000,
331            packets: 20,
332        });
333        tracker.get_or_insert_with(&3, || FlowData {
334            timestamp: 300,
335            bytes: 3000,
336            packets: 30,
337        });
338        tracker.get_or_insert_with(&4, || FlowData {
339            timestamp: 400,
340            bytes: 4000,
341            packets: 40,
342        });
343
344        // Evict entries with timestamp < 250
345        let threshold = 250;
346        let evicted = tracker.process_and_evict(|_key, value| {
347            value.timestamp < threshold // true = evict, false = keep + stop
348        });
349
350        assert_eq!(evicted, 2); // Evicted entries 1 and 2
351        assert_eq!(tracker.len(), 2); // Entries 3 and 4 remain
352
353        assert!(tracker.get(&1).is_none());
354        assert!(tracker.get(&2).is_none());
355        assert!(tracker.get(&3).is_some());
356        assert!(tracker.get(&4).is_some());
357    }
358
359    #[test]
360    fn test_process_and_evict_with_keep() {
361        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
362
363        tracker.get_or_insert_with(&1, || FlowData {
364            timestamp: 100,
365            bytes: 1000,
366            packets: 10,
367        });
368        tracker.get_or_insert_with(&2, || FlowData {
369            timestamp: 200,
370            bytes: 2000,
371            packets: 20,
372        });
373        tracker.get_or_insert_with(&3, || FlowData {
374            timestamp: 300,
375            bytes: 3000,
376            packets: 30,
377        });
378        tracker.get_or_insert_with(&4, || FlowData {
379            timestamp: 400,
380            bytes: 4000,
381            packets: 40,
382        });
383
384        // With new API: false = keep + stop, so we stop at entry 1 if we want to keep it
385        // This test now verifies that returning false stops iteration immediately
386        let threshold = 250;
387        let evicted = tracker.process_and_evict(|_key, value| {
388            value.timestamp < threshold // true = evict, false = keep + stop
389        });
390
391        assert_eq!(evicted, 2); // Entries 1 and 2 evicted (both < 250)
392        assert_eq!(tracker.len(), 2); // Entries 3 and 4 remain
393
394        assert!(tracker.get(&1).is_none());
395        assert!(tracker.get(&2).is_none());
396        assert!(tracker.get(&3).is_some());
397        assert!(tracker.get(&4).is_some());
398    }
399
400    #[test]
401    fn test_process_and_evict_stop_early() {
402        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
403
404        tracker.get_or_insert_with(&1, || FlowData {
405            timestamp: 100,
406            bytes: 1000,
407            packets: 10,
408        });
409        tracker.get_or_insert_with(&2, || FlowData {
410            timestamp: 200,
411            bytes: 2000,
412            packets: 20,
413        });
414        tracker.get_or_insert_with(&3, || FlowData {
415            timestamp: 300,
416            bytes: 3000,
417            packets: 30,
418        });
419        tracker.get_or_insert_with(&4, || FlowData {
420            timestamp: 400,
421            bytes: 4000,
422            packets: 40,
423        });
424
425        // Process and evict, stop at first entry >= threshold
426        let mut inspected = Vec::new();
427        let threshold = 250;
428        let evicted = tracker.process_and_evict(|key, value| {
429            inspected.push(*key);
430            value.timestamp < threshold // true = evict, false = keep + stop
431        });
432
433        assert_eq!(evicted, 2); // Evicted entries 1 and 2
434        assert_eq!(inspected, vec![1, 2, 3]); // Inspected 1, 2, and 3 (stopped at 3)
435        assert_eq!(tracker.len(), 2); // Entries 3 and 4 remain (3 was not consumed)
436
437        assert!(tracker.get(&1).is_none());
438        assert!(tracker.get(&2).is_none());
439        assert!(tracker.get(&3).is_some()); // Entry 3 remains (not consumed)
440        assert!(tracker.get(&4).is_some());
441    }
442
443    #[test]
444    fn test_process_and_evict_with_accumulation() {
445        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
446
447        tracker.get_or_insert_with(&1, || FlowData {
448            timestamp: 100,
449            bytes: 1000,
450            packets: 10,
451        });
452        tracker.get_or_insert_with(&2, || FlowData {
453            timestamp: 200,
454            bytes: 2000,
455            packets: 20,
456        });
457        tracker.get_or_insert_with(&3, || FlowData {
458            timestamp: 300,
459            bytes: 3000,
460            packets: 30,
461        });
462        tracker.get_or_insert_with(&4, || FlowData {
463            timestamp: 400,
464            bytes: 4000,
465            packets: 40,
466        });
467
468        // Accumulate stats from entries before eviction
469        let mut total_bytes_evicted = 0u64;
470        let threshold = 250;
471
472        let evicted = tracker.process_and_evict(|_key, value| {
473            if value.timestamp < threshold {
474                total_bytes_evicted += value.bytes;
475                true // evict
476            } else {
477                false // keep + stop
478            }
479        });
480
481        assert_eq!(evicted, 2);
482        assert_eq!(total_bytes_evicted, 3000); // 1000 + 2000
483        assert_eq!(tracker.len(), 2);
484    }
485
486    #[test]
487    fn test_front_back_access() {
488        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
489
490        // Empty tracker
491        assert!(tracker.front().is_none());
492        assert!(tracker.back().is_none());
493
494        tracker.get_or_insert_with(&1, || FlowData {
495            timestamp: 100,
496            bytes: 1000,
497            packets: 10,
498        });
499        tracker.get_or_insert_with(&2, || FlowData {
500            timestamp: 200,
501            bytes: 2000,
502            packets: 20,
503        });
504        tracker.get_or_insert_with(&3, || FlowData {
505            timestamp: 300,
506            bytes: 3000,
507            packets: 30,
508        });
509
510        // Front is oldest
511        assert_eq!(tracker.front().map(|(k, _)| *k), Some(1));
512        assert_eq!(tracker.front().map(|(_, v)| v.timestamp), Some(100));
513
514        // Back is newest
515        assert_eq!(tracker.back().map(|(k, _)| *k), Some(3));
516        assert_eq!(tracker.back().map(|(_, v)| v.timestamp), Some(300));
517    }
518
519    #[test]
520    fn test_pop_operations() {
521        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
522
523        tracker.get_or_insert_with(&1, || FlowData {
524            timestamp: 100,
525            bytes: 1000,
526            packets: 10,
527        });
528        tracker.get_or_insert_with(&2, || FlowData {
529            timestamp: 200,
530            bytes: 2000,
531            packets: 20,
532        });
533        tracker.get_or_insert_with(&3, || FlowData {
534            timestamp: 300,
535            bytes: 3000,
536            packets: 30,
537        });
538
539        // Pop front (oldest)
540        let (k, v) = tracker.pop_front().unwrap();
541        assert_eq!(k, 1);
542        assert_eq!(v.timestamp, 100);
543        assert_eq!(tracker.len(), 2);
544
545        // Pop back (newest)
546        let (k, v) = tracker.pop_back().unwrap();
547        assert_eq!(k, 3);
548        assert_eq!(v.timestamp, 300);
549        assert_eq!(tracker.len(), 1);
550
551        // Only item 2 remains
552        assert_eq!(tracker.front().map(|(k, _)| *k), Some(2));
553    }
554
555    #[test]
556    fn test_iterator_methods() {
557        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
558
559        tracker.get_or_insert_with(&1, || FlowData {
560            timestamp: 100,
561            bytes: 1000,
562            packets: 10,
563        });
564        tracker.get_or_insert_with(&2, || FlowData {
565            timestamp: 200,
566            bytes: 2000,
567            packets: 20,
568        });
569        tracker.get_or_insert_with(&3, || FlowData {
570            timestamp: 300,
571            bytes: 3000,
572            packets: 30,
573        });
574
575        // Keys iterator
576        let keys: Vec<_> = tracker.keys().copied().collect();
577        assert_eq!(keys, vec![1, 2, 3]);
578
579        // Values iterator
580        let bytes: Vec<_> = tracker.values().map(|v| v.bytes).collect();
581        assert_eq!(bytes, vec![1000, 2000, 3000]);
582
583        // Mutable values iterator
584        for value in tracker.values_mut() {
585            value.bytes *= 2;
586        }
587        let bytes: Vec<_> = tracker.values().map(|v| v.bytes).collect();
588        assert_eq!(bytes, vec![2000, 4000, 6000]);
589    }
590
591    #[test]
592    fn test_remove() {
593        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
594
595        tracker.get_or_insert_with(&1, || FlowData {
596            timestamp: 100,
597            bytes: 1000,
598            packets: 10,
599        });
600        tracker.get_or_insert_with(&2, || FlowData {
601            timestamp: 200,
602            bytes: 2000,
603            packets: 20,
604        });
605        tracker.get_or_insert_with(&3, || FlowData {
606            timestamp: 300,
607            bytes: 3000,
608            packets: 30,
609        });
610
611        // Remove middle entry
612        let removed = tracker.remove(&2).unwrap();
613        assert_eq!(removed.bytes, 2000);
614        assert_eq!(tracker.len(), 2);
615
616        // Verify order preserved
617        let keys: Vec<_> = tracker.keys().copied().collect();
618        assert_eq!(keys, vec![1, 3]);
619
620        // Remove non-existent
621        assert!(tracker.remove(&99).is_none());
622    }
623
624    #[test]
625    fn test_manual_eviction_pattern() {
626        let mut tracker = Tracker::<i32, FlowData>::with_capacity(10);
627
628        tracker.get_or_insert_with(&1, || FlowData {
629            timestamp: 100,
630            bytes: 1000,
631            packets: 10,
632        });
633        tracker.get_or_insert_with(&2, || FlowData {
634            timestamp: 200,
635            bytes: 2000,
636            packets: 20,
637        });
638        tracker.get_or_insert_with(&3, || FlowData {
639            timestamp: 300,
640            bytes: 3000,
641            packets: 30,
642        });
643        tracker.get_or_insert_with(&4, || FlowData {
644            timestamp: 400,
645            bytes: 4000,
646            packets: 40,
647        });
648
649        // Manual eviction: iterate and pop old entries
650        let threshold = 250u64;
651        let mut evicted = Vec::new();
652
653        while let Some((_, v)) = tracker.front() {
654            if v.timestamp < threshold {
655                let (k, v) = tracker.pop_front().unwrap();
656                evicted.push((k, v));
657            } else {
658                break;
659            }
660        }
661
662        assert_eq!(evicted.len(), 2);
663        assert_eq!(evicted[0].0, 1);
664        assert_eq!(evicted[1].0, 2);
665        assert_eq!(tracker.len(), 2);
666    }
667}