Skip to main content

chartml_core/resolver/
mod.rs

1//! Resolver: caching + dedup + dispatch layer between `ChartML::fetch` and
2//! the registered `DataSourceProvider` implementations (chartml 5.0 phase 3).
3//!
4//! The resolver owns:
5//! - **Tier-1 cache** — a `MemoryBackend` always present.
6//! - **Tier-2 cache** — optional [`CacheBackendRef`] (populated in
7//!   phase 3b by `IndexedDbBackend`; phase 3 leaves it `None`).
8//! - **In-flight tracker** — a `HashMap<u64, Shared<BoxFuture<...>>>` so two
9//!   concurrent fetches for the same key share one provider invocation.
10//! - **Provider registry** — `HashMap<String, Arc<dyn DataSourceProvider>>`,
11//!   keyed by dispatch slug (`"inline"`, `"http"`, `"datasource"`, …).
12//!
13//! Phase 3 leaves `ResolverHooks` integration as a no-op stub; phase 3c will
14//! add hook dispatch without further changes to this file's structure.
15
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18// `web_time::SystemTime` works on `wasm32-unknown-unknown` (where the std
19// version panics). On native it's a transparent alias for `std::time`.
20use std::time::Duration;
21use web_time::SystemTime;
22
23use async_trait::async_trait;
24use futures::future::{FutureExt, Shared};
25use thiserror::Error;
26use xxhash_rust::xxh3::Xxh3;
27
28// `?Send` on WASM means the futures we shove into `Shared` cannot promise
29// `Send`. Use `LocalBoxFuture` on WASM and `BoxFuture` (which IS `Send`) on
30// native so the resolver can stay multi-threaded on tokio while compiling
31// cleanly under `wasm32-unknown-unknown`.
32#[cfg(not(target_arch = "wasm32"))]
33type ResolverFuture<T> = futures::future::BoxFuture<'static, T>;
34#[cfg(target_arch = "wasm32")]
35type ResolverFuture<T> = futures::future::LocalBoxFuture<'static, T>;
36
37// Cfg-gated shared-ownership + interior-mutability primitives. On native we
38// use `Arc<Mutex<T>>` so the resolver stays multi-threaded for tokio. On
39// WASM the resolver's inflight `Shared<LocalBoxFuture<...>>` map is
40// inherently `?Send`, so wrapping it in `Arc<Mutex<...>>` would trip
41// `clippy::arc_with_non_send_sync`. Single-threaded `Rc<RefCell<...>>` is
42// the correct primitive for the wasm32-unknown-unknown target.
43/// Cfg-gated shared-ownership pointer. `Arc<T>` on native (so handles can
44/// move across `tokio::spawn` task boundaries), `Rc<T>` on WASM (where
45/// `wasm32-unknown-unknown` is single-threaded and the resolver's
46/// internals are `?Send`, so an `Arc` would be both incorrect and
47/// rejected by `clippy::arc_with_non_send_sync`).
48///
49/// Public so consumers wiring [`CacheBackend`] / [`DataSourceProvider`]
50/// trait objects through the resolver can use the same alias as the
51/// resolver's internal storage — their wasm32-unknown-unknown builds get
52/// the right primitive without a manual `cfg_attr` dance at every site.
53#[cfg(not(target_arch = "wasm32"))]
54pub type SharedRef<T> = Arc<T>;
55#[cfg(target_arch = "wasm32")]
56pub type SharedRef<T> = std::rc::Rc<T>;
57
58#[cfg(not(target_arch = "wasm32"))]
59pub(crate) type Lock<T> = std::sync::Mutex<T>;
60#[cfg(target_arch = "wasm32")]
61pub(crate) type Lock<T> = std::cell::RefCell<T>;
62
63/// Cfg-gated extension trait so `self.field.write_lock("…").something()` and
64/// `self.field.read_lock("…").something()` work uniformly across the native
65/// `Mutex` and the WASM `RefCell`. Eliminates per-call-site `cfg` blocks
66/// without losing the panic message that distinguishes which lock failed.
67trait LockExt<T> {
68    type Read<'a>: std::ops::Deref<Target = T>
69    where
70        Self: 'a;
71    type Write<'a>: std::ops::DerefMut<Target = T>
72    where
73        Self: 'a;
74    fn read_lock(&self, label: &'static str) -> Self::Read<'_>;
75    fn write_lock(&self, label: &'static str) -> Self::Write<'_>;
76}
77
78#[cfg(not(target_arch = "wasm32"))]
79impl<T> LockExt<T> for std::sync::Mutex<T> {
80    type Read<'a>
81        = std::sync::MutexGuard<'a, T>
82    where
83        Self: 'a;
84    type Write<'a>
85        = std::sync::MutexGuard<'a, T>
86    where
87        Self: 'a;
88    fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
89        self.lock()
90            .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
91    }
92    fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
93        self.lock()
94            .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
95    }
96}
97
98#[cfg(target_arch = "wasm32")]
99impl<T> LockExt<T> for std::cell::RefCell<T> {
100    type Read<'a>
101        = std::cell::Ref<'a, T>
102    where
103        Self: 'a;
104    type Write<'a>
105        = std::cell::RefMut<'a, T>
106    where
107        Self: 'a;
108    fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
109        self.try_borrow()
110            .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed mutably"))
111    }
112    fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
113        self.try_borrow_mut()
114            .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed"))
115    }
116}
117
118use crate::data::DataTable;
119use crate::error::ChartError;
120use crate::spec::source::CacheConfig as SpecCacheConfig;
121use crate::spec::InlineData;
122
123pub mod builtin;
124pub mod cache;
125pub mod cancel;
126pub mod hooks;
127
128// Phase 3b: persistent cache backends. The container module is unconditional
129// because it hosts the pure-Rust `codec` submodule (shared blob framing) that
130// needs to compile + test on every target. Backend implementations themselves
131// (`indexeddb`, future `sqlite`/`fs`/…) carry their own `#[cfg(...)]` gates
132// inside `backends/mod.rs`, so non-browser builds and browser builds without
133// the `wasm-indexeddb` feature don't compile any of the IndexedDB-specific
134// code (which depends on the `idb` crate, brought in only by that feature).
135pub mod backends;
136
137pub use builtin::{HttpProvider, InlineProvider};
138pub use cache::{CacheBackend, CacheError, CachedEntry, MemoryBackend};
139pub use cancel::CancellationToken;
140pub use hooks::{
141    CacheHitEvent, CacheMissEvent, CacheTier, ErrorEvent, HooksRef, MissReason, NullHooks, Phase,
142    ProgressEvent, ResolverHooks,
143};
144
145/// Default TTL applied when a spec doesn't declare one. Five minutes matches
146/// the JS middleware's `CACHE_TTL_DEFAULT_MS`.
147pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
148
149/// Sentinel byte mixed into the hash for `None` fields. Distinct from any
150/// UTF-8 byte (0xFE is invalid as a UTF-8 start byte) so `None` can never
151/// collide with a real string value (e.g., a literal `"None"` datasource).
152const HASH_NONE_SENTINEL: u8 = 0xFE;
153const HASH_FIELD_SEP: u8 = 0xFF;
154
155/// Provider dispatch + fetch trait. Host apps implement this for their
156/// custom data sources (BigQuery, Snowflake, internal REST APIs, …) and
157/// register them via `ChartML::register_provider(kind, provider)`.
158///
159/// `?Send` on WASM matches `TransformMiddleware` so single-threaded
160/// browser environments can hold non-`Send` state inside provider impls.
161#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
162#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
163pub trait DataSourceProvider: Send + Sync {
164    /// Fetch one source. The resolver handles caching + dedup + parallelism;
165    /// providers only see this single call per actual upstream invocation.
166    async fn fetch(&self, request: FetchRequest) -> Result<FetchResult, FetchError>;
167
168    /// Optional graceful shutdown hook. Called by `ChartML::shutdown()` on
169    /// SSR request end / tab close. Default no-op so providers that hold no
170    /// pooled resources don't have to implement it.
171    async fn shutdown(&self) {}
172}
173
174/// Per-source request context. Carries the resolved spec (params already
175/// substituted) plus cache + header + namespace + cancellation hints.
176#[derive(Debug, Clone)]
177pub struct FetchRequest {
178    /// User-chosen name for this source within the chart spec. `None` for
179    /// unnamed (flat) `data:` forms.
180    pub source_name: Option<String>,
181    /// Fully resolved flat-form source spec — `datasource`, `query`, `url`,
182    /// `rows`, etc. with `$param.name` references already substituted.
183    pub spec: InlineData,
184    /// Parsed cache config from `spec.cache.ttl` (and `auto_refresh`).
185    pub cache: Option<CacheConfig>,
186    /// Request-level HTTP headers (merged with `HttpProvider` defaults).
187    /// Ignored by non-HTTP providers unless they explicitly read this field.
188    pub headers: HashMap<String, String>,
189    /// Tenant / workspace namespace folded into the cache key. `None` for
190    /// single-tenant deployments.
191    pub namespace: Option<String>,
192    /// Optional cancellation token. Phase 3 always passes `None` from the
193    /// resolver; providers that opt into honoring it stay forward-compatible
194    /// with future tab-close / cancel-on-route-change wiring.
195    pub cancel_token: Option<CancellationToken>,
196}
197
198/// Provider response. `Clone` is required because `futures::future::Shared`
199/// (used for in-flight dedup) takes a `Future<Output: Clone>`. `DataTable`
200/// is `Arc`-backed so cloning is cheap.
201#[derive(Debug, Clone)]
202pub struct FetchResult {
203    pub data: DataTable,
204    /// Free-form per-provider metadata — `bytes_billed`, `rows_returned`,
205    /// `server_refreshed_at`, `upstream_cache_hit`, `warnings`, etc.
206    pub metadata: HashMap<String, serde_json::Value>,
207}
208
209/// Provider failures. `Clone` matches `FetchResult` so `Shared` can clone
210/// the error when multiple in-flight callers receive the same outcome.
211#[derive(Debug, Error, Clone)]
212pub enum FetchError {
213    #[error("datasource '{slug}' not found")]
214    SlugNotFound { slug: String },
215
216    #[error("query failed: {0}")]
217    QueryFailed(String),
218
219    #[error("decode failed: {0}")]
220    DecodeFailed(String),
221
222    #[error("cancelled")]
223    Cancelled,
224
225    #[error("no provider registered for kind '{kind}'")]
226    ProviderNotFound { kind: String },
227
228    #[error("cache backend error: {0}")]
229    Cache(String),
230
231    #[error("{0}")]
232    Other(String),
233}
234
235impl From<FetchError> for ChartError {
236    fn from(err: FetchError) -> Self {
237        match err {
238            FetchError::SlugNotFound { slug } => {
239                ChartError::DataError(format!("datasource '{slug}' not found"))
240            }
241            FetchError::QueryFailed(msg) => ChartError::DataError(format!("query failed: {msg}")),
242            FetchError::DecodeFailed(msg) => ChartError::DataError(format!("decode failed: {msg}")),
243            FetchError::Cancelled => ChartError::DataError("fetch cancelled".to_string()),
244            FetchError::ProviderNotFound { kind } => ChartError::PluginError(format!(
245                "no provider registered for kind '{kind}'"
246            )),
247            FetchError::Cache(msg) => ChartError::DataError(format!("cache error: {msg}")),
248            FetchError::Other(msg) => ChartError::DataError(msg),
249        }
250    }
251}
252
253impl From<CacheError> for FetchError {
254    fn from(err: CacheError) -> Self {
255        FetchError::Cache(err.to_string())
256    }
257}
258
259/// Parsed cache config. Built from `spec::source::CacheConfig` (which carries
260/// the raw YAML strings) so the resolver doesn't have to re-parse `humantime`
261/// formats on every fetch.
262#[derive(Debug, Clone, Default)]
263pub struct CacheConfig {
264    /// TTL parsed via `humantime` (`"30s"`, `"5m"`, `"6h"`, `"1d"`, `"7d"`).
265    /// `None` → default TTL applies.
266    pub ttl: Option<Duration>,
267    /// Component-layer hint — the resolver doesn't auto-refresh, but the
268    /// flag is preserved end-to-end so consumers (Leptos, React) can read it.
269    pub auto_refresh: bool,
270}
271
272impl CacheConfig {
273    /// Parse a `spec::source::CacheConfig` (the raw YAML form) into the
274    /// resolver-friendly shape. Returns `Ok(None)` when the input is `None`
275    /// so the upstream `Option` plumbing stays clean. Returns `Err` when the
276    /// declared TTL string is present but unparseable — silent fallback to
277    /// `DEFAULT_TTL` would let an operator's typo (`"5 minutes"` instead of
278    /// `"5m"`) ship to production unnoticed, so the parser's complaint is
279    /// surfaced to the caller verbatim.
280    pub fn from_spec(spec: Option<&SpecCacheConfig>) -> Result<Option<Self>, ChartError> {
281        let Some(spec) = spec else {
282            return Ok(None);
283        };
284        let ttl = match spec.ttl.as_deref() {
285            Some(s) => Some(humantime::parse_duration(s).map_err(|e| {
286                ChartError::InvalidSpec(format!(
287                    "invalid cache.ttl value {s:?}: {e} (expected humantime format like \"30s\", \"5m\", \"6h\", \"1d\")"
288                ))
289            })?),
290            None => None,
291        };
292        Ok(Some(Self {
293            ttl,
294            auto_refresh: spec.auto_refresh.unwrap_or(false),
295        }))
296    }
297
298    /// Convenience accessor matching the design doc's `ttl_duration()` name.
299    pub fn ttl_duration(&self) -> Option<Duration> {
300        self.ttl
301    }
302}
303
304/// Outcome of a `Resolver::fetch` call: the provider's result PLUS whether
305/// the value came from a cache tier or a fresh provider invocation.
306///
307/// `ChartML::fetch` uses this to populate `FetchMetadata.cache_hits` /
308/// `cache_misses` per source name without needing the resolver to know about
309/// source names itself (which it doesn't — it works in keys, not names).
310#[derive(Debug, Clone)]
311pub struct ResolveOutcome {
312    pub result: FetchResult,
313    pub cache_hit: bool,
314}
315
316/// Tag prefix for source-slug-based bulk invalidation. Public so consumers
317/// computing tags out-of-band (e.g., a custom CacheBackend that wants to
318/// pre-populate entries) match the resolver's wire format exactly.
319pub const TAG_SLUG_PREFIX: &str = "slug:";
320/// Tag prefix for namespace-based bulk invalidation.
321pub const TAG_NAMESPACE_PREFIX: &str = "namespace:";
322
323/// Public alias for the shared-ownership wrapper around `Resolver`. `Arc` on
324/// native (so consumers using `tokio::spawn` can move resolver handles across
325/// task boundaries) and `Rc` on WASM (where the resolver's inflight map is
326/// inherently `?Send`, so an `Arc` would be both incorrect and rejected by
327/// `clippy::arc_with_non_send_sync`). Returned by `ChartML::resolver()` and
328/// accepted by every host that wants a long-lived handle for the bulk
329/// `invalidate*` API.
330pub type ResolverRef = SharedRef<Resolver>;
331
332/// Public alias for the shared-ownership wrapper around a [`CacheBackend`]
333/// trait object. `Arc<dyn CacheBackend>` on native, `Rc<dyn CacheBackend>`
334/// on WASM — mirrors the [`SharedRef`] story so wasm32 consumers can hand
335/// off `!Send` backends (e.g. [`backends::indexeddb::IndexedDbBackend`])
336/// without tripping `clippy::arc_with_non_send_sync`.
337pub type CacheBackendRef = SharedRef<dyn CacheBackend>;
338
339/// Provider dispatch + cache + dedup orchestration.
340///
341/// One `Resolver` per `ChartML` instance. The resolver is held inside
342/// `ChartML` as `SharedRef<Resolver>` (`Arc` on native, `Rc` on WASM) so
343/// consumers can grab a handle for the `invalidate*` API while the chart
344/// instance keeps using it.
345pub struct Resolver {
346    /// Default in-process cache (always present, never replaced — kept for
347    /// the rare case a consumer wants to introspect the in-memory tier
348    /// directly even after swapping `primary`). Held as `SharedRef` so the
349    /// `MemoryBackend` clone the resolver hands itself the very first time
350    /// is the same single-threaded type the wasm32 backends need (clippy
351    /// rejects `Arc<MemoryBackend>` when other tier swaps land on
352    /// non-`Send` types in the same `Lock`).
353    memory: SharedRef<MemoryBackend>,
354    /// Tier-1 cache. Defaults to the always-present in-memory backend; can
355    /// be replaced via `ChartML::set_cache(...)` (e.g., a host's custom
356    /// process-wide LRU). Behind a `Lock<CacheBackendRef>` so swaps are
357    /// atomic and don't require `&mut self` (the resolver is held inside a
358    /// `SharedRef`). The `CacheBackendRef` alias is `Arc` on native and
359    /// `Rc` on WASM — wasm32 backends like `IndexedDbBackend` are `!Send`
360    /// and would trip `clippy::arc_with_non_send_sync` if forced into
361    /// `std::sync::Arc`.
362    primary: Lock<CacheBackendRef>,
363    /// Tier-2 (persistent) cache. `None` in phase 3 — phase 3b populates it
364    /// with `IndexedDbBackend` for browser consumers.
365    persistent: Lock<Option<CacheBackendRef>>,
366    inflight: SharedRef<Lock<HashMap<u64, SharedFetch>>>,
367    providers: Lock<HashMap<String, Arc<dyn DataSourceProvider>>>,
368    /// Optional hook impl. Wrapped in `Lock<Option<...>>` so `set_hooks`
369    /// works without `&mut self` (the resolver lives behind a `SharedRef`).
370    /// Snapshotted into a local clone before each instrumentation site so
371    /// the hook lock is never held across an `await`.
372    hooks: Lock<Option<HooksRef>>,
373    /// Tracker for which keys have been explicitly invalidated since their
374    /// last fetch, so the next cache-miss for that key can be reported as
375    /// [`hooks::MissReason::Invalidated`] instead of `NotFound`.
376    ///
377    /// **Per-key invalidation** (`invalidate(key)`) inserts into
378    /// [`InvalidationTracker::keys`] — the next miss for that exact key
379    /// reports `Invalidated` and removes the entry (so subsequent misses on
380    /// the same key without re-invalidating fall back to the regular
381    /// `NotFound` / `Expired` reasoning).
382    ///
383    /// **Bulk invalidation** (`invalidate_all` / `invalidate_by_slug` /
384    /// `invalidate_by_namespace`) sets [`InvalidationTracker::bulk_pending`]
385    /// to `true`. Enumerating every just-evicted key is impractical — the
386    /// `CacheBackend` trait doesn't expose iteration (and adding it would
387    /// be expensive on `IndexedDbBackend`, which would need a cursor sweep
388    /// per call) — so the resolver instead reports the *first* post-bulk
389    /// miss as `Invalidated` and clears the flag. Subsequent misses fall
390    /// back to `NotFound` / `Expired` until another invalidation happens.
391    /// This is a deliberate trade-off: documented below, mirrored in the
392    /// integration test `test_invalidate_emits_invalidated_miss_reason`.
393    recently_invalidated: SharedRef<Lock<InvalidationTracker>>,
394}
395
396/// Per-resolver tracker for invalidation events. Held inside a
397/// `Lock<...>` on the resolver so it survives across `&self` borrows
398/// (the resolver lives behind a `SharedRef` and uses interior mutability
399/// for every other piece of state too).
400#[derive(Debug, Default)]
401struct InvalidationTracker {
402    /// Specific keys invalidated via `Resolver::invalidate(key)`. Drained
403    /// on first observation by `consume_invalidation_reason`.
404    keys: HashSet<u64>,
405    /// Whether ANY bulk invalidate (`invalidate_all` /
406    /// `invalidate_by_slug` / `invalidate_by_namespace`) has fired since
407    /// the last bulk-pending consumption. Cleared the first time a miss
408    /// observes it — at most one post-bulk miss is reported as
409    /// `Invalidated`.
410    bulk_pending: bool,
411}
412
413/// Shared in-flight future type. Boxed for dyn-trait erasure; `Shared` lets
414/// multiple awaiters poll the same future without cloning the work. The
415/// inner `ResolverFuture` is `BoxFuture` (Send) on native and
416/// `LocalBoxFuture` (?Send) on WASM so we don't conflict with the
417/// `?Send` async traits the providers and cache backends use.
418type SharedFetch = Shared<ResolverFuture<Result<FetchResult, FetchError>>>;
419
420impl Default for Resolver {
421    fn default() -> Self {
422        Self::new()
423    }
424}
425
426impl std::fmt::Debug for Resolver {
427    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428        let has_persistent = self.persistent.read_lock("persistent cache").is_some();
429        let provider_keys: Vec<String> = self
430            .providers
431            .read_lock("providers")
432            .keys()
433            .cloned()
434            .collect();
435        f.debug_struct("Resolver")
436            .field("memory", &self.memory)
437            .field("has_persistent", &has_persistent)
438            .field("providers", &provider_keys)
439            .finish_non_exhaustive()
440    }
441}
442
443impl Resolver {
444    /// New resolver with the default `MemoryBackend` as tier-1, no tier-2,
445    /// and no providers registered. `ChartML::new()` registers the built-in
446    /// `inline` + `http` providers immediately after construction.
447    pub fn new() -> Self {
448        let memory = SharedRef::new(MemoryBackend::new());
449        let primary: CacheBackendRef = memory.clone();
450        Self {
451            memory,
452            primary: Lock::new(primary),
453            persistent: Lock::new(None),
454            inflight: SharedRef::new(Lock::new(HashMap::new())),
455            providers: Lock::new(HashMap::new()),
456            hooks: Lock::new(None),
457            recently_invalidated: SharedRef::new(Lock::new(InvalidationTracker::default())),
458        }
459    }
460
461    /// Replace the tier-1 cache backend. Used by `ChartML::set_cache`.
462    /// The fresh backend starts empty — entries in the old backend are not
463    /// migrated (caller's responsibility if they want to).
464    pub fn set_primary_cache(&self, backend: CacheBackendRef) {
465        let mut guard = self.primary.write_lock("primary cache");
466        *guard = backend;
467    }
468
469    /// Set the optional tier-2 (persistent) cache. Phase 3 leaves this
470    /// public so phase 3b's `IndexedDbBackend` can wire in without further
471    /// surface changes.
472    pub fn set_persistent_cache(&self, backend: CacheBackendRef) {
473        let mut guard = self.persistent.write_lock("persistent cache");
474        *guard = Some(backend);
475    }
476
477    /// Register a [`ResolverHooks`] impl. Replaces any previously registered
478    /// hooks; passes the new impl in as a `HooksRef` (`Arc` on native, `Rc`
479    /// on WASM). After this call every `Resolver::fetch` invocation emits
480    /// progress / cache / error events through the new impl.
481    pub fn set_hooks(&self, hooks: HooksRef) {
482        let mut guard = self.hooks.write_lock("hooks");
483        *guard = Some(hooks);
484    }
485
486    /// Clear any previously registered hooks. Subsequent `fetch` calls
487    /// behave as if `set_hooks` had never been called (no-op emission).
488    pub fn clear_hooks(&self) {
489        let mut guard = self.hooks.write_lock("hooks");
490        *guard = None;
491    }
492
493    /// Snapshot the current hooks `HooksRef` (or `None`) so the resolver
494    /// can release the lock before entering any cache walk or `.await`.
495    /// Must be called once at the top of `fetch`; downstream sites use the
496    /// snapshot rather than re-acquiring the lock to keep the hook lock
497    /// off the hot path. Also called by `ChartML::transform` (one crate up)
498    /// at the top of the transform stage for the same reason.
499    pub(crate) fn hooks_snapshot(&self) -> Option<HooksRef> {
500        self.hooks.read_lock("hooks").clone()
501    }
502
503    /// Snapshot the tier-1 cache handle so we can drop the lock before the
504    /// async cache call. Always returns a backend (the field starts as
505    /// `MemoryBackend` and `set_primary_cache` only replaces, never clears).
506    fn primary_snapshot(&self) -> CacheBackendRef {
507        self.primary.read_lock("primary cache").clone()
508    }
509
510    /// Snapshot the optional tier-2 cache handle (or `None`) for the same
511    /// reason `primary_snapshot` exists — release the sync lock before
512    /// the async cache call.
513    fn persistent_snapshot(&self) -> Option<CacheBackendRef> {
514        self.persistent.read_lock("persistent cache").clone()
515    }
516
517    /// Register a provider under a dispatch key (`"inline"`, `"http"`,
518    /// `"datasource"`, or any custom slug). Re-registration replaces the
519    /// previously registered provider for that key.
520    pub fn register_provider(&self, kind: &str, provider: Arc<dyn DataSourceProvider>) {
521        let mut providers = self.providers.write_lock("providers");
522        providers.insert(kind.to_string(), provider);
523    }
524
525    /// Snapshot the registered provider kinds. Useful for tests and
526    /// host-app diagnostics.
527    pub fn provider_kinds(&self) -> Vec<String> {
528        self.providers
529            .read_lock("providers")
530            .keys()
531            .cloned()
532            .collect()
533    }
534
535    /// Compute the cache key the resolver would use for a given spec.
536    ///
537    /// Public so phase 4 (Leptos refresh button) and phase 6 (Kyomi
538    /// invalidate-on-change) can compute the exact key the resolver caches
539    /// under without re-implementing the hash.
540    ///
541    /// Hashes `(namespace, datasource, query, url, provider, rows_hash)` in
542    /// that order. `None` fields contribute a sentinel byte (`0xFE`, an
543    /// invalid UTF-8 start byte) so they cannot collide with a real string
544    /// value of `"None"`. Field separator is `0xFF` (also invalid UTF-8) so
545    /// adjacent fields can't bleed into each other ("a|b" vs "ab|").
546    pub fn key_for(spec: &InlineData, namespace: Option<&str>) -> u64 {
547        let mut hasher = Xxh3::new();
548
549        let fields: [Option<&str>; 5] = [
550            namespace,
551            spec.datasource.as_deref(),
552            spec.query.as_deref(),
553            spec.url.as_deref(),
554            spec.provider.as_deref(),
555        ];
556        for field in fields {
557            match field {
558                Some(s) => hasher.update(s.as_bytes()),
559                None => hasher.update(&[HASH_NONE_SENTINEL]),
560            }
561            hasher.update(&[HASH_FIELD_SEP]);
562        }
563
564        // Inline `rows` is hashed via its canonical JSON serialization so
565        // two specs with the same row data hash identically regardless of
566        // hashmap iteration order. `None` rows contribute the sentinel.
567        match spec.rows.as_ref() {
568            Some(rows) => match serde_json::to_vec(rows) {
569                Ok(bytes) => hasher.update(&bytes),
570                Err(_) => hasher.update(&[HASH_NONE_SENTINEL]),
571            },
572            None => hasher.update(&[HASH_NONE_SENTINEL]),
573        }
574
575        hasher.digest()
576    }
577
578    /// The core orchestration: tier-1 → tier-2 → in-flight dedup → provider.
579    ///
580    /// Returns `ResolveOutcome` (result + cache-hit flag) so the caller can
581    /// classify the source under `cache_hits` vs `cache_misses` in
582    /// `FetchMetadata`. Tier-1 hits hydrate from memory only; tier-2 hits
583    /// also re-populate tier-1 so subsequent reads in the same session
584    /// short-circuit.
585    pub async fn fetch(
586        &self,
587        key: u64,
588        request: FetchRequest,
589    ) -> Result<ResolveOutcome, FetchError> {
590        let primary = self.primary_snapshot();
591        let persistent = self.persistent_snapshot();
592        // Snapshot the hooks `HooksRef` ONCE up front so the hooks lock is
593        // never held across an `.await` and downstream sites can share the
594        // same clone without re-acquiring.
595        let hooks = self.hooks_snapshot();
596        let source_name = request.source_name.clone();
597
598        // ── Tier 1: in-process cache ──
599        let mut tier1_expired = false;
600        if let Some(entry) = primary.get(key).await {
601            if !entry.is_expired() {
602                let age = entry.age();
603                emit_cache_hit(&hooks, key, &source_name, hooks::CacheTier::Memory, age);
604                return Ok(ResolveOutcome {
605                    result: FetchResult {
606                        data: entry.data,
607                        metadata: entry.metadata,
608                    },
609                    cache_hit: true,
610                });
611            }
612            // Expired — let it fall through; we'll overwrite on success.
613            tier1_expired = true;
614        }
615
616        // ── Tier 2: persistent cache (phase 3b populates this) ──
617        let mut tier2_expired = false;
618        if let Some(p) = &persistent {
619            if let Some(entry) = p.get(key).await {
620                if !entry.is_expired() {
621                    let age = entry.age();
622                    // Hydrate tier-1 so subsequent reads stay in-process.
623                    // Memory hydration is silent (no event) — only the
624                    // logical tier-2 hit fires.
625                    let _ = primary.put(key, entry.clone()).await;
626                    emit_cache_hit(
627                        &hooks,
628                        key,
629                        &source_name,
630                        hooks::CacheTier::Persistent,
631                        age,
632                    );
633                    return Ok(ResolveOutcome {
634                        result: FetchResult {
635                            data: entry.data,
636                            metadata: entry.metadata,
637                        },
638                        cache_hit: true,
639                    });
640                }
641                // Expired — evict from tier-2 too.
642                let _ = p.invalidate(key).await;
643                tier2_expired = true;
644            }
645        }
646
647        // Both tiers missed — emit one cache-miss with the most specific
648        // reason we can prove. Precedence: `Invalidated` (most specific —
649        // an operator explicitly cleared this key or fired a bulk
650        // invalidate) wins over `Expired` (TTL elapsed naturally), which
651        // wins over `NotFound` (key was never cached or was evicted by
652        // some path the resolver doesn't track).
653        let miss_reason = if self.consume_invalidation_reason(key) {
654            hooks::MissReason::Invalidated
655        } else if tier1_expired || tier2_expired {
656            hooks::MissReason::Expired
657        } else {
658            hooks::MissReason::NotFound
659        };
660        emit_cache_miss(&hooks, key, &source_name, miss_reason);
661
662        // Provider call start — fire BEFORE the dedup wait so consumers
663        // see a "fetching" event for every distinct cache miss, not just
664        // the first concurrent one.
665        emit_progress(
666            &hooks,
667            hooks::Phase::Fetch,
668            &source_name,
669            None,
670            None,
671            format!(
672                "Fetching {}",
673                source_name.as_deref().unwrap_or("source"),
674            ),
675        );
676
677        // ── In-flight dedup ──
678        // If another fetch for the same key is already in flight, await its
679        // shared future. Otherwise install a new shared future and run it.
680        let shared = self.intern_inflight(key, request, primary, persistent);
681        let result = shared.await;
682
683        // Cleanup: drop the inflight slot whether the fetch succeeded or
684        // failed so subsequent calls re-dispatch instead of replaying a
685        // stale terminal state. Errors clear too — failed fetches should
686        // retry, not stick.
687        self.inflight.write_lock("inflight").remove(&key);
688
689        match result {
690            Ok(fetch_result) => {
691                let row_count = fetch_result.data.num_rows();
692                emit_progress(
693                    &hooks,
694                    hooks::Phase::Fetch,
695                    &source_name,
696                    Some(row_count as u64),
697                    None,
698                    format!(
699                        "Fetched {} ({} rows)",
700                        source_name.as_deref().unwrap_or("source"),
701                        row_count,
702                    ),
703                );
704                Ok(ResolveOutcome {
705                    result: fetch_result,
706                    cache_hit: false,
707                })
708            }
709            Err(err) => {
710                emit_error(&hooks, hooks::Phase::Fetch, &source_name, err.to_string());
711                Err(err)
712            }
713        }
714    }
715
716    /// Get-or-insert the in-flight `Shared` future for a key. Returns a
717    /// clone of the `Shared` so the caller can `await` it; the original
718    /// stays in the map until removed by the caller.
719    fn intern_inflight(
720        &self,
721        key: u64,
722        request: FetchRequest,
723        primary: CacheBackendRef,
724        persistent: Option<CacheBackendRef>,
725    ) -> SharedFetch {
726        let mut inflight = self.inflight.write_lock("inflight");
727        if let Some(existing) = inflight.get(&key) {
728            return existing.clone();
729        }
730
731        // Build the work future: dispatch to the right provider, write to
732        // both cache tiers on success. The future is `'static` because it
733        // owns everything it touches via `Arc` clones.
734        let providers = self.snapshot_providers();
735        let cache_cfg = request.cache.clone();
736        let namespace = request.namespace.clone();
737        let slug = request.spec.datasource.clone();
738
739        let work = async move {
740            let provider = dispatch_provider(&providers, &request.spec)?;
741            let result = provider.fetch(request).await?;
742
743            // Write-through: tier-1 always; tier-2 if configured.
744            let entry = CachedEntry {
745                data: result.data.clone(),
746                fetched_at: SystemTime::now(),
747                ttl: cache_cfg
748                    .as_ref()
749                    .and_then(|c| c.ttl_duration())
750                    .unwrap_or(DEFAULT_TTL),
751                tags: build_tags(slug.as_deref(), namespace.as_deref()),
752                metadata: result.metadata.clone(),
753            };
754            // Cache write failures must not poison the result — log via
755            // tracing in 3c, but for now just swallow (matches the design
756            // doc's `.ok()` guidance).
757            let _ = primary.put(key, entry.clone()).await;
758            if let Some(p) = &persistent {
759                let _ = p.put(key, entry).await;
760            }
761            Ok(result)
762        };
763
764        // Pick the right Box variant for the target. `boxed()` is Send-only
765        // (native), `boxed_local()` is ?Send (WASM).
766        #[cfg(not(target_arch = "wasm32"))]
767        let boxed: ResolverFuture<Result<FetchResult, FetchError>> = work.boxed();
768        #[cfg(target_arch = "wasm32")]
769        let boxed: ResolverFuture<Result<FetchResult, FetchError>> = work.boxed_local();
770
771        let future = boxed.shared();
772        inflight.insert(key, future.clone());
773        future
774    }
775
776    /// Snapshot the provider map for use inside the inflight future. We
777    /// snapshot rather than holding the lock so the future stays `'static`.
778    fn snapshot_providers(&self) -> HashMap<String, Arc<dyn DataSourceProvider>> {
779        self.providers.read_lock("providers").clone()
780    }
781
782    // ── Bulk invalidation API ──
783
784    /// Drop a single entry from every cache tier. The next miss on `key`
785    /// will be reported via [`hooks::ResolverHooks::on_cache_miss`] with
786    /// [`hooks::MissReason::Invalidated`] rather than `NotFound`.
787    pub async fn invalidate(&self, key: u64) {
788        let primary = self.primary_snapshot();
789        let persistent = self.persistent_snapshot();
790        let _ = primary.invalidate(key).await;
791        if let Some(p) = &persistent {
792            let _ = p.invalidate(key).await;
793        }
794        self.recently_invalidated
795            .write_lock("recently_invalidated")
796            .keys
797            .insert(key);
798    }
799
800    /// Drop every cached entry across all tiers. The very next miss on any
801    /// key will be reported as `Invalidated`; subsequent misses fall back
802    /// to the regular `NotFound` / `Expired` reasoning. See the field-level
803    /// docs on [`Resolver::recently_invalidated`] for why per-key tracking
804    /// isn't done here (would require a `keys()` method on every backend).
805    pub async fn invalidate_all(&self) {
806        let primary = self.primary_snapshot();
807        let persistent = self.persistent_snapshot();
808        let _ = primary.clear().await;
809        if let Some(p) = &persistent {
810            let _ = p.clear().await;
811        }
812        self.mark_bulk_invalidated();
813    }
814
815    /// Drop every entry whose source spec carried the given `datasource`
816    /// slug. Useful for "datasource X was edited; invalidate all queries
817    /// against it" workflows. Subject to the same single-shot
818    /// `Invalidated` reporting as [`Resolver::invalidate_all`].
819    pub async fn invalidate_by_slug(&self, slug: &str) {
820        let tag = format!("{TAG_SLUG_PREFIX}{slug}");
821        let primary = self.primary_snapshot();
822        let persistent = self.persistent_snapshot();
823        let _ = primary.invalidate_by_tag(&tag).await;
824        if let Some(p) = &persistent {
825            let _ = p.invalidate_by_tag(&tag).await;
826        }
827        self.mark_bulk_invalidated();
828    }
829
830    /// Drop every entry tagged with the given namespace. Used for tenant
831    /// isolation flows ("user logged out; clear their cached data").
832    /// Subject to the same single-shot `Invalidated` reporting as
833    /// [`Resolver::invalidate_all`].
834    pub async fn invalidate_by_namespace(&self, namespace: &str) {
835        let tag = format!("{TAG_NAMESPACE_PREFIX}{namespace}");
836        let primary = self.primary_snapshot();
837        let persistent = self.persistent_snapshot();
838        let _ = primary.invalidate_by_tag(&tag).await;
839        if let Some(p) = &persistent {
840            let _ = p.invalidate_by_tag(&tag).await;
841        }
842        self.mark_bulk_invalidated();
843    }
844
845    /// Set the bulk-pending flag so the next post-bulk miss surfaces as
846    /// [`hooks::MissReason::Invalidated`]. Synchronous; the lock is never
847    /// held across an `.await`.
848    fn mark_bulk_invalidated(&self) {
849        self.recently_invalidated
850            .write_lock("recently_invalidated")
851            .bulk_pending = true;
852    }
853
854    /// Check whether a miss for `key` should be reported as
855    /// [`hooks::MissReason::Invalidated`]. Drains the per-key entry on
856    /// first observation and consumes the bulk-pending flag at most once
857    /// per bulk invalidation, so this returns `true` for at most one miss
858    /// per `invalidate*` call. Called from the hot fetch path so the lock
859    /// is taken briefly and never held across an `.await`.
860    fn consume_invalidation_reason(&self, key: u64) -> bool {
861        let mut tracker = self
862            .recently_invalidated
863            .write_lock("recently_invalidated");
864        if tracker.keys.remove(&key) {
865            return true;
866        }
867        if tracker.bulk_pending {
868            tracker.bulk_pending = false;
869            return true;
870        }
871        false
872    }
873
874    /// Iterate every registered provider AND cache backend, awaiting their
875    /// `shutdown` hook in turn. Wired up by `ChartML::shutdown()`.
876    pub async fn shutdown(&self) {
877        let providers = self.snapshot_providers();
878        for (_, provider) in providers {
879            provider.shutdown().await;
880        }
881        let primary = self.primary_snapshot();
882        primary.shutdown().await;
883        if let Some(p) = self.persistent_snapshot() {
884            p.shutdown().await;
885        }
886    }
887}
888
889/// Emit a [`hooks::CacheHitEvent`] through the registered hook impl, if
890/// any. Fire-and-forget via `spawn_hook`; never blocks the resolver.
891fn emit_cache_hit(
892    hooks: &Option<HooksRef>,
893    key: u64,
894    source_name: &Option<String>,
895    tier: hooks::CacheTier,
896    age: Duration,
897) {
898    let Some(h) = hooks.as_ref() else { return };
899    let h = h.clone();
900    let event = hooks::CacheHitEvent {
901        key,
902        source_name: source_name.clone(),
903        tier,
904        age,
905    };
906    hooks::spawn_hook(async move {
907        h.on_cache_hit(event).await;
908    });
909}
910
911/// Emit a [`hooks::CacheMissEvent`] through the registered hook impl.
912fn emit_cache_miss(
913    hooks: &Option<HooksRef>,
914    key: u64,
915    source_name: &Option<String>,
916    reason: hooks::MissReason,
917) {
918    let Some(h) = hooks.as_ref() else { return };
919    let h = h.clone();
920    let event = hooks::CacheMissEvent {
921        key,
922        source_name: source_name.clone(),
923        reason,
924    };
925    hooks::spawn_hook(async move {
926        h.on_cache_miss(event).await;
927    });
928}
929
930/// Emit a [`hooks::ProgressEvent`] through the registered hook impl.
931/// `pub(crate)` so `ChartML::transform` / `render_prepared_to_svg` can
932/// emit transform/render-phase progress without re-implementing the
933/// snapshot dance.
934pub(crate) fn emit_progress(
935    hooks: &Option<HooksRef>,
936    phase: hooks::Phase,
937    source_name: &Option<String>,
938    loaded: Option<u64>,
939    total: Option<u64>,
940    message: String,
941) {
942    let Some(h) = hooks.as_ref() else { return };
943    let h = h.clone();
944    let event = hooks::ProgressEvent {
945        phase,
946        source_name: source_name.clone(),
947        loaded,
948        total,
949        message,
950    };
951    hooks::spawn_hook(async move {
952        h.on_progress(event).await;
953    });
954}
955
956/// Emit a [`hooks::ErrorEvent`] through the registered hook impl.
957/// `pub(crate)` so `ChartML::transform` can emit transform-phase errors.
958pub(crate) fn emit_error(
959    hooks: &Option<HooksRef>,
960    phase: hooks::Phase,
961    source_name: &Option<String>,
962    error: String,
963) {
964    let Some(h) = hooks.as_ref() else { return };
965    let h = h.clone();
966    let event = hooks::ErrorEvent {
967        phase,
968        source_name: source_name.clone(),
969        error,
970    };
971    hooks::spawn_hook(async move {
972        h.on_error(event).await;
973    });
974}
975
976/// Build the tag list applied to `CachedEntry` on write. `slug` and
977/// `namespace` are optional — entries without one of them simply skip the
978/// corresponding tag (no empty-string tag pollution).
979fn build_tags(slug: Option<&str>, namespace: Option<&str>) -> Vec<String> {
980    let mut tags = Vec::new();
981    if let Some(slug) = slug {
982        tags.push(format!("{TAG_SLUG_PREFIX}{slug}"));
983    }
984    if let Some(ns) = namespace {
985        tags.push(format!("{TAG_NAMESPACE_PREFIX}{ns}"));
986    }
987    tags
988}
989
990/// Apply the design-doc dispatch routing. Precedence: explicit `provider`
991/// key wins over inferred shape (rows → inline, url → http, datasource →
992/// datasource). Returns the `Arc` so the caller can `await` the trait
993/// method without holding any lock.
994fn dispatch_provider(
995    providers: &HashMap<String, Arc<dyn DataSourceProvider>>,
996    spec: &InlineData,
997) -> Result<Arc<dyn DataSourceProvider>, FetchError> {
998    let kind = if let Some(kind) = spec.provider.as_deref() {
999        kind
1000    } else if spec.rows.is_some() {
1001        "inline"
1002    } else if spec.url.is_some() {
1003        "http"
1004    } else if spec.datasource.is_some() {
1005        "datasource"
1006    } else {
1007        return Err(FetchError::Other(
1008            "no dispatch match for spec — needs one of `provider`, `rows`, `url`, or `datasource`"
1009                .to_string(),
1010        ));
1011    };
1012
1013    providers
1014        .get(kind)
1015        .cloned()
1016        .ok_or_else(|| FetchError::ProviderNotFound {
1017            kind: kind.to_string(),
1018        })
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024
1025    fn empty_inline() -> InlineData {
1026        InlineData {
1027            provider: None,
1028            rows: None,
1029            url: None,
1030            endpoint: None,
1031            cache: None,
1032            datasource: None,
1033            query: None,
1034        }
1035    }
1036
1037    #[test]
1038    fn key_for_is_deterministic() {
1039        let spec = InlineData {
1040            datasource: Some("warehouse".into()),
1041            query: Some("SELECT 1".into()),
1042            ..empty_inline()
1043        };
1044        let k1 = Resolver::key_for(&spec, Some("ns"));
1045        let k2 = Resolver::key_for(&spec, Some("ns"));
1046        assert_eq!(k1, k2);
1047    }
1048
1049    #[test]
1050    fn key_for_namespace_changes_key() {
1051        let spec = InlineData {
1052            datasource: Some("warehouse".into()),
1053            ..empty_inline()
1054        };
1055        let k1 = Resolver::key_for(&spec, Some("tenant-a"));
1056        let k2 = Resolver::key_for(&spec, Some("tenant-b"));
1057        assert_ne!(k1, k2);
1058    }
1059
1060    #[test]
1061    fn key_for_none_distinguishes_from_literal_none_string() {
1062        let spec_none = InlineData {
1063            datasource: None,
1064            url: Some("https://x".into()),
1065            ..empty_inline()
1066        };
1067        let spec_literal = InlineData {
1068            datasource: Some("None".into()),
1069            url: Some("https://x".into()),
1070            ..empty_inline()
1071        };
1072        // Sentinel byte must NOT collide with the literal "None" string.
1073        assert_ne!(
1074            Resolver::key_for(&spec_none, None),
1075            Resolver::key_for(&spec_literal, None),
1076        );
1077    }
1078
1079    #[test]
1080    fn key_for_field_separator_prevents_bleed() {
1081        // `(datasource="ab", query=None)` must hash differently from
1082        // `(datasource="a", query="b")` — without a separator they'd both
1083        // serialize to "ab".
1084        let merged = InlineData {
1085            datasource: Some("ab".into()),
1086            ..empty_inline()
1087        };
1088        let split = InlineData {
1089            datasource: Some("a".into()),
1090            query: Some("b".into()),
1091            ..empty_inline()
1092        };
1093        assert_ne!(
1094            Resolver::key_for(&merged, None),
1095            Resolver::key_for(&split, None),
1096        );
1097    }
1098
1099    #[test]
1100    fn dispatch_provider_precedence_explicit_wins() {
1101        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1102            "custom".to_string(),
1103            Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1104        )]
1105        .into_iter()
1106        .collect();
1107        let spec = InlineData {
1108            provider: Some("custom".into()),
1109            // rows would normally route to "inline", but explicit wins.
1110            rows: Some(vec![]),
1111            ..empty_inline()
1112        };
1113        // We can't compare trait objects, but `is_ok()` is sufficient: the
1114        // ONLY registered provider key is "custom", so reaching `Ok` means
1115        // dispatch picked it. Routing rows alone (no `provider`) would
1116        // search for "inline", which is unregistered → would return
1117        // `ProviderNotFound`.
1118        assert!(dispatch_provider(&providers, &spec).is_ok());
1119    }
1120
1121    #[test]
1122    fn dispatch_provider_inferred_inline() {
1123        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1124            "inline".to_string(),
1125            Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1126        )]
1127        .into_iter()
1128        .collect();
1129        let spec = InlineData {
1130            rows: Some(vec![]),
1131            ..empty_inline()
1132        };
1133        assert!(dispatch_provider(&providers, &spec).is_ok());
1134    }
1135
1136    #[test]
1137    fn dispatch_provider_missing_kind_errors() {
1138        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1139        let spec = InlineData {
1140            datasource: Some("warehouse".into()),
1141            ..empty_inline()
1142        };
1143        let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1144        assert!(matches!(err, FetchError::ProviderNotFound { ref kind } if kind == "datasource"));
1145    }
1146
1147    #[test]
1148    fn dispatch_provider_unmatched_spec_errors() {
1149        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1150        let spec = empty_inline();
1151        let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1152        assert!(matches!(err, FetchError::Other(_)));
1153    }
1154}