Skip to main content

chartml_core/resolver/
mod.rs

1//! Resolver: caching + dedup + dispatch layer between `ChartML::fetch` and
2//! the registered `DataSourceProvider` implementations (chartml 5.0 phase 3).
3//!
4//! The resolver owns:
5//! - **Tier-1 cache** — a `MemoryBackend` always present.
6//! - **Tier-2 cache** — a [`PersistentSlot`] that is either `None` (no
7//!   persistent cache), `Ready` (backend constructed), or `Deferred`
8//!   (lazy factory awaiting first `fetch()` to initialize — used for
9//!   `IndexedDbBackend` whose async constructor can't run at setup time).
10//! - **In-flight tracker** — a `HashMap<u64, Shared<BoxFuture<...>>>` so two
11//!   concurrent fetches for the same key share one provider invocation.
12//! - **Provider registry** — `HashMap<String, Arc<dyn DataSourceProvider>>`,
13//!   keyed by dispatch slug (`"inline"`, `"http"`, `"datasource"`, …).
14//!
15//! Phase 3 leaves `ResolverHooks` integration as a no-op stub; phase 3c will
16//! add hook dispatch without further changes to this file's structure.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use arrow::array::RecordBatch;
22use arrow::datatypes::SchemaRef;
23// `web_time::SystemTime` works on `wasm32-unknown-unknown` (where the std
24// version panics). On native it's a transparent alias for `std::time`.
25use std::time::Duration;
26use web_time::SystemTime;
27
28use async_trait::async_trait;
29use futures::future::{FutureExt, Shared};
30use thiserror::Error;
31use xxhash_rust::xxh3::Xxh3;
32
33// `?Send` on WASM means the futures we shove into `Shared` cannot promise
34// `Send`. Use `LocalBoxFuture` on WASM and `BoxFuture` (which IS `Send`) on
35// native so the resolver can stay multi-threaded on tokio while compiling
36// cleanly under `wasm32-unknown-unknown`.
37#[cfg(not(target_arch = "wasm32"))]
38type ResolverFuture<T> = futures::future::BoxFuture<'static, T>;
39#[cfg(target_arch = "wasm32")]
40type ResolverFuture<T> = futures::future::LocalBoxFuture<'static, T>;
41
42// Cfg-gated shared-ownership + interior-mutability primitives. On native we
43// use `Arc<Mutex<T>>` so the resolver stays multi-threaded for tokio. On
44// WASM the resolver's inflight `Shared<LocalBoxFuture<...>>` map is
45// inherently `?Send`, so wrapping it in `Arc<Mutex<...>>` would trip
46// `clippy::arc_with_non_send_sync`. Single-threaded `Rc<RefCell<...>>` is
47// the correct primitive for the wasm32-unknown-unknown target.
48/// Cfg-gated shared-ownership pointer. `Arc<T>` on native (so handles can
49/// move across `tokio::spawn` task boundaries), `Rc<T>` on WASM (where
50/// `wasm32-unknown-unknown` is single-threaded and the resolver's
51/// internals are `?Send`, so an `Arc` would be both incorrect and
52/// rejected by `clippy::arc_with_non_send_sync`).
53///
54/// Public so consumers wiring [`CacheBackend`] / [`DataSourceProvider`]
55/// trait objects through the resolver can use the same alias as the
56/// resolver's internal storage — their wasm32-unknown-unknown builds get
57/// the right primitive without a manual `cfg_attr` dance at every site.
58#[cfg(not(target_arch = "wasm32"))]
59pub type SharedRef<T> = Arc<T>;
60#[cfg(target_arch = "wasm32")]
61pub type SharedRef<T> = std::rc::Rc<T>;
62
63#[cfg(not(target_arch = "wasm32"))]
64pub(crate) type Lock<T> = std::sync::Mutex<T>;
65#[cfg(target_arch = "wasm32")]
66pub(crate) type Lock<T> = std::cell::RefCell<T>;
67
68/// Cfg-gated extension trait so `self.field.write_lock("…").something()` and
69/// `self.field.read_lock("…").something()` work uniformly across the native
70/// `Mutex` and the WASM `RefCell`. Eliminates per-call-site `cfg` blocks
71/// without losing the panic message that distinguishes which lock failed.
72trait LockExt<T> {
73    type Read<'a>: std::ops::Deref<Target = T>
74    where
75        Self: 'a;
76    type Write<'a>: std::ops::DerefMut<Target = T>
77    where
78        Self: 'a;
79    fn read_lock(&self, label: &'static str) -> Self::Read<'_>;
80    fn write_lock(&self, label: &'static str) -> Self::Write<'_>;
81}
82
83#[cfg(not(target_arch = "wasm32"))]
84impl<T> LockExt<T> for std::sync::Mutex<T> {
85    type Read<'a>
86        = std::sync::MutexGuard<'a, T>
87    where
88        Self: 'a;
89    type Write<'a>
90        = std::sync::MutexGuard<'a, T>
91    where
92        Self: 'a;
93    fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
94        self.lock()
95            .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
96    }
97    fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
98        self.lock()
99            .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
100    }
101}
102
103#[cfg(target_arch = "wasm32")]
104impl<T> LockExt<T> for std::cell::RefCell<T> {
105    type Read<'a>
106        = std::cell::Ref<'a, T>
107    where
108        Self: 'a;
109    type Write<'a>
110        = std::cell::RefMut<'a, T>
111    where
112        Self: 'a;
113    fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
114        self.try_borrow()
115            .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed mutably"))
116    }
117    fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
118        self.try_borrow_mut()
119            .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed"))
120    }
121}
122
123use crate::data::DataTable;
124use crate::error::ChartError;
125use crate::spec::source::CacheConfig as SpecCacheConfig;
126use crate::spec::InlineData;
127
128pub mod builtin;
129pub mod cache;
130pub mod cancel;
131pub mod hooks;
132
133// Phase 3b: persistent cache backends. The container module is unconditional
134// because it hosts the pure-Rust `codec` submodule (shared blob framing) that
135// needs to compile + test on every target. Backend implementations themselves
136// (`indexeddb`, future `sqlite`/`fs`/…) carry their own `#[cfg(...)]` gates
137// inside `backends/mod.rs`, so non-browser builds and browser builds without
138// the `wasm-indexeddb` feature don't compile any of the IndexedDB-specific
139// code (which depends on the `idb` crate, brought in only by that feature).
140pub mod backends;
141
142pub use builtin::{HttpProvider, InlineProvider};
143pub use cache::{CacheBackend, CacheError, CachedEntry, MemoryBackend};
144pub use cancel::CancellationToken;
145pub use hooks::{
146    CacheHitEvent, CacheMissEvent, CacheTier, ErrorEvent, HooksRef, MissReason, NullHooks, Phase,
147    ProgressEvent, ResolverHooks,
148};
149
150/// Default TTL applied when a spec doesn't declare one. Five minutes matches
151/// the JS middleware's `CACHE_TTL_DEFAULT_MS`.
152pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
153
154/// Sentinel byte mixed into the hash for `None` fields. Distinct from any
155/// UTF-8 byte (0xFE is invalid as a UTF-8 start byte) so `None` can never
156/// collide with a real string value (e.g., a literal `"None"` datasource).
157const HASH_NONE_SENTINEL: u8 = 0xFE;
158const HASH_FIELD_SEP: u8 = 0xFF;
159
160/// Provider dispatch + fetch trait. Host apps implement this for their
161/// custom data sources (BigQuery, Snowflake, internal REST APIs, …) and
162/// register them via `ChartML::register_provider(kind, provider)`.
163///
164/// `?Send` on WASM matches `TransformMiddleware` so single-threaded
165/// browser environments can hold non-`Send` state inside provider impls.
166#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
167#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
168pub trait DataSourceProvider: Send + Sync {
169    /// Fetch one source. The resolver handles caching + dedup + parallelism;
170    /// providers only see this single call per actual upstream invocation.
171    async fn fetch(&self, request: FetchRequest) -> Result<FetchResult, FetchError>;
172
173    /// Batch-oriented fetch. Returns multiple RecordBatches with a shared
174    /// schema, allowing the transform layer to register them into MemTable
175    /// without concatenation.
176    ///
177    /// Default wraps `fetch()` into a single-batch result. Override to
178    /// stream batches from an HTTP endpoint (e.g. Arrow IPC stream).
179    async fn fetch_batches(&self, request: FetchRequest) -> Result<FetchBatchResult, FetchError> {
180        let result = self.fetch(request).await?;
181        let schema = result.data.schema();
182        let batch = result.data.into_record_batch();
183        Ok(FetchBatchResult {
184            schema,
185            batches: vec![batch],
186            metadata: result.metadata,
187        })
188    }
189
190    /// Optional graceful shutdown hook. Called by `ChartML::shutdown()` on
191    /// SSR request end / tab close. Default no-op so providers that hold no
192    /// pooled resources don't have to implement it.
193    async fn shutdown(&self) {}
194}
195
196/// Per-source request context. Carries the resolved spec (params already
197/// substituted) plus cache + header + namespace + cancellation hints.
198#[derive(Debug, Clone)]
199pub struct FetchRequest {
200    /// User-chosen name for this source within the chart spec. `None` for
201    /// unnamed (flat) `data:` forms.
202    pub source_name: Option<String>,
203    /// Fully resolved flat-form source spec — `datasource`, `query`, `url`,
204    /// `rows`, etc. with `$param.name` references already substituted.
205    pub spec: InlineData,
206    /// Parsed cache config from `spec.cache.ttl` (and `auto_refresh`).
207    pub cache: Option<CacheConfig>,
208    /// Request-level HTTP headers (merged with `HttpProvider` defaults).
209    /// Ignored by non-HTTP providers unless they explicitly read this field.
210    pub headers: HashMap<String, String>,
211    /// Tenant / workspace namespace folded into the cache key. `None` for
212    /// single-tenant deployments.
213    pub namespace: Option<String>,
214    /// Optional cancellation token. Phase 3 always passes `None` from the
215    /// resolver; providers that opt into honoring it stay forward-compatible
216    /// with future tab-close / cancel-on-route-change wiring.
217    pub cancel_token: Option<CancellationToken>,
218}
219
220/// Provider response. `Clone` is required because `futures::future::Shared`
221/// (used for in-flight dedup) takes a `Future<Output: Clone>`. `DataTable`
222/// is `Arc`-backed so cloning is cheap.
223#[derive(Debug, Clone)]
224pub struct FetchResult {
225    pub data: DataTable,
226    /// Free-form per-provider metadata — `bytes_billed`, `rows_returned`,
227    /// `server_refreshed_at`, `upstream_cache_hit`, `warnings`, etc.
228    pub metadata: HashMap<String, serde_json::Value>,
229}
230
231/// Batch-oriented provider response. Carries multiple RecordBatches with a
232/// shared schema, allowing the transform layer to register them into
233/// MemTable without concatenation.
234#[derive(Debug, Clone)]
235pub struct FetchBatchResult {
236    pub schema: SchemaRef,
237    pub batches: Vec<RecordBatch>,
238    pub metadata: HashMap<String, serde_json::Value>,
239}
240
241/// Provider failures. `Clone` matches `FetchResult` so `Shared` can clone
242/// the error when multiple in-flight callers receive the same outcome.
243#[derive(Debug, Error, Clone)]
244pub enum FetchError {
245    #[error("datasource '{slug}' not found")]
246    SlugNotFound { slug: String },
247
248    #[error("query failed: {0}")]
249    QueryFailed(String),
250
251    #[error("decode failed: {0}")]
252    DecodeFailed(String),
253
254    #[error("cancelled")]
255    Cancelled,
256
257    #[error("no provider registered for kind '{kind}'")]
258    ProviderNotFound { kind: String },
259
260    #[error("cache backend error: {0}")]
261    Cache(String),
262
263    #[error("{0}")]
264    Other(String),
265}
266
267impl From<FetchError> for ChartError {
268    fn from(err: FetchError) -> Self {
269        match err {
270            FetchError::SlugNotFound { slug } => {
271                ChartError::DataError(format!("datasource '{slug}' not found"))
272            }
273            FetchError::QueryFailed(msg) => ChartError::DataError(format!("query failed: {msg}")),
274            FetchError::DecodeFailed(msg) => ChartError::DataError(format!("decode failed: {msg}")),
275            FetchError::Cancelled => ChartError::DataError("fetch cancelled".to_string()),
276            FetchError::ProviderNotFound { kind } => ChartError::PluginError(format!(
277                "no provider registered for kind '{kind}'"
278            )),
279            FetchError::Cache(msg) => ChartError::DataError(format!("cache error: {msg}")),
280            FetchError::Other(msg) => ChartError::DataError(msg),
281        }
282    }
283}
284
285impl From<CacheError> for FetchError {
286    fn from(err: CacheError) -> Self {
287        FetchError::Cache(err.to_string())
288    }
289}
290
291/// Parsed cache config. Built from `spec::source::CacheConfig` (which carries
292/// the raw YAML strings) so the resolver doesn't have to re-parse `humantime`
293/// formats on every fetch.
294#[derive(Debug, Clone, Default)]
295pub struct CacheConfig {
296    /// TTL parsed via `humantime` (`"30s"`, `"5m"`, `"6h"`, `"1d"`, `"7d"`).
297    /// `None` → default TTL applies.
298    pub ttl: Option<Duration>,
299    /// Component-layer hint — the resolver doesn't auto-refresh, but the
300    /// flag is preserved end-to-end so consumers (Leptos, React) can read it.
301    pub auto_refresh: bool,
302}
303
304impl CacheConfig {
305    /// Parse a `spec::source::CacheConfig` (the raw YAML form) into the
306    /// resolver-friendly shape. Returns `Ok(None)` when the input is `None`
307    /// so the upstream `Option` plumbing stays clean. Returns `Err` when the
308    /// declared TTL string is present but unparseable — silent fallback to
309    /// `DEFAULT_TTL` would let an operator's typo (`"5 minutes"` instead of
310    /// `"5m"`) ship to production unnoticed, so the parser's complaint is
311    /// surfaced to the caller verbatim.
312    pub fn from_spec(spec: Option<&SpecCacheConfig>) -> Result<Option<Self>, ChartError> {
313        let Some(spec) = spec else {
314            return Ok(None);
315        };
316        let ttl = match spec.ttl.as_deref() {
317            Some(s) => Some(humantime::parse_duration(s).map_err(|e| {
318                ChartError::InvalidSpec(format!(
319                    "invalid cache.ttl value {s:?}: {e} (expected humantime format like \"30s\", \"5m\", \"6h\", \"1d\")"
320                ))
321            })?),
322            None => None,
323        };
324        Ok(Some(Self {
325            ttl,
326            auto_refresh: spec.auto_refresh.unwrap_or(false),
327        }))
328    }
329
330    /// Convenience accessor matching the design doc's `ttl_duration()` name.
331    pub fn ttl_duration(&self) -> Option<Duration> {
332        self.ttl
333    }
334}
335
336/// Outcome of a `Resolver::fetch` call: the provider's result PLUS whether
337/// the value came from a cache tier or a fresh provider invocation.
338///
339/// `ChartML::fetch` uses this to populate `FetchMetadata.cache_hits` /
340/// `cache_misses` per source name without needing the resolver to know about
341/// source names itself (which it doesn't — it works in keys, not names).
342#[derive(Debug, Clone)]
343pub struct ResolveOutcome {
344    pub result: FetchResult,
345    pub cache_hit: bool,
346    /// Original provider batches. `Some` on cache miss (when the provider
347    /// returned via `fetch_batches`), `None` on cache hit. Allows the
348    /// transform layer to register batches into MemTable without concat.
349    pub batches: Option<FetchBatchResult>,
350}
351
352/// Tag prefix for source-slug-based bulk invalidation. Public so consumers
353/// computing tags out-of-band (e.g., a custom CacheBackend that wants to
354/// pre-populate entries) match the resolver's wire format exactly.
355pub const TAG_SLUG_PREFIX: &str = "slug:";
356/// Tag prefix for namespace-based bulk invalidation.
357pub const TAG_NAMESPACE_PREFIX: &str = "namespace:";
358
359/// Public alias for the shared-ownership wrapper around `Resolver`. `Arc` on
360/// native (so consumers using `tokio::spawn` can move resolver handles across
361/// task boundaries) and `Rc` on WASM (where the resolver's inflight map is
362/// inherently `?Send`, so an `Arc` would be both incorrect and rejected by
363/// `clippy::arc_with_non_send_sync`). Returned by `ChartML::resolver()` and
364/// accepted by every host that wants a long-lived handle for the bulk
365/// `invalidate*` API.
366pub type ResolverRef = SharedRef<Resolver>;
367
368/// Public alias for the shared-ownership wrapper around a [`CacheBackend`]
369/// trait object. `Arc<dyn CacheBackend>` on native, `Rc<dyn CacheBackend>`
370/// on WASM — mirrors the [`SharedRef`] story so wasm32 consumers can hand
371/// off `!Send` backends (e.g. [`backends::indexeddb::IndexedDbBackend`])
372/// without tripping `clippy::arc_with_non_send_sync`.
373pub type CacheBackendRef = SharedRef<dyn CacheBackend>;
374
375/// State of the optional tier-2 (persistent) cache slot. Supports both
376/// eagerly-constructed backends (`Ready`) and lazy factories (`Deferred`)
377/// whose async construction is deferred to the first `fetch()`.
378enum PersistentSlot {
379    /// No persistent cache configured.
380    None,
381    /// Backend already constructed and ready to use.
382    Ready(CacheBackendRef),
383    /// Lazy factory — will be invoked on the first `fetch()` call. The inner
384    /// `Option` is `take()`-d once so only one caller runs the factory;
385    /// concurrent callers that find `None` inside skip tier-2 for that fetch.
386    Deferred(Option<BoxPersistentFactory>),
387}
388
389/// Boxed lazy factory for the persistent cache backend.
390/// On native the closure and its returned future must be `Send` so the
391/// resolver stays compatible with `tokio::spawn`. On WASM `?Send` is fine
392/// because the runtime is single-threaded.
393#[cfg(not(target_arch = "wasm32"))]
394type BoxPersistentFactory = Box<dyn FnOnce() -> ResolverFuture<Option<CacheBackendRef>> + Send>;
395#[cfg(target_arch = "wasm32")]
396type BoxPersistentFactory = Box<dyn FnOnce() -> ResolverFuture<Option<CacheBackendRef>>>;
397
398/// Provider dispatch + cache + dedup orchestration.
399///
400/// One `Resolver` per `ChartML` instance. The resolver is held inside
401/// `ChartML` as `SharedRef<Resolver>` (`Arc` on native, `Rc` on WASM) so
402/// consumers can grab a handle for the `invalidate*` API while the chart
403/// instance keeps using it.
404pub struct Resolver {
405    /// Default in-process cache (always present, never replaced — kept for
406    /// the rare case a consumer wants to introspect the in-memory tier
407    /// directly even after swapping `primary`). Held as `SharedRef` so the
408    /// `MemoryBackend` clone the resolver hands itself the very first time
409    /// is the same single-threaded type the wasm32 backends need (clippy
410    /// rejects `Arc<MemoryBackend>` when other tier swaps land on
411    /// non-`Send` types in the same `Lock`).
412    memory: SharedRef<MemoryBackend>,
413    /// Tier-1 cache. Defaults to the always-present in-memory backend; can
414    /// be replaced via `ChartML::set_cache(...)` (e.g., a host's custom
415    /// process-wide LRU). Behind a `Lock<CacheBackendRef>` so swaps are
416    /// atomic and don't require `&mut self` (the resolver is held inside a
417    /// `SharedRef`). The `CacheBackendRef` alias is `Arc` on native and
418    /// `Rc` on WASM — wasm32 backends like `IndexedDbBackend` are `!Send`
419    /// and would trip `clippy::arc_with_non_send_sync` if forced into
420    /// `std::sync::Arc`.
421    primary: Lock<CacheBackendRef>,
422    /// Tier-2 (persistent) cache. Starts as `PersistentSlot::None`; populated
423    /// eagerly via `set_persistent_cache` (`Ready`) or lazily via
424    /// `set_persistent_cache_factory` (`Deferred` — resolved on first fetch).
425    persistent: Lock<PersistentSlot>,
426    inflight: SharedRef<Lock<HashMap<u64, SharedFetch>>>,
427    providers: Lock<HashMap<String, Arc<dyn DataSourceProvider>>>,
428    /// Optional hook impl. Wrapped in `Lock<Option<...>>` so `set_hooks`
429    /// works without `&mut self` (the resolver lives behind a `SharedRef`).
430    /// Snapshotted into a local clone before each instrumentation site so
431    /// the hook lock is never held across an `await`.
432    hooks: Lock<Option<HooksRef>>,
433    /// Tracker for which keys have been explicitly invalidated since their
434    /// last fetch, so the next cache-miss for that key can be reported as
435    /// [`hooks::MissReason::Invalidated`] instead of `NotFound`.
436    ///
437    /// **Per-key invalidation** (`invalidate(key)`) inserts into
438    /// [`InvalidationTracker::keys`] — the next miss for that exact key
439    /// reports `Invalidated` and removes the entry (so subsequent misses on
440    /// the same key without re-invalidating fall back to the regular
441    /// `NotFound` / `Expired` reasoning).
442    ///
443    /// **Bulk invalidation** (`invalidate_all` / `invalidate_by_slug` /
444    /// `invalidate_by_namespace`) sets [`InvalidationTracker::bulk_pending`]
445    /// to `true`. Enumerating every just-evicted key is impractical — the
446    /// `CacheBackend` trait doesn't expose iteration (and adding it would
447    /// be expensive on `IndexedDbBackend`, which would need a cursor sweep
448    /// per call) — so the resolver instead reports the *first* post-bulk
449    /// miss as `Invalidated` and clears the flag. Subsequent misses fall
450    /// back to `NotFound` / `Expired` until another invalidation happens.
451    /// This is a deliberate trade-off: documented below, mirrored in the
452    /// integration test `test_invalidate_emits_invalidated_miss_reason`.
453    recently_invalidated: SharedRef<Lock<InvalidationTracker>>,
454}
455
456/// Per-resolver tracker for invalidation events. Held inside a
457/// `Lock<...>` on the resolver so it survives across `&self` borrows
458/// (the resolver lives behind a `SharedRef` and uses interior mutability
459/// for every other piece of state too).
460#[derive(Debug, Default)]
461struct InvalidationTracker {
462    /// Specific keys invalidated via `Resolver::invalidate(key)`. Drained
463    /// on first observation by `consume_invalidation_reason`.
464    keys: HashSet<u64>,
465    /// Whether ANY bulk invalidate (`invalidate_all` /
466    /// `invalidate_by_slug` / `invalidate_by_namespace`) has fired since
467    /// the last bulk-pending consumption. Cleared the first time a miss
468    /// observes it — at most one post-bulk miss is reported as
469    /// `Invalidated`.
470    bulk_pending: bool,
471}
472
473/// Shared in-flight future type. Boxed for dyn-trait erasure; `Shared` lets
474/// multiple awaiters poll the same future without cloning the work. The
475/// inner `ResolverFuture` is `BoxFuture` (Send) on native and
476/// `LocalBoxFuture` (?Send) on WASM so we don't conflict with the
477/// `?Send` async traits the providers and cache backends use.
478///
479/// Provider invocation result carried inside the in-flight dedup future.
480/// Holds the DataTable (for caching) plus optional original batches.
481#[derive(Debug, Clone)]
482struct FetchWithBatches {
483    result: FetchResult,
484    batches: Option<FetchBatchResult>,
485}
486
487type SharedFetch = Shared<ResolverFuture<Result<FetchWithBatches, FetchError>>>;
488
489impl Default for Resolver {
490    fn default() -> Self {
491        Self::new()
492    }
493}
494
495impl std::fmt::Debug for Resolver {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        let persistent_state = {
498            let guard = self.persistent.read_lock("persistent cache");
499            match &*guard {
500                PersistentSlot::None => "none",
501                PersistentSlot::Ready(_) => "ready",
502                PersistentSlot::Deferred(_) => "deferred",
503            }
504        };
505        let provider_keys: Vec<String> = self
506            .providers
507            .read_lock("providers")
508            .keys()
509            .cloned()
510            .collect();
511        f.debug_struct("Resolver")
512            .field("memory", &self.memory)
513            .field("persistent", &persistent_state)
514            .field("providers", &provider_keys)
515            .finish_non_exhaustive()
516    }
517}
518
519impl Resolver {
520    /// New resolver with the default `MemoryBackend` as tier-1, no tier-2,
521    /// and no providers registered. `ChartML::new()` registers the built-in
522    /// `inline` + `http` providers immediately after construction.
523    pub fn new() -> Self {
524        let memory = SharedRef::new(MemoryBackend::new());
525        let primary: CacheBackendRef = memory.clone();
526        Self {
527            memory,
528            primary: Lock::new(primary),
529            persistent: Lock::new(PersistentSlot::None),
530            inflight: SharedRef::new(Lock::new(HashMap::new())),
531            providers: Lock::new(HashMap::new()),
532            hooks: Lock::new(None),
533            recently_invalidated: SharedRef::new(Lock::new(InvalidationTracker::default())),
534        }
535    }
536
537    /// Replace the tier-1 cache backend. Used by `ChartML::set_cache`.
538    /// The fresh backend starts empty — entries in the old backend are not
539    /// migrated (caller's responsibility if they want to).
540    pub fn set_primary_cache(&self, backend: CacheBackendRef) {
541        let mut guard = self.primary.write_lock("primary cache");
542        *guard = backend;
543    }
544
545    /// Set the tier-2 (persistent) cache to an already-constructed backend.
546    /// Immediately marks the slot as `Ready` — no lazy resolution needed.
547    pub fn set_persistent_cache(&self, backend: CacheBackendRef) {
548        let mut guard = self.persistent.write_lock("persistent cache");
549        *guard = PersistentSlot::Ready(backend);
550    }
551
552    /// Set the tier-2 (persistent) cache to a lazy factory. The factory is
553    /// invoked on the first `fetch()` call; until then the slot stays
554    /// `Deferred` and sync invalidation/shutdown paths skip tier-2 (the
555    /// backend doesn't exist yet, so there is nothing to invalidate).
556    ///
557    /// This exists because on WASM `IndexedDbBackend::new()` is async — it
558    /// can't be ready at `ChartML` construction time. The factory defers the
559    /// database open until the first fetch needs it.
560    #[cfg(not(target_arch = "wasm32"))]
561    pub fn set_persistent_cache_factory<F, Fut>(&self, factory: F)
562    where
563        F: FnOnce() -> Fut + Send + 'static,
564        Fut: std::future::Future<Output = Option<CacheBackendRef>> + Send + 'static,
565    {
566        let mut guard = self.persistent.write_lock("persistent cache");
567        *guard = PersistentSlot::Deferred(Some(Box::new(move || Box::pin(factory()))));
568    }
569
570    /// Set the tier-2 (persistent) cache to a lazy factory. The factory is
571    /// invoked on the first `fetch()` call; until then the slot stays
572    /// `Deferred` and sync invalidation/shutdown paths skip tier-2 (the
573    /// backend doesn't exist yet, so there is nothing to invalidate).
574    ///
575    /// WASM variant — no `Send` bound on the closure or its returned future,
576    /// matching the single-threaded `wasm32-unknown-unknown` runtime.
577    #[cfg(target_arch = "wasm32")]
578    pub fn set_persistent_cache_factory<F, Fut>(&self, factory: F)
579    where
580        F: FnOnce() -> Fut + 'static,
581        Fut: std::future::Future<Output = Option<CacheBackendRef>> + 'static,
582    {
583        let mut guard = self.persistent.write_lock("persistent cache");
584        *guard = PersistentSlot::Deferred(Some(Box::new(move || Box::pin(factory()))));
585    }
586
587    /// Enable IndexedDB-backed persistent caching. The database is opened
588    /// lazily on the first fetch. `db_name` is the IDB database name,
589    /// `namespace` isolates entries (e.g. per-workspace).
590    ///
591    /// This is a convenience wrapper around `set_persistent_cache_factory`
592    /// using the built-in `IndexedDbBackend`.
593    #[cfg(all(target_arch = "wasm32", feature = "wasm-indexeddb"))]
594    pub fn enable_indexeddb_cache(&self, db_name: &str, namespace: &str) {
595        let db = db_name.to_string();
596        let ns = namespace.to_string();
597        self.set_persistent_cache_factory(move || async move {
598            use crate::resolver::backends::indexeddb::IndexedDbBackend;
599            match IndexedDbBackend::new(&db, &ns).await {
600                Ok(backend) => Some(SharedRef::new(backend) as CacheBackendRef),
601                Err(e) => {
602                    tracing::warn!(error = %e, "IndexedDB cache unavailable");
603                    None
604                }
605            }
606        });
607    }
608
609    /// Register a [`ResolverHooks`] impl. Replaces any previously registered
610    /// hooks; passes the new impl in as a `HooksRef` (`Arc` on native, `Rc`
611    /// on WASM). After this call every `Resolver::fetch` invocation emits
612    /// progress / cache / error events through the new impl.
613    pub fn set_hooks(&self, hooks: HooksRef) {
614        let mut guard = self.hooks.write_lock("hooks");
615        *guard = Some(hooks);
616    }
617
618    /// Clear any previously registered hooks. Subsequent `fetch` calls
619    /// behave as if `set_hooks` had never been called (no-op emission).
620    pub fn clear_hooks(&self) {
621        let mut guard = self.hooks.write_lock("hooks");
622        *guard = None;
623    }
624
625    /// Snapshot the current hooks `HooksRef` (or `None`) so the resolver
626    /// can release the lock before entering any cache walk or `.await`.
627    /// Must be called once at the top of `fetch`; downstream sites use the
628    /// snapshot rather than re-acquiring the lock to keep the hook lock
629    /// off the hot path. Also called by `ChartML::transform` (one crate up)
630    /// at the top of the transform stage for the same reason.
631    pub(crate) fn hooks_snapshot(&self) -> Option<HooksRef> {
632        self.hooks.read_lock("hooks").clone()
633    }
634
635    /// Snapshot the tier-1 cache handle so we can drop the lock before the
636    /// async cache call. Always returns a backend (the field starts as
637    /// `MemoryBackend` and `set_primary_cache` only replaces, never clears).
638    fn primary_snapshot(&self) -> CacheBackendRef {
639        self.primary.read_lock("primary cache").clone()
640    }
641
642    /// Resolve the tier-2 cache, lazily initializing the factory if needed.
643    /// Returns the backend handle (or `None`). Used by `fetch()` — the only
644    /// path that should trigger lazy init.
645    ///
646    /// Lock discipline: the sync lock is NEVER held across an `.await`.
647    /// The factory closure is `take()`-d under a write lock, the lock is
648    /// dropped, then the factory's returned future is awaited. The result
649    /// is written back under a fresh write lock.
650    async fn resolve_persistent(&self) -> Option<CacheBackendRef> {
651        // Fast path — no write lock needed
652        {
653            let guard = self.persistent.read_lock("persistent cache");
654            match &*guard {
655                PersistentSlot::None => return None,
656                PersistentSlot::Ready(b) => return Some(b.clone()),
657                PersistentSlot::Deferred(_) => {} // fall through to slow path
658            }
659        }
660
661        // Slow path — take the factory under write lock, release, then await
662        let factory = {
663            let mut guard = self.persistent.write_lock("persistent cache");
664            match &mut *guard {
665                PersistentSlot::Deferred(opt) => opt.take(),
666                PersistentSlot::Ready(b) => return Some(b.clone()),
667                PersistentSlot::None => return None,
668            }
669        };
670
671        let Some(factory) = factory else {
672            // Another caller already took the factory; skip tier-2 for this fetch
673            return None;
674        };
675
676        let result = factory().await;
677        let mut guard = self.persistent.write_lock("persistent cache");
678        match &result {
679            Some(backend) => *guard = PersistentSlot::Ready(backend.clone()),
680            None => *guard = PersistentSlot::None,
681        }
682        result
683    }
684
685    /// Sync snapshot of the tier-2 cache — returns `Some` only if the slot
686    /// is already `Ready`. Does NOT trigger lazy initialization. Used by
687    /// invalidation and shutdown paths where we must not start an async
688    /// factory (the backend doesn't exist yet, so there is nothing to
689    /// invalidate or shut down).
690    fn persistent_snapshot_if_ready(&self) -> Option<CacheBackendRef> {
691        let guard = self.persistent.read_lock("persistent cache");
692        match &*guard {
693            PersistentSlot::Ready(b) => Some(b.clone()),
694            _ => None,
695        }
696    }
697
698    /// Register a provider under a dispatch key (`"inline"`, `"http"`,
699    /// `"datasource"`, or any custom slug). Re-registration replaces the
700    /// previously registered provider for that key.
701    pub fn register_provider(&self, kind: &str, provider: Arc<dyn DataSourceProvider>) {
702        let mut providers = self.providers.write_lock("providers");
703        providers.insert(kind.to_string(), provider);
704    }
705
706    /// Snapshot the registered provider kinds. Useful for tests and
707    /// host-app diagnostics.
708    pub fn provider_kinds(&self) -> Vec<String> {
709        self.providers
710            .read_lock("providers")
711            .keys()
712            .cloned()
713            .collect()
714    }
715
716    /// Compute the cache key the resolver would use for a given spec.
717    ///
718    /// Public so phase 4 (Leptos refresh button) and phase 6 (Kyomi
719    /// invalidate-on-change) can compute the exact key the resolver caches
720    /// under without re-implementing the hash.
721    ///
722    /// Hashes `(namespace, datasource, query, url, provider, rows_hash)` in
723    /// that order. `None` fields contribute a sentinel byte (`0xFE`, an
724    /// invalid UTF-8 start byte) so they cannot collide with a real string
725    /// value of `"None"`. Field separator is `0xFF` (also invalid UTF-8) so
726    /// adjacent fields can't bleed into each other ("a|b" vs "ab|").
727    pub fn key_for(spec: &InlineData, namespace: Option<&str>) -> u64 {
728        let mut hasher = Xxh3::new();
729
730        let fields: [Option<&str>; 5] = [
731            namespace,
732            spec.datasource.as_deref(),
733            spec.query.as_deref(),
734            spec.url.as_deref(),
735            spec.provider.as_deref(),
736        ];
737        for field in fields {
738            match field {
739                Some(s) => hasher.update(s.as_bytes()),
740                None => hasher.update(&[HASH_NONE_SENTINEL]),
741            }
742            hasher.update(&[HASH_FIELD_SEP]);
743        }
744
745        // Inline `rows` is hashed via its canonical JSON serialization so
746        // two specs with the same row data hash identically regardless of
747        // hashmap iteration order. `None` rows contribute the sentinel.
748        match spec.rows.as_ref() {
749            Some(rows) => match serde_json::to_vec(rows) {
750                Ok(bytes) => hasher.update(&bytes),
751                Err(_) => hasher.update(&[HASH_NONE_SENTINEL]),
752            },
753            None => hasher.update(&[HASH_NONE_SENTINEL]),
754        }
755
756        hasher.digest()
757    }
758
759    /// The core orchestration: tier-1 → tier-2 → in-flight dedup → provider.
760    ///
761    /// Returns `ResolveOutcome` (result + cache-hit flag) so the caller can
762    /// classify the source under `cache_hits` vs `cache_misses` in
763    /// `FetchMetadata`. Tier-1 hits hydrate from memory only; tier-2 hits
764    /// also re-populate tier-1 so subsequent reads in the same session
765    /// short-circuit.
766    pub async fn fetch(
767        &self,
768        key: u64,
769        request: FetchRequest,
770    ) -> Result<ResolveOutcome, FetchError> {
771        let primary = self.primary_snapshot();
772        let persistent = self.resolve_persistent().await;
773        // Snapshot the hooks `HooksRef` ONCE up front so the hooks lock is
774        // never held across an `.await` and downstream sites can share the
775        // same clone without re-acquiring.
776        let hooks = self.hooks_snapshot();
777        let source_name = request.source_name.clone();
778
779        // ── Tier 1: in-process cache ──
780        let mut tier1_expired = false;
781        if let Some(entry) = primary.get(key).await {
782            if !entry.is_expired() {
783                let age = entry.age();
784                emit_cache_hit(&hooks, key, &source_name, hooks::CacheTier::Memory, age);
785                let mut metadata = entry.metadata;
786                inject_fetched_at(&mut metadata, &entry.fetched_at);
787                return Ok(ResolveOutcome {
788                    result: FetchResult {
789                        data: entry.data,
790                        metadata,
791                    },
792                    cache_hit: true,
793                    batches: None,
794                });
795            }
796            // Expired — let it fall through; we'll overwrite on success.
797            tier1_expired = true;
798        }
799
800        // ── Tier 2: persistent cache (phase 3b populates this) ──
801        let mut tier2_expired = false;
802        if let Some(p) = &persistent {
803            if let Some(entry) = p.get(key).await {
804                if !entry.is_expired() {
805                    let age = entry.age();
806                    // Hydrate tier-1 so subsequent reads stay in-process.
807                    // Memory hydration is silent (no event) — only the
808                    // logical tier-2 hit fires.
809                    let _ = primary.put(key, entry.clone()).await;
810                    emit_cache_hit(
811                        &hooks,
812                        key,
813                        &source_name,
814                        hooks::CacheTier::Persistent,
815                        age,
816                    );
817                    let mut metadata = entry.metadata;
818                    inject_fetched_at(&mut metadata, &entry.fetched_at);
819                    return Ok(ResolveOutcome {
820                        result: FetchResult {
821                            data: entry.data,
822                            metadata,
823                        },
824                        cache_hit: true,
825                        batches: None,
826                    });
827                }
828                // Expired — evict from tier-2 too.
829                let _ = p.invalidate(key).await;
830                tier2_expired = true;
831            }
832        }
833
834        // Both tiers missed — emit one cache-miss with the most specific
835        // reason we can prove. Precedence: `Invalidated` (most specific —
836        // an operator explicitly cleared this key or fired a bulk
837        // invalidate) wins over `Expired` (TTL elapsed naturally), which
838        // wins over `NotFound` (key was never cached or was evicted by
839        // some path the resolver doesn't track).
840        let miss_reason = if self.consume_invalidation_reason(key) {
841            hooks::MissReason::Invalidated
842        } else if tier1_expired || tier2_expired {
843            hooks::MissReason::Expired
844        } else {
845            hooks::MissReason::NotFound
846        };
847        emit_cache_miss(&hooks, key, &source_name, miss_reason);
848
849        // Provider call start — fire BEFORE the dedup wait so consumers
850        // see a "fetching" event for every distinct cache miss, not just
851        // the first concurrent one.
852        emit_progress(
853            &hooks,
854            hooks::Phase::Fetch,
855            &source_name,
856            None,
857            None,
858            format!(
859                "Fetching {}",
860                source_name.as_deref().unwrap_or("source"),
861            ),
862        );
863
864        // ── In-flight dedup ──
865        // If another fetch for the same key is already in flight, await its
866        // shared future. Otherwise install a new shared future and run it.
867        let shared = self.intern_inflight(key, request, primary, persistent);
868        let result = shared.await;
869
870        // Cleanup: drop the inflight slot whether the fetch succeeded or
871        // failed so subsequent calls re-dispatch instead of replaying a
872        // stale terminal state. Errors clear too — failed fetches should
873        // retry, not stick.
874        self.inflight.write_lock("inflight").remove(&key);
875
876        match result {
877            Ok(fetched) => {
878                let row_count = fetched.result.data.num_rows();
879                emit_progress(
880                    &hooks,
881                    hooks::Phase::Fetch,
882                    &source_name,
883                    Some(row_count as u64),
884                    None,
885                    format!(
886                        "Fetched {} ({} rows)",
887                        source_name.as_deref().unwrap_or("source"),
888                        row_count,
889                    ),
890                );
891                Ok(ResolveOutcome {
892                    result: fetched.result,
893                    cache_hit: false,
894                    batches: fetched.batches,
895                })
896            }
897            Err(err) => {
898                emit_error(&hooks, hooks::Phase::Fetch, &source_name, err.to_string());
899                Err(err)
900            }
901        }
902    }
903
904    /// Get-or-insert the in-flight `Shared` future for a key. Returns a
905    /// clone of the `Shared` so the caller can `await` it; the original
906    /// stays in the map until removed by the caller.
907    fn intern_inflight(
908        &self,
909        key: u64,
910        request: FetchRequest,
911        primary: CacheBackendRef,
912        persistent: Option<CacheBackendRef>,
913    ) -> SharedFetch {
914        let mut inflight = self.inflight.write_lock("inflight");
915        if let Some(existing) = inflight.get(&key) {
916            return existing.clone();
917        }
918
919        // Build the work future: dispatch to the right provider, write to
920        // both cache tiers on success. The future is `'static` because it
921        // owns everything it touches via `Arc` clones.
922        let providers = self.snapshot_providers();
923        let cache_cfg = request.cache.clone();
924        let namespace = request.namespace.clone();
925        let slug = request.spec.datasource.clone();
926
927        let work = async move {
928            let provider = dispatch_provider(&providers, &request.spec)?;
929            let batch_result = provider.fetch_batches(request).await?;
930
931            // Concat batches → DataTable for caching.
932            let data = if batch_result.batches.is_empty() {
933                DataTable::from_record_batch(RecordBatch::new_empty(
934                    Arc::clone(&batch_result.schema),
935                ))
936            } else {
937                let concat = arrow::compute::concat_batches(
938                    &batch_result.schema,
939                    &batch_result.batches,
940                )
941                .map_err(|e| FetchError::DecodeFailed(format!("concat batches: {e}")))?;
942                DataTable::from_record_batch(concat)
943            };
944
945            let fetch_result = FetchResult {
946                data: data.clone(),
947                metadata: batch_result.metadata.clone(),
948            };
949
950            // Write-through: tier-1 always; tier-2 if configured.
951            let entry = CachedEntry {
952                data,
953                fetched_at: SystemTime::now(),
954                ttl: cache_cfg
955                    .as_ref()
956                    .and_then(|c| c.ttl_duration())
957                    .unwrap_or(DEFAULT_TTL),
958                tags: build_tags(slug.as_deref(), namespace.as_deref()),
959                metadata: batch_result.metadata.clone(),
960            };
961            let _ = primary.put(key, entry.clone()).await;
962            if let Some(p) = &persistent {
963                let _ = p.put(key, entry).await;
964            }
965            Ok(FetchWithBatches {
966                result: fetch_result,
967                batches: Some(batch_result),
968            })
969        };
970
971        #[cfg(not(target_arch = "wasm32"))]
972        let boxed: ResolverFuture<Result<FetchWithBatches, FetchError>> = work.boxed();
973        #[cfg(target_arch = "wasm32")]
974        let boxed: ResolverFuture<Result<FetchWithBatches, FetchError>> = work.boxed_local();
975
976        let future = boxed.shared();
977        inflight.insert(key, future.clone());
978        future
979    }
980
981    /// Snapshot the provider map for use inside the inflight future. We
982    /// snapshot rather than holding the lock so the future stays `'static`.
983    fn snapshot_providers(&self) -> HashMap<String, Arc<dyn DataSourceProvider>> {
984        self.providers.read_lock("providers").clone()
985    }
986
987    // ── Bulk invalidation API ──
988
989    /// Drop a single entry from every cache tier. The next miss on `key`
990    /// will be reported via [`hooks::ResolverHooks::on_cache_miss`] with
991    /// [`hooks::MissReason::Invalidated`] rather than `NotFound`.
992    pub async fn invalidate(&self, key: u64) {
993        let primary = self.primary_snapshot();
994        let persistent = self.persistent_snapshot_if_ready();
995        let _ = primary.invalidate(key).await;
996        if let Some(p) = &persistent {
997            let _ = p.invalidate(key).await;
998        }
999        self.recently_invalidated
1000            .write_lock("recently_invalidated")
1001            .keys
1002            .insert(key);
1003    }
1004
1005    /// Drop every cached entry across all tiers. The very next miss on any
1006    /// key will be reported as `Invalidated`; subsequent misses fall back
1007    /// to the regular `NotFound` / `Expired` reasoning. See the field-level
1008    /// docs on [`Resolver::recently_invalidated`] for why per-key tracking
1009    /// isn't done here (would require a `keys()` method on every backend).
1010    pub async fn invalidate_all(&self) {
1011        let primary = self.primary_snapshot();
1012        let persistent = self.persistent_snapshot_if_ready();
1013        let _ = primary.clear().await;
1014        if let Some(p) = &persistent {
1015            let _ = p.clear().await;
1016        }
1017        self.mark_bulk_invalidated();
1018    }
1019
1020    /// Drop every entry whose source spec carried the given `datasource`
1021    /// slug. Useful for "datasource X was edited; invalidate all queries
1022    /// against it" workflows. Subject to the same single-shot
1023    /// `Invalidated` reporting as [`Resolver::invalidate_all`].
1024    pub async fn invalidate_by_slug(&self, slug: &str) {
1025        let tag = format!("{TAG_SLUG_PREFIX}{slug}");
1026        let primary = self.primary_snapshot();
1027        let persistent = self.persistent_snapshot_if_ready();
1028        let _ = primary.invalidate_by_tag(&tag).await;
1029        if let Some(p) = &persistent {
1030            let _ = p.invalidate_by_tag(&tag).await;
1031        }
1032        self.mark_bulk_invalidated();
1033    }
1034
1035    /// Drop every entry tagged with the given namespace. Used for tenant
1036    /// isolation flows ("user logged out; clear their cached data").
1037    /// Subject to the same single-shot `Invalidated` reporting as
1038    /// [`Resolver::invalidate_all`].
1039    pub async fn invalidate_by_namespace(&self, namespace: &str) {
1040        let tag = format!("{TAG_NAMESPACE_PREFIX}{namespace}");
1041        let primary = self.primary_snapshot();
1042        let persistent = self.persistent_snapshot_if_ready();
1043        let _ = primary.invalidate_by_tag(&tag).await;
1044        if let Some(p) = &persistent {
1045            let _ = p.invalidate_by_tag(&tag).await;
1046        }
1047        self.mark_bulk_invalidated();
1048    }
1049
1050    /// Set the bulk-pending flag so the next post-bulk miss surfaces as
1051    /// [`hooks::MissReason::Invalidated`]. Synchronous; the lock is never
1052    /// held across an `.await`.
1053    fn mark_bulk_invalidated(&self) {
1054        self.recently_invalidated
1055            .write_lock("recently_invalidated")
1056            .bulk_pending = true;
1057    }
1058
1059    /// Check whether a miss for `key` should be reported as
1060    /// [`hooks::MissReason::Invalidated`]. Drains the per-key entry on
1061    /// first observation and consumes the bulk-pending flag at most once
1062    /// per bulk invalidation, so this returns `true` for at most one miss
1063    /// per `invalidate*` call. Called from the hot fetch path so the lock
1064    /// is taken briefly and never held across an `.await`.
1065    fn consume_invalidation_reason(&self, key: u64) -> bool {
1066        let mut tracker = self
1067            .recently_invalidated
1068            .write_lock("recently_invalidated");
1069        if tracker.keys.remove(&key) {
1070            return true;
1071        }
1072        if tracker.bulk_pending {
1073            tracker.bulk_pending = false;
1074            return true;
1075        }
1076        false
1077    }
1078
1079    /// Iterate every registered provider AND cache backend, awaiting their
1080    /// `shutdown` hook in turn. Wired up by `ChartML::shutdown()`.
1081    pub async fn shutdown(&self) {
1082        let providers = self.snapshot_providers();
1083        for (_, provider) in providers {
1084            provider.shutdown().await;
1085        }
1086        let primary = self.primary_snapshot();
1087        primary.shutdown().await;
1088        if let Some(p) = self.persistent_snapshot_if_ready() {
1089            p.shutdown().await;
1090        }
1091    }
1092}
1093
1094/// Emit a [`hooks::CacheHitEvent`] through the registered hook impl, if
1095/// any. Fire-and-forget via `spawn_hook`; never blocks the resolver.
1096fn emit_cache_hit(
1097    hooks: &Option<HooksRef>,
1098    key: u64,
1099    source_name: &Option<String>,
1100    tier: hooks::CacheTier,
1101    age: Duration,
1102) {
1103    let Some(h) = hooks.as_ref() else { return };
1104    let h = h.clone();
1105    let event = hooks::CacheHitEvent {
1106        key,
1107        source_name: source_name.clone(),
1108        tier,
1109        age,
1110    };
1111    hooks::spawn_hook(async move {
1112        h.on_cache_hit(event).await;
1113    });
1114}
1115
1116/// Emit a [`hooks::CacheMissEvent`] through the registered hook impl.
1117fn emit_cache_miss(
1118    hooks: &Option<HooksRef>,
1119    key: u64,
1120    source_name: &Option<String>,
1121    reason: hooks::MissReason,
1122) {
1123    let Some(h) = hooks.as_ref() else { return };
1124    let h = h.clone();
1125    let event = hooks::CacheMissEvent {
1126        key,
1127        source_name: source_name.clone(),
1128        reason,
1129    };
1130    hooks::spawn_hook(async move {
1131        h.on_cache_miss(event).await;
1132    });
1133}
1134
1135/// Emit a [`hooks::ProgressEvent`] through the registered hook impl.
1136/// `pub(crate)` so `ChartML::transform` / `render_prepared_to_svg` can
1137/// emit transform/render-phase progress without re-implementing the
1138/// snapshot dance.
1139pub(crate) fn emit_progress(
1140    hooks: &Option<HooksRef>,
1141    phase: hooks::Phase,
1142    source_name: &Option<String>,
1143    loaded: Option<u64>,
1144    total: Option<u64>,
1145    message: String,
1146) {
1147    let Some(h) = hooks.as_ref() else { return };
1148    let h = h.clone();
1149    let event = hooks::ProgressEvent {
1150        phase,
1151        source_name: source_name.clone(),
1152        loaded,
1153        total,
1154        message,
1155    };
1156    hooks::spawn_hook(async move {
1157        h.on_progress(event).await;
1158    });
1159}
1160
1161/// Emit a [`hooks::ErrorEvent`] through the registered hook impl.
1162/// `pub(crate)` so `ChartML::transform` can emit transform-phase errors.
1163pub(crate) fn emit_error(
1164    hooks: &Option<HooksRef>,
1165    phase: hooks::Phase,
1166    source_name: &Option<String>,
1167    error: String,
1168) {
1169    let Some(h) = hooks.as_ref() else { return };
1170    let h = h.clone();
1171    let event = hooks::ErrorEvent {
1172        phase,
1173        source_name: source_name.clone(),
1174        error,
1175    };
1176    hooks::spawn_hook(async move {
1177        h.on_error(event).await;
1178    });
1179}
1180
1181/// Inject `fetched_at_ms` into provider metadata on cache hits so consumers
1182/// can display the actual data age rather than "just now."
1183fn inject_fetched_at(metadata: &mut HashMap<String, serde_json::Value>, fetched_at: &SystemTime) {
1184    if let Ok(d) = fetched_at.duration_since(SystemTime::UNIX_EPOCH) {
1185        metadata.insert(
1186            "fetched_at_ms".to_string(),
1187            serde_json::Value::from(d.as_millis() as f64),
1188        );
1189    }
1190}
1191
1192/// Build the tag list applied to `CachedEntry` on write. `slug` and
1193/// `namespace` are optional — entries without one of them simply skip the
1194/// corresponding tag (no empty-string tag pollution).
1195fn build_tags(slug: Option<&str>, namespace: Option<&str>) -> Vec<String> {
1196    let mut tags = Vec::new();
1197    if let Some(slug) = slug {
1198        tags.push(format!("{TAG_SLUG_PREFIX}{slug}"));
1199    }
1200    if let Some(ns) = namespace {
1201        tags.push(format!("{TAG_NAMESPACE_PREFIX}{ns}"));
1202    }
1203    tags
1204}
1205
1206/// Apply the design-doc dispatch routing. Precedence: explicit `provider`
1207/// key wins over inferred shape (rows → inline, url → http, datasource →
1208/// datasource). Returns the `Arc` so the caller can `await` the trait
1209/// method without holding any lock.
1210fn dispatch_provider(
1211    providers: &HashMap<String, Arc<dyn DataSourceProvider>>,
1212    spec: &InlineData,
1213) -> Result<Arc<dyn DataSourceProvider>, FetchError> {
1214    let kind = if let Some(kind) = spec.provider.as_deref() {
1215        kind
1216    } else if spec.rows.is_some() {
1217        "inline"
1218    } else if spec.url.is_some() {
1219        "http"
1220    } else if spec.datasource.is_some() {
1221        "datasource"
1222    } else {
1223        return Err(FetchError::Other(
1224            "no dispatch match for spec — needs one of `provider`, `rows`, `url`, or `datasource`"
1225                .to_string(),
1226        ));
1227    };
1228
1229    providers
1230        .get(kind)
1231        .cloned()
1232        .ok_or_else(|| FetchError::ProviderNotFound {
1233            kind: kind.to_string(),
1234        })
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239    #![allow(clippy::unwrap_used)]
1240    use super::*;
1241
1242    fn empty_inline() -> InlineData {
1243        InlineData {
1244            provider: None,
1245            rows: None,
1246            url: None,
1247            endpoint: None,
1248            cache: None,
1249            datasource: None,
1250            query: None,
1251        }
1252    }
1253
1254    #[test]
1255    fn key_for_is_deterministic() {
1256        let spec = InlineData {
1257            datasource: Some("warehouse".into()),
1258            query: Some("SELECT 1".into()),
1259            ..empty_inline()
1260        };
1261        let k1 = Resolver::key_for(&spec, Some("ns"));
1262        let k2 = Resolver::key_for(&spec, Some("ns"));
1263        assert_eq!(k1, k2);
1264    }
1265
1266    #[test]
1267    fn key_for_namespace_changes_key() {
1268        let spec = InlineData {
1269            datasource: Some("warehouse".into()),
1270            ..empty_inline()
1271        };
1272        let k1 = Resolver::key_for(&spec, Some("tenant-a"));
1273        let k2 = Resolver::key_for(&spec, Some("tenant-b"));
1274        assert_ne!(k1, k2);
1275    }
1276
1277    #[test]
1278    fn key_for_none_distinguishes_from_literal_none_string() {
1279        let spec_none = InlineData {
1280            datasource: None,
1281            url: Some("https://x".into()),
1282            ..empty_inline()
1283        };
1284        let spec_literal = InlineData {
1285            datasource: Some("None".into()),
1286            url: Some("https://x".into()),
1287            ..empty_inline()
1288        };
1289        // Sentinel byte must NOT collide with the literal "None" string.
1290        assert_ne!(
1291            Resolver::key_for(&spec_none, None),
1292            Resolver::key_for(&spec_literal, None),
1293        );
1294    }
1295
1296    #[test]
1297    fn key_for_field_separator_prevents_bleed() {
1298        // `(datasource="ab", query=None)` must hash differently from
1299        // `(datasource="a", query="b")` — without a separator they'd both
1300        // serialize to "ab".
1301        let merged = InlineData {
1302            datasource: Some("ab".into()),
1303            ..empty_inline()
1304        };
1305        let split = InlineData {
1306            datasource: Some("a".into()),
1307            query: Some("b".into()),
1308            ..empty_inline()
1309        };
1310        assert_ne!(
1311            Resolver::key_for(&merged, None),
1312            Resolver::key_for(&split, None),
1313        );
1314    }
1315
1316    #[test]
1317    fn dispatch_provider_precedence_explicit_wins() {
1318        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1319            "custom".to_string(),
1320            Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1321        )]
1322        .into_iter()
1323        .collect();
1324        let spec = InlineData {
1325            provider: Some("custom".into()),
1326            // rows would normally route to "inline", but explicit wins.
1327            rows: Some(vec![]),
1328            ..empty_inline()
1329        };
1330        // We can't compare trait objects, but `is_ok()` is sufficient: the
1331        // ONLY registered provider key is "custom", so reaching `Ok` means
1332        // dispatch picked it. Routing rows alone (no `provider`) would
1333        // search for "inline", which is unregistered → would return
1334        // `ProviderNotFound`.
1335        assert!(dispatch_provider(&providers, &spec).is_ok());
1336    }
1337
1338    #[test]
1339    fn dispatch_provider_inferred_inline() {
1340        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1341            "inline".to_string(),
1342            Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1343        )]
1344        .into_iter()
1345        .collect();
1346        let spec = InlineData {
1347            rows: Some(vec![]),
1348            ..empty_inline()
1349        };
1350        assert!(dispatch_provider(&providers, &spec).is_ok());
1351    }
1352
1353    #[test]
1354    fn dispatch_provider_missing_kind_errors() {
1355        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1356        let spec = InlineData {
1357            datasource: Some("warehouse".into()),
1358            ..empty_inline()
1359        };
1360        let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1361        assert!(matches!(err, FetchError::ProviderNotFound { ref kind } if kind == "datasource"));
1362    }
1363
1364    #[test]
1365    fn dispatch_provider_unmatched_spec_errors() {
1366        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1367        let spec = empty_inline();
1368        let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1369        assert!(matches!(err, FetchError::Other(_)));
1370    }
1371
1372    // ── PersistentSlot / factory tests ──────────────────────────────────
1373
1374    /// Minimal `CacheBackend` impl for testing the persistent slot plumbing.
1375    struct MockBackend;
1376
1377    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1378    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1379    impl CacheBackend for MockBackend {
1380        async fn get(&self, _key: u64) -> Option<CachedEntry> {
1381            None
1382        }
1383        async fn put(&self, _key: u64, _entry: CachedEntry) -> Result<(), CacheError> {
1384            Ok(())
1385        }
1386        async fn invalidate(&self, _key: u64) -> Result<(), CacheError> {
1387            Ok(())
1388        }
1389        async fn invalidate_by_tag(&self, _tag: &str) -> Result<(), CacheError> {
1390            Ok(())
1391        }
1392        async fn clear(&self) -> Result<(), CacheError> {
1393            Ok(())
1394        }
1395        async fn shutdown(&self) {}
1396    }
1397
1398    #[test]
1399    fn test_persistent_slot_default_is_none() {
1400        let resolver = Resolver::new();
1401        assert!(resolver.persistent_snapshot_if_ready().is_none());
1402    }
1403
1404    #[test]
1405    fn test_set_persistent_cache_still_works() {
1406        let resolver = Resolver::new();
1407        let backend: CacheBackendRef = SharedRef::new(MockBackend);
1408        resolver.set_persistent_cache(backend);
1409        assert!(resolver.persistent_snapshot_if_ready().is_some());
1410    }
1411
1412    #[test]
1413    fn test_persistent_cache_factory_stores_deferred() {
1414        let resolver = Resolver::new();
1415        resolver.set_persistent_cache_factory(|| async {
1416            Some(SharedRef::new(MockBackend) as CacheBackendRef)
1417        });
1418        // Factory hasn't fired yet — sync snapshot must be None
1419        assert!(resolver.persistent_snapshot_if_ready().is_none());
1420    }
1421
1422    #[tokio::test]
1423    async fn test_persistent_cache_factory_resolves_on_fetch() {
1424        let resolver = Resolver::new();
1425        resolver.set_persistent_cache_factory(|| async {
1426            Some(SharedRef::new(MockBackend) as CacheBackendRef)
1427        });
1428        // resolve_persistent triggers the factory
1429        let result = resolver.resolve_persistent().await;
1430        assert!(result.is_some());
1431        // After resolution the slot is Ready — sync snapshot works now
1432        assert!(resolver.persistent_snapshot_if_ready().is_some());
1433    }
1434
1435    #[tokio::test]
1436    async fn test_persistent_cache_factory_none_result() {
1437        let resolver = Resolver::new();
1438        resolver.set_persistent_cache_factory(|| async { None });
1439        let result = resolver.resolve_persistent().await;
1440        assert!(result.is_none());
1441        // After a None factory result the slot becomes None — subsequent
1442        // resolve_persistent calls return None without re-running the factory
1443        let result2 = resolver.resolve_persistent().await;
1444        assert!(result2.is_none());
1445        assert!(resolver.persistent_snapshot_if_ready().is_none());
1446    }
1447
1448    #[test]
1449    fn test_persistent_cache_factory_replaces_previous() {
1450        let resolver = Resolver::new();
1451        // Set a ready backend
1452        let backend: CacheBackendRef = SharedRef::new(MockBackend);
1453        resolver.set_persistent_cache(backend);
1454        assert!(resolver.persistent_snapshot_if_ready().is_some());
1455
1456        // Now set a factory — it replaces the ready backend
1457        resolver.set_persistent_cache_factory(|| async {
1458            Some(SharedRef::new(MockBackend) as CacheBackendRef)
1459        });
1460        // Sync snapshot returns None because the slot is now Deferred
1461        assert!(resolver.persistent_snapshot_if_ready().is_none());
1462    }
1463}