1use std::collections::{HashMap, HashSet};
36use std::hash::{Hash, Hasher};
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::time::{Duration, Instant};
39
40#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct CacheKey {
47 pub query_type: String,
49 pub params: Vec<(String, String)>,
51 hash: u64,
53}
54
55impl CacheKey {
56 pub fn new(query_type: impl Into<String>) -> Self {
58 let query_type = query_type.into();
59 let mut key = Self {
60 query_type,
61 params: Vec::new(),
62 hash: 0,
63 };
64 key.rehash();
65 key
66 }
67
68 pub fn param(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
70 self.params.push((name.into(), value.into()));
71 self.params.sort_by(|a, b| a.0.cmp(&b.0));
72 self.rehash();
73 self
74 }
75
76 pub fn params(mut self, params: impl IntoIterator<Item = (String, String)>) -> Self {
78 self.params.extend(params);
79 self.params.sort_by(|a, b| a.0.cmp(&b.0));
80 self.rehash();
81 self
82 }
83
84 fn rehash(&mut self) {
85 use std::collections::hash_map::DefaultHasher;
86 let mut hasher = DefaultHasher::new();
87 self.query_type.hash(&mut hasher);
88 for (k, v) in &self.params {
89 k.hash(&mut hasher);
90 v.hash(&mut hasher);
91 }
92 self.hash = hasher.finish();
93 }
94}
95
96impl Hash for CacheKey {
97 fn hash<H: Hasher>(&self, state: &mut H) {
98 state.write_u64(self.hash);
99 }
100}
101
102#[derive(Debug, Clone)]
108pub struct CachePolicy {
109 pub ttl: Duration,
111 pub dependencies: HashSet<String>,
113 pub priority: u8,
115 pub sliding: bool,
117}
118
119impl Default for CachePolicy {
120 fn default() -> Self {
121 Self {
122 ttl: Duration::from_secs(300), dependencies: HashSet::new(),
124 priority: 50,
125 sliding: false,
126 }
127 }
128}
129
130impl CachePolicy {
131 pub fn ttl(mut self, ttl: Duration) -> Self {
133 self.ttl = ttl;
134 self
135 }
136
137 pub fn depends_on(mut self, deps: &[&str]) -> Self {
139 for dep in deps {
140 self.dependencies.insert(dep.to_string());
141 }
142 self
143 }
144
145 pub fn priority(mut self, priority: u8) -> Self {
147 self.priority = priority;
148 self
149 }
150
151 pub fn sliding(mut self) -> Self {
153 self.sliding = true;
154 self
155 }
156}
157
158struct CacheEntry {
164 data: Vec<u8>,
166 size: usize,
168 created_at: Instant,
170 last_accessed: Instant,
172 access_count: AtomicU64,
174 policy: CachePolicy,
176}
177
178impl CacheEntry {
179 fn new(data: Vec<u8>, policy: CachePolicy) -> Self {
180 let size = data.len();
181 let now = Instant::now();
182 Self {
183 data,
184 size,
185 created_at: now,
186 last_accessed: now,
187 access_count: AtomicU64::new(0),
188 policy,
189 }
190 }
191
192 fn is_expired(&self) -> bool {
193 let elapsed = if self.policy.sliding {
194 self.last_accessed.elapsed()
195 } else {
196 self.created_at.elapsed()
197 };
198 elapsed > self.policy.ttl
199 }
200
201 fn touch(&mut self) {
202 self.access_count.fetch_add(1, Ordering::Relaxed);
203 self.last_accessed = Instant::now();
204 }
205
206 fn eviction_score(&self) -> u64 {
208 let frequency = self.access_count.load(Ordering::Relaxed);
209 let recency = self.last_accessed.elapsed().as_secs();
210 let priority = self.policy.priority as u64;
211
212 frequency
215 .saturating_mul(priority)
216 .checked_div(recency)
217 .unwrap_or_else(|| frequency.saturating_mul(priority).saturating_mul(1000))
218 }
219}
220
221#[derive(Debug, Clone, Default)]
227pub struct ResultCacheStats {
228 pub hits: u64,
230 pub misses: u64,
232 pub evictions: u64,
234 pub entry_count: usize,
236 pub memory_bytes: usize,
238 pub max_memory_bytes: usize,
240 pub expirations: u64,
242 pub invalidations: u64,
244}
245
246impl ResultCacheStats {
247 pub fn hit_rate(&self) -> f64 {
249 let total = self.hits + self.misses;
250 if total == 0 {
251 0.0
252 } else {
253 self.hits as f64 / total as f64
254 }
255 }
256
257 pub fn memory_utilization(&self) -> f64 {
259 if self.max_memory_bytes == 0 {
260 0.0
261 } else {
262 self.memory_bytes as f64 / self.max_memory_bytes as f64
263 }
264 }
265}
266
267pub struct ResultCache {
273 entries: HashMap<CacheKey, CacheEntry>,
275 dependency_index: HashMap<String, HashSet<CacheKey>>,
277 max_memory: usize,
279 current_memory: usize,
281 stats: ResultCacheStats,
283}
284
285impl ResultCache {
286 pub fn new(max_memory_bytes: usize) -> Self {
288 Self {
289 entries: HashMap::new(),
290 dependency_index: HashMap::new(),
291 max_memory: max_memory_bytes,
292 current_memory: 0,
293 stats: ResultCacheStats {
294 max_memory_bytes,
295 ..Default::default()
296 },
297 }
298 }
299
300 pub fn get(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
302 if let Some(entry) = self.entries.get(key) {
304 if entry.is_expired() {
305 self.remove(key);
306 self.stats.expirations += 1;
307 self.stats.misses += 1;
308 return None;
309 }
310 }
311
312 if let Some(entry) = self.entries.get_mut(key) {
313 entry.touch();
314 self.stats.hits += 1;
315 Some(entry.data.clone())
316 } else {
317 self.stats.misses += 1;
318 None
319 }
320 }
321
322 pub fn contains(&self, key: &CacheKey) -> bool {
324 if let Some(entry) = self.entries.get(key) {
325 !entry.is_expired()
326 } else {
327 false
328 }
329 }
330
331 pub fn insert(&mut self, key: CacheKey, data: Vec<u8>, policy: CachePolicy) {
333 let entry_size = data.len() + std::mem::size_of::<CacheEntry>();
334
335 if self.entries.contains_key(&key) {
337 self.remove(&key);
338 }
339
340 while self.current_memory + entry_size > self.max_memory && !self.entries.is_empty() {
342 self.evict_one();
343 }
344
345 for dep in &policy.dependencies {
347 self.dependency_index
348 .entry(dep.clone())
349 .or_default()
350 .insert(key.clone());
351 }
352
353 let entry = CacheEntry::new(data, policy);
354 self.current_memory += entry.size;
355 self.entries.insert(key, entry);
356 self.stats.entry_count = self.entries.len();
357 self.stats.memory_bytes = self.current_memory;
358 }
359
360 pub fn remove(&mut self, key: &CacheKey) -> Option<Vec<u8>> {
362 if let Some(entry) = self.entries.remove(key) {
363 self.current_memory = self.current_memory.saturating_sub(entry.size);
364
365 for dep in &entry.policy.dependencies {
367 if let Some(keys) = self.dependency_index.get_mut(dep) {
368 keys.remove(key);
369 }
370 }
371
372 self.stats.entry_count = self.entries.len();
373 self.stats.memory_bytes = self.current_memory;
374 Some(entry.data)
375 } else {
376 None
377 }
378 }
379
380 pub fn invalidate_by_dependency(&mut self, dependency: &str) {
382 if let Some(keys) = self.dependency_index.remove(dependency) {
383 for key in keys {
384 if self.entries.remove(&key).is_some() {
385 self.stats.invalidations += 1;
386 }
387 }
388 self.stats.entry_count = self.entries.len();
389 self.current_memory = self.entries.values().map(|e| e.size).sum();
391 self.stats.memory_bytes = self.current_memory;
392 }
393 }
394
395 pub fn invalidate_where<F>(&mut self, predicate: F)
397 where
398 F: Fn(&CacheKey) -> bool,
399 {
400 let keys_to_remove: Vec<CacheKey> = self
401 .entries
402 .keys()
403 .filter(|k| predicate(k))
404 .cloned()
405 .collect();
406
407 for key in keys_to_remove {
408 self.remove(&key);
409 self.stats.invalidations += 1;
410 }
411 }
412
413 pub fn prune_expired(&mut self) {
415 let expired: Vec<CacheKey> = self
416 .entries
417 .iter()
418 .filter(|(_, v)| v.is_expired())
419 .map(|(k, _)| k.clone())
420 .collect();
421
422 for key in expired {
423 self.remove(&key);
424 self.stats.expirations += 1;
425 }
426 }
427
428 pub fn clear(&mut self) {
430 self.entries.clear();
431 self.dependency_index.clear();
432 self.current_memory = 0;
433 self.stats.entry_count = 0;
434 self.stats.memory_bytes = 0;
435 }
436
437 pub fn stats(&self) -> &ResultCacheStats {
439 &self.stats
440 }
441
442 fn evict_one(&mut self) {
444 let victim = self
445 .entries
446 .iter()
447 .min_by_key(|(_, v)| v.eviction_score())
448 .map(|(k, _)| k.clone());
449
450 if let Some(key) = victim {
451 self.remove(&key);
452 self.stats.evictions += 1;
453 }
454 }
455}
456
457#[derive(Debug, Clone)]
463pub struct MaterializedViewDef {
464 pub name: String,
466 pub query: String,
468 pub dependencies: Vec<String>,
470 pub refresh: RefreshPolicy,
472 pub retention_duration_ms: Option<u64>,
477}
478
479#[derive(Debug, Clone)]
481pub enum RefreshPolicy {
482 Manual,
484 OnChange,
486 Periodic(Duration),
488 AfterWrites(usize),
490}
491
492struct MaterializedView {
494 data: Vec<u8>,
496 def: MaterializedViewDef,
498 last_refresh: Instant,
501 last_refresh_at_ms: u64,
504 last_refresh_duration_ms: u64,
506 last_error: Option<String>,
508 current_row_count: u64,
510 refresh_every_ms: Option<u64>,
513 view_retention_ms: Option<u64>,
516 writes_since_refresh: usize,
518 stale: bool,
520}
521
522#[derive(Debug, Clone)]
526pub struct MaterializedViewMetadata {
527 pub name: String,
528 pub query_text: String,
529 pub refresh_every_ms: Option<u64>,
530 pub last_refresh_at_ms: u64,
531 pub last_refresh_duration_ms: u64,
532 pub last_error: Option<String>,
533 pub current_row_count: u64,
534 pub retention_duration_ms: Option<u64>,
537}
538
539pub struct MaterializedViewCache {
541 views: HashMap<String, MaterializedView>,
543 dependency_index: HashMap<String, HashSet<String>>,
545}
546
547impl MaterializedViewCache {
548 pub fn new() -> Self {
550 Self {
551 views: HashMap::new(),
552 dependency_index: HashMap::new(),
553 }
554 }
555
556 pub fn register(&mut self, def: MaterializedViewDef) {
558 for dep in &def.dependencies {
560 self.dependency_index
561 .entry(dep.clone())
562 .or_default()
563 .insert(def.name.clone());
564 }
565
566 let refresh_every_ms = match &def.refresh {
567 RefreshPolicy::Periodic(d) => Some(d.as_millis() as u64),
568 _ => None,
569 };
570 let view_retention_ms = def.retention_duration_ms;
571
572 let view = MaterializedView {
573 data: Vec::new(),
574 def,
575 last_refresh: Instant::now(),
576 last_refresh_at_ms: 0,
577 last_refresh_duration_ms: 0,
578 last_error: None,
579 current_row_count: 0,
580 refresh_every_ms,
581 view_retention_ms,
582 writes_since_refresh: 0,
583 stale: true,
584 };
585
586 self.views.insert(view.def.name.clone(), view);
587 }
588
589 pub fn claim_due_at(&mut self, now: Instant) -> Vec<String> {
596 let mut due = Vec::new();
597 for view in self.views.values_mut() {
598 if let RefreshPolicy::Periodic(interval) = &view.def.refresh {
599 let elapsed = now.saturating_duration_since(view.last_refresh);
600 if elapsed >= *interval {
601 due.push(view.def.name.clone());
602 view.last_refresh = now;
603 }
604 }
605 }
606 due
607 }
608
609 pub fn record_refresh_success(
612 &mut self,
613 name: &str,
614 data: Vec<u8>,
615 row_count: u64,
616 duration_ms: u64,
617 at_unix_ms: u64,
618 ) {
619 if let Some(view) = self.views.get_mut(name) {
620 view.data = data;
621 view.last_refresh = Instant::now();
622 view.last_refresh_at_ms = at_unix_ms;
623 view.last_refresh_duration_ms = duration_ms;
624 view.last_error = None;
625 view.current_row_count = row_count;
626 view.writes_since_refresh = 0;
627 view.stale = false;
628 }
629 }
630
631 pub fn record_refresh_failure(
635 &mut self,
636 name: &str,
637 error: String,
638 duration_ms: u64,
639 at_unix_ms: u64,
640 ) {
641 if let Some(view) = self.views.get_mut(name) {
642 view.last_refresh = Instant::now();
643 view.last_refresh_at_ms = at_unix_ms;
644 view.last_refresh_duration_ms = duration_ms;
645 view.last_error = Some(error);
646 }
647 }
648
649 pub fn metadata(&self) -> Vec<MaterializedViewMetadata> {
651 self.views
652 .values()
653 .map(|v| MaterializedViewMetadata {
654 name: v.def.name.clone(),
655 query_text: v.def.query.clone(),
656 refresh_every_ms: v.refresh_every_ms,
657 last_refresh_at_ms: v.last_refresh_at_ms,
658 last_refresh_duration_ms: v.last_refresh_duration_ms,
659 last_error: v.last_error.clone(),
660 current_row_count: v.current_row_count,
661 retention_duration_ms: v.view_retention_ms,
662 })
663 .collect()
664 }
665
666 pub fn get(&self, name: &str) -> Option<&[u8]> {
668 self.views
669 .get(name)
670 .filter(|v| !v.stale && !v.data.is_empty())
671 .map(|v| v.data.as_slice())
672 }
673
674 pub fn needs_refresh(&self, name: &str) -> bool {
676 self.views.get(name).map(|v| v.stale).unwrap_or(false)
677 }
678
679 pub fn refresh(&mut self, name: &str, data: Vec<u8>) {
681 if let Some(view) = self.views.get_mut(name) {
682 view.data = data;
683 view.last_refresh = Instant::now();
684 view.writes_since_refresh = 0;
685 view.stale = false;
686 }
687 }
688
689 pub fn mark_stale(&mut self, table: &str) {
691 if let Some(view_names) = self.dependency_index.get(table) {
692 for name in view_names.clone() {
693 if let Some(view) = self.views.get_mut(&name) {
694 view.writes_since_refresh += 1;
695
696 match &view.def.refresh {
697 RefreshPolicy::OnChange => {
698 view.stale = true;
699 }
700 RefreshPolicy::AfterWrites(threshold)
701 if view.writes_since_refresh >= *threshold =>
702 {
703 view.stale = true;
704 }
705 _ => {}
706 }
707 }
708 }
709 }
710 }
711
712 pub fn due_for_refresh(&self) -> Vec<String> {
714 self.views
715 .values()
716 .filter(|v| {
717 if let RefreshPolicy::Periodic(interval) = &v.def.refresh {
718 v.last_refresh.elapsed() >= *interval
719 } else {
720 false
721 }
722 })
723 .map(|v| v.def.name.clone())
724 .collect()
725 }
726
727 pub fn remove(&mut self, name: &str) {
729 if let Some(view) = self.views.remove(name) {
730 for dep in &view.def.dependencies {
731 if let Some(names) = self.dependency_index.get_mut(dep) {
732 names.remove(name);
733 }
734 }
735 }
736 }
737
738 pub fn list(&self) -> Vec<&str> {
740 self.views.keys().map(|s| s.as_str()).collect()
741 }
742}
743
744impl Default for MaterializedViewCache {
745 fn default() -> Self {
746 Self::new()
747 }
748}
749
750#[cfg(test)]
755mod tests {
756 use super::*;
757
758 fn mk_periodic_view(name: &str, ms: u64) -> MaterializedViewDef {
759 MaterializedViewDef {
760 name: name.into(),
761 query: "<test>".into(),
762 dependencies: vec![],
763 refresh: RefreshPolicy::Periodic(Duration::from_millis(ms)),
764 retention_duration_ms: None,
765 }
766 }
767
768 #[test]
772 fn test_materialized_view_claim_due_exactly_once_per_tick() {
773 let mut cache = MaterializedViewCache::new();
774 cache.register(mk_periodic_view("v1", 100));
775
776 let t0 = Instant::now();
777 assert!(cache.claim_due_at(t0).is_empty());
779
780 let t1 = t0 + Duration::from_millis(150);
782 let due = cache.claim_due_at(t1);
783 assert_eq!(due, vec!["v1".to_string()]);
784
785 assert!(cache.claim_due_at(t1).is_empty());
787
788 let t2 = t1 + Duration::from_millis(150);
790 assert_eq!(cache.claim_due_at(t2), vec!["v1".to_string()]);
791 }
792
793 #[test]
795 fn test_materialized_view_claim_due_skips_manual_views() {
796 let mut cache = MaterializedViewCache::new();
797 cache.register(MaterializedViewDef {
798 name: "m".into(),
799 query: "<test>".into(),
800 dependencies: vec![],
801 refresh: RefreshPolicy::Manual,
802 retention_duration_ms: None,
803 });
804 let t = Instant::now() + Duration::from_secs(60);
805 assert!(cache.claim_due_at(t).is_empty());
806 }
807
808 #[test]
811 fn test_materialized_view_failure_preserves_prior_content() {
812 let mut cache = MaterializedViewCache::new();
813 cache.register(mk_periodic_view("v", 100));
814
815 cache.record_refresh_success("v", b"first-payload".to_vec(), 42, 7, 1_000);
816 {
817 let md = cache.metadata();
818 let entry = md.iter().find(|m| m.name == "v").unwrap();
819 assert_eq!(entry.current_row_count, 42);
820 assert!(entry.last_error.is_none());
821 }
822
823 cache.record_refresh_failure("v", "boom".into(), 3, 2_000);
824 {
825 let md = cache.metadata();
826 let entry = md.iter().find(|m| m.name == "v").unwrap();
827 assert_eq!(entry.current_row_count, 42);
829 assert_eq!(entry.last_error.as_deref(), Some("boom"));
830 assert_eq!(entry.last_refresh_at_ms, 2_000);
831 }
832
833 cache.record_refresh_success("v", b"second".to_vec(), 7, 4, 3_000);
834 {
835 let md = cache.metadata();
836 let entry = md.iter().find(|m| m.name == "v").unwrap();
837 assert_eq!(entry.current_row_count, 7);
838 assert!(entry.last_error.is_none());
839 }
840 }
841
842 #[test]
846 fn test_materialized_view_drop_cleans_scheduled_work() {
847 let mut cache = MaterializedViewCache::new();
848 cache.register(mk_periodic_view("v", 50));
849 cache.remove("v");
850 let t = Instant::now() + Duration::from_secs(10);
851 assert!(cache.claim_due_at(t).is_empty());
852 assert!(cache.metadata().is_empty());
853 }
854
855 #[test]
858 fn test_materialized_view_metadata_exposes_seven_fields() {
859 let mut cache = MaterializedViewCache::new();
860 cache.register(mk_periodic_view("v", 500));
861 let md = cache.metadata();
862 assert_eq!(md.len(), 1);
863 let m = &md[0];
864 assert_eq!(m.name, "v");
865 assert_eq!(m.refresh_every_ms, Some(500));
866 assert_eq!(m.last_refresh_at_ms, 0);
867 assert_eq!(m.last_refresh_duration_ms, 0);
868 assert!(m.last_error.is_none());
869 assert_eq!(m.current_row_count, 0);
870 }
871
872 #[test]
873 fn test_cache_key_hashing() {
874 let key1 = CacheKey::new("attack_paths")
875 .param("from", "host1")
876 .param("to", "host2");
877
878 let key2 = CacheKey::new("attack_paths")
879 .param("to", "host2")
880 .param("from", "host1"); assert_eq!(key1, key2);
883 assert_eq!(key1.hash, key2.hash);
884 }
885
886 #[test]
887 fn test_result_cache_basic() {
888 let mut cache = ResultCache::new(1024 * 1024); let key = CacheKey::new("test_query").param("id", "123");
891 let data = vec![1, 2, 3, 4, 5];
892
893 cache.insert(key.clone(), data.clone(), CachePolicy::default());
894
895 let result = cache.get(&key);
896 assert_eq!(result, Some(data));
897 assert_eq!(cache.stats().hits, 1);
898 }
899
900 #[test]
901 fn test_cache_expiration() {
902 let mut cache = ResultCache::new(1024 * 1024);
903
904 let key = CacheKey::new("test");
905 let data = vec![1, 2, 3];
906
907 cache.insert(
909 key.clone(),
910 data,
911 CachePolicy::default().ttl(Duration::from_millis(1)),
912 );
913
914 std::thread::sleep(Duration::from_millis(10));
916
917 assert!(cache.get(&key).is_none());
918 assert_eq!(cache.stats().expirations, 1);
919 }
920
921 #[test]
922 fn test_dependency_invalidation() {
923 let mut cache = ResultCache::new(1024 * 1024);
924
925 let key = CacheKey::new("host_query");
926 cache.insert(
927 key.clone(),
928 vec![1, 2, 3],
929 CachePolicy::default().depends_on(&["hosts"]),
930 );
931
932 assert!(cache.contains(&key));
933
934 cache.invalidate_by_dependency("hosts");
936
937 assert!(!cache.contains(&key));
938 assert_eq!(cache.stats().invalidations, 1);
939 }
940
941 #[test]
942 fn test_memory_eviction() {
943 let mut cache = ResultCache::new(100); for i in 0..10 {
947 let key = CacheKey::new("query").param("i", i.to_string());
948 cache.insert(key, vec![0u8; 20], CachePolicy::default());
949 }
950
951 assert!(cache.stats().evictions > 0);
953 assert!(cache.stats().memory_bytes <= 100);
954 }
955
956 #[test]
957 fn test_materialized_view() {
958 let mut cache = MaterializedViewCache::new();
959
960 cache.register(MaterializedViewDef {
961 name: "active_hosts".to_string(),
962 query: "SELECT * FROM hosts WHERE status = 'active'".to_string(),
963 dependencies: vec!["hosts".to_string()],
964 refresh: RefreshPolicy::OnChange,
965 retention_duration_ms: None,
966 });
967
968 assert!(cache.needs_refresh("active_hosts"));
970
971 cache.refresh("active_hosts", vec![1, 2, 3]);
973 assert!(!cache.needs_refresh("active_hosts"));
974 assert_eq!(cache.get("active_hosts"), Some(&[1, 2, 3][..]));
975
976 cache.mark_stale("hosts");
978 assert!(cache.needs_refresh("active_hosts"));
979 }
980}