1use std::fmt;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::time::{Duration, Instant};
33
34use dashmap::DashMap;
35use serde::{Deserialize, Serialize};
36
37use crate::address::GroupAddress;
38
39#[derive(Debug, Clone)]
45pub struct CacheEntry {
46 pub address: GroupAddress,
48 pub value: Vec<u8>,
50 pub source: Option<String>,
52 pub updated_at: Instant,
54 pub accessed_at: Instant,
56 pub update_count: u64,
58 pub access_count: u64,
60}
61
62impl CacheEntry {
63 fn new(address: GroupAddress, value: Vec<u8>, source: Option<String>) -> Self {
65 let now = Instant::now();
66 Self {
67 address,
68 value,
69 source,
70 updated_at: now,
71 accessed_at: now,
72 update_count: 1,
73 access_count: 0,
74 }
75 }
76
77 pub fn is_expired(&self, ttl: Duration) -> bool {
79 if ttl.is_zero() {
80 return false; }
82 self.updated_at.elapsed() > ttl
83 }
84
85 pub fn age(&self) -> Duration {
87 self.updated_at.elapsed()
88 }
89
90 pub fn idle_time(&self) -> Duration {
92 self.accessed_at.elapsed()
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct GroupValueCacheConfig {
103 #[serde(default = "default_true")]
105 pub enabled: bool,
106
107 #[serde(default = "default_max_entries")]
110 pub max_entries: usize,
111
112 #[serde(default = "default_ttl_ms")]
115 pub ttl_ms: u64,
116
117 #[serde(default = "default_true")]
119 pub auto_update_on_indication: bool,
120
121 #[serde(default = "default_true")]
123 pub auto_update_on_write: bool,
124}
125
126fn default_true() -> bool {
127 true
128}
129
130fn default_max_entries() -> usize {
131 4096
132}
133
134fn default_ttl_ms() -> u64 {
135 60_000 }
137
138impl Default for GroupValueCacheConfig {
139 fn default() -> Self {
140 Self {
141 enabled: true,
142 max_entries: default_max_entries(),
143 ttl_ms: default_ttl_ms(),
144 auto_update_on_indication: true,
145 auto_update_on_write: true,
146 }
147 }
148}
149
150impl GroupValueCacheConfig {
151 pub fn ttl(&self) -> Duration {
153 Duration::from_millis(self.ttl_ms)
154 }
155
156 pub fn validate(&self) -> Result<(), String> {
158 if self.max_entries == 0 {
159 return Err("GroupValueCache max_entries must be > 0".to_string());
160 }
161 Ok(())
162 }
163}
164
165pub struct GroupValueCache {
173 config: GroupValueCacheConfig,
175 entries: DashMap<GroupAddress, CacheEntry>,
177 stats: CacheStats,
181}
182
183pub struct CacheStats {
185 pub hits: AtomicU64,
187 pub misses: AtomicU64,
189 pub evictions: AtomicU64,
191 pub expirations: AtomicU64,
193 pub updates: AtomicU64,
195 pub indication_updates: AtomicU64,
197 pub write_updates: AtomicU64,
199}
200
201impl CacheStats {
202 fn new() -> Self {
203 Self {
204 hits: AtomicU64::new(0),
205 misses: AtomicU64::new(0),
206 evictions: AtomicU64::new(0),
207 expirations: AtomicU64::new(0),
208 updates: AtomicU64::new(0),
209 indication_updates: AtomicU64::new(0),
210 write_updates: AtomicU64::new(0),
211 }
212 }
213
214 pub fn snapshot(&self) -> CacheStatsSnapshot {
216 CacheStatsSnapshot {
217 hits: self.hits.load(Ordering::Relaxed),
218 misses: self.misses.load(Ordering::Relaxed),
219 evictions: self.evictions.load(Ordering::Relaxed),
220 expirations: self.expirations.load(Ordering::Relaxed),
221 updates: self.updates.load(Ordering::Relaxed),
222 indication_updates: self.indication_updates.load(Ordering::Relaxed),
223 write_updates: self.write_updates.load(Ordering::Relaxed),
224 }
225 }
226}
227
228#[derive(Debug, Clone)]
230pub struct CacheStatsSnapshot {
231 pub hits: u64,
232 pub misses: u64,
233 pub evictions: u64,
234 pub expirations: u64,
235 pub updates: u64,
236 pub indication_updates: u64,
237 pub write_updates: u64,
238}
239
240impl CacheStatsSnapshot {
241 pub fn hit_rate(&self) -> f64 {
243 let total = self.hits + self.misses;
244 if total == 0 {
245 return 0.0;
246 }
247 self.hits as f64 / total as f64
248 }
249
250 pub fn total_lookups(&self) -> u64 {
252 self.hits + self.misses
253 }
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
258pub enum UpdateSource {
259 Write,
261 Indication,
263}
264
265impl GroupValueCache {
266 pub fn new(config: GroupValueCacheConfig) -> Self {
268 Self {
269 entries: DashMap::with_capacity(config.max_entries.min(256)),
270 config,
271 stats: CacheStats::new(),
272 }
273 }
274
275 pub fn with_defaults() -> Self {
277 Self::new(GroupValueCacheConfig::default())
278 }
279
280 pub fn is_enabled(&self) -> bool {
282 self.config.enabled
283 }
284
285 pub fn config(&self) -> &GroupValueCacheConfig {
287 &self.config
288 }
289
290 pub fn get(&self, address: &GroupAddress) -> Option<Vec<u8>> {
297 if !self.config.enabled {
298 self.stats.misses.fetch_add(1, Ordering::Relaxed);
299 return None;
300 }
301
302 match self.entries.get_mut(address) {
303 Some(mut entry) => {
304 let ttl = self.config.ttl();
305 if entry.is_expired(ttl) {
306 drop(entry);
308 self.entries.remove(address);
309 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
310 self.stats.misses.fetch_add(1, Ordering::Relaxed);
311 None
312 } else {
313 entry.accessed_at = Instant::now();
315 entry.access_count += 1;
316 let value = entry.value.clone();
317 self.stats.hits.fetch_add(1, Ordering::Relaxed);
318 Some(value)
319 }
320 }
321 None => {
322 self.stats.misses.fetch_add(1, Ordering::Relaxed);
323 None
324 }
325 }
326 }
327
328 pub fn get_entry(&self, address: &GroupAddress) -> Option<CacheEntry> {
332 if !self.config.enabled {
333 return None;
334 }
335
336 match self.entries.get_mut(address) {
337 Some(mut entry) => {
338 let ttl = self.config.ttl();
339 if entry.is_expired(ttl) {
340 drop(entry);
341 self.entries.remove(address);
342 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
343 None
344 } else {
345 entry.accessed_at = Instant::now();
346 entry.access_count += 1;
347 let snapshot = entry.clone();
348 self.stats.hits.fetch_add(1, Ordering::Relaxed);
349 Some(snapshot)
350 }
351 }
352 None => None,
353 }
354 }
355
356 pub fn update(
360 &self,
361 address: GroupAddress,
362 value: Vec<u8>,
363 source: Option<String>,
364 update_source: UpdateSource,
365 ) {
366 if !self.config.enabled {
367 return;
368 }
369
370 match update_source {
372 UpdateSource::Write if !self.config.auto_update_on_write => return,
373 UpdateSource::Indication if !self.config.auto_update_on_indication => return,
374 _ => {}
375 }
376
377 if let Some(mut entry) = self.entries.get_mut(&address) {
379 entry.value = value;
380 entry.source = source;
381 entry.updated_at = Instant::now();
382 entry.accessed_at = Instant::now();
383 entry.update_count += 1;
384 } else {
385 self.evict_if_needed();
387 self.entries
388 .insert(address, CacheEntry::new(address, value, source));
389 }
390
391 self.stats.updates.fetch_add(1, Ordering::Relaxed);
393 match update_source {
394 UpdateSource::Write => {
395 self.stats.write_updates.fetch_add(1, Ordering::Relaxed);
396 }
397 UpdateSource::Indication => {
398 self.stats
399 .indication_updates
400 .fetch_add(1, Ordering::Relaxed);
401 }
402 }
403 }
404
405 pub fn on_indication(&self, address: GroupAddress, value: Vec<u8>, source: Option<String>) {
410 self.update(address, value, source, UpdateSource::Indication);
411 }
412
413 pub fn on_write(&self, address: GroupAddress, value: Vec<u8>, source: Option<String>) {
417 self.update(address, value, source, UpdateSource::Write);
418 }
419
420 pub fn invalidate(&self, address: &GroupAddress) -> bool {
422 self.entries.remove(address).is_some()
423 }
424
425 pub fn invalidate_all(&self) {
427 self.entries.clear();
428 }
429
430 pub fn purge_expired(&self) -> usize {
434 let ttl = self.config.ttl();
435 if ttl.is_zero() {
436 return 0; }
438
439 let expired: Vec<GroupAddress> = self
440 .entries
441 .iter()
442 .filter(|r| r.value().is_expired(ttl))
443 .map(|r| *r.key())
444 .collect();
445
446 let count = expired.len();
447 for addr in expired {
448 self.entries.remove(&addr);
449 }
450
451 self.stats
452 .expirations
453 .fetch_add(count as u64, Ordering::Relaxed);
454 count
455 }
456
457 pub fn len(&self) -> usize {
459 self.entries.len()
460 }
461
462 pub fn is_empty(&self) -> bool {
464 self.entries.is_empty()
465 }
466
467 pub fn addresses(&self) -> Vec<GroupAddress> {
469 self.entries.iter().map(|r| *r.key()).collect()
470 }
471
472 pub fn stats_snapshot(&self) -> CacheStatsSnapshot {
474 self.stats.snapshot()
475 }
476
477 fn evict_if_needed(&self) {
479 if self.entries.len() < self.config.max_entries {
480 return;
481 }
482
483 let lru_addr = self
485 .entries
486 .iter()
487 .min_by_key(|r| r.value().accessed_at)
488 .map(|r| *r.key());
489
490 if let Some(addr) = lru_addr {
491 self.entries.remove(&addr);
492 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
493 }
494 }
495
496 pub fn entry_info(&self, address: &GroupAddress) -> Option<CacheEntryInfo> {
498 self.entries.get(address).map(|entry| {
499 let ttl = self.config.ttl();
500 CacheEntryInfo {
501 address: entry.address,
502 value_len: entry.value.len(),
503 source: entry.source.clone(),
504 age: entry.age(),
505 idle_time: entry.idle_time(),
506 update_count: entry.update_count,
507 access_count: entry.access_count,
508 is_expired: entry.is_expired(ttl),
509 }
510 })
511 }
512
513 pub fn all_entry_info(&self) -> Vec<CacheEntryInfo> {
515 let ttl = self.config.ttl();
516 self.entries
517 .iter()
518 .map(|r| {
519 let entry = r.value();
520 CacheEntryInfo {
521 address: entry.address,
522 value_len: entry.value.len(),
523 source: entry.source.clone(),
524 age: entry.age(),
525 idle_time: entry.idle_time(),
526 update_count: entry.update_count,
527 access_count: entry.access_count,
528 is_expired: entry.is_expired(ttl),
529 }
530 })
531 .collect()
532 }
533}
534
535#[derive(Debug, Clone)]
537pub struct CacheEntryInfo {
538 pub address: GroupAddress,
539 pub value_len: usize,
540 pub source: Option<String>,
541 pub age: Duration,
542 pub idle_time: Duration,
543 pub update_count: u64,
544 pub access_count: u64,
545 pub is_expired: bool,
546}
547
548impl fmt::Debug for GroupValueCache {
549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550 f.debug_struct("GroupValueCache")
551 .field("enabled", &self.config.enabled)
552 .field("entries", &self.entries.len())
553 .field("max_entries", &self.config.max_entries)
554 .field("ttl_ms", &self.config.ttl_ms)
555 .finish()
556 }
557}
558
559impl Default for GroupValueCache {
560 fn default() -> Self {
561 Self::with_defaults()
562 }
563}
564
565#[cfg(test)]
570mod tests {
571 use super::*;
572
573 fn make_addr(main: u8, middle: u8, sub: u8) -> GroupAddress {
574 GroupAddress::three_level(main, middle, sub)
575 }
576
577 #[test]
578 fn test_cache_basic_get_set() {
579 let cache = GroupValueCache::with_defaults();
580
581 let addr = make_addr(1, 0, 1);
582 cache.on_write(addr, vec![0x01], Some("1.1.1".into()));
583
584 let value = cache.get(&addr);
585 assert_eq!(value, Some(vec![0x01]));
586 }
587
588 #[test]
589 fn test_cache_miss() {
590 let cache = GroupValueCache::with_defaults();
591 let addr = make_addr(1, 0, 1);
592 assert_eq!(cache.get(&addr), None);
593 }
594
595 #[test]
596 fn test_cache_update_overwrites() {
597 let cache = GroupValueCache::with_defaults();
598 let addr = make_addr(1, 0, 1);
599
600 cache.on_write(addr, vec![0x01], None);
601 assert_eq!(cache.get(&addr), Some(vec![0x01]));
602
603 cache.on_write(addr, vec![0x02], None);
604 assert_eq!(cache.get(&addr), Some(vec![0x02]));
605 }
606
607 #[test]
608 fn test_cache_indication_update() {
609 let cache = GroupValueCache::with_defaults();
610 let addr = make_addr(1, 0, 1);
611
612 cache.on_indication(addr, vec![0x55], Some("1.2.3".into()));
613
614 let value = cache.get(&addr);
615 assert_eq!(value, Some(vec![0x55]));
616
617 let stats = cache.stats_snapshot();
618 assert_eq!(stats.indication_updates, 1);
619 }
620
621 #[test]
622 fn test_cache_disabled() {
623 let config = GroupValueCacheConfig {
624 enabled: false,
625 ..Default::default()
626 };
627 let cache = GroupValueCache::new(config);
628 let addr = make_addr(1, 0, 1);
629
630 cache.on_write(addr, vec![0x01], None);
631 assert_eq!(cache.get(&addr), None);
632 assert_eq!(cache.len(), 0);
633 }
634
635 #[test]
636 fn test_cache_ttl_expiration() {
637 let config = GroupValueCacheConfig {
638 ttl_ms: 1, ..Default::default()
640 };
641 let cache = GroupValueCache::new(config);
642 let addr = make_addr(1, 0, 1);
643
644 cache.on_write(addr, vec![0x01], None);
645
646 std::thread::sleep(Duration::from_millis(5));
648
649 assert_eq!(cache.get(&addr), None);
650
651 let stats = cache.stats_snapshot();
652 assert_eq!(stats.expirations, 1);
653 }
654
655 #[test]
656 fn test_cache_ttl_zero_no_expiration() {
657 let config = GroupValueCacheConfig {
658 ttl_ms: 0, ..Default::default()
660 };
661 let cache = GroupValueCache::new(config);
662 let addr = make_addr(1, 0, 1);
663
664 cache.on_write(addr, vec![0x01], None);
665
666 std::thread::sleep(Duration::from_millis(5));
668 assert_eq!(cache.get(&addr), Some(vec![0x01]));
669 }
670
671 #[test]
672 fn test_cache_lru_eviction() {
673 let config = GroupValueCacheConfig {
674 max_entries: 3,
675 ttl_ms: 0, ..Default::default()
677 };
678 let cache = GroupValueCache::new(config);
679
680 let addr1 = make_addr(1, 0, 1);
681 let addr2 = make_addr(1, 0, 2);
682 let addr3 = make_addr(1, 0, 3);
683 let addr4 = make_addr(1, 0, 4);
684
685 cache.on_write(addr1, vec![0x01], None);
686 std::thread::sleep(Duration::from_millis(1));
687 cache.on_write(addr2, vec![0x02], None);
688 std::thread::sleep(Duration::from_millis(1));
689 cache.on_write(addr3, vec![0x03], None);
690
691 assert_eq!(cache.len(), 3);
692
693 cache.get(&addr1);
695 std::thread::sleep(Duration::from_millis(1));
696
697 cache.on_write(addr4, vec![0x04], None);
699
700 assert_eq!(cache.len(), 3);
701 assert_eq!(cache.get(&addr1), Some(vec![0x01])); assert_eq!(cache.get(&addr2), None); assert_eq!(cache.get(&addr3), Some(vec![0x03])); assert_eq!(cache.get(&addr4), Some(vec![0x04])); let stats = cache.stats_snapshot();
707 assert_eq!(stats.evictions, 1);
708 }
709
710 #[test]
711 fn test_cache_invalidate() {
712 let cache = GroupValueCache::with_defaults();
713 let addr = make_addr(1, 0, 1);
714
715 cache.on_write(addr, vec![0x01], None);
716 assert!(cache.invalidate(&addr));
717 assert_eq!(cache.get(&addr), None);
718 assert!(!cache.invalidate(&addr)); }
720
721 #[test]
722 fn test_cache_invalidate_all() {
723 let cache = GroupValueCache::with_defaults();
724 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
725 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
726 cache.on_write(make_addr(1, 0, 3), vec![0x03], None);
727
728 assert_eq!(cache.len(), 3);
729 cache.invalidate_all();
730 assert_eq!(cache.len(), 0);
731 }
732
733 #[test]
734 fn test_cache_purge_expired() {
735 let config = GroupValueCacheConfig {
736 ttl_ms: 1,
737 ..Default::default()
738 };
739 let cache = GroupValueCache::new(config);
740
741 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
742 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
743
744 std::thread::sleep(Duration::from_millis(5));
745
746 let purged = cache.purge_expired();
747 assert_eq!(purged, 2);
748 assert!(cache.is_empty());
749 }
750
751 #[test]
752 fn test_cache_stats() {
753 let cache = GroupValueCache::with_defaults();
754 let addr = make_addr(1, 0, 1);
755
756 cache.get(&addr);
758
759 cache.on_write(addr, vec![0x01], None);
761
762 cache.get(&addr);
764
765 cache.on_indication(addr, vec![0x02], None);
767
768 let stats = cache.stats_snapshot();
769 assert_eq!(stats.hits, 1);
770 assert_eq!(stats.misses, 1);
771 assert_eq!(stats.updates, 2);
772 assert_eq!(stats.write_updates, 1);
773 assert_eq!(stats.indication_updates, 1);
774 assert_eq!(stats.total_lookups(), 2);
775 assert_eq!(stats.hit_rate(), 0.5);
776 }
777
778 #[test]
779 fn test_cache_stats_hit_rate_zero() {
780 let stats = CacheStatsSnapshot {
781 hits: 0,
782 misses: 0,
783 evictions: 0,
784 expirations: 0,
785 updates: 0,
786 indication_updates: 0,
787 write_updates: 0,
788 };
789 assert_eq!(stats.hit_rate(), 0.0);
790 }
791
792 #[test]
793 fn test_cache_get_entry() {
794 let cache = GroupValueCache::with_defaults();
795 let addr = make_addr(1, 0, 1);
796
797 cache.on_write(addr, vec![0x42], Some("1.1.1".into()));
798
799 let entry = cache.get_entry(&addr).unwrap();
800 assert_eq!(entry.address, addr);
801 assert_eq!(entry.value, vec![0x42]);
802 assert_eq!(entry.source, Some("1.1.1".to_string()));
803 assert_eq!(entry.update_count, 1);
804 assert_eq!(entry.access_count, 1); }
806
807 #[test]
808 fn test_cache_entry_info() {
809 let cache = GroupValueCache::with_defaults();
810 let addr = make_addr(1, 0, 1);
811
812 cache.on_write(addr, vec![0x42, 0x43], Some("1.1.1".into()));
813
814 let info = cache.entry_info(&addr).unwrap();
815 assert_eq!(info.address, addr);
816 assert_eq!(info.value_len, 2);
817 assert_eq!(info.source, Some("1.1.1".to_string()));
818 assert_eq!(info.update_count, 1);
819 assert!(!info.is_expired);
820 }
821
822 #[test]
823 fn test_cache_addresses() {
824 let cache = GroupValueCache::with_defaults();
825 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
826 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
827
828 let addresses = cache.addresses();
829 assert_eq!(addresses.len(), 2);
830 }
831
832 #[test]
833 fn test_cache_auto_update_disabled() {
834 let config = GroupValueCacheConfig {
835 auto_update_on_indication: false,
836 auto_update_on_write: true,
837 ..Default::default()
838 };
839 let cache = GroupValueCache::new(config);
840 let addr = make_addr(1, 0, 1);
841
842 cache.on_write(addr, vec![0x01], None);
844 assert_eq!(cache.get(&addr), Some(vec![0x01]));
845
846 cache.on_indication(addr, vec![0x02], None);
848 assert_eq!(cache.get(&addr), Some(vec![0x01])); }
850
851 #[test]
852 fn test_cache_config_validate() {
853 assert!(GroupValueCacheConfig::default().validate().is_ok());
854 assert!(GroupValueCacheConfig {
855 max_entries: 0,
856 ..Default::default()
857 }
858 .validate()
859 .is_err());
860 }
861
862 #[test]
863 fn test_cache_config_defaults() {
864 let config = GroupValueCacheConfig::default();
865 assert!(config.enabled);
866 assert_eq!(config.max_entries, 4096);
867 assert_eq!(config.ttl_ms, 60_000);
868 assert!(config.auto_update_on_indication);
869 assert!(config.auto_update_on_write);
870 }
871
872 #[test]
873 fn test_cache_debug() {
874 let cache = GroupValueCache::with_defaults();
875 let debug_str = format!("{:?}", cache);
876 assert!(debug_str.contains("GroupValueCache"));
877 assert!(debug_str.contains("enabled"));
878 }
879
880 #[test]
881 fn test_update_count_increments() {
882 let cache = GroupValueCache::with_defaults();
883 let addr = make_addr(1, 0, 1);
884
885 cache.on_write(addr, vec![0x01], None);
886 cache.on_write(addr, vec![0x02], None);
887 cache.on_indication(addr, vec![0x03], None);
888
889 let entry = cache.get_entry(&addr).unwrap();
890 assert_eq!(entry.update_count, 3);
891 assert_eq!(entry.value, vec![0x03]);
892 }
893
894 #[test]
895 fn test_cache_entry_expired() {
896 let entry = CacheEntry::new(make_addr(1, 0, 1), vec![0x01], None);
897
898 assert!(!entry.is_expired(Duration::from_secs(3600)));
900
901 assert!(!entry.is_expired(Duration::ZERO));
903 }
904
905 #[test]
906 fn test_cache_all_entry_info() {
907 let cache = GroupValueCache::with_defaults();
908 cache.on_write(make_addr(1, 0, 1), vec![0x01], None);
909 cache.on_write(make_addr(1, 0, 2), vec![0x02], None);
910
911 let info = cache.all_entry_info();
912 assert_eq!(info.len(), 2);
913 }
914}