teaql_runtime/repository/
cache.rs1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::time::{Duration, Instant};
4
5use teaql_core::Record;
6
7#[derive(Debug, Default)]
8pub struct InMemoryAggregationCache {
9 namespace: String,
10 entries: Mutex<HashMap<String, AggregationCacheEntry>>,
11}
12
13pub trait AggregationCacheBackend: Send + Sync {
14 fn namespace(&self) -> &str;
15 fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>>;
16 fn put(&self, key: String, rows: Vec<Record>);
17 fn invalidate_namespace(&self, namespace: &str);
18}
19
20#[derive(Debug, Clone)]
21struct AggregationCacheEntry {
22 stored_at: Instant,
23 rows: Vec<Record>,
24}
25
26impl InMemoryAggregationCache {
27 pub fn with_namespace(namespace: impl Into<String>) -> Self {
28 Self {
29 namespace: namespace.into(),
30 entries: Mutex::new(HashMap::new()),
31 }
32 }
33
34 pub fn namespace(&self) -> &str {
35 &self.namespace
36 }
37}
38
39impl AggregationCacheBackend for InMemoryAggregationCache {
40 fn namespace(&self) -> &str {
41 &self.namespace
42 }
43
44 fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
45 let entries = self.entries.lock().ok()?;
46 let entry = entries.get(key)?;
47 if max_age_millis == 0 || entry.stored_at.elapsed() <= Duration::from_millis(max_age_millis)
48 {
49 Some(entry.rows.clone())
50 } else {
51 None
52 }
53 }
54
55 fn put(&self, key: String, rows: Vec<Record>) {
56 if let Ok(mut entries) = self.entries.lock() {
57 entries.insert(
58 key,
59 AggregationCacheEntry {
60 stored_at: Instant::now(),
61 rows,
62 },
63 );
64 }
65 }
66
67 fn invalidate_namespace(&self, namespace: &str) {
68 if let Ok(mut entries) = self.entries.lock() {
69 let prefix = format!("{namespace}::");
70 entries.retain(|key, _| !key.starts_with(&prefix));
71 }
72 }
73}
74
75impl InMemoryAggregationCache {
76 pub fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
77 AggregationCacheBackend::get(self, key, max_age_millis)
78 }
79
80 pub fn put(&self, key: String, rows: Vec<Record>) {
81 AggregationCacheBackend::put(self, key, rows);
82 }
83
84 pub fn clear(&self) {
85 if let Ok(mut entries) = self.entries.lock() {
86 entries.clear();
87 }
88 }
89
90 pub fn invalidate_namespace(&self, namespace: &str) {
91 AggregationCacheBackend::invalidate_namespace(self, namespace);
92 }
93}