1use std::fmt;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::time::{Duration, Instant};
33
34use dashmap::DashMap;
35use serde::{Deserialize, Serialize};
36
37use crate::address::GroupAddress;
38
39#[derive(Debug, Clone)]
45pub struct CacheEntry {
46 pub address: GroupAddress,
48 pub value: Vec<u8>,
50 pub source: Option<String>,
52 pub updated_at: Instant,
54 pub accessed_at: Instant,
56 pub update_count: u64,
58 pub access_count: u64,
60}
61
62impl CacheEntry {
63 fn new(address: GroupAddress, value: Vec<u8>, source: Option<String>) -> Self {
65 let now = Instant::now();
66 Self {
67 address,
68 value,
69 source,
70 updated_at: now,
71 accessed_at: now,
72 update_count: 1,
73 access_count: 0,
74 }
75 }
76
77 pub fn is_expired(&self, ttl: Duration) -> bool {
79 if ttl.is_zero() {
80 return false; }
82 self.updated_at.elapsed() > ttl
83 }
84
85 pub fn age(&self) -> Duration {
87 self.updated_at.elapsed()
88 }
89
90 pub fn idle_time(&self) -> Duration {
92 self.accessed_at.elapsed()
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct GroupValueCacheConfig {
103 #[serde(default = "default_true")]
105 pub enabled: bool,
106
107 #[serde(default = "default_max_entries")]
110 pub max_entries: usize,
111
112 #[serde(default = "default_ttl_ms")]
115 pub ttl_ms: u64,
116
117 #[serde(default = "default_true")]
119 pub auto_update_on_indication: bool,
120
121 #[serde(default = "default_true")]
123 pub auto_update_on_write: bool,
124}
125
126fn default_true() -> bool {
127 true
128}
129
130fn default_max_entries() -> usize {
131 4096
132}
133
134fn default_ttl_ms() -> u64 {
135 60_000 }
137
138impl Default for GroupValueCacheConfig {
139 fn default() -> Self {
140 Self {
141 enabled: true,
142 max_entries: default_max_entries(),
143 ttl_ms: default_ttl_ms(),
144 auto_update_on_indication: true,
145 auto_update_on_write: true,
146 }
147 }
148}
149
150impl GroupValueCacheConfig {
151 pub fn ttl(&self) -> Duration {
153 Duration::from_millis(self.ttl_ms)
154 }
155
156 pub fn validate(&self) -> Result<(), String> {
158 if self.max_entries == 0 {
159 return Err("GroupValueCache max_entries must be > 0".to_string());
160 }
161 Ok(())
162 }
163}
164
165pub struct GroupValueCache {
173 config: GroupValueCacheConfig,
175 entries: DashMap<GroupAddress, CacheEntry>,
177 stats: CacheStats,
181}
182
183pub struct CacheStats {
185 pub hits: AtomicU64,
187 pub misses: AtomicU64,
189 pub evictions: AtomicU64,
191 pub expirations: AtomicU64,
193 pub updates: AtomicU64,
195 pub indication_updates: AtomicU64,
197 pub write_updates: AtomicU64,
199}
200
201impl CacheStats {
202 fn new() -> Self {
203 Self {
204 hits: AtomicU64::new(0),
205 misses: AtomicU64::new(0),
206 evictions: AtomicU64::new(0),
207 expirations: AtomicU64::new(0),
208 updates: AtomicU64::new(0),
209 indication_updates: AtomicU64::new(0),
210 write_updates: AtomicU64::new(0),
211 }
212 }
213
214 pub fn snapshot(&self) -> CacheStatsSnapshot {
216 CacheStatsSnapshot {
217 hits: self.hits.load(Ordering::Relaxed),
218 misses: self.misses.load(Ordering::Relaxed),
219 evictions: self.evictions.load(Ordering::Relaxed),
220 expirations: self.expirations.load(Ordering::Relaxed),
221 updates: self.updates.load(Ordering::Relaxed),
222 indication_updates: self.indication_updates.load(Ordering::Relaxed),
223 write_updates: self.write_updates.load(Ordering::Relaxed),
224 }
225 }
226}
227
228#[derive(Debug, Clone)]
230pub struct CacheStatsSnapshot {
231 pub hits: u64,
232 pub misses: u64,
233 pub evictions: u64,
234 pub expirations: u64,
235 pub updates: u64,
236 pub indication_updates: u64,
237 pub write_updates: u64,
238}
239
240impl CacheStatsSnapshot {
241 pub fn hit_rate(&self) -> f64 {
243 let total = self.hits + self.misses;
244 if total == 0 {
245 return 0.0;
246 }
247 self.hits as f64 / total as f64
248 }
249
250 pub fn total_lookups(&self) -> u64 {
252 self.hits + self.misses
253 }
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
258pub enum UpdateSource {
259 Write,
261 Indication,
263}
264
265impl GroupValueCache {
266 pub fn new(config: GroupValueCacheConfig) -> Self {
268 Self {
269 entries: DashMap::with_capacity(config.max_entries.min(256)),
270 config,
271 stats: CacheStats::new(),
272 }
273 }
274
275 pub fn with_defaults() -> Self {
277 Self::new(GroupValueCacheConfig::default())
278 }
279
280 pub fn is_enabled(&self) -> bool {
282 self.config.enabled
283 }
284
285 pub fn config(&self) -> &GroupValueCacheConfig {
287 &self.config
288 }
289
290 pub fn get(&self, address: &GroupAddress) -> Option<Vec<u8>> {
297 if !self.config.enabled {
298 self.stats.misses.fetch_add(1, Ordering::Relaxed);
299 return None;
300 }
301
302 match self.entries.get_mut(address) {
303 Some(mut entry) => {
304 let ttl = self.config.ttl();
305 if entry.is_expired(ttl) {
306 drop(entry);
308 self.entries.remove(address);
309 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
310 self.stats.misses.fetch_add(1, Ordering::Relaxed);
311 None
312 } else {
313 entry.accessed_at = Instant::now();
315 entry.access_count += 1;
316 let value = entry.value.clone();
317 self.stats.hits.fetch_add(1, Ordering::Relaxed);
318 Some(value)
319 }
320 }
321 None => {
322 self.stats.misses.fetch_add(1, Ordering::Relaxed);
323 None
324 }
325 }
326 }
327
328 pub fn get_entry(&self, address: &GroupAddress) -> Option<CacheEntry> {
332 if !self.config.enabled {
333 return None;
334 }
335
336 match self.entries.get_mut(address) {
337 Some(mut entry) => {
338 let ttl = self.config.ttl();
339 if entry.is_expired(ttl) {
340 drop(entry);
341 self.entries.remove(address);
342 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
343 None
344 } else {
345 entry.accessed_at = Instant::now();
346 entry.access_count += 1;
347 let snapshot = entry.clone();
348 self.stats.hits.fetch_add(1, Ordering::Relaxed);
349 Some(snapshot)
350 }
351 }
352 None => None,
353 }
354 }
355
356 pub fn update(
360 &self,
361 address: GroupAddress,
362 value: Vec<u8>,
363 source: Option<String>,
364 update_source: UpdateSource,
365 ) {
366 if !self.config.enabled {
367 return;
368 }
369
370 match update_source {
372 UpdateSource::Write if !self.config.auto_update_on_write => return,
373 UpdateSource::Indication if !self.config.auto_update_on_indication => return,
374 _ => {}
375 }
376
377 if let Some(mut entry) = self.entries.get_mut(&address) {
379 entry.value = value;
380 entry.source = source;
381 entry.updated_at = Instant::now();
382 entry.accessed_at = Instant::now();
383 entry.update_count += 1;
384 } else {
385 self.evict_if_needed();
387 self.entries.insert(address, CacheEntry::new(address, value, source));
388 }
389
390 self.stats.updates.fetch_add(1, Ordering::Relaxed);
392 match update_source {
393 UpdateSource::Write => {
394 self.stats.write_updates.fetch_add(1, Ordering::Relaxed);
395 }
396 UpdateSource::Indication => {
397 self.stats.indication_updates.fetch_add(1, Ordering::Relaxed);
398 }
399 }
400 }
401
402 pub fn on_indication(
407 &self,
408 address: GroupAddress,
409 value: Vec<u8>,
410 source: Option<String>,
411 ) {
412 self.update(address, value, source, UpdateSource::Indication);
413 }
414
415 pub fn on_write(
419 &self,
420 address: GroupAddress,
421 value: Vec<u8>,
422 source: Option<String>,
423 ) {
424 self.update(address, value, source, UpdateSource::Write);
425 }
426
427 pub fn invalidate(&self, address: &GroupAddress) -> bool {
429 self.entries.remove(address).is_some()
430 }
431
432 pub fn invalidate_all(&self) {
434 self.entries.clear();
435 }
436
437 pub fn purge_expired(&self) -> usize {
441 let ttl = self.config.ttl();
442 if ttl.is_zero() {
443 return 0; }
445
446 let expired: Vec<GroupAddress> = self.entries
447 .iter()
448 .filter(|r| r.value().is_expired(ttl))
449 .map(|r| *r.key())
450 .collect();
451
452 let count = expired.len();
453 for addr in expired {
454 self.entries.remove(&addr);
455 }
456
457 self.stats.expirations.fetch_add(count as u64, Ordering::Relaxed);
458 count
459 }
460
461 pub fn len(&self) -> usize {
463 self.entries.len()
464 }
465
466 pub fn is_empty(&self) -> bool {
468 self.entries.is_empty()
469 }
470
471 pub fn addresses(&self) -> Vec<GroupAddress> {
473 self.entries.iter().map(|r| *r.key()).collect()
474 }
475
476 pub fn stats_snapshot(&self) -> CacheStatsSnapshot {
478 self.stats.snapshot()
479 }
480
481 fn evict_if_needed(&self) {
483 if self.entries.len() < self.config.max_entries {
484 return;
485 }
486
487 let lru_addr = self.entries
489 .iter()
490 .min_by_key(|r| r.value().accessed_at)
491 .map(|r| *r.key());
492
493 if let Some(addr) = lru_addr {
494 self.entries.remove(&addr);
495 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
496 }
497 }
498
499 pub fn entry_info(&self, address: &GroupAddress) -> Option<CacheEntryInfo> {
501 self.entries.get(address).map(|entry| {
502 let ttl = self.config.ttl();
503 CacheEntryInfo {
504 address: entry.address,
505 value_len: entry.value.len(),
506 source: entry.source.clone(),
507 age: entry.age(),
508 idle_time: entry.idle_time(),
509 update_count: entry.update_count,
510 access_count: entry.access_count,
511 is_expired: entry.is_expired(ttl),
512 }
513 })
514 }
515
516 pub fn all_entry_info(&self) -> Vec<CacheEntryInfo> {
518 let ttl = self.config.ttl();
519 self.entries
520 .iter()
521 .map(|r| {
522 let entry = r.value();
523 CacheEntryInfo {
524 address: entry.address,
525 value_len: entry.value.len(),
526 source: entry.source.clone(),
527 age: entry.age(),
528 idle_time: entry.idle_time(),
529 update_count: entry.update_count,
530 access_count: entry.access_count,
531 is_expired: entry.is_expired(ttl),
532 }
533 })
534 .collect()
535 }
536}
537
538#[derive(Debug, Clone)]
540pub struct CacheEntryInfo {
541 pub address: GroupAddress,
542 pub value_len: usize,
543 pub source: Option<String>,
544 pub age: Duration,
545 pub idle_time: Duration,
546 pub update_count: u64,
547 pub access_count: u64,
548 pub is_expired: bool,
549}
550
551impl fmt::Debug for GroupValueCache {
552 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
553 f.debug_struct("GroupValueCache")
554 .field("enabled", &self.config.enabled)
555 .field("entries", &self.entries.len())
556 .field("max_entries", &self.config.max_entries)
557 .field("ttl_ms", &self.config.ttl_ms)
558 .finish()
559 }
560}
561
562impl Default for GroupValueCache {
563 fn default() -> Self {
564 Self::with_defaults()
565 }
566}
567
568#[cfg(test)]
573mod tests {
574 use super::*;
575
576 fn make_addr(main: u8, middle: u8, sub: u8) -> GroupAddress {
577 GroupAddress::three_level(main, middle, sub)
578 }
579
580 #[test]
581 fn test_cache_basic_get_set() {
582 let cache = GroupValueCache::with_defaults();
583
584 let addr = make_addr(1, 0, 1);
585 cache.on_write(addr, vec![0x01], Some("1.1.1".into()));
586
587 let value = cache.get(&addr);
588 assert_eq!(value, Some(vec![0x01]));
589 }
590
591 #[test]
592 fn test_cache_miss() {
593 let cache = GroupValueCache::with_defaults();
594 let addr = make_addr(1, 0, 1);
595 assert_eq!(cache.get(&addr), None);
596 }
597
598 #[test]
599 fn test_cache_update_overwrites() {
600 let cache = GroupValueCache::with_defaults();
601 let addr = make_addr(1, 0, 1);
602
603 cache.on_write(addr, vec![0x01], None);
604 assert_eq!(cache.get(&addr), Some(vec![0x01]));
605
606 cache.on_write(addr, vec![0x02], None);
607 assert_eq!(cache.get(&addr), Some(vec![0x02]));
608 }
609
610 #[test]
611 fn test_cache_indication_update() {
612 let cache = GroupValueCache::with_defaults();
613 let addr = make_addr(1, 0, 1);
614
615 cache.on_indication(addr, vec![0x55], Some("1.2.3".into()));
616
617 let value = cache.get(&addr);
618 assert_eq!(value, Some(vec![0x55]));
619
620 let stats = cache.stats_snapshot();
621 assert_eq!(stats.indication_updates, 1);
622 }
623
624 #[test]
625 fn test_cache_disabled() {
626 let config = GroupValueCacheConfig {
627 enabled: false,
628 ..Default::default()
629 };
630 let cache = GroupValueCache::new(config);
631 let addr = make_addr(1, 0, 1);
632
633 cache.on_write(addr, vec![0x01], None);
634 assert_eq!(cache.get(&addr), None);
635 assert_eq!(cache.len(), 0);
636 }
637
638 #[test]
639 fn test_cache_ttl_expiration() {
640 let config = GroupValueCacheConfig {
641 ttl_ms: 1, ..Default::default()
643 };
644 let cache = GroupValueCache::new(config);
645 let addr = make_addr(1, 0, 1);
646
647 cache.on_write(addr, vec![0x01], None);
648
649 std::thread::sleep(Duration::from_millis(5));
651
652 assert_eq!(cache.get(&addr), None);
653
654 let stats = cache.stats_snapshot();
655 assert_eq!(stats.expirations, 1);
656 }
657
658 #[test]
659 fn test_cache_ttl_zero_no_expiration() {
660 let config = GroupValueCacheConfig {
661 ttl_ms: 0, ..Default::default()
663 };
664 let cache = GroupValueCache::new(config);
665 let addr = make_addr(1, 0, 1);
666
667 cache.on_write(addr, vec![0x01], None);
668
669 std::thread::sleep(Duration::from_millis(5));
671 assert_eq!(cache.get(&addr), Some(vec![0x01]));
672 }
673
674 #[test]
675 fn test_cache_lru_eviction() {
676 let config = GroupValueCacheConfig {
677 max_entries: 3,
678 ttl_ms: 0, ..Default::default()
680 };
681 let cache = GroupValueCache::new(config);
682
683 let addr1 = make_addr(1, 0, 1);
684 let addr2 = make_addr(1, 0, 2);
685 let addr3 = make_addr(1, 0, 3);
686 let addr4 = make_addr(1, 0, 4);
687
688 cache.on_write(addr1, vec![0x01], None);
689 std::thread::sleep(Duration::from_millis(1));
690 cache.on_write(addr2, vec![0x02], None);
691 std::thread::sleep(Duration::from_millis(1));
692 cache.on_write(addr3, vec![0x03], None);
693
694 assert_eq!(cache.len(), 3);
695
696 cache.get(&addr1);
698 std::thread::sleep(Duration::from_millis(1));
699
700 cache.on_write(addr4, vec![0x04], None);
702
703 assert_eq!(cache.len(), 3);
704 assert_eq!(cache.get(&addr1), Some(vec![0x01])); assert_eq!(cache.get(&addr2), None); assert_eq!(cache.get(&addr3), Some(vec![0x03])); assert_eq!(cache.get(&addr4), Some(vec![0x04])); let stats = cache.stats_snapshot();
710 assert_eq!(stats.evictions, 1);
711 }
712
713 #[test]
714 fn test_cache_invalidate() {
715 let cache = GroupValueCache::with_defaults();
716 let addr = make_addr(1, 0, 1);
717
718 cache.on_write(addr, vec![0x01], None);
719 assert!(cache.invalidate(&addr));
720 assert_eq!(cache.get(&addr), None);
721 assert!(!cache.invalidate(&addr)); }
723
724 #[test]
725 fn test_cache_invalidate_all() {
726 let cache = GroupValueCache::with_defaults();
727 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
728 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
729 cache.on_write(make_addr(1, 0, 3), vec![0x03], None);
730
731 assert_eq!(cache.len(), 3);
732 cache.invalidate_all();
733 assert_eq!(cache.len(), 0);
734 }
735
736 #[test]
737 fn test_cache_purge_expired() {
738 let config = GroupValueCacheConfig {
739 ttl_ms: 1,
740 ..Default::default()
741 };
742 let cache = GroupValueCache::new(config);
743
744 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
745 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
746
747 std::thread::sleep(Duration::from_millis(5));
748
749 let purged = cache.purge_expired();
750 assert_eq!(purged, 2);
751 assert!(cache.is_empty());
752 }
753
754 #[test]
755 fn test_cache_stats() {
756 let cache = GroupValueCache::with_defaults();
757 let addr = make_addr(1, 0, 1);
758
759 cache.get(&addr);
761
762 cache.on_write(addr, vec![0x01], None);
764
765 cache.get(&addr);
767
768 cache.on_indication(addr, vec![0x02], None);
770
771 let stats = cache.stats_snapshot();
772 assert_eq!(stats.hits, 1);
773 assert_eq!(stats.misses, 1);
774 assert_eq!(stats.updates, 2);
775 assert_eq!(stats.write_updates, 1);
776 assert_eq!(stats.indication_updates, 1);
777 assert_eq!(stats.total_lookups(), 2);
778 assert_eq!(stats.hit_rate(), 0.5);
779 }
780
781 #[test]
782 fn test_cache_stats_hit_rate_zero() {
783 let stats = CacheStatsSnapshot {
784 hits: 0,
785 misses: 0,
786 evictions: 0,
787 expirations: 0,
788 updates: 0,
789 indication_updates: 0,
790 write_updates: 0,
791 };
792 assert_eq!(stats.hit_rate(), 0.0);
793 }
794
795 #[test]
796 fn test_cache_get_entry() {
797 let cache = GroupValueCache::with_defaults();
798 let addr = make_addr(1, 0, 1);
799
800 cache.on_write(addr, vec![0x42], Some("1.1.1".into()));
801
802 let entry = cache.get_entry(&addr).unwrap();
803 assert_eq!(entry.address, addr);
804 assert_eq!(entry.value, vec![0x42]);
805 assert_eq!(entry.source, Some("1.1.1".to_string()));
806 assert_eq!(entry.update_count, 1);
807 assert_eq!(entry.access_count, 1); }
809
810 #[test]
811 fn test_cache_entry_info() {
812 let cache = GroupValueCache::with_defaults();
813 let addr = make_addr(1, 0, 1);
814
815 cache.on_write(addr, vec![0x42, 0x43], Some("1.1.1".into()));
816
817 let info = cache.entry_info(&addr).unwrap();
818 assert_eq!(info.address, addr);
819 assert_eq!(info.value_len, 2);
820 assert_eq!(info.source, Some("1.1.1".to_string()));
821 assert_eq!(info.update_count, 1);
822 assert!(!info.is_expired);
823 }
824
825 #[test]
826 fn test_cache_addresses() {
827 let cache = GroupValueCache::with_defaults();
828 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
829 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
830
831 let addresses = cache.addresses();
832 assert_eq!(addresses.len(), 2);
833 }
834
835 #[test]
836 fn test_cache_auto_update_disabled() {
837 let config = GroupValueCacheConfig {
838 auto_update_on_indication: false,
839 auto_update_on_write: true,
840 ..Default::default()
841 };
842 let cache = GroupValueCache::new(config);
843 let addr = make_addr(1, 0, 1);
844
845 cache.on_write(addr, vec![0x01], None);
847 assert_eq!(cache.get(&addr), Some(vec![0x01]));
848
849 cache.on_indication(addr, vec![0x02], None);
851 assert_eq!(cache.get(&addr), Some(vec![0x01])); }
853
854 #[test]
855 fn test_cache_config_validate() {
856 assert!(GroupValueCacheConfig::default().validate().is_ok());
857 assert!(GroupValueCacheConfig {
858 max_entries: 0,
859 ..Default::default()
860 }.validate().is_err());
861 }
862
863 #[test]
864 fn test_cache_config_defaults() {
865 let config = GroupValueCacheConfig::default();
866 assert!(config.enabled);
867 assert_eq!(config.max_entries, 4096);
868 assert_eq!(config.ttl_ms, 60_000);
869 assert!(config.auto_update_on_indication);
870 assert!(config.auto_update_on_write);
871 }
872
873 #[test]
874 fn test_cache_debug() {
875 let cache = GroupValueCache::with_defaults();
876 let debug_str = format!("{:?}", cache);
877 assert!(debug_str.contains("GroupValueCache"));
878 assert!(debug_str.contains("enabled"));
879 }
880
881 #[test]
882 fn test_update_count_increments() {
883 let cache = GroupValueCache::with_defaults();
884 let addr = make_addr(1, 0, 1);
885
886 cache.on_write(addr, vec![0x01], None);
887 cache.on_write(addr, vec![0x02], None);
888 cache.on_indication(addr, vec![0x03], None);
889
890 let entry = cache.get_entry(&addr).unwrap();
891 assert_eq!(entry.update_count, 3);
892 assert_eq!(entry.value, vec![0x03]);
893 }
894
895 #[test]
896 fn test_cache_entry_expired() {
897 let entry = CacheEntry::new(
898 make_addr(1, 0, 1),
899 vec![0x01],
900 None,
901 );
902
903 assert!(!entry.is_expired(Duration::from_secs(3600)));
905
906 assert!(!entry.is_expired(Duration::ZERO));
908 }
909
910 #[test]
911 fn test_cache_all_entry_info() {
912 let cache = GroupValueCache::with_defaults();
913 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
914 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
915
916 let info = cache.all_entry_info();
917 assert_eq!(info.len(), 2);
918 }
919}