chartml_core/resolver/cache.rs
1//! Pluggable caching for fetched provider results.
2//!
3//! `CacheBackend` is the trait host apps implement to plug in tier-1 storage
4//! (default `MemoryBackend` is in-process), tier-2 storage (`IndexedDbBackend`
5//! lands in phase 3b), or anything else they like (Redis, file-system, etc.).
6//!
7//! `CachedEntry` carries the `DataTable`, the wall-clock time it was fetched,
8//! the TTL, free-form bulk-invalidation tags, and provider metadata so
9//! cache-hits return identical metadata to the original fetch.
10
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13// `web_time` re-exports `SystemTime`/`Duration` with the same API as `std::time`
14// but works on `wasm32-unknown-unknown` (where `std::time::SystemTime::now()`
15// panics with "time not implemented on this platform"). On native it is a
16// transparent alias for `std::time`. See chartml-core's Cargo.toml for the
17// rationale.
18use std::time::Duration;
19use web_time::SystemTime;
20
21use async_trait::async_trait;
22use thiserror::Error;
23
24use crate::data::DataTable;
25
26/// One cached source entry. Cloning is cheap because `DataTable` is
27/// `Arc`-backed; metadata + tag clones are a small `HashMap`/`Vec`.
28#[derive(Debug, Clone)]
29pub struct CachedEntry {
30 pub data: DataTable,
31 pub fetched_at: SystemTime,
32 pub ttl: Duration,
33 /// Free-form tags for bulk invalidation. Typical values:
34 /// `["slug:kyomi-analytics", "namespace:workspace-foo"]`.
35 pub tags: Vec<String>,
36 /// Provider metadata preserved with the cached entry (from
37 /// `FetchResult.metadata`). Survives cache hits so callers see the same
38 /// `bytes_billed` / `rows_returned` as on the original fetch.
39 pub metadata: HashMap<String, serde_json::Value>,
40}
41
42impl CachedEntry {
43 /// `true` once the entry has aged past its TTL. Conservative — clock
44 /// regressions or `SystemTime` errors return `true` (treat as expired)
45 /// so we never serve a stale row by accident.
46 pub fn is_expired(&self) -> bool {
47 SystemTime::now()
48 .duration_since(self.fetched_at)
49 .map(|age| age > self.ttl)
50 .unwrap_or(true)
51 }
52
53 /// Wall-clock age of the entry. Returns `Duration::ZERO` if the system
54 /// clock has gone backwards since the entry was inserted.
55 pub fn age(&self) -> Duration {
56 SystemTime::now()
57 .duration_since(self.fetched_at)
58 .unwrap_or(Duration::ZERO)
59 }
60}
61
62/// Cache backend errors. Backed by `thiserror` so backends can wrap their
63/// native error type (`Mutex` poisoning, IndexedDB errors, IO errors, etc.).
64#[derive(Debug, Error, Clone)]
65pub enum CacheError {
66 /// Backend storage failed (poisoned mutex, IO error, IndexedDB transaction
67 /// failure, …). The string is the implementation-specific detail.
68 #[error("cache backend error: {0}")]
69 Backend(String),
70}
71
72/// Pluggable cache backend trait. `?Send` on WASM mirrors the
73/// `DataSourceProvider` / `TransformMiddleware` story so single-threaded
74/// browser environments don't need `Send` bounds.
75///
76/// The supertrait bound is cfg-gated:
77/// - **native** (`not(target_arch = "wasm32")`) — `Send + Sync` so the
78/// resolver can move backend handles across `tokio::spawn` task boundaries.
79/// - **WASM** — no Send/Sync requirement so backends backed by
80/// single-threaded handles (`Rc<RefCell<...>>` for `idb::Database`,
81/// `js_sys::Function` for callback bridges) implement the trait directly,
82/// without the `unsafe impl Send + Sync` workaround the previous
83/// unconditional bound forced.
84#[cfg(not(target_arch = "wasm32"))]
85#[async_trait]
86pub trait CacheBackend: Send + Sync {
87 /// Look up an entry. Backends may return `None` for "not present" OR
88 /// "present but failed to deserialize" — the resolver treats both as
89 /// cache miss and falls through to the next tier or the provider.
90 async fn get(&self, key: u64) -> Option<CachedEntry>;
91
92 /// Insert or replace an entry. Returns `Err` only on backend storage
93 /// failure (poisoned mutex, IndexedDB transaction failure, etc.) — TTL
94 /// math happens at `get` time, not `put`.
95 async fn put(&self, key: u64, entry: CachedEntry) -> Result<(), CacheError>;
96
97 /// Remove a single entry. No-op if the key is absent.
98 async fn invalidate(&self, key: u64) -> Result<(), CacheError>;
99
100 /// Remove every entry whose `tags` contain the given tag. Used by the
101 /// resolver's `invalidate_by_slug` / `invalidate_by_namespace` APIs.
102 async fn invalidate_by_tag(&self, tag: &str) -> Result<(), CacheError>;
103
104 /// Drop everything.
105 async fn clear(&self) -> Result<(), CacheError>;
106
107 /// Optional graceful shutdown (flush pending writes, close transactions).
108 /// Default no-op.
109 async fn shutdown(&self) {}
110}
111
112/// WASM variant of [`CacheBackend`] — same surface, no `Send + Sync`
113/// supertrait. See the native impl above for full docs (intentionally
114/// duplicated rather than aliased so the trait body is discoverable from
115/// either platform's rustdoc).
116#[cfg(target_arch = "wasm32")]
117#[async_trait(?Send)]
118pub trait CacheBackend {
119 /// Look up an entry. Backends may return `None` for "not present" OR
120 /// "present but failed to deserialize" — the resolver treats both as
121 /// cache miss and falls through to the next tier or the provider.
122 async fn get(&self, key: u64) -> Option<CachedEntry>;
123
124 /// Insert or replace an entry. Returns `Err` only on backend storage
125 /// failure (poisoned mutex, IndexedDB transaction failure, etc.) — TTL
126 /// math happens at `get` time, not `put`.
127 async fn put(&self, key: u64, entry: CachedEntry) -> Result<(), CacheError>;
128
129 /// Remove a single entry. No-op if the key is absent.
130 async fn invalidate(&self, key: u64) -> Result<(), CacheError>;
131
132 /// Remove every entry whose `tags` contain the given tag. Used by the
133 /// resolver's `invalidate_by_slug` / `invalidate_by_namespace` APIs.
134 async fn invalidate_by_tag(&self, tag: &str) -> Result<(), CacheError>;
135
136 /// Drop everything.
137 async fn clear(&self) -> Result<(), CacheError>;
138
139 /// Optional graceful shutdown (flush pending writes, close transactions).
140 /// Default no-op.
141 async fn shutdown(&self) {}
142}
143
144/// Default in-process tier-1 cache. `Arc<Mutex<HashMap<u64, CachedEntry>>>` —
145/// no async I/O, no external dependencies, identical behavior on native and
146/// WASM.
147#[derive(Debug, Default, Clone)]
148pub struct MemoryBackend {
149 inner: Arc<Mutex<HashMap<u64, CachedEntry>>>,
150}
151
152impl MemoryBackend {
153 pub fn new() -> Self {
154 Self {
155 inner: Arc::new(Mutex::new(HashMap::new())),
156 }
157 }
158
159 /// Snapshot the number of entries — useful for tests and metrics.
160 pub fn len(&self) -> usize {
161 self.inner
162 .lock()
163 .expect("memory cache lock poisoned")
164 .len()
165 }
166
167 pub fn is_empty(&self) -> bool {
168 self.len() == 0
169 }
170}
171
172#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
173#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
174impl CacheBackend for MemoryBackend {
175 async fn get(&self, key: u64) -> Option<CachedEntry> {
176 let guard = self.inner.lock().expect("memory cache lock poisoned");
177 guard.get(&key).cloned()
178 }
179
180 async fn put(&self, key: u64, entry: CachedEntry) -> Result<(), CacheError> {
181 let mut guard = self.inner.lock().expect("memory cache lock poisoned");
182 guard.insert(key, entry);
183 Ok(())
184 }
185
186 async fn invalidate(&self, key: u64) -> Result<(), CacheError> {
187 let mut guard = self.inner.lock().expect("memory cache lock poisoned");
188 guard.remove(&key);
189 Ok(())
190 }
191
192 async fn invalidate_by_tag(&self, tag: &str) -> Result<(), CacheError> {
193 let mut guard = self.inner.lock().expect("memory cache lock poisoned");
194 guard.retain(|_, entry| !entry.tags.iter().any(|t| t == tag));
195 Ok(())
196 }
197
198 async fn clear(&self) -> Result<(), CacheError> {
199 let mut guard = self.inner.lock().expect("memory cache lock poisoned");
200 guard.clear();
201 Ok(())
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::data::Row;
209 use serde_json::json;
210
211 fn make_entry(tags: Vec<&str>) -> CachedEntry {
212 let row: Row = [("x".to_string(), json!(1.0))].into_iter().collect();
213 CachedEntry {
214 data: DataTable::from_rows(&[row]).unwrap(),
215 fetched_at: SystemTime::now(),
216 ttl: Duration::from_secs(60),
217 tags: tags.into_iter().map(String::from).collect(),
218 metadata: HashMap::new(),
219 }
220 }
221
222 #[tokio::test]
223 async fn memory_backend_get_put_roundtrip() {
224 let backend = MemoryBackend::new();
225 backend.put(1, make_entry(vec![])).await.unwrap();
226 let got = backend.get(1).await;
227 assert!(got.is_some());
228 assert_eq!(backend.len(), 1);
229 }
230
231 #[tokio::test]
232 async fn memory_backend_invalidate_single() {
233 let backend = MemoryBackend::new();
234 backend.put(1, make_entry(vec![])).await.unwrap();
235 backend.put(2, make_entry(vec![])).await.unwrap();
236 backend.invalidate(1).await.unwrap();
237 assert!(backend.get(1).await.is_none());
238 assert!(backend.get(2).await.is_some());
239 }
240
241 #[tokio::test]
242 async fn memory_backend_invalidate_by_tag() {
243 let backend = MemoryBackend::new();
244 backend.put(1, make_entry(vec!["slug:foo"])).await.unwrap();
245 backend.put(2, make_entry(vec!["slug:foo"])).await.unwrap();
246 backend.put(3, make_entry(vec!["slug:bar"])).await.unwrap();
247 backend.invalidate_by_tag("slug:foo").await.unwrap();
248 assert!(backend.get(1).await.is_none());
249 assert!(backend.get(2).await.is_none());
250 assert!(backend.get(3).await.is_some());
251 }
252
253 #[tokio::test]
254 async fn memory_backend_clear() {
255 let backend = MemoryBackend::new();
256 backend.put(1, make_entry(vec![])).await.unwrap();
257 backend.put(2, make_entry(vec![])).await.unwrap();
258 backend.clear().await.unwrap();
259 assert_eq!(backend.len(), 0);
260 }
261
262 #[tokio::test]
263 async fn cached_entry_expiry() {
264 let mut entry = make_entry(vec![]);
265 entry.ttl = Duration::from_millis(0);
266 // Allow a tiny moment for SystemTime to advance past fetched_at.
267 std::thread::sleep(Duration::from_millis(2));
268 assert!(entry.is_expired());
269 }
270}