1use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, RwLock};
11
12use super::{CacheStats, CachedParse, ParseCache};
13
14const ESTIMATED_ENTRY_SIZE: usize = 1024; pub struct LruParseCache {
22 max_entries: usize,
24
25 entries: RwLock<HashMap<u64, (Arc<CachedParse>, u64)>>,
27
28 access_counter: AtomicU64,
30
31 readers: RwLock<HashMap<usize, u64>>,
34
35 next_reader_id: AtomicUsize,
37
38 hits: AtomicU64,
40 misses: AtomicU64,
41
42 evictions_lru: AtomicU64,
44 evictions_reader: AtomicU64,
46 peak_entries: AtomicUsize,
48
49 reader_eviction_enabled: bool,
51}
52
53impl LruParseCache {
54 pub fn new(max_entries: usize) -> Self {
59 Self::with_options(max_entries, true)
60 }
61
62 pub fn with_options(max_entries: usize, reader_eviction_enabled: bool) -> Self {
70 Self {
71 max_entries,
72 entries: RwLock::new(HashMap::with_capacity(max_entries.min(10000))),
73 access_counter: AtomicU64::new(0),
74 readers: RwLock::new(HashMap::new()),
75 next_reader_id: AtomicUsize::new(0),
76 hits: AtomicU64::new(0),
77 misses: AtomicU64::new(0),
78 evictions_lru: AtomicU64::new(0),
79 evictions_reader: AtomicU64::new(0),
80 peak_entries: AtomicUsize::new(0),
81 reader_eviction_enabled,
82 }
83 }
84
85 fn evict_passed_entries(&self, entries: &mut HashMap<u64, (Arc<CachedParse>, u64)>) {
87 if !self.reader_eviction_enabled {
89 return;
90 }
91
92 let readers = self.readers.read().unwrap();
93 if readers.is_empty() {
94 return;
95 }
96
97 let min_passed = readers.values().min().copied().unwrap_or(0);
99
100 let before_count = entries.len();
101
102 entries.retain(|&frame_number, _| frame_number >= min_passed);
104
105 let evicted = before_count - entries.len();
106 if evicted > 0 {
107 self.evictions_reader
108 .fetch_add(evicted as u64, Ordering::Relaxed);
109 }
110 }
111
112 fn evict_lru(&self, entries: &mut HashMap<u64, (Arc<CachedParse>, u64)>, target_size: usize) {
114 if entries.len() <= target_size {
115 return;
116 }
117
118 let to_remove = entries.len() - target_size;
119
120 let mut access_orders: Vec<_> = entries
122 .iter()
123 .map(|(&frame, &(_, order))| (frame, order))
124 .collect();
125 access_orders.sort_by_key(|&(_, order)| order);
126
127 for (frame, _) in access_orders.into_iter().take(to_remove) {
129 entries.remove(&frame);
130 }
131
132 self.evictions_lru
133 .fetch_add(to_remove as u64, Ordering::Relaxed);
134 }
135
136 fn update_peak(&self, current: usize) {
138 let mut peak = self.peak_entries.load(Ordering::Relaxed);
139 while current > peak {
140 match self.peak_entries.compare_exchange_weak(
141 peak,
142 current,
143 Ordering::Relaxed,
144 Ordering::Relaxed,
145 ) {
146 Ok(_) => break,
147 Err(actual) => peak = actual,
148 }
149 }
150 }
151
152 pub fn get_stats(&self) -> CacheStats {
154 let entries = self.entries.read().unwrap();
155 let readers = self.readers.read().unwrap();
156
157 CacheStats {
158 hits: self.hits.load(Ordering::Relaxed),
159 misses: self.misses.load(Ordering::Relaxed),
160 entries: entries.len(),
161 max_entries: self.max_entries,
162 evictions_lru: self.evictions_lru.load(Ordering::Relaxed),
163 evictions_reader: self.evictions_reader.load(Ordering::Relaxed),
164 peak_entries: self.peak_entries.load(Ordering::Relaxed),
165 active_readers: readers.len(),
166 memory_bytes_estimate: entries.len() * ESTIMATED_ENTRY_SIZE,
167 }
168 }
169
170 pub fn reset_stats(&self) {
172 self.hits.store(0, Ordering::Relaxed);
173 self.misses.store(0, Ordering::Relaxed);
174 self.evictions_lru.store(0, Ordering::Relaxed);
175 self.evictions_reader.store(0, Ordering::Relaxed);
176 }
178
179 pub fn clear(&self) {
181 let mut entries = self.entries.write().unwrap();
182 entries.clear();
183 }
184}
185
186impl ParseCache for LruParseCache {
187 fn get(&self, frame_number: u64) -> Option<Arc<CachedParse>> {
188 let mut entries = self.entries.write().unwrap();
189
190 if let Some((cached, access_order)) = entries.get_mut(&frame_number) {
191 *access_order = self.access_counter.fetch_add(1, Ordering::Relaxed);
193 self.hits.fetch_add(1, Ordering::Relaxed);
194 Some(cached.clone())
195 } else {
196 self.misses.fetch_add(1, Ordering::Relaxed);
197 None
198 }
199 }
200
201 fn put(&self, frame_number: u64, parsed: Arc<CachedParse>) {
202 let mut entries = self.entries.write().unwrap();
203
204 if entries.contains_key(&frame_number) {
206 return;
207 }
208
209 if entries.len() >= self.max_entries {
211 self.evict_passed_entries(&mut entries);
212
213 if entries.len() >= self.max_entries {
214 let target = (self.max_entries as f64 * 0.9) as usize;
216 self.evict_lru(&mut entries, target);
217 }
218 }
219
220 let access_order = self.access_counter.fetch_add(1, Ordering::Relaxed);
221 entries.insert(frame_number, (parsed, access_order));
222
223 self.update_peak(entries.len());
225 }
226
227 fn reset_stats(&self) {
228 LruParseCache::reset_stats(self);
229 }
230
231 fn reader_passed(&self, reader_id: usize, frame_number: u64) {
232 let mut readers = self.readers.write().unwrap();
233 if let Some(pos) = readers.get_mut(&reader_id) {
234 *pos = frame_number;
235 }
236 }
237
238 fn register_reader(&self) -> usize {
239 let id = self.next_reader_id.fetch_add(1, Ordering::Relaxed);
240 let mut readers = self.readers.write().unwrap();
241 readers.insert(id, 0);
242 id
243 }
244
245 fn unregister_reader(&self, reader_id: usize) {
246 let mut readers = self.readers.write().unwrap();
247 readers.remove(&reader_id);
248 }
249
250 fn stats(&self) -> Option<CacheStats> {
251 Some(self.get_stats())
252 }
253
254 fn get_or_insert_with(
255 &self,
256 frame_number: u64,
257 f: Box<dyn FnOnce() -> Arc<CachedParse> + '_>,
258 ) -> (Arc<CachedParse>, bool) {
259 {
261 let entries = self.entries.read().unwrap();
262 if let Some((cached, _)) = entries.get(&frame_number) {
263 self.hits.fetch_add(1, Ordering::Relaxed);
264 return (cached.clone(), true);
265 }
266 }
267
268 self.misses.fetch_add(1, Ordering::Relaxed);
270 let result = f();
271
272 {
274 let mut entries = self.entries.write().unwrap();
275
276 if entries.len() >= self.max_entries {
278 self.evict_passed_entries(&mut entries);
279 if entries.len() >= self.max_entries {
280 let target = (self.max_entries as f64 * 0.9) as usize;
281 self.evict_lru(&mut entries, target);
282 }
283 }
284
285 let access_order = self.access_counter.fetch_add(1, Ordering::Relaxed);
286 entries.insert(frame_number, (result.clone(), access_order));
287 self.update_peak(entries.len());
288 }
289
290 (result, false)
291 }
292}
293
294impl std::fmt::Debug for LruParseCache {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 let stats = self.get_stats();
297 f.debug_struct("LruParseCache")
298 .field("max_entries", &self.max_entries)
299 .field("entries", &stats.entries)
300 .field("hits", &stats.hits)
301 .field("misses", &stats.misses)
302 .field("hit_ratio", &format!("{:.2}%", stats.hit_ratio() * 100.0))
303 .finish()
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_cache_hit_miss() {
313 let cache = LruParseCache::new(100);
314
315 assert!(cache.get(1).is_none());
317 assert_eq!(cache.get_stats().misses, 1);
318
319 let parsed = Arc::new(CachedParse {
321 frame_number: 1,
322 protocols: vec![],
323 });
324 cache.put(1, parsed.clone());
325
326 assert!(cache.get(1).is_some());
327 assert_eq!(cache.get_stats().hits, 1);
328 }
329
330 #[test]
331 fn test_lru_eviction() {
332 let cache = LruParseCache::new(3);
333
334 for i in 1..=3 {
336 cache.put(
337 i,
338 Arc::new(CachedParse {
339 frame_number: i,
340 protocols: vec![],
341 }),
342 );
343 }
344
345 let _ = cache.get(1);
347
348 cache.put(
350 4,
351 Arc::new(CachedParse {
352 frame_number: 4,
353 protocols: vec![],
354 }),
355 );
356
357 assert!(cache.get(1).is_some());
359 assert!(cache.get(4).is_some());
361 }
363
364 #[test]
365 fn test_reader_tracking() {
366 let cache = LruParseCache::new(100);
367
368 let r1 = cache.register_reader();
369 let r2 = cache.register_reader();
370
371 assert_ne!(r1, r2);
372
373 for i in 1..=10 {
375 cache.put(
376 i,
377 Arc::new(CachedParse {
378 frame_number: i,
379 protocols: vec![],
380 }),
381 );
382 }
383
384 assert_eq!(cache.get_stats().entries, 10);
385
386 cache.reader_passed(r1, 5);
388
389 cache.reader_passed(r2, 5);
391
392 cache.unregister_reader(r1);
394 cache.unregister_reader(r2);
395 }
396
397 #[test]
398 fn test_duplicate_put_ignored() {
399 let cache = LruParseCache::new(100);
400
401 let parsed1 = Arc::new(CachedParse {
402 frame_number: 1,
403 protocols: vec![],
404 });
405 let parsed2 = Arc::new(CachedParse {
406 frame_number: 1,
407 protocols: vec![(
408 "test",
409 super::super::OwnedParseResult {
410 fields: std::collections::HashMap::new(),
411 error: None,
412 encap_depth: 0,
413 tunnel_type: crate::protocol::TunnelType::None,
414 tunnel_id: None,
415 },
416 )],
417 });
418
419 cache.put(1, parsed1);
420 cache.put(1, parsed2);
421
422 let cached = cache.get(1).unwrap();
424 assert!(cached.protocols.is_empty());
425 }
426
427 #[test]
428 fn test_clear() {
429 let cache = LruParseCache::new(100);
430
431 for i in 1..=10 {
432 cache.put(
433 i,
434 Arc::new(CachedParse {
435 frame_number: i,
436 protocols: vec![],
437 }),
438 );
439 }
440
441 assert_eq!(cache.get_stats().entries, 10);
442
443 cache.clear();
444
445 assert_eq!(cache.get_stats().entries, 0);
446 }
447
448 #[test]
449 fn test_evict_passed_entries() {
450 let cache = LruParseCache::new(100);
451
452 let r1 = cache.register_reader();
454 let r2 = cache.register_reader();
455
456 for i in 1..=10 {
458 cache.put(
459 i,
460 Arc::new(CachedParse {
461 frame_number: i,
462 protocols: vec![],
463 }),
464 );
465 }
466
467 cache.reader_passed(r1, 5);
469
470 assert!(cache.get(3).is_some());
472
473 cache.reader_passed(r2, 7);
475
476 }
479
480 #[test]
481 fn test_debug_format() {
482 let cache = LruParseCache::new(100);
483 cache.put(
484 1,
485 Arc::new(CachedParse {
486 frame_number: 1,
487 protocols: vec![],
488 }),
489 );
490
491 let debug_str = format!("{:?}", cache);
492 assert!(debug_str.contains("LruParseCache"));
493 assert!(debug_str.contains("max_entries"));
494 }
495
496 #[test]
497 fn test_concurrent_access() {
498 use std::thread;
499
500 let cache = Arc::new(LruParseCache::new(1000));
501 let mut handles = vec![];
502
503 for t in 0..4 {
505 let cache_clone = cache.clone();
506 let handle = thread::spawn(move || {
507 for i in 0..100 {
508 let frame = (t * 100 + i) as u64;
509 cache_clone.put(
510 frame,
511 Arc::new(CachedParse {
512 frame_number: frame,
513 protocols: vec![],
514 }),
515 );
516 let _ = cache_clone.get(frame);
518 }
519 });
520 handles.push(handle);
521 }
522
523 for handle in handles {
525 handle.join().unwrap();
526 }
527
528 let stats = cache.get_stats();
530 assert!(stats.entries > 0);
531 assert!(stats.hits > 0);
532 }
533
534 #[test]
535 fn test_heavy_eviction() {
536 let cache = LruParseCache::new(10);
537
538 for i in 1..=100 {
540 cache.put(
541 i,
542 Arc::new(CachedParse {
543 frame_number: i,
544 protocols: vec![],
545 }),
546 );
547 }
548
549 let stats = cache.get_stats();
551 assert!(stats.entries <= 10);
552
553 let mut found_recent = false;
556 for i in 90..=100 {
557 if cache.get(i).is_some() {
558 found_recent = true;
559 break;
560 }
561 }
562 assert!(found_recent, "At least one recent entry should be in cache");
563 }
564
565 #[test]
566 fn test_stats_accuracy() {
567 let cache = LruParseCache::new(100);
568
569 let stats = cache.get_stats();
571 assert_eq!(stats.hits, 0);
572 assert_eq!(stats.misses, 0);
573 assert_eq!(stats.entries, 0);
574 assert_eq!(stats.max_entries, 100);
575
576 cache.get(1);
578 cache.get(2);
579 cache.get(3);
580
581 cache.put(
583 1,
584 Arc::new(CachedParse {
585 frame_number: 1,
586 protocols: vec![],
587 }),
588 );
589 cache.put(
590 2,
591 Arc::new(CachedParse {
592 frame_number: 2,
593 protocols: vec![],
594 }),
595 );
596
597 cache.get(1);
599 cache.get(2);
600
601 let stats = cache.get_stats();
602 assert_eq!(stats.misses, 3);
603 assert_eq!(stats.hits, 2);
604 assert_eq!(stats.entries, 2);
605 assert!((stats.hit_ratio() - 0.4).abs() < 0.01); }
607
608 #[test]
609 fn test_reader_eviction_boundary() {
610 let cache = LruParseCache::new(20);
611
612 let r1 = cache.register_reader();
614 let r2 = cache.register_reader();
615
616 for i in 1..=15 {
618 cache.put(
619 i,
620 Arc::new(CachedParse {
621 frame_number: i,
622 protocols: vec![],
623 }),
624 );
625 }
626
627 cache.reader_passed(r1, 5);
629 cache.reader_passed(r2, 10);
630
631 for i in 16..=25 {
636 cache.put(
637 i,
638 Arc::new(CachedParse {
639 frame_number: i,
640 protocols: vec![],
641 }),
642 );
643 }
644
645 cache.reader_passed(r1, 15);
647
648 cache.unregister_reader(r1);
650 cache.unregister_reader(r2);
651
652 assert!(cache.get(25).is_some());
654 }
655
656 #[test]
657 fn test_zero_size_cache() {
658 let cache = LruParseCache::new(0);
663
664 cache.put(
665 1,
666 Arc::new(CachedParse {
667 frame_number: 1,
668 protocols: vec![],
669 }),
670 );
671 cache.put(
672 2,
673 Arc::new(CachedParse {
674 frame_number: 2,
675 protocols: vec![],
676 }),
677 );
678
679 let stats = cache.get_stats();
682 assert!(stats.max_entries == 0);
684 }
685
686 #[test]
687 fn test_eviction_counters() {
688 let cache = LruParseCache::new(5);
689
690 for i in 1..=5 {
692 cache.put(
693 i,
694 Arc::new(CachedParse {
695 frame_number: i,
696 protocols: vec![],
697 }),
698 );
699 }
700
701 let stats = cache.get_stats();
702 assert_eq!(stats.evictions_lru, 0);
703 assert_eq!(stats.evictions_reader, 0);
704
705 for i in 6..=10 {
707 cache.put(
708 i,
709 Arc::new(CachedParse {
710 frame_number: i,
711 protocols: vec![],
712 }),
713 );
714 }
715
716 let stats = cache.get_stats();
717 assert!(stats.evictions_lru > 0);
719 assert_eq!(
720 stats.total_evictions(),
721 stats.evictions_lru + stats.evictions_reader
722 );
723 }
724
725 #[test]
726 fn test_reader_eviction_counters() {
727 let cache = LruParseCache::new(20);
728
729 let r1 = cache.register_reader();
731
732 for i in 1..=10 {
734 cache.put(
735 i,
736 Arc::new(CachedParse {
737 frame_number: i,
738 protocols: vec![],
739 }),
740 );
741 }
742
743 cache.reader_passed(r1, 5);
745
746 let stats_before = cache.get_stats();
747 let evictions_reader_before = stats_before.evictions_reader;
748
749 for i in 11..=25 {
751 cache.put(
752 i,
753 Arc::new(CachedParse {
754 frame_number: i,
755 protocols: vec![],
756 }),
757 );
758 }
759
760 let stats = cache.get_stats();
761 assert!(stats.evictions_reader >= evictions_reader_before);
763
764 cache.unregister_reader(r1);
765 }
766
767 #[test]
768 fn test_peak_entries_tracking() {
769 let cache = LruParseCache::new(10);
770
771 for i in 1..=10 {
773 cache.put(
774 i,
775 Arc::new(CachedParse {
776 frame_number: i,
777 protocols: vec![],
778 }),
779 );
780 }
781
782 let stats = cache.get_stats();
783 assert_eq!(stats.peak_entries, 10);
784 assert_eq!(stats.entries, 10);
785
786 cache.clear();
788
789 let stats = cache.get_stats();
790 assert_eq!(stats.entries, 0);
791 assert_eq!(stats.peak_entries, 10);
793 }
794
795 #[test]
796 fn test_active_readers_count() {
797 let cache = LruParseCache::new(100);
798
799 let stats = cache.get_stats();
800 assert_eq!(stats.active_readers, 0);
801
802 let r1 = cache.register_reader();
803 let stats = cache.get_stats();
804 assert_eq!(stats.active_readers, 1);
805
806 let r2 = cache.register_reader();
807 let stats = cache.get_stats();
808 assert_eq!(stats.active_readers, 2);
809
810 cache.unregister_reader(r1);
811 let stats = cache.get_stats();
812 assert_eq!(stats.active_readers, 1);
813
814 cache.unregister_reader(r2);
815 let stats = cache.get_stats();
816 assert_eq!(stats.active_readers, 0);
817 }
818
819 #[test]
820 fn test_memory_bytes_estimate() {
821 let cache = LruParseCache::new(100);
822
823 let stats = cache.get_stats();
824 assert_eq!(stats.memory_bytes_estimate, 0);
825
826 for i in 1..=5 {
828 cache.put(
829 i,
830 Arc::new(CachedParse {
831 frame_number: i,
832 protocols: vec![],
833 }),
834 );
835 }
836
837 let stats = cache.get_stats();
838 assert_eq!(stats.memory_bytes_estimate, 5 * ESTIMATED_ENTRY_SIZE);
840 }
841
842 #[test]
843 fn test_reset_stats() {
844 let cache = LruParseCache::new(5);
845
846 cache.get(1); cache.put(
849 1,
850 Arc::new(CachedParse {
851 frame_number: 1,
852 protocols: vec![],
853 }),
854 );
855 cache.get(1); for i in 2..=10 {
859 cache.put(
860 i,
861 Arc::new(CachedParse {
862 frame_number: i,
863 protocols: vec![],
864 }),
865 );
866 }
867
868 let stats = cache.get_stats();
869 assert!(stats.hits > 0);
870 assert!(stats.misses > 0);
871 assert!(stats.evictions_lru > 0);
872 let old_peak = stats.peak_entries;
873
874 cache.reset_stats();
876
877 let stats = cache.get_stats();
878 assert_eq!(stats.hits, 0);
879 assert_eq!(stats.misses, 0);
880 assert_eq!(stats.evictions_lru, 0);
881 assert_eq!(stats.evictions_reader, 0);
882 assert_eq!(stats.peak_entries, old_peak);
884 assert!(stats.entries > 0);
886 }
887
888 #[test]
889 fn test_utilization() {
890 let cache = LruParseCache::new(100);
891
892 let stats = cache.get_stats();
893 assert!((stats.utilization() - 0.0).abs() < 0.001);
894
895 for i in 1..=50 {
897 cache.put(
898 i,
899 Arc::new(CachedParse {
900 frame_number: i,
901 protocols: vec![],
902 }),
903 );
904 }
905
906 let stats = cache.get_stats();
907 assert!((stats.utilization() - 0.5).abs() < 0.001);
908 }
909}