Skip to main content

chartml_core/resolver/
cache.rs

1//! Pluggable caching for fetched provider results.
2//!
3//! `CacheBackend` is the trait host apps implement to plug in tier-1 storage
4//! (default `MemoryBackend` is in-process), tier-2 storage (`IndexedDbBackend`
5//! lands in phase 3b), or anything else they like (Redis, file-system, etc.).
6//!
7//! `CachedEntry` carries the `DataTable`, the wall-clock time it was fetched,
8//! the TTL, free-form bulk-invalidation tags, and provider metadata so
9//! cache-hits return identical metadata to the original fetch.
10
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13// `web_time` re-exports `SystemTime`/`Duration` with the same API as `std::time`
14// but works on `wasm32-unknown-unknown` (where `std::time::SystemTime::now()`
15// panics with "time not implemented on this platform"). On native it is a
16// transparent alias for `std::time`. See chartml-core's Cargo.toml for the
17// rationale.
18use std::time::Duration;
19use web_time::SystemTime;
20
21use async_trait::async_trait;
22use thiserror::Error;
23
24use crate::data::DataTable;
25
26/// One cached source entry. Cloning is cheap because `DataTable` is
27/// `Arc`-backed; metadata + tag clones are a small `HashMap`/`Vec`.
28#[derive(Debug, Clone)]
29pub struct CachedEntry {
30    pub data: DataTable,
31    pub fetched_at: SystemTime,
32    pub ttl: Duration,
33    /// Free-form tags for bulk invalidation. Typical values:
34    /// `["slug:kyomi-analytics", "namespace:workspace-foo"]`.
35    pub tags: Vec<String>,
36    /// Provider metadata preserved with the cached entry (from
37    /// `FetchResult.metadata`). Survives cache hits so callers see the same
38    /// `bytes_billed` / `rows_returned` as on the original fetch.
39    pub metadata: HashMap<String, serde_json::Value>,
40}
41
42impl CachedEntry {
43    /// `true` once the entry has aged past its TTL. Conservative — clock
44    /// regressions or `SystemTime` errors return `true` (treat as expired)
45    /// so we never serve a stale row by accident.
46    pub fn is_expired(&self) -> bool {
47        SystemTime::now()
48            .duration_since(self.fetched_at)
49            .map(|age| age > self.ttl)
50            .unwrap_or(true)
51    }
52
53    /// Wall-clock age of the entry. Returns `Duration::ZERO` if the system
54    /// clock has gone backwards since the entry was inserted.
55    pub fn age(&self) -> Duration {
56        SystemTime::now()
57            .duration_since(self.fetched_at)
58            .unwrap_or(Duration::ZERO)
59    }
60}
61
62/// Cache backend errors. Backed by `thiserror` so backends can wrap their
63/// native error type (`Mutex` poisoning, IndexedDB errors, IO errors, etc.).
64#[derive(Debug, Error, Clone)]
65pub enum CacheError {
66    /// Backend storage failed (poisoned mutex, IO error, IndexedDB transaction
67    /// failure, …). The string is the implementation-specific detail.
68    #[error("cache backend error: {0}")]
69    Backend(String),
70}
71
72/// Pluggable cache backend trait. `?Send` on WASM mirrors the
73/// `DataSourceProvider` / `TransformMiddleware` story so single-threaded
74/// browser environments don't need `Send` bounds.
75///
76/// The supertrait bound is cfg-gated:
77/// - **native** (`not(target_arch = "wasm32")`) — `Send + Sync` so the
78///   resolver can move backend handles across `tokio::spawn` task boundaries.
79/// - **WASM** — no Send/Sync requirement so backends backed by
80///   single-threaded handles (`Rc<RefCell<...>>` for `idb::Database`,
81///   `js_sys::Function` for callback bridges) implement the trait directly,
82///   without the `unsafe impl Send + Sync` workaround the previous
83///   unconditional bound forced.
84#[cfg(not(target_arch = "wasm32"))]
85#[async_trait]
86pub trait CacheBackend: Send + Sync {
87    /// Look up an entry. Backends may return `None` for "not present" OR
88    /// "present but failed to deserialize" — the resolver treats both as
89    /// cache miss and falls through to the next tier or the provider.
90    async fn get(&self, key: u64) -> Option<CachedEntry>;
91
92    /// Insert or replace an entry. Returns `Err` only on backend storage
93    /// failure (poisoned mutex, IndexedDB transaction failure, etc.) — TTL
94    /// math happens at `get` time, not `put`.
95    async fn put(&self, key: u64, entry: CachedEntry) -> Result<(), CacheError>;
96
97    /// Remove a single entry. No-op if the key is absent.
98    async fn invalidate(&self, key: u64) -> Result<(), CacheError>;
99
100    /// Remove every entry whose `tags` contain the given tag. Used by the
101    /// resolver's `invalidate_by_slug` / `invalidate_by_namespace` APIs.
102    async fn invalidate_by_tag(&self, tag: &str) -> Result<(), CacheError>;
103
104    /// Drop everything.
105    async fn clear(&self) -> Result<(), CacheError>;
106
107    /// Optional graceful shutdown (flush pending writes, close transactions).
108    /// Default no-op.
109    async fn shutdown(&self) {}
110}
111
112/// WASM variant of [`CacheBackend`] — same surface, no `Send + Sync`
113/// supertrait. See the native impl above for full docs (intentionally
114/// duplicated rather than aliased so the trait body is discoverable from
115/// either platform's rustdoc).
116#[cfg(target_arch = "wasm32")]
117#[async_trait(?Send)]
118pub trait CacheBackend {
119    /// Look up an entry. Backends may return `None` for "not present" OR
120    /// "present but failed to deserialize" — the resolver treats both as
121    /// cache miss and falls through to the next tier or the provider.
122    async fn get(&self, key: u64) -> Option<CachedEntry>;
123
124    /// Insert or replace an entry. Returns `Err` only on backend storage
125    /// failure (poisoned mutex, IndexedDB transaction failure, etc.) — TTL
126    /// math happens at `get` time, not `put`.
127    async fn put(&self, key: u64, entry: CachedEntry) -> Result<(), CacheError>;
128
129    /// Remove a single entry. No-op if the key is absent.
130    async fn invalidate(&self, key: u64) -> Result<(), CacheError>;
131
132    /// Remove every entry whose `tags` contain the given tag. Used by the
133    /// resolver's `invalidate_by_slug` / `invalidate_by_namespace` APIs.
134    async fn invalidate_by_tag(&self, tag: &str) -> Result<(), CacheError>;
135
136    /// Drop everything.
137    async fn clear(&self) -> Result<(), CacheError>;
138
139    /// Optional graceful shutdown (flush pending writes, close transactions).
140    /// Default no-op.
141    async fn shutdown(&self) {}
142}
143
144/// Default in-process tier-1 cache. `Arc<Mutex<HashMap<u64, CachedEntry>>>` —
145/// no async I/O, no external dependencies, identical behavior on native and
146/// WASM.
147#[derive(Debug, Default, Clone)]
148pub struct MemoryBackend {
149    inner: Arc<Mutex<HashMap<u64, CachedEntry>>>,
150}
151
152impl MemoryBackend {
153    pub fn new() -> Self {
154        Self {
155            inner: Arc::new(Mutex::new(HashMap::new())),
156        }
157    }
158
159    /// Snapshot the number of entries — useful for tests and metrics.
160    pub fn len(&self) -> usize {
161        self.inner
162            .lock()
163            .expect("memory cache lock poisoned")
164            .len()
165    }
166
167    pub fn is_empty(&self) -> bool {
168        self.len() == 0
169    }
170}
171
172#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
173#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
174impl CacheBackend for MemoryBackend {
175    async fn get(&self, key: u64) -> Option<CachedEntry> {
176        let guard = self.inner.lock().expect("memory cache lock poisoned");
177        guard.get(&key).cloned()
178    }
179
180    async fn put(&self, key: u64, entry: CachedEntry) -> Result<(), CacheError> {
181        let mut guard = self.inner.lock().expect("memory cache lock poisoned");
182        guard.insert(key, entry);
183        Ok(())
184    }
185
186    async fn invalidate(&self, key: u64) -> Result<(), CacheError> {
187        let mut guard = self.inner.lock().expect("memory cache lock poisoned");
188        guard.remove(&key);
189        Ok(())
190    }
191
192    async fn invalidate_by_tag(&self, tag: &str) -> Result<(), CacheError> {
193        let mut guard = self.inner.lock().expect("memory cache lock poisoned");
194        guard.retain(|_, entry| !entry.tags.iter().any(|t| t == tag));
195        Ok(())
196    }
197
198    async fn clear(&self) -> Result<(), CacheError> {
199        let mut guard = self.inner.lock().expect("memory cache lock poisoned");
200        guard.clear();
201        Ok(())
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::data::Row;
209    use serde_json::json;
210
211    fn make_entry(tags: Vec<&str>) -> CachedEntry {
212        let row: Row = [("x".to_string(), json!(1.0))].into_iter().collect();
213        CachedEntry {
214            data: DataTable::from_rows(&[row]).unwrap(),
215            fetched_at: SystemTime::now(),
216            ttl: Duration::from_secs(60),
217            tags: tags.into_iter().map(String::from).collect(),
218            metadata: HashMap::new(),
219        }
220    }
221
222    #[tokio::test]
223    async fn memory_backend_get_put_roundtrip() {
224        let backend = MemoryBackend::new();
225        backend.put(1, make_entry(vec![])).await.unwrap();
226        let got = backend.get(1).await;
227        assert!(got.is_some());
228        assert_eq!(backend.len(), 1);
229    }
230
231    #[tokio::test]
232    async fn memory_backend_invalidate_single() {
233        let backend = MemoryBackend::new();
234        backend.put(1, make_entry(vec![])).await.unwrap();
235        backend.put(2, make_entry(vec![])).await.unwrap();
236        backend.invalidate(1).await.unwrap();
237        assert!(backend.get(1).await.is_none());
238        assert!(backend.get(2).await.is_some());
239    }
240
241    #[tokio::test]
242    async fn memory_backend_invalidate_by_tag() {
243        let backend = MemoryBackend::new();
244        backend.put(1, make_entry(vec!["slug:foo"])).await.unwrap();
245        backend.put(2, make_entry(vec!["slug:foo"])).await.unwrap();
246        backend.put(3, make_entry(vec!["slug:bar"])).await.unwrap();
247        backend.invalidate_by_tag("slug:foo").await.unwrap();
248        assert!(backend.get(1).await.is_none());
249        assert!(backend.get(2).await.is_none());
250        assert!(backend.get(3).await.is_some());
251    }
252
253    #[tokio::test]
254    async fn memory_backend_clear() {
255        let backend = MemoryBackend::new();
256        backend.put(1, make_entry(vec![])).await.unwrap();
257        backend.put(2, make_entry(vec![])).await.unwrap();
258        backend.clear().await.unwrap();
259        assert_eq!(backend.len(), 0);
260    }
261
262    #[tokio::test]
263    async fn cached_entry_expiry() {
264        let mut entry = make_entry(vec![]);
265        entry.ttl = Duration::from_millis(0);
266        // Allow a tiny moment for SystemTime to advance past fetched_at.
267        std::thread::sleep(Duration::from_millis(2));
268        assert!(entry.is_expired());
269    }
270}