chartml_core/resolver/mod.rs
1//! Resolver: caching + dedup + dispatch layer between `ChartML::fetch` and
2//! the registered `DataSourceProvider` implementations (chartml 5.0 phase 3).
3//!
4//! The resolver owns:
5//! - **Tier-1 cache** — a `MemoryBackend` always present.
6//! - **Tier-2 cache** — optional [`CacheBackendRef`] (populated in
7//! phase 3b by `IndexedDbBackend`; phase 3 leaves it `None`).
8//! - **In-flight tracker** — a `HashMap<u64, Shared<BoxFuture<...>>>` so two
9//! concurrent fetches for the same key share one provider invocation.
10//! - **Provider registry** — `HashMap<String, Arc<dyn DataSourceProvider>>`,
11//! keyed by dispatch slug (`"inline"`, `"http"`, `"datasource"`, …).
12//!
13//! Phase 3 leaves `ResolverHooks` integration as a no-op stub; phase 3c will
14//! add hook dispatch without further changes to this file's structure.
15
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18// `web_time::SystemTime` works on `wasm32-unknown-unknown` (where the std
19// version panics). On native it's a transparent alias for `std::time`.
20use std::time::Duration;
21use web_time::SystemTime;
22
23use async_trait::async_trait;
24use futures::future::{FutureExt, Shared};
25use thiserror::Error;
26use xxhash_rust::xxh3::Xxh3;
27
28// `?Send` on WASM means the futures we shove into `Shared` cannot promise
29// `Send`. Use `LocalBoxFuture` on WASM and `BoxFuture` (which IS `Send`) on
30// native so the resolver can stay multi-threaded on tokio while compiling
31// cleanly under `wasm32-unknown-unknown`.
32#[cfg(not(target_arch = "wasm32"))]
33type ResolverFuture<T> = futures::future::BoxFuture<'static, T>;
34#[cfg(target_arch = "wasm32")]
35type ResolverFuture<T> = futures::future::LocalBoxFuture<'static, T>;
36
37// Cfg-gated shared-ownership + interior-mutability primitives. On native we
38// use `Arc<Mutex<T>>` so the resolver stays multi-threaded for tokio. On
39// WASM the resolver's inflight `Shared<LocalBoxFuture<...>>` map is
40// inherently `?Send`, so wrapping it in `Arc<Mutex<...>>` would trip
41// `clippy::arc_with_non_send_sync`. Single-threaded `Rc<RefCell<...>>` is
42// the correct primitive for the wasm32-unknown-unknown target.
43/// Cfg-gated shared-ownership pointer. `Arc<T>` on native (so handles can
44/// move across `tokio::spawn` task boundaries), `Rc<T>` on WASM (where
45/// `wasm32-unknown-unknown` is single-threaded and the resolver's
46/// internals are `?Send`, so an `Arc` would be both incorrect and
47/// rejected by `clippy::arc_with_non_send_sync`).
48///
49/// Public so consumers wiring [`CacheBackend`] / [`DataSourceProvider`]
50/// trait objects through the resolver can use the same alias as the
51/// resolver's internal storage — their wasm32-unknown-unknown builds get
52/// the right primitive without a manual `cfg_attr` dance at every site.
53#[cfg(not(target_arch = "wasm32"))]
54pub type SharedRef<T> = Arc<T>;
55#[cfg(target_arch = "wasm32")]
56pub type SharedRef<T> = std::rc::Rc<T>;
57
58#[cfg(not(target_arch = "wasm32"))]
59pub(crate) type Lock<T> = std::sync::Mutex<T>;
60#[cfg(target_arch = "wasm32")]
61pub(crate) type Lock<T> = std::cell::RefCell<T>;
62
63/// Cfg-gated extension trait so `self.field.write_lock("…").something()` and
64/// `self.field.read_lock("…").something()` work uniformly across the native
65/// `Mutex` and the WASM `RefCell`. Eliminates per-call-site `cfg` blocks
66/// without losing the panic message that distinguishes which lock failed.
67trait LockExt<T> {
68 type Read<'a>: std::ops::Deref<Target = T>
69 where
70 Self: 'a;
71 type Write<'a>: std::ops::DerefMut<Target = T>
72 where
73 Self: 'a;
74 fn read_lock(&self, label: &'static str) -> Self::Read<'_>;
75 fn write_lock(&self, label: &'static str) -> Self::Write<'_>;
76}
77
78#[cfg(not(target_arch = "wasm32"))]
79impl<T> LockExt<T> for std::sync::Mutex<T> {
80 type Read<'a>
81 = std::sync::MutexGuard<'a, T>
82 where
83 Self: 'a;
84 type Write<'a>
85 = std::sync::MutexGuard<'a, T>
86 where
87 Self: 'a;
88 fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
89 self.lock()
90 .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
91 }
92 fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
93 self.lock()
94 .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
95 }
96}
97
98#[cfg(target_arch = "wasm32")]
99impl<T> LockExt<T> for std::cell::RefCell<T> {
100 type Read<'a>
101 = std::cell::Ref<'a, T>
102 where
103 Self: 'a;
104 type Write<'a>
105 = std::cell::RefMut<'a, T>
106 where
107 Self: 'a;
108 fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
109 self.try_borrow()
110 .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed mutably"))
111 }
112 fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
113 self.try_borrow_mut()
114 .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed"))
115 }
116}
117
118use crate::data::DataTable;
119use crate::error::ChartError;
120use crate::spec::source::CacheConfig as SpecCacheConfig;
121use crate::spec::InlineData;
122
123pub mod builtin;
124pub mod cache;
125pub mod cancel;
126pub mod hooks;
127
128// Phase 3b: persistent cache backends. The container module is unconditional
129// because it hosts the pure-Rust `codec` submodule (shared blob framing) that
130// needs to compile + test on every target. Backend implementations themselves
131// (`indexeddb`, future `sqlite`/`fs`/…) carry their own `#[cfg(...)]` gates
132// inside `backends/mod.rs`, so non-browser builds and browser builds without
133// the `wasm-indexeddb` feature don't compile any of the IndexedDB-specific
134// code (which depends on the `idb` crate, brought in only by that feature).
135pub mod backends;
136
137pub use builtin::{HttpProvider, InlineProvider};
138pub use cache::{CacheBackend, CacheError, CachedEntry, MemoryBackend};
139pub use cancel::CancellationToken;
140pub use hooks::{
141 CacheHitEvent, CacheMissEvent, CacheTier, ErrorEvent, HooksRef, MissReason, NullHooks, Phase,
142 ProgressEvent, ResolverHooks,
143};
144
145/// Default TTL applied when a spec doesn't declare one. Five minutes matches
146/// the JS middleware's `CACHE_TTL_DEFAULT_MS`.
147pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
148
149/// Sentinel byte mixed into the hash for `None` fields. Distinct from any
150/// UTF-8 byte (0xFE is invalid as a UTF-8 start byte) so `None` can never
151/// collide with a real string value (e.g., a literal `"None"` datasource).
152const HASH_NONE_SENTINEL: u8 = 0xFE;
153const HASH_FIELD_SEP: u8 = 0xFF;
154
155/// Provider dispatch + fetch trait. Host apps implement this for their
156/// custom data sources (BigQuery, Snowflake, internal REST APIs, …) and
157/// register them via `ChartML::register_provider(kind, provider)`.
158///
159/// `?Send` on WASM matches `TransformMiddleware` so single-threaded
160/// browser environments can hold non-`Send` state inside provider impls.
161#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
162#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
163pub trait DataSourceProvider: Send + Sync {
164 /// Fetch one source. The resolver handles caching + dedup + parallelism;
165 /// providers only see this single call per actual upstream invocation.
166 async fn fetch(&self, request: FetchRequest) -> Result<FetchResult, FetchError>;
167
168 /// Optional graceful shutdown hook. Called by `ChartML::shutdown()` on
169 /// SSR request end / tab close. Default no-op so providers that hold no
170 /// pooled resources don't have to implement it.
171 async fn shutdown(&self) {}
172}
173
174/// Per-source request context. Carries the resolved spec (params already
175/// substituted) plus cache + header + namespace + cancellation hints.
176#[derive(Debug, Clone)]
177pub struct FetchRequest {
178 /// User-chosen name for this source within the chart spec. `None` for
179 /// unnamed (flat) `data:` forms.
180 pub source_name: Option<String>,
181 /// Fully resolved flat-form source spec — `datasource`, `query`, `url`,
182 /// `rows`, etc. with `$param.name` references already substituted.
183 pub spec: InlineData,
184 /// Parsed cache config from `spec.cache.ttl` (and `auto_refresh`).
185 pub cache: Option<CacheConfig>,
186 /// Request-level HTTP headers (merged with `HttpProvider` defaults).
187 /// Ignored by non-HTTP providers unless they explicitly read this field.
188 pub headers: HashMap<String, String>,
189 /// Tenant / workspace namespace folded into the cache key. `None` for
190 /// single-tenant deployments.
191 pub namespace: Option<String>,
192 /// Optional cancellation token. Phase 3 always passes `None` from the
193 /// resolver; providers that opt into honoring it stay forward-compatible
194 /// with future tab-close / cancel-on-route-change wiring.
195 pub cancel_token: Option<CancellationToken>,
196}
197
198/// Provider response. `Clone` is required because `futures::future::Shared`
199/// (used for in-flight dedup) takes a `Future<Output: Clone>`. `DataTable`
200/// is `Arc`-backed so cloning is cheap.
201#[derive(Debug, Clone)]
202pub struct FetchResult {
203 pub data: DataTable,
204 /// Free-form per-provider metadata — `bytes_billed`, `rows_returned`,
205 /// `server_refreshed_at`, `upstream_cache_hit`, `warnings`, etc.
206 pub metadata: HashMap<String, serde_json::Value>,
207}
208
209/// Provider failures. `Clone` matches `FetchResult` so `Shared` can clone
210/// the error when multiple in-flight callers receive the same outcome.
211#[derive(Debug, Error, Clone)]
212pub enum FetchError {
213 #[error("datasource '{slug}' not found")]
214 SlugNotFound { slug: String },
215
216 #[error("query failed: {0}")]
217 QueryFailed(String),
218
219 #[error("decode failed: {0}")]
220 DecodeFailed(String),
221
222 #[error("cancelled")]
223 Cancelled,
224
225 #[error("no provider registered for kind '{kind}'")]
226 ProviderNotFound { kind: String },
227
228 #[error("cache backend error: {0}")]
229 Cache(String),
230
231 #[error("{0}")]
232 Other(String),
233}
234
235impl From<FetchError> for ChartError {
236 fn from(err: FetchError) -> Self {
237 match err {
238 FetchError::SlugNotFound { slug } => {
239 ChartError::DataError(format!("datasource '{slug}' not found"))
240 }
241 FetchError::QueryFailed(msg) => ChartError::DataError(format!("query failed: {msg}")),
242 FetchError::DecodeFailed(msg) => ChartError::DataError(format!("decode failed: {msg}")),
243 FetchError::Cancelled => ChartError::DataError("fetch cancelled".to_string()),
244 FetchError::ProviderNotFound { kind } => ChartError::PluginError(format!(
245 "no provider registered for kind '{kind}'"
246 )),
247 FetchError::Cache(msg) => ChartError::DataError(format!("cache error: {msg}")),
248 FetchError::Other(msg) => ChartError::DataError(msg),
249 }
250 }
251}
252
253impl From<CacheError> for FetchError {
254 fn from(err: CacheError) -> Self {
255 FetchError::Cache(err.to_string())
256 }
257}
258
259/// Parsed cache config. Built from `spec::source::CacheConfig` (which carries
260/// the raw YAML strings) so the resolver doesn't have to re-parse `humantime`
261/// formats on every fetch.
262#[derive(Debug, Clone, Default)]
263pub struct CacheConfig {
264 /// TTL parsed via `humantime` (`"30s"`, `"5m"`, `"6h"`, `"1d"`, `"7d"`).
265 /// `None` → default TTL applies.
266 pub ttl: Option<Duration>,
267 /// Component-layer hint — the resolver doesn't auto-refresh, but the
268 /// flag is preserved end-to-end so consumers (Leptos, React) can read it.
269 pub auto_refresh: bool,
270}
271
272impl CacheConfig {
273 /// Parse a `spec::source::CacheConfig` (the raw YAML form) into the
274 /// resolver-friendly shape. Returns `Ok(None)` when the input is `None`
275 /// so the upstream `Option` plumbing stays clean. Returns `Err` when the
276 /// declared TTL string is present but unparseable — silent fallback to
277 /// `DEFAULT_TTL` would let an operator's typo (`"5 minutes"` instead of
278 /// `"5m"`) ship to production unnoticed, so the parser's complaint is
279 /// surfaced to the caller verbatim.
280 pub fn from_spec(spec: Option<&SpecCacheConfig>) -> Result<Option<Self>, ChartError> {
281 let Some(spec) = spec else {
282 return Ok(None);
283 };
284 let ttl = match spec.ttl.as_deref() {
285 Some(s) => Some(humantime::parse_duration(s).map_err(|e| {
286 ChartError::InvalidSpec(format!(
287 "invalid cache.ttl value {s:?}: {e} (expected humantime format like \"30s\", \"5m\", \"6h\", \"1d\")"
288 ))
289 })?),
290 None => None,
291 };
292 Ok(Some(Self {
293 ttl,
294 auto_refresh: spec.auto_refresh.unwrap_or(false),
295 }))
296 }
297
298 /// Convenience accessor matching the design doc's `ttl_duration()` name.
299 pub fn ttl_duration(&self) -> Option<Duration> {
300 self.ttl
301 }
302}
303
304/// Outcome of a `Resolver::fetch` call: the provider's result PLUS whether
305/// the value came from a cache tier or a fresh provider invocation.
306///
307/// `ChartML::fetch` uses this to populate `FetchMetadata.cache_hits` /
308/// `cache_misses` per source name without needing the resolver to know about
309/// source names itself (which it doesn't — it works in keys, not names).
310#[derive(Debug, Clone)]
311pub struct ResolveOutcome {
312 pub result: FetchResult,
313 pub cache_hit: bool,
314}
315
316/// Tag prefix for source-slug-based bulk invalidation. Public so consumers
317/// computing tags out-of-band (e.g., a custom CacheBackend that wants to
318/// pre-populate entries) match the resolver's wire format exactly.
319pub const TAG_SLUG_PREFIX: &str = "slug:";
320/// Tag prefix for namespace-based bulk invalidation.
321pub const TAG_NAMESPACE_PREFIX: &str = "namespace:";
322
323/// Public alias for the shared-ownership wrapper around `Resolver`. `Arc` on
324/// native (so consumers using `tokio::spawn` can move resolver handles across
325/// task boundaries) and `Rc` on WASM (where the resolver's inflight map is
326/// inherently `?Send`, so an `Arc` would be both incorrect and rejected by
327/// `clippy::arc_with_non_send_sync`). Returned by `ChartML::resolver()` and
328/// accepted by every host that wants a long-lived handle for the bulk
329/// `invalidate*` API.
330pub type ResolverRef = SharedRef<Resolver>;
331
332/// Public alias for the shared-ownership wrapper around a [`CacheBackend`]
333/// trait object. `Arc<dyn CacheBackend>` on native, `Rc<dyn CacheBackend>`
334/// on WASM — mirrors the [`SharedRef`] story so wasm32 consumers can hand
335/// off `!Send` backends (e.g. [`backends::indexeddb::IndexedDbBackend`])
336/// without tripping `clippy::arc_with_non_send_sync`.
337pub type CacheBackendRef = SharedRef<dyn CacheBackend>;
338
339/// Provider dispatch + cache + dedup orchestration.
340///
341/// One `Resolver` per `ChartML` instance. The resolver is held inside
342/// `ChartML` as `SharedRef<Resolver>` (`Arc` on native, `Rc` on WASM) so
343/// consumers can grab a handle for the `invalidate*` API while the chart
344/// instance keeps using it.
345pub struct Resolver {
346 /// Default in-process cache (always present, never replaced — kept for
347 /// the rare case a consumer wants to introspect the in-memory tier
348 /// directly even after swapping `primary`). Held as `SharedRef` so the
349 /// `MemoryBackend` clone the resolver hands itself the very first time
350 /// is the same single-threaded type the wasm32 backends need (clippy
351 /// rejects `Arc<MemoryBackend>` when other tier swaps land on
352 /// non-`Send` types in the same `Lock`).
353 memory: SharedRef<MemoryBackend>,
354 /// Tier-1 cache. Defaults to the always-present in-memory backend; can
355 /// be replaced via `ChartML::set_cache(...)` (e.g., a host's custom
356 /// process-wide LRU). Behind a `Lock<CacheBackendRef>` so swaps are
357 /// atomic and don't require `&mut self` (the resolver is held inside a
358 /// `SharedRef`). The `CacheBackendRef` alias is `Arc` on native and
359 /// `Rc` on WASM — wasm32 backends like `IndexedDbBackend` are `!Send`
360 /// and would trip `clippy::arc_with_non_send_sync` if forced into
361 /// `std::sync::Arc`.
362 primary: Lock<CacheBackendRef>,
363 /// Tier-2 (persistent) cache. `None` in phase 3 — phase 3b populates it
364 /// with `IndexedDbBackend` for browser consumers.
365 persistent: Lock<Option<CacheBackendRef>>,
366 inflight: SharedRef<Lock<HashMap<u64, SharedFetch>>>,
367 providers: Lock<HashMap<String, Arc<dyn DataSourceProvider>>>,
368 /// Optional hook impl. Wrapped in `Lock<Option<...>>` so `set_hooks`
369 /// works without `&mut self` (the resolver lives behind a `SharedRef`).
370 /// Snapshotted into a local clone before each instrumentation site so
371 /// the hook lock is never held across an `await`.
372 hooks: Lock<Option<HooksRef>>,
373 /// Tracker for which keys have been explicitly invalidated since their
374 /// last fetch, so the next cache-miss for that key can be reported as
375 /// [`hooks::MissReason::Invalidated`] instead of `NotFound`.
376 ///
377 /// **Per-key invalidation** (`invalidate(key)`) inserts into
378 /// [`InvalidationTracker::keys`] — the next miss for that exact key
379 /// reports `Invalidated` and removes the entry (so subsequent misses on
380 /// the same key without re-invalidating fall back to the regular
381 /// `NotFound` / `Expired` reasoning).
382 ///
383 /// **Bulk invalidation** (`invalidate_all` / `invalidate_by_slug` /
384 /// `invalidate_by_namespace`) sets [`InvalidationTracker::bulk_pending`]
385 /// to `true`. Enumerating every just-evicted key is impractical — the
386 /// `CacheBackend` trait doesn't expose iteration (and adding it would
387 /// be expensive on `IndexedDbBackend`, which would need a cursor sweep
388 /// per call) — so the resolver instead reports the *first* post-bulk
389 /// miss as `Invalidated` and clears the flag. Subsequent misses fall
390 /// back to `NotFound` / `Expired` until another invalidation happens.
391 /// This is a deliberate trade-off: documented below, mirrored in the
392 /// integration test `test_invalidate_emits_invalidated_miss_reason`.
393 recently_invalidated: SharedRef<Lock<InvalidationTracker>>,
394}
395
396/// Per-resolver tracker for invalidation events. Held inside a
397/// `Lock<...>` on the resolver so it survives across `&self` borrows
398/// (the resolver lives behind a `SharedRef` and uses interior mutability
399/// for every other piece of state too).
400#[derive(Debug, Default)]
401struct InvalidationTracker {
402 /// Specific keys invalidated via `Resolver::invalidate(key)`. Drained
403 /// on first observation by `consume_invalidation_reason`.
404 keys: HashSet<u64>,
405 /// Whether ANY bulk invalidate (`invalidate_all` /
406 /// `invalidate_by_slug` / `invalidate_by_namespace`) has fired since
407 /// the last bulk-pending consumption. Cleared the first time a miss
408 /// observes it — at most one post-bulk miss is reported as
409 /// `Invalidated`.
410 bulk_pending: bool,
411}
412
413/// Shared in-flight future type. Boxed for dyn-trait erasure; `Shared` lets
414/// multiple awaiters poll the same future without cloning the work. The
415/// inner `ResolverFuture` is `BoxFuture` (Send) on native and
416/// `LocalBoxFuture` (?Send) on WASM so we don't conflict with the
417/// `?Send` async traits the providers and cache backends use.
418type SharedFetch = Shared<ResolverFuture<Result<FetchResult, FetchError>>>;
419
420impl Default for Resolver {
421 fn default() -> Self {
422 Self::new()
423 }
424}
425
426impl std::fmt::Debug for Resolver {
427 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428 let has_persistent = self.persistent.read_lock("persistent cache").is_some();
429 let provider_keys: Vec<String> = self
430 .providers
431 .read_lock("providers")
432 .keys()
433 .cloned()
434 .collect();
435 f.debug_struct("Resolver")
436 .field("memory", &self.memory)
437 .field("has_persistent", &has_persistent)
438 .field("providers", &provider_keys)
439 .finish_non_exhaustive()
440 }
441}
442
443impl Resolver {
444 /// New resolver with the default `MemoryBackend` as tier-1, no tier-2,
445 /// and no providers registered. `ChartML::new()` registers the built-in
446 /// `inline` + `http` providers immediately after construction.
447 pub fn new() -> Self {
448 let memory = SharedRef::new(MemoryBackend::new());
449 let primary: CacheBackendRef = memory.clone();
450 Self {
451 memory,
452 primary: Lock::new(primary),
453 persistent: Lock::new(None),
454 inflight: SharedRef::new(Lock::new(HashMap::new())),
455 providers: Lock::new(HashMap::new()),
456 hooks: Lock::new(None),
457 recently_invalidated: SharedRef::new(Lock::new(InvalidationTracker::default())),
458 }
459 }
460
461 /// Replace the tier-1 cache backend. Used by `ChartML::set_cache`.
462 /// The fresh backend starts empty — entries in the old backend are not
463 /// migrated (caller's responsibility if they want to).
464 pub fn set_primary_cache(&self, backend: CacheBackendRef) {
465 let mut guard = self.primary.write_lock("primary cache");
466 *guard = backend;
467 }
468
469 /// Set the optional tier-2 (persistent) cache. Phase 3 leaves this
470 /// public so phase 3b's `IndexedDbBackend` can wire in without further
471 /// surface changes.
472 pub fn set_persistent_cache(&self, backend: CacheBackendRef) {
473 let mut guard = self.persistent.write_lock("persistent cache");
474 *guard = Some(backend);
475 }
476
477 /// Register a [`ResolverHooks`] impl. Replaces any previously registered
478 /// hooks; passes the new impl in as a `HooksRef` (`Arc` on native, `Rc`
479 /// on WASM). After this call every `Resolver::fetch` invocation emits
480 /// progress / cache / error events through the new impl.
481 pub fn set_hooks(&self, hooks: HooksRef) {
482 let mut guard = self.hooks.write_lock("hooks");
483 *guard = Some(hooks);
484 }
485
486 /// Clear any previously registered hooks. Subsequent `fetch` calls
487 /// behave as if `set_hooks` had never been called (no-op emission).
488 pub fn clear_hooks(&self) {
489 let mut guard = self.hooks.write_lock("hooks");
490 *guard = None;
491 }
492
493 /// Snapshot the current hooks `HooksRef` (or `None`) so the resolver
494 /// can release the lock before entering any cache walk or `.await`.
495 /// Must be called once at the top of `fetch`; downstream sites use the
496 /// snapshot rather than re-acquiring the lock to keep the hook lock
497 /// off the hot path. Also called by `ChartML::transform` (one crate up)
498 /// at the top of the transform stage for the same reason.
499 pub(crate) fn hooks_snapshot(&self) -> Option<HooksRef> {
500 self.hooks.read_lock("hooks").clone()
501 }
502
503 /// Snapshot the tier-1 cache handle so we can drop the lock before the
504 /// async cache call. Always returns a backend (the field starts as
505 /// `MemoryBackend` and `set_primary_cache` only replaces, never clears).
506 fn primary_snapshot(&self) -> CacheBackendRef {
507 self.primary.read_lock("primary cache").clone()
508 }
509
510 /// Snapshot the optional tier-2 cache handle (or `None`) for the same
511 /// reason `primary_snapshot` exists — release the sync lock before
512 /// the async cache call.
513 fn persistent_snapshot(&self) -> Option<CacheBackendRef> {
514 self.persistent.read_lock("persistent cache").clone()
515 }
516
517 /// Register a provider under a dispatch key (`"inline"`, `"http"`,
518 /// `"datasource"`, or any custom slug). Re-registration replaces the
519 /// previously registered provider for that key.
520 pub fn register_provider(&self, kind: &str, provider: Arc<dyn DataSourceProvider>) {
521 let mut providers = self.providers.write_lock("providers");
522 providers.insert(kind.to_string(), provider);
523 }
524
525 /// Snapshot the registered provider kinds. Useful for tests and
526 /// host-app diagnostics.
527 pub fn provider_kinds(&self) -> Vec<String> {
528 self.providers
529 .read_lock("providers")
530 .keys()
531 .cloned()
532 .collect()
533 }
534
535 /// Compute the cache key the resolver would use for a given spec.
536 ///
537 /// Public so phase 4 (Leptos refresh button) and phase 6 (Kyomi
538 /// invalidate-on-change) can compute the exact key the resolver caches
539 /// under without re-implementing the hash.
540 ///
541 /// Hashes `(namespace, datasource, query, url, provider, rows_hash)` in
542 /// that order. `None` fields contribute a sentinel byte (`0xFE`, an
543 /// invalid UTF-8 start byte) so they cannot collide with a real string
544 /// value of `"None"`. Field separator is `0xFF` (also invalid UTF-8) so
545 /// adjacent fields can't bleed into each other ("a|b" vs "ab|").
546 pub fn key_for(spec: &InlineData, namespace: Option<&str>) -> u64 {
547 let mut hasher = Xxh3::new();
548
549 let fields: [Option<&str>; 5] = [
550 namespace,
551 spec.datasource.as_deref(),
552 spec.query.as_deref(),
553 spec.url.as_deref(),
554 spec.provider.as_deref(),
555 ];
556 for field in fields {
557 match field {
558 Some(s) => hasher.update(s.as_bytes()),
559 None => hasher.update(&[HASH_NONE_SENTINEL]),
560 }
561 hasher.update(&[HASH_FIELD_SEP]);
562 }
563
564 // Inline `rows` is hashed via its canonical JSON serialization so
565 // two specs with the same row data hash identically regardless of
566 // hashmap iteration order. `None` rows contribute the sentinel.
567 match spec.rows.as_ref() {
568 Some(rows) => match serde_json::to_vec(rows) {
569 Ok(bytes) => hasher.update(&bytes),
570 Err(_) => hasher.update(&[HASH_NONE_SENTINEL]),
571 },
572 None => hasher.update(&[HASH_NONE_SENTINEL]),
573 }
574
575 hasher.digest()
576 }
577
578 /// The core orchestration: tier-1 → tier-2 → in-flight dedup → provider.
579 ///
580 /// Returns `ResolveOutcome` (result + cache-hit flag) so the caller can
581 /// classify the source under `cache_hits` vs `cache_misses` in
582 /// `FetchMetadata`. Tier-1 hits hydrate from memory only; tier-2 hits
583 /// also re-populate tier-1 so subsequent reads in the same session
584 /// short-circuit.
585 pub async fn fetch(
586 &self,
587 key: u64,
588 request: FetchRequest,
589 ) -> Result<ResolveOutcome, FetchError> {
590 let primary = self.primary_snapshot();
591 let persistent = self.persistent_snapshot();
592 // Snapshot the hooks `HooksRef` ONCE up front so the hooks lock is
593 // never held across an `.await` and downstream sites can share the
594 // same clone without re-acquiring.
595 let hooks = self.hooks_snapshot();
596 let source_name = request.source_name.clone();
597
598 // ── Tier 1: in-process cache ──
599 let mut tier1_expired = false;
600 if let Some(entry) = primary.get(key).await {
601 if !entry.is_expired() {
602 let age = entry.age();
603 emit_cache_hit(&hooks, key, &source_name, hooks::CacheTier::Memory, age);
604 return Ok(ResolveOutcome {
605 result: FetchResult {
606 data: entry.data,
607 metadata: entry.metadata,
608 },
609 cache_hit: true,
610 });
611 }
612 // Expired — let it fall through; we'll overwrite on success.
613 tier1_expired = true;
614 }
615
616 // ── Tier 2: persistent cache (phase 3b populates this) ──
617 let mut tier2_expired = false;
618 if let Some(p) = &persistent {
619 if let Some(entry) = p.get(key).await {
620 if !entry.is_expired() {
621 let age = entry.age();
622 // Hydrate tier-1 so subsequent reads stay in-process.
623 // Memory hydration is silent (no event) — only the
624 // logical tier-2 hit fires.
625 let _ = primary.put(key, entry.clone()).await;
626 emit_cache_hit(
627 &hooks,
628 key,
629 &source_name,
630 hooks::CacheTier::Persistent,
631 age,
632 );
633 return Ok(ResolveOutcome {
634 result: FetchResult {
635 data: entry.data,
636 metadata: entry.metadata,
637 },
638 cache_hit: true,
639 });
640 }
641 // Expired — evict from tier-2 too.
642 let _ = p.invalidate(key).await;
643 tier2_expired = true;
644 }
645 }
646
647 // Both tiers missed — emit one cache-miss with the most specific
648 // reason we can prove. Precedence: `Invalidated` (most specific —
649 // an operator explicitly cleared this key or fired a bulk
650 // invalidate) wins over `Expired` (TTL elapsed naturally), which
651 // wins over `NotFound` (key was never cached or was evicted by
652 // some path the resolver doesn't track).
653 let miss_reason = if self.consume_invalidation_reason(key) {
654 hooks::MissReason::Invalidated
655 } else if tier1_expired || tier2_expired {
656 hooks::MissReason::Expired
657 } else {
658 hooks::MissReason::NotFound
659 };
660 emit_cache_miss(&hooks, key, &source_name, miss_reason);
661
662 // Provider call start — fire BEFORE the dedup wait so consumers
663 // see a "fetching" event for every distinct cache miss, not just
664 // the first concurrent one.
665 emit_progress(
666 &hooks,
667 hooks::Phase::Fetch,
668 &source_name,
669 None,
670 None,
671 format!(
672 "Fetching {}",
673 source_name.as_deref().unwrap_or("source"),
674 ),
675 );
676
677 // ── In-flight dedup ──
678 // If another fetch for the same key is already in flight, await its
679 // shared future. Otherwise install a new shared future and run it.
680 let shared = self.intern_inflight(key, request, primary, persistent);
681 let result = shared.await;
682
683 // Cleanup: drop the inflight slot whether the fetch succeeded or
684 // failed so subsequent calls re-dispatch instead of replaying a
685 // stale terminal state. Errors clear too — failed fetches should
686 // retry, not stick.
687 self.inflight.write_lock("inflight").remove(&key);
688
689 match result {
690 Ok(fetch_result) => {
691 let row_count = fetch_result.data.num_rows();
692 emit_progress(
693 &hooks,
694 hooks::Phase::Fetch,
695 &source_name,
696 Some(row_count as u64),
697 None,
698 format!(
699 "Fetched {} ({} rows)",
700 source_name.as_deref().unwrap_or("source"),
701 row_count,
702 ),
703 );
704 Ok(ResolveOutcome {
705 result: fetch_result,
706 cache_hit: false,
707 })
708 }
709 Err(err) => {
710 emit_error(&hooks, hooks::Phase::Fetch, &source_name, err.to_string());
711 Err(err)
712 }
713 }
714 }
715
716 /// Get-or-insert the in-flight `Shared` future for a key. Returns a
717 /// clone of the `Shared` so the caller can `await` it; the original
718 /// stays in the map until removed by the caller.
719 fn intern_inflight(
720 &self,
721 key: u64,
722 request: FetchRequest,
723 primary: CacheBackendRef,
724 persistent: Option<CacheBackendRef>,
725 ) -> SharedFetch {
726 let mut inflight = self.inflight.write_lock("inflight");
727 if let Some(existing) = inflight.get(&key) {
728 return existing.clone();
729 }
730
731 // Build the work future: dispatch to the right provider, write to
732 // both cache tiers on success. The future is `'static` because it
733 // owns everything it touches via `Arc` clones.
734 let providers = self.snapshot_providers();
735 let cache_cfg = request.cache.clone();
736 let namespace = request.namespace.clone();
737 let slug = request.spec.datasource.clone();
738
739 let work = async move {
740 let provider = dispatch_provider(&providers, &request.spec)?;
741 let result = provider.fetch(request).await?;
742
743 // Write-through: tier-1 always; tier-2 if configured.
744 let entry = CachedEntry {
745 data: result.data.clone(),
746 fetched_at: SystemTime::now(),
747 ttl: cache_cfg
748 .as_ref()
749 .and_then(|c| c.ttl_duration())
750 .unwrap_or(DEFAULT_TTL),
751 tags: build_tags(slug.as_deref(), namespace.as_deref()),
752 metadata: result.metadata.clone(),
753 };
754 // Cache write failures must not poison the result — log via
755 // tracing in 3c, but for now just swallow (matches the design
756 // doc's `.ok()` guidance).
757 let _ = primary.put(key, entry.clone()).await;
758 if let Some(p) = &persistent {
759 let _ = p.put(key, entry).await;
760 }
761 Ok(result)
762 };
763
764 // Pick the right Box variant for the target. `boxed()` is Send-only
765 // (native), `boxed_local()` is ?Send (WASM).
766 #[cfg(not(target_arch = "wasm32"))]
767 let boxed: ResolverFuture<Result<FetchResult, FetchError>> = work.boxed();
768 #[cfg(target_arch = "wasm32")]
769 let boxed: ResolverFuture<Result<FetchResult, FetchError>> = work.boxed_local();
770
771 let future = boxed.shared();
772 inflight.insert(key, future.clone());
773 future
774 }
775
776 /// Snapshot the provider map for use inside the inflight future. We
777 /// snapshot rather than holding the lock so the future stays `'static`.
778 fn snapshot_providers(&self) -> HashMap<String, Arc<dyn DataSourceProvider>> {
779 self.providers.read_lock("providers").clone()
780 }
781
782 // ── Bulk invalidation API ──
783
784 /// Drop a single entry from every cache tier. The next miss on `key`
785 /// will be reported via [`hooks::ResolverHooks::on_cache_miss`] with
786 /// [`hooks::MissReason::Invalidated`] rather than `NotFound`.
787 pub async fn invalidate(&self, key: u64) {
788 let primary = self.primary_snapshot();
789 let persistent = self.persistent_snapshot();
790 let _ = primary.invalidate(key).await;
791 if let Some(p) = &persistent {
792 let _ = p.invalidate(key).await;
793 }
794 self.recently_invalidated
795 .write_lock("recently_invalidated")
796 .keys
797 .insert(key);
798 }
799
800 /// Drop every cached entry across all tiers. The very next miss on any
801 /// key will be reported as `Invalidated`; subsequent misses fall back
802 /// to the regular `NotFound` / `Expired` reasoning. See the field-level
803 /// docs on [`Resolver::recently_invalidated`] for why per-key tracking
804 /// isn't done here (would require a `keys()` method on every backend).
805 pub async fn invalidate_all(&self) {
806 let primary = self.primary_snapshot();
807 let persistent = self.persistent_snapshot();
808 let _ = primary.clear().await;
809 if let Some(p) = &persistent {
810 let _ = p.clear().await;
811 }
812 self.mark_bulk_invalidated();
813 }
814
815 /// Drop every entry whose source spec carried the given `datasource`
816 /// slug. Useful for "datasource X was edited; invalidate all queries
817 /// against it" workflows. Subject to the same single-shot
818 /// `Invalidated` reporting as [`Resolver::invalidate_all`].
819 pub async fn invalidate_by_slug(&self, slug: &str) {
820 let tag = format!("{TAG_SLUG_PREFIX}{slug}");
821 let primary = self.primary_snapshot();
822 let persistent = self.persistent_snapshot();
823 let _ = primary.invalidate_by_tag(&tag).await;
824 if let Some(p) = &persistent {
825 let _ = p.invalidate_by_tag(&tag).await;
826 }
827 self.mark_bulk_invalidated();
828 }
829
830 /// Drop every entry tagged with the given namespace. Used for tenant
831 /// isolation flows ("user logged out; clear their cached data").
832 /// Subject to the same single-shot `Invalidated` reporting as
833 /// [`Resolver::invalidate_all`].
834 pub async fn invalidate_by_namespace(&self, namespace: &str) {
835 let tag = format!("{TAG_NAMESPACE_PREFIX}{namespace}");
836 let primary = self.primary_snapshot();
837 let persistent = self.persistent_snapshot();
838 let _ = primary.invalidate_by_tag(&tag).await;
839 if let Some(p) = &persistent {
840 let _ = p.invalidate_by_tag(&tag).await;
841 }
842 self.mark_bulk_invalidated();
843 }
844
845 /// Set the bulk-pending flag so the next post-bulk miss surfaces as
846 /// [`hooks::MissReason::Invalidated`]. Synchronous; the lock is never
847 /// held across an `.await`.
848 fn mark_bulk_invalidated(&self) {
849 self.recently_invalidated
850 .write_lock("recently_invalidated")
851 .bulk_pending = true;
852 }
853
854 /// Check whether a miss for `key` should be reported as
855 /// [`hooks::MissReason::Invalidated`]. Drains the per-key entry on
856 /// first observation and consumes the bulk-pending flag at most once
857 /// per bulk invalidation, so this returns `true` for at most one miss
858 /// per `invalidate*` call. Called from the hot fetch path so the lock
859 /// is taken briefly and never held across an `.await`.
860 fn consume_invalidation_reason(&self, key: u64) -> bool {
861 let mut tracker = self
862 .recently_invalidated
863 .write_lock("recently_invalidated");
864 if tracker.keys.remove(&key) {
865 return true;
866 }
867 if tracker.bulk_pending {
868 tracker.bulk_pending = false;
869 return true;
870 }
871 false
872 }
873
874 /// Iterate every registered provider AND cache backend, awaiting their
875 /// `shutdown` hook in turn. Wired up by `ChartML::shutdown()`.
876 pub async fn shutdown(&self) {
877 let providers = self.snapshot_providers();
878 for (_, provider) in providers {
879 provider.shutdown().await;
880 }
881 let primary = self.primary_snapshot();
882 primary.shutdown().await;
883 if let Some(p) = self.persistent_snapshot() {
884 p.shutdown().await;
885 }
886 }
887}
888
889/// Emit a [`hooks::CacheHitEvent`] through the registered hook impl, if
890/// any. Fire-and-forget via `spawn_hook`; never blocks the resolver.
891fn emit_cache_hit(
892 hooks: &Option<HooksRef>,
893 key: u64,
894 source_name: &Option<String>,
895 tier: hooks::CacheTier,
896 age: Duration,
897) {
898 let Some(h) = hooks.as_ref() else { return };
899 let h = h.clone();
900 let event = hooks::CacheHitEvent {
901 key,
902 source_name: source_name.clone(),
903 tier,
904 age,
905 };
906 hooks::spawn_hook(async move {
907 h.on_cache_hit(event).await;
908 });
909}
910
911/// Emit a [`hooks::CacheMissEvent`] through the registered hook impl.
912fn emit_cache_miss(
913 hooks: &Option<HooksRef>,
914 key: u64,
915 source_name: &Option<String>,
916 reason: hooks::MissReason,
917) {
918 let Some(h) = hooks.as_ref() else { return };
919 let h = h.clone();
920 let event = hooks::CacheMissEvent {
921 key,
922 source_name: source_name.clone(),
923 reason,
924 };
925 hooks::spawn_hook(async move {
926 h.on_cache_miss(event).await;
927 });
928}
929
930/// Emit a [`hooks::ProgressEvent`] through the registered hook impl.
931/// `pub(crate)` so `ChartML::transform` / `render_prepared_to_svg` can
932/// emit transform/render-phase progress without re-implementing the
933/// snapshot dance.
934pub(crate) fn emit_progress(
935 hooks: &Option<HooksRef>,
936 phase: hooks::Phase,
937 source_name: &Option<String>,
938 loaded: Option<u64>,
939 total: Option<u64>,
940 message: String,
941) {
942 let Some(h) = hooks.as_ref() else { return };
943 let h = h.clone();
944 let event = hooks::ProgressEvent {
945 phase,
946 source_name: source_name.clone(),
947 loaded,
948 total,
949 message,
950 };
951 hooks::spawn_hook(async move {
952 h.on_progress(event).await;
953 });
954}
955
956/// Emit a [`hooks::ErrorEvent`] through the registered hook impl.
957/// `pub(crate)` so `ChartML::transform` can emit transform-phase errors.
958pub(crate) fn emit_error(
959 hooks: &Option<HooksRef>,
960 phase: hooks::Phase,
961 source_name: &Option<String>,
962 error: String,
963) {
964 let Some(h) = hooks.as_ref() else { return };
965 let h = h.clone();
966 let event = hooks::ErrorEvent {
967 phase,
968 source_name: source_name.clone(),
969 error,
970 };
971 hooks::spawn_hook(async move {
972 h.on_error(event).await;
973 });
974}
975
976/// Build the tag list applied to `CachedEntry` on write. `slug` and
977/// `namespace` are optional — entries without one of them simply skip the
978/// corresponding tag (no empty-string tag pollution).
979fn build_tags(slug: Option<&str>, namespace: Option<&str>) -> Vec<String> {
980 let mut tags = Vec::new();
981 if let Some(slug) = slug {
982 tags.push(format!("{TAG_SLUG_PREFIX}{slug}"));
983 }
984 if let Some(ns) = namespace {
985 tags.push(format!("{TAG_NAMESPACE_PREFIX}{ns}"));
986 }
987 tags
988}
989
990/// Apply the design-doc dispatch routing. Precedence: explicit `provider`
991/// key wins over inferred shape (rows → inline, url → http, datasource →
992/// datasource). Returns the `Arc` so the caller can `await` the trait
993/// method without holding any lock.
994fn dispatch_provider(
995 providers: &HashMap<String, Arc<dyn DataSourceProvider>>,
996 spec: &InlineData,
997) -> Result<Arc<dyn DataSourceProvider>, FetchError> {
998 let kind = if let Some(kind) = spec.provider.as_deref() {
999 kind
1000 } else if spec.rows.is_some() {
1001 "inline"
1002 } else if spec.url.is_some() {
1003 "http"
1004 } else if spec.datasource.is_some() {
1005 "datasource"
1006 } else {
1007 return Err(FetchError::Other(
1008 "no dispatch match for spec — needs one of `provider`, `rows`, `url`, or `datasource`"
1009 .to_string(),
1010 ));
1011 };
1012
1013 providers
1014 .get(kind)
1015 .cloned()
1016 .ok_or_else(|| FetchError::ProviderNotFound {
1017 kind: kind.to_string(),
1018 })
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 use super::*;
1024
1025 fn empty_inline() -> InlineData {
1026 InlineData {
1027 provider: None,
1028 rows: None,
1029 url: None,
1030 endpoint: None,
1031 cache: None,
1032 datasource: None,
1033 query: None,
1034 }
1035 }
1036
1037 #[test]
1038 fn key_for_is_deterministic() {
1039 let spec = InlineData {
1040 datasource: Some("warehouse".into()),
1041 query: Some("SELECT 1".into()),
1042 ..empty_inline()
1043 };
1044 let k1 = Resolver::key_for(&spec, Some("ns"));
1045 let k2 = Resolver::key_for(&spec, Some("ns"));
1046 assert_eq!(k1, k2);
1047 }
1048
1049 #[test]
1050 fn key_for_namespace_changes_key() {
1051 let spec = InlineData {
1052 datasource: Some("warehouse".into()),
1053 ..empty_inline()
1054 };
1055 let k1 = Resolver::key_for(&spec, Some("tenant-a"));
1056 let k2 = Resolver::key_for(&spec, Some("tenant-b"));
1057 assert_ne!(k1, k2);
1058 }
1059
1060 #[test]
1061 fn key_for_none_distinguishes_from_literal_none_string() {
1062 let spec_none = InlineData {
1063 datasource: None,
1064 url: Some("https://x".into()),
1065 ..empty_inline()
1066 };
1067 let spec_literal = InlineData {
1068 datasource: Some("None".into()),
1069 url: Some("https://x".into()),
1070 ..empty_inline()
1071 };
1072 // Sentinel byte must NOT collide with the literal "None" string.
1073 assert_ne!(
1074 Resolver::key_for(&spec_none, None),
1075 Resolver::key_for(&spec_literal, None),
1076 );
1077 }
1078
1079 #[test]
1080 fn key_for_field_separator_prevents_bleed() {
1081 // `(datasource="ab", query=None)` must hash differently from
1082 // `(datasource="a", query="b")` — without a separator they'd both
1083 // serialize to "ab".
1084 let merged = InlineData {
1085 datasource: Some("ab".into()),
1086 ..empty_inline()
1087 };
1088 let split = InlineData {
1089 datasource: Some("a".into()),
1090 query: Some("b".into()),
1091 ..empty_inline()
1092 };
1093 assert_ne!(
1094 Resolver::key_for(&merged, None),
1095 Resolver::key_for(&split, None),
1096 );
1097 }
1098
1099 #[test]
1100 fn dispatch_provider_precedence_explicit_wins() {
1101 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1102 "custom".to_string(),
1103 Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1104 )]
1105 .into_iter()
1106 .collect();
1107 let spec = InlineData {
1108 provider: Some("custom".into()),
1109 // rows would normally route to "inline", but explicit wins.
1110 rows: Some(vec![]),
1111 ..empty_inline()
1112 };
1113 // We can't compare trait objects, but `is_ok()` is sufficient: the
1114 // ONLY registered provider key is "custom", so reaching `Ok` means
1115 // dispatch picked it. Routing rows alone (no `provider`) would
1116 // search for "inline", which is unregistered → would return
1117 // `ProviderNotFound`.
1118 assert!(dispatch_provider(&providers, &spec).is_ok());
1119 }
1120
1121 #[test]
1122 fn dispatch_provider_inferred_inline() {
1123 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1124 "inline".to_string(),
1125 Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1126 )]
1127 .into_iter()
1128 .collect();
1129 let spec = InlineData {
1130 rows: Some(vec![]),
1131 ..empty_inline()
1132 };
1133 assert!(dispatch_provider(&providers, &spec).is_ok());
1134 }
1135
1136 #[test]
1137 fn dispatch_provider_missing_kind_errors() {
1138 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1139 let spec = InlineData {
1140 datasource: Some("warehouse".into()),
1141 ..empty_inline()
1142 };
1143 let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1144 assert!(matches!(err, FetchError::ProviderNotFound { ref kind } if kind == "datasource"));
1145 }
1146
1147 #[test]
1148 fn dispatch_provider_unmatched_spec_errors() {
1149 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1150 let spec = empty_inline();
1151 let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1152 assert!(matches!(err, FetchError::Other(_)));
1153 }
1154}