Skip to main content

teaql_runtime/repository/
cache.rs

1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::time::{Duration, Instant};
4
5use teaql_core::Record;
6
7#[derive(Debug, Default)]
8pub struct InMemoryAggregationCache {
9    namespace: String,
10    entries: Mutex<HashMap<String, AggregationCacheEntry>>,
11}
12
13pub trait AggregationCacheBackend: Send + Sync {
14    fn namespace(&self) -> &str;
15    fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>>;
16    fn put(&self, key: String, rows: Vec<Record>);
17    fn invalidate_namespace(&self, namespace: &str);
18}
19
20#[derive(Debug, Clone)]
21struct AggregationCacheEntry {
22    stored_at: Instant,
23    rows: Vec<Record>,
24}
25
26impl InMemoryAggregationCache {
27    pub fn with_namespace(namespace: impl Into<String>) -> Self {
28        Self {
29            namespace: namespace.into(),
30            entries: Mutex::new(HashMap::new()),
31        }
32    }
33
34    pub fn namespace(&self) -> &str {
35        &self.namespace
36    }
37}
38
39impl AggregationCacheBackend for InMemoryAggregationCache {
40    fn namespace(&self) -> &str {
41        &self.namespace
42    }
43
44    fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
45        let entries = self.entries.lock().ok()?;
46        let entry = entries.get(key)?;
47        if max_age_millis == 0 || entry.stored_at.elapsed() <= Duration::from_millis(max_age_millis)
48        {
49            Some(entry.rows.clone())
50        } else {
51            None
52        }
53    }
54
55    fn put(&self, key: String, rows: Vec<Record>) {
56        if let Ok(mut entries) = self.entries.lock() {
57            entries.insert(
58                key,
59                AggregationCacheEntry {
60                    stored_at: Instant::now(),
61                    rows,
62                },
63            );
64        }
65    }
66
67    fn invalidate_namespace(&self, namespace: &str) {
68        if let Ok(mut entries) = self.entries.lock() {
69            let prefix = format!("{namespace}::");
70            entries.retain(|key, _| !key.starts_with(&prefix));
71        }
72    }
73}
74
75impl InMemoryAggregationCache {
76    pub fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
77        AggregationCacheBackend::get(self, key, max_age_millis)
78    }
79
80    pub fn put(&self, key: String, rows: Vec<Record>) {
81        AggregationCacheBackend::put(self, key, rows);
82    }
83
84    pub fn clear(&self) {
85        if let Ok(mut entries) = self.entries.lock() {
86            entries.clear();
87        }
88    }
89
90    pub fn invalidate_namespace(&self, namespace: &str) {
91        AggregationCacheBackend::invalidate_namespace(self, namespace);
92    }
93}