Skip to main content

chartml_core/resolver/
mod.rs

1//! Resolver: caching + dedup + dispatch layer between `ChartML::fetch` and
2//! the registered `DataSourceProvider` implementations (chartml 5.0 phase 3).
3//!
4//! The resolver owns:
5//! - **Tier-1 cache** — a `MemoryBackend` always present.
6//! - **Tier-2 cache** — a [`PersistentSlot`] that is either `None` (no
7//!   persistent cache), `Ready` (backend constructed), or `Deferred`
8//!   (lazy factory awaiting first `fetch()` to initialize — used for
9//!   `IndexedDbBackend` whose async constructor can't run at setup time).
10//! - **In-flight tracker** — a `HashMap<u64, Shared<BoxFuture<...>>>` so two
11//!   concurrent fetches for the same key share one provider invocation.
12//! - **Provider registry** — `HashMap<String, Arc<dyn DataSourceProvider>>`,
13//!   keyed by dispatch slug (`"inline"`, `"http"`, `"datasource"`, …).
14//!
15//! Phase 3 leaves `ResolverHooks` integration as a no-op stub; phase 3c will
16//! add hook dispatch without further changes to this file's structure.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use arrow::array::RecordBatch;
22use arrow::datatypes::SchemaRef;
23// `web_time::SystemTime` works on `wasm32-unknown-unknown` (where the std
24// version panics). On native it's a transparent alias for `std::time`.
25use std::time::Duration;
26use web_time::SystemTime;
27
28use async_trait::async_trait;
29use futures::future::{FutureExt, Shared};
30use thiserror::Error;
31use xxhash_rust::xxh3::Xxh3;
32
33// `?Send` on WASM means the futures we shove into `Shared` cannot promise
34// `Send`. Use `LocalBoxFuture` on WASM and `BoxFuture` (which IS `Send`) on
35// native so the resolver can stay multi-threaded on tokio while compiling
36// cleanly under `wasm32-unknown-unknown`.
37#[cfg(not(target_arch = "wasm32"))]
38type ResolverFuture<T> = futures::future::BoxFuture<'static, T>;
39#[cfg(target_arch = "wasm32")]
40type ResolverFuture<T> = futures::future::LocalBoxFuture<'static, T>;
41
42// Cfg-gated shared-ownership + interior-mutability primitives. On native we
43// use `Arc<Mutex<T>>` so the resolver stays multi-threaded for tokio. On
44// WASM the resolver's inflight `Shared<LocalBoxFuture<...>>` map is
45// inherently `?Send`, so wrapping it in `Arc<Mutex<...>>` would trip
46// `clippy::arc_with_non_send_sync`. Single-threaded `Rc<RefCell<...>>` is
47// the correct primitive for the wasm32-unknown-unknown target.
48/// Cfg-gated shared-ownership pointer. `Arc<T>` on native (so handles can
49/// move across `tokio::spawn` task boundaries), `Rc<T>` on WASM (where
50/// `wasm32-unknown-unknown` is single-threaded and the resolver's
51/// internals are `?Send`, so an `Arc` would be both incorrect and
52/// rejected by `clippy::arc_with_non_send_sync`).
53///
54/// Public so consumers wiring [`CacheBackend`] / [`DataSourceProvider`]
55/// trait objects through the resolver can use the same alias as the
56/// resolver's internal storage — their wasm32-unknown-unknown builds get
57/// the right primitive without a manual `cfg_attr` dance at every site.
58#[cfg(not(target_arch = "wasm32"))]
59pub type SharedRef<T> = Arc<T>;
60#[cfg(target_arch = "wasm32")]
61pub type SharedRef<T> = std::rc::Rc<T>;
62
63#[cfg(not(target_arch = "wasm32"))]
64pub(crate) type Lock<T> = std::sync::Mutex<T>;
65#[cfg(target_arch = "wasm32")]
66pub(crate) type Lock<T> = std::cell::RefCell<T>;
67
68/// Cfg-gated extension trait so `self.field.write_lock("…").something()` and
69/// `self.field.read_lock("…").something()` work uniformly across the native
70/// `Mutex` and the WASM `RefCell`. Eliminates per-call-site `cfg` blocks
71/// without losing the panic message that distinguishes which lock failed.
72trait LockExt<T> {
73    type Read<'a>: std::ops::Deref<Target = T>
74    where
75        Self: 'a;
76    type Write<'a>: std::ops::DerefMut<Target = T>
77    where
78        Self: 'a;
79    fn read_lock(&self, label: &'static str) -> Self::Read<'_>;
80    fn write_lock(&self, label: &'static str) -> Self::Write<'_>;
81}
82
83#[cfg(not(target_arch = "wasm32"))]
84impl<T> LockExt<T> for std::sync::Mutex<T> {
85    type Read<'a>
86        = std::sync::MutexGuard<'a, T>
87    where
88        Self: 'a;
89    type Write<'a>
90        = std::sync::MutexGuard<'a, T>
91    where
92        Self: 'a;
93    fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
94        self.lock()
95            .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
96    }
97    fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
98        self.lock()
99            .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
100    }
101}
102
103#[cfg(target_arch = "wasm32")]
104impl<T> LockExt<T> for std::cell::RefCell<T> {
105    type Read<'a>
106        = std::cell::Ref<'a, T>
107    where
108        Self: 'a;
109    type Write<'a>
110        = std::cell::RefMut<'a, T>
111    where
112        Self: 'a;
113    fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
114        self.try_borrow()
115            .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed mutably"))
116    }
117    fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
118        self.try_borrow_mut()
119            .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed"))
120    }
121}
122
123use crate::data::DataTable;
124use crate::error::ChartError;
125use crate::spec::source::CacheConfig as SpecCacheConfig;
126use crate::spec::InlineData;
127
128pub mod builtin;
129pub mod cache;
130pub mod cancel;
131pub mod hooks;
132
133// Phase 3b: persistent cache backends. The container module is unconditional
134// because it hosts the pure-Rust `codec` submodule (shared blob framing) that
135// needs to compile + test on every target. Backend implementations themselves
136// (`indexeddb`, future `sqlite`/`fs`/…) carry their own `#[cfg(...)]` gates
137// inside `backends/mod.rs`, so non-browser builds and browser builds without
138// the `wasm-indexeddb` feature don't compile any of the IndexedDB-specific
139// code (which depends on the `idb` crate, brought in only by that feature).
140pub mod backends;
141
142pub use builtin::{HttpProvider, InlineProvider};
143pub use cache::{CacheBackend, CacheError, CachedEntry, MemoryBackend};
144pub use cancel::CancellationToken;
145pub use hooks::{
146    CacheHitEvent, CacheMissEvent, CacheTier, ErrorEvent, HooksRef, MissReason, NullHooks, Phase,
147    ProgressEvent, ResolverHooks,
148};
149
150/// Default TTL applied when a spec doesn't declare one. Five minutes matches
151/// the JS middleware's `CACHE_TTL_DEFAULT_MS`.
152pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
153
154/// Sentinel byte mixed into the hash for `None` fields. Distinct from any
155/// UTF-8 byte (0xFE is invalid as a UTF-8 start byte) so `None` can never
156/// collide with a real string value (e.g., a literal `"None"` datasource).
157const HASH_NONE_SENTINEL: u8 = 0xFE;
158const HASH_FIELD_SEP: u8 = 0xFF;
159
160/// Provider dispatch + fetch trait. Host apps implement this for their
161/// custom data sources (BigQuery, Snowflake, internal REST APIs, …) and
162/// register them via `ChartML::register_provider(kind, provider)`.
163///
164/// `?Send` on WASM matches `TransformMiddleware` so single-threaded
165/// browser environments can hold non-`Send` state inside provider impls.
166#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
167#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
168pub trait DataSourceProvider: Send + Sync {
169    /// Fetch one source. The resolver handles caching + dedup + parallelism;
170    /// providers only see this single call per actual upstream invocation.
171    async fn fetch(&self, request: FetchRequest) -> Result<FetchResult, FetchError>;
172
173    /// Batch-oriented fetch. Returns multiple RecordBatches with a shared
174    /// schema, allowing the transform layer to register them into MemTable
175    /// without concatenation.
176    ///
177    /// Default wraps `fetch()` into a single-batch result. Override to
178    /// stream batches from an HTTP endpoint (e.g. Arrow IPC stream).
179    async fn fetch_batches(&self, request: FetchRequest) -> Result<FetchBatchResult, FetchError> {
180        let result = self.fetch(request).await?;
181        let schema = result.data.schema();
182        let batch = result.data.into_record_batch();
183        Ok(FetchBatchResult {
184            schema,
185            batches: vec![batch],
186            metadata: result.metadata,
187        })
188    }
189
190    /// Optional graceful shutdown hook. Called by `ChartML::shutdown()` on
191    /// SSR request end / tab close. Default no-op so providers that hold no
192    /// pooled resources don't have to implement it.
193    async fn shutdown(&self) {}
194}
195
196/// Per-source request context. Carries the resolved spec (params already
197/// substituted) plus cache + header + namespace + cancellation hints.
198#[derive(Debug, Clone)]
199pub struct FetchRequest {
200    /// User-chosen name for this source within the chart spec. `None` for
201    /// unnamed (flat) `data:` forms.
202    pub source_name: Option<String>,
203    /// Fully resolved flat-form source spec — `datasource`, `query`, `url`,
204    /// `rows`, etc. with `$param.name` references already substituted.
205    pub spec: InlineData,
206    /// Parsed cache config from `spec.cache.ttl` (and `auto_refresh`).
207    pub cache: Option<CacheConfig>,
208    /// Request-level HTTP headers (merged with `HttpProvider` defaults).
209    /// Ignored by non-HTTP providers unless they explicitly read this field.
210    pub headers: HashMap<String, String>,
211    /// Tenant / workspace namespace folded into the cache key. `None` for
212    /// single-tenant deployments.
213    pub namespace: Option<String>,
214    /// Optional cancellation token. Phase 3 always passes `None` from the
215    /// resolver; providers that opt into honoring it stay forward-compatible
216    /// with future tab-close / cancel-on-route-change wiring.
217    pub cancel_token: Option<CancellationToken>,
218}
219
220/// Provider response. `Clone` is required because `futures::future::Shared`
221/// (used for in-flight dedup) takes a `Future<Output: Clone>`. `DataTable`
222/// is `Arc`-backed so cloning is cheap.
223#[derive(Debug, Clone)]
224pub struct FetchResult {
225    pub data: DataTable,
226    /// Free-form per-provider metadata — `bytes_billed`, `rows_returned`,
227    /// `server_refreshed_at`, `upstream_cache_hit`, `warnings`, etc.
228    pub metadata: HashMap<String, serde_json::Value>,
229}
230
231/// Batch-oriented provider response. Carries multiple RecordBatches with a
232/// shared schema, allowing the transform layer to register them into
233/// MemTable without concatenation.
234#[derive(Debug, Clone)]
235pub struct FetchBatchResult {
236    pub schema: SchemaRef,
237    pub batches: Vec<RecordBatch>,
238    pub metadata: HashMap<String, serde_json::Value>,
239}
240
241/// Provider failures. `Clone` matches `FetchResult` so `Shared` can clone
242/// the error when multiple in-flight callers receive the same outcome.
243#[derive(Debug, Error, Clone)]
244pub enum FetchError {
245    #[error("datasource '{slug}' not found")]
246    SlugNotFound { slug: String },
247
248    #[error("query failed: {0}")]
249    QueryFailed(String),
250
251    #[error("decode failed: {0}")]
252    DecodeFailed(String),
253
254    #[error("cancelled")]
255    Cancelled,
256
257    #[error("no provider registered for kind '{kind}'")]
258    ProviderNotFound { kind: String },
259
260    #[error("cache backend error: {0}")]
261    Cache(String),
262
263    #[error("{0}")]
264    Other(String),
265}
266
267impl From<FetchError> for ChartError {
268    fn from(err: FetchError) -> Self {
269        match err {
270            FetchError::SlugNotFound { slug } => {
271                ChartError::DataError(format!("datasource '{slug}' not found"))
272            }
273            FetchError::QueryFailed(msg) => ChartError::DataError(format!("query failed: {msg}")),
274            FetchError::DecodeFailed(msg) => ChartError::DataError(format!("decode failed: {msg}")),
275            FetchError::Cancelled => ChartError::DataError("fetch cancelled".to_string()),
276            FetchError::ProviderNotFound { kind } => ChartError::PluginError(format!(
277                "no provider registered for kind '{kind}'"
278            )),
279            FetchError::Cache(msg) => ChartError::DataError(format!("cache error: {msg}")),
280            FetchError::Other(msg) => ChartError::DataError(msg),
281        }
282    }
283}
284
285impl From<CacheError> for FetchError {
286    fn from(err: CacheError) -> Self {
287        FetchError::Cache(err.to_string())
288    }
289}
290
291/// Parsed cache config. Built from `spec::source::CacheConfig` (which carries
292/// the raw YAML strings) so the resolver doesn't have to re-parse `humantime`
293/// formats on every fetch.
294#[derive(Debug, Clone, Default)]
295pub struct CacheConfig {
296    /// TTL parsed via `humantime` (`"30s"`, `"5m"`, `"6h"`, `"1d"`, `"7d"`).
297    /// `None` → default TTL applies.
298    pub ttl: Option<Duration>,
299    /// Component-layer hint — the resolver doesn't auto-refresh, but the
300    /// flag is preserved end-to-end so consumers (Leptos, React) can read it.
301    pub auto_refresh: bool,
302}
303
304impl CacheConfig {
305    /// Parse a `spec::source::CacheConfig` (the raw YAML form) into the
306    /// resolver-friendly shape. Returns `Ok(None)` when the input is `None`
307    /// so the upstream `Option` plumbing stays clean. Returns `Err` when the
308    /// declared TTL string is present but unparseable — silent fallback to
309    /// `DEFAULT_TTL` would let an operator's typo (`"5 minutes"` instead of
310    /// `"5m"`) ship to production unnoticed, so the parser's complaint is
311    /// surfaced to the caller verbatim.
312    pub fn from_spec(spec: Option<&SpecCacheConfig>) -> Result<Option<Self>, ChartError> {
313        let Some(spec) = spec else {
314            return Ok(None);
315        };
316        let ttl = match spec.ttl.as_deref() {
317            Some(s) => Some(humantime::parse_duration(s).map_err(|e| {
318                ChartError::InvalidSpec(format!(
319                    "invalid cache.ttl value {s:?}: {e} (expected humantime format like \"30s\", \"5m\", \"6h\", \"1d\")"
320                ))
321            })?),
322            None => None,
323        };
324        Ok(Some(Self {
325            ttl,
326            auto_refresh: spec.auto_refresh.unwrap_or(false),
327        }))
328    }
329
330    /// Convenience accessor matching the design doc's `ttl_duration()` name.
331    pub fn ttl_duration(&self) -> Option<Duration> {
332        self.ttl
333    }
334}
335
336/// Outcome of a `Resolver::fetch` call: the provider's result PLUS whether
337/// the value came from a cache tier or a fresh provider invocation.
338///
339/// `ChartML::fetch` uses this to populate `FetchMetadata.cache_hits` /
340/// `cache_misses` per source name without needing the resolver to know about
341/// source names itself (which it doesn't — it works in keys, not names).
342#[derive(Debug, Clone)]
343pub struct ResolveOutcome {
344    pub result: FetchResult,
345    pub cache_hit: bool,
346    /// Original provider batches. `Some` on cache miss (when the provider
347    /// returned via `fetch_batches`), `None` on cache hit. Allows the
348    /// transform layer to register batches into MemTable without concat.
349    pub batches: Option<FetchBatchResult>,
350}
351
352/// Tag prefix for source-slug-based bulk invalidation. Public so consumers
353/// computing tags out-of-band (e.g., a custom CacheBackend that wants to
354/// pre-populate entries) match the resolver's wire format exactly.
355pub const TAG_SLUG_PREFIX: &str = "slug:";
356/// Tag prefix for namespace-based bulk invalidation.
357pub const TAG_NAMESPACE_PREFIX: &str = "namespace:";
358
359/// Public alias for the shared-ownership wrapper around `Resolver`. `Arc` on
360/// native (so consumers using `tokio::spawn` can move resolver handles across
361/// task boundaries) and `Rc` on WASM (where the resolver's inflight map is
362/// inherently `?Send`, so an `Arc` would be both incorrect and rejected by
363/// `clippy::arc_with_non_send_sync`). Returned by `ChartML::resolver()` and
364/// accepted by every host that wants a long-lived handle for the bulk
365/// `invalidate*` API.
366pub type ResolverRef = SharedRef<Resolver>;
367
368/// Public alias for the shared-ownership wrapper around a [`CacheBackend`]
369/// trait object. `Arc<dyn CacheBackend>` on native, `Rc<dyn CacheBackend>`
370/// on WASM — mirrors the [`SharedRef`] story so wasm32 consumers can hand
371/// off `!Send` backends (e.g. [`backends::indexeddb::IndexedDbBackend`])
372/// without tripping `clippy::arc_with_non_send_sync`.
373pub type CacheBackendRef = SharedRef<dyn CacheBackend>;
374
375/// State of the optional tier-2 (persistent) cache slot. Supports both
376/// eagerly-constructed backends (`Ready`) and lazy factories (`Deferred`)
377/// whose async construction is deferred to the first `fetch()`.
378enum PersistentSlot {
379    /// No persistent cache configured.
380    None,
381    /// Backend already constructed and ready to use.
382    Ready(CacheBackendRef),
383    /// Lazy factory — will be invoked on the first `fetch()` call. The inner
384    /// `Option` is `take()`-d once so only one caller runs the factory;
385    /// concurrent callers that find `None` inside skip tier-2 for that fetch.
386    Deferred(Option<BoxPersistentFactory>),
387}
388
389/// Boxed lazy factory for the persistent cache backend.
390/// On native the closure and its returned future must be `Send` so the
391/// resolver stays compatible with `tokio::spawn`. On WASM `?Send` is fine
392/// because the runtime is single-threaded.
393#[cfg(not(target_arch = "wasm32"))]
394type BoxPersistentFactory = Box<dyn FnOnce() -> ResolverFuture<Option<CacheBackendRef>> + Send>;
395#[cfg(target_arch = "wasm32")]
396type BoxPersistentFactory = Box<dyn FnOnce() -> ResolverFuture<Option<CacheBackendRef>>>;
397
398/// Provider dispatch + cache + dedup orchestration.
399///
400/// One `Resolver` per `ChartML` instance. The resolver is held inside
401/// `ChartML` as `SharedRef<Resolver>` (`Arc` on native, `Rc` on WASM) so
402/// consumers can grab a handle for the `invalidate*` API while the chart
403/// instance keeps using it.
404pub struct Resolver {
405    /// Default in-process cache (always present, never replaced — kept for
406    /// the rare case a consumer wants to introspect the in-memory tier
407    /// directly even after swapping `primary`). Held as `SharedRef` so the
408    /// `MemoryBackend` clone the resolver hands itself the very first time
409    /// is the same single-threaded type the wasm32 backends need (clippy
410    /// rejects `Arc<MemoryBackend>` when other tier swaps land on
411    /// non-`Send` types in the same `Lock`).
412    memory: SharedRef<MemoryBackend>,
413    /// Tier-1 cache. Defaults to the always-present in-memory backend; can
414    /// be replaced via `ChartML::set_cache(...)` (e.g., a host's custom
415    /// process-wide LRU). Behind a `Lock<CacheBackendRef>` so swaps are
416    /// atomic and don't require `&mut self` (the resolver is held inside a
417    /// `SharedRef`). The `CacheBackendRef` alias is `Arc` on native and
418    /// `Rc` on WASM — wasm32 backends like `IndexedDbBackend` are `!Send`
419    /// and would trip `clippy::arc_with_non_send_sync` if forced into
420    /// `std::sync::Arc`.
421    primary: Lock<CacheBackendRef>,
422    /// Tier-2 (persistent) cache. Starts as `PersistentSlot::None`; populated
423    /// eagerly via `set_persistent_cache` (`Ready`) or lazily via
424    /// `set_persistent_cache_factory` (`Deferred` — resolved on first fetch).
425    persistent: Lock<PersistentSlot>,
426    inflight: SharedRef<Lock<HashMap<u64, SharedFetch>>>,
427    providers: Lock<HashMap<String, Arc<dyn DataSourceProvider>>>,
428    /// Optional hook impl. Wrapped in `Lock<Option<...>>` so `set_hooks`
429    /// works without `&mut self` (the resolver lives behind a `SharedRef`).
430    /// Snapshotted into a local clone before each instrumentation site so
431    /// the hook lock is never held across an `await`.
432    hooks: Lock<Option<HooksRef>>,
433    /// Tracker for which keys have been explicitly invalidated since their
434    /// last fetch, so the next cache-miss for that key can be reported as
435    /// [`hooks::MissReason::Invalidated`] instead of `NotFound`.
436    ///
437    /// **Per-key invalidation** (`invalidate(key)`) inserts into
438    /// [`InvalidationTracker::keys`] — the next miss for that exact key
439    /// reports `Invalidated` and removes the entry (so subsequent misses on
440    /// the same key without re-invalidating fall back to the regular
441    /// `NotFound` / `Expired` reasoning).
442    ///
443    /// **Bulk invalidation** (`invalidate_all` / `invalidate_by_slug` /
444    /// `invalidate_by_namespace`) sets [`InvalidationTracker::bulk_pending`]
445    /// to `true`. Enumerating every just-evicted key is impractical — the
446    /// `CacheBackend` trait doesn't expose iteration (and adding it would
447    /// be expensive on `IndexedDbBackend`, which would need a cursor sweep
448    /// per call) — so the resolver instead reports the *first* post-bulk
449    /// miss as `Invalidated` and clears the flag. Subsequent misses fall
450    /// back to `NotFound` / `Expired` until another invalidation happens.
451    /// This is a deliberate trade-off: documented below, mirrored in the
452    /// integration test `test_invalidate_emits_invalidated_miss_reason`.
453    recently_invalidated: SharedRef<Lock<InvalidationTracker>>,
454}
455
456/// Per-resolver tracker for invalidation events. Held inside a
457/// `Lock<...>` on the resolver so it survives across `&self` borrows
458/// (the resolver lives behind a `SharedRef` and uses interior mutability
459/// for every other piece of state too).
460#[derive(Debug, Default)]
461struct InvalidationTracker {
462    /// Specific keys invalidated via `Resolver::invalidate(key)`. Drained
463    /// on first observation by `consume_invalidation_reason`.
464    keys: HashSet<u64>,
465    /// Whether ANY bulk invalidate (`invalidate_all` /
466    /// `invalidate_by_slug` / `invalidate_by_namespace`) has fired since
467    /// the last bulk-pending consumption. Cleared the first time a miss
468    /// observes it — at most one post-bulk miss is reported as
469    /// `Invalidated`.
470    bulk_pending: bool,
471}
472
473/// Shared in-flight future type. Boxed for dyn-trait erasure; `Shared` lets
474/// multiple awaiters poll the same future without cloning the work. The
475/// inner `ResolverFuture` is `BoxFuture` (Send) on native and
476/// `LocalBoxFuture` (?Send) on WASM so we don't conflict with the
477/// `?Send` async traits the providers and cache backends use.
478///
479/// Provider invocation result carried inside the in-flight dedup future.
480/// Holds the DataTable (for caching) plus optional original batches.
481#[derive(Debug, Clone)]
482struct FetchWithBatches {
483    result: FetchResult,
484    batches: Option<FetchBatchResult>,
485}
486
487type SharedFetch = Shared<ResolverFuture<Result<FetchWithBatches, FetchError>>>;
488
489impl Default for Resolver {
490    fn default() -> Self {
491        Self::new()
492    }
493}
494
495impl std::fmt::Debug for Resolver {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        let persistent_state = {
498            let guard = self.persistent.read_lock("persistent cache");
499            match &*guard {
500                PersistentSlot::None => "none",
501                PersistentSlot::Ready(_) => "ready",
502                PersistentSlot::Deferred(_) => "deferred",
503            }
504        };
505        let provider_keys: Vec<String> = self
506            .providers
507            .read_lock("providers")
508            .keys()
509            .cloned()
510            .collect();
511        f.debug_struct("Resolver")
512            .field("memory", &self.memory)
513            .field("persistent", &persistent_state)
514            .field("providers", &provider_keys)
515            .finish_non_exhaustive()
516    }
517}
518
519impl Resolver {
520    /// New resolver with the default `MemoryBackend` as tier-1, no tier-2,
521    /// and no providers registered. `ChartML::new()` registers the built-in
522    /// `inline` + `http` providers immediately after construction.
523    pub fn new() -> Self {
524        let memory = SharedRef::new(MemoryBackend::new());
525        let primary: CacheBackendRef = memory.clone();
526        Self {
527            memory,
528            primary: Lock::new(primary),
529            persistent: Lock::new(PersistentSlot::None),
530            inflight: SharedRef::new(Lock::new(HashMap::new())),
531            providers: Lock::new(HashMap::new()),
532            hooks: Lock::new(None),
533            recently_invalidated: SharedRef::new(Lock::new(InvalidationTracker::default())),
534        }
535    }
536
537    /// Replace the tier-1 cache backend. Used by `ChartML::set_cache`.
538    /// The fresh backend starts empty — entries in the old backend are not
539    /// migrated (caller's responsibility if they want to).
540    pub fn set_primary_cache(&self, backend: CacheBackendRef) {
541        let mut guard = self.primary.write_lock("primary cache");
542        *guard = backend;
543    }
544
545    /// Set the tier-2 (persistent) cache to an already-constructed backend.
546    /// Immediately marks the slot as `Ready` — no lazy resolution needed.
547    pub fn set_persistent_cache(&self, backend: CacheBackendRef) {
548        let mut guard = self.persistent.write_lock("persistent cache");
549        *guard = PersistentSlot::Ready(backend);
550    }
551
552    /// Set the tier-2 (persistent) cache to a lazy factory. The factory is
553    /// invoked on the first `fetch()` call; until then the slot stays
554    /// `Deferred` and sync invalidation/shutdown paths skip tier-2 (the
555    /// backend doesn't exist yet, so there is nothing to invalidate).
556    ///
557    /// This exists because on WASM `IndexedDbBackend::new()` is async — it
558    /// can't be ready at `ChartML` construction time. The factory defers the
559    /// database open until the first fetch needs it.
560    #[cfg(not(target_arch = "wasm32"))]
561    pub fn set_persistent_cache_factory<F, Fut>(&self, factory: F)
562    where
563        F: FnOnce() -> Fut + Send + 'static,
564        Fut: std::future::Future<Output = Option<CacheBackendRef>> + Send + 'static,
565    {
566        let mut guard = self.persistent.write_lock("persistent cache");
567        *guard = PersistentSlot::Deferred(Some(Box::new(move || Box::pin(factory()))));
568    }
569
570    /// Set the tier-2 (persistent) cache to a lazy factory. The factory is
571    /// invoked on the first `fetch()` call; until then the slot stays
572    /// `Deferred` and sync invalidation/shutdown paths skip tier-2 (the
573    /// backend doesn't exist yet, so there is nothing to invalidate).
574    ///
575    /// WASM variant — no `Send` bound on the closure or its returned future,
576    /// matching the single-threaded `wasm32-unknown-unknown` runtime.
577    #[cfg(target_arch = "wasm32")]
578    pub fn set_persistent_cache_factory<F, Fut>(&self, factory: F)
579    where
580        F: FnOnce() -> Fut + 'static,
581        Fut: std::future::Future<Output = Option<CacheBackendRef>> + 'static,
582    {
583        let mut guard = self.persistent.write_lock("persistent cache");
584        *guard = PersistentSlot::Deferred(Some(Box::new(move || Box::pin(factory()))));
585    }
586
587    /// Enable IndexedDB-backed persistent caching. The database is opened
588    /// lazily on the first fetch. `db_name` is the IDB database name,
589    /// `namespace` isolates entries (e.g. per-workspace).
590    ///
591    /// This is a convenience wrapper around `set_persistent_cache_factory`
592    /// using the built-in `IndexedDbBackend`.
593    #[cfg(all(target_arch = "wasm32", feature = "wasm-indexeddb"))]
594    pub fn enable_indexeddb_cache(&self, db_name: &str, namespace: &str) {
595        let db = db_name.to_string();
596        let ns = namespace.to_string();
597        self.set_persistent_cache_factory(move || async move {
598            use crate::resolver::backends::indexeddb::IndexedDbBackend;
599            match IndexedDbBackend::new(&db, &ns).await {
600                Ok(backend) => Some(SharedRef::new(backend) as CacheBackendRef),
601                Err(e) => {
602                    tracing::warn!(error = %e, "IndexedDB cache unavailable");
603                    None
604                }
605            }
606        });
607    }
608
609    /// Register a [`ResolverHooks`] impl. Replaces any previously registered
610    /// hooks; passes the new impl in as a `HooksRef` (`Arc` on native, `Rc`
611    /// on WASM). After this call every `Resolver::fetch` invocation emits
612    /// progress / cache / error events through the new impl.
613    pub fn set_hooks(&self, hooks: HooksRef) {
614        let mut guard = self.hooks.write_lock("hooks");
615        *guard = Some(hooks);
616    }
617
618    /// Clear any previously registered hooks. Subsequent `fetch` calls
619    /// behave as if `set_hooks` had never been called (no-op emission).
620    pub fn clear_hooks(&self) {
621        let mut guard = self.hooks.write_lock("hooks");
622        *guard = None;
623    }
624
625    /// Snapshot the current hooks `HooksRef` (or `None`) so the resolver
626    /// can release the lock before entering any cache walk or `.await`.
627    /// Must be called once at the top of `fetch`; downstream sites use the
628    /// snapshot rather than re-acquiring the lock to keep the hook lock
629    /// off the hot path. Also called by `ChartML::transform` (one crate up)
630    /// at the top of the transform stage for the same reason.
631    pub(crate) fn hooks_snapshot(&self) -> Option<HooksRef> {
632        self.hooks.read_lock("hooks").clone()
633    }
634
635    /// Snapshot the tier-1 cache handle so we can drop the lock before the
636    /// async cache call. Always returns a backend (the field starts as
637    /// `MemoryBackend` and `set_primary_cache` only replaces, never clears).
638    fn primary_snapshot(&self) -> CacheBackendRef {
639        self.primary.read_lock("primary cache").clone()
640    }
641
642    /// Resolve the tier-2 cache, lazily initializing the factory if needed.
643    /// Returns the backend handle (or `None`). Used by `fetch()` — the only
644    /// path that should trigger lazy init.
645    ///
646    /// Lock discipline: the sync lock is NEVER held across an `.await`.
647    /// The factory closure is `take()`-d under a write lock, the lock is
648    /// dropped, then the factory's returned future is awaited. The result
649    /// is written back under a fresh write lock.
650    async fn resolve_persistent(&self) -> Option<CacheBackendRef> {
651        // Fast path — no write lock needed
652        {
653            let guard = self.persistent.read_lock("persistent cache");
654            match &*guard {
655                PersistentSlot::None => return None,
656                PersistentSlot::Ready(b) => return Some(b.clone()),
657                PersistentSlot::Deferred(_) => {} // fall through to slow path
658            }
659        }
660
661        // Slow path — take the factory under write lock, release, then await
662        let factory = {
663            let mut guard = self.persistent.write_lock("persistent cache");
664            match &mut *guard {
665                PersistentSlot::Deferred(opt) => opt.take(),
666                PersistentSlot::Ready(b) => return Some(b.clone()),
667                PersistentSlot::None => return None,
668            }
669        };
670
671        let Some(factory) = factory else {
672            // Another caller already took the factory; skip tier-2 for this fetch
673            return None;
674        };
675
676        let result = factory().await;
677        let mut guard = self.persistent.write_lock("persistent cache");
678        match &result {
679            Some(backend) => *guard = PersistentSlot::Ready(backend.clone()),
680            None => *guard = PersistentSlot::None,
681        }
682        result
683    }
684
685    /// Sync snapshot of the tier-2 cache — returns `Some` only if the slot
686    /// is already `Ready`. Does NOT trigger lazy initialization. Used by
687    /// invalidation and shutdown paths where we must not start an async
688    /// factory (the backend doesn't exist yet, so there is nothing to
689    /// invalidate or shut down).
690    fn persistent_snapshot_if_ready(&self) -> Option<CacheBackendRef> {
691        let guard = self.persistent.read_lock("persistent cache");
692        match &*guard {
693            PersistentSlot::Ready(b) => Some(b.clone()),
694            _ => None,
695        }
696    }
697
698    /// Register a provider under a dispatch key (`"inline"`, `"http"`,
699    /// `"datasource"`, or any custom slug). Re-registration replaces the
700    /// previously registered provider for that key.
701    pub fn register_provider(&self, kind: &str, provider: Arc<dyn DataSourceProvider>) {
702        let mut providers = self.providers.write_lock("providers");
703        providers.insert(kind.to_string(), provider);
704    }
705
706    /// Snapshot the registered provider kinds. Useful for tests and
707    /// host-app diagnostics.
708    pub fn provider_kinds(&self) -> Vec<String> {
709        self.providers
710            .read_lock("providers")
711            .keys()
712            .cloned()
713            .collect()
714    }
715
716    /// Compute the cache key the resolver would use for a given spec.
717    ///
718    /// Public so phase 4 (Leptos refresh button) and phase 6 (Kyomi
719    /// invalidate-on-change) can compute the exact key the resolver caches
720    /// under without re-implementing the hash.
721    ///
722    /// Hashes `(namespace, datasource, query, url, provider, rows_hash)` in
723    /// that order. `None` fields contribute a sentinel byte (`0xFE`, an
724    /// invalid UTF-8 start byte) so they cannot collide with a real string
725    /// value of `"None"`. Field separator is `0xFF` (also invalid UTF-8) so
726    /// adjacent fields can't bleed into each other ("a|b" vs "ab|").
727    pub fn key_for(spec: &InlineData, namespace: Option<&str>) -> u64 {
728        let mut hasher = Xxh3::new();
729
730        let fields: [Option<&str>; 5] = [
731            namespace,
732            spec.datasource.as_deref(),
733            spec.query.as_deref(),
734            spec.url.as_deref(),
735            spec.provider.as_deref(),
736        ];
737        for field in fields {
738            match field {
739                Some(s) => hasher.update(s.as_bytes()),
740                None => hasher.update(&[HASH_NONE_SENTINEL]),
741            }
742            hasher.update(&[HASH_FIELD_SEP]);
743        }
744
745        // Inline `rows` is hashed via its canonical JSON serialization so
746        // two specs with the same row data hash identically regardless of
747        // hashmap iteration order. `None` rows contribute the sentinel.
748        match spec.rows.as_ref() {
749            Some(rows) => match serde_json::to_vec(rows) {
750                Ok(bytes) => hasher.update(&bytes),
751                Err(_) => hasher.update(&[HASH_NONE_SENTINEL]),
752            },
753            None => hasher.update(&[HASH_NONE_SENTINEL]),
754        }
755
756        hasher.digest()
757    }
758
759    /// The core orchestration: tier-1 → tier-2 → in-flight dedup → provider.
760    ///
761    /// Returns `ResolveOutcome` (result + cache-hit flag) so the caller can
762    /// classify the source under `cache_hits` vs `cache_misses` in
763    /// `FetchMetadata`. Tier-1 hits hydrate from memory only; tier-2 hits
764    /// also re-populate tier-1 so subsequent reads in the same session
765    /// short-circuit.
766    pub async fn fetch(
767        &self,
768        key: u64,
769        request: FetchRequest,
770    ) -> Result<ResolveOutcome, FetchError> {
771        let primary = self.primary_snapshot();
772        let persistent = self.resolve_persistent().await;
773        // Snapshot the hooks `HooksRef` ONCE up front so the hooks lock is
774        // never held across an `.await` and downstream sites can share the
775        // same clone without re-acquiring.
776        let hooks = self.hooks_snapshot();
777        let source_name = request.source_name.clone();
778
779        // ── Tier 1: in-process cache ──
780        let mut tier1_expired = false;
781        if let Some(entry) = primary.get(key).await {
782            if !entry.is_expired() {
783                let age = entry.age();
784                emit_cache_hit(&hooks, key, &source_name, hooks::CacheTier::Memory, age);
785                return Ok(ResolveOutcome {
786                    result: FetchResult {
787                        data: entry.data,
788                        metadata: entry.metadata,
789                    },
790                    cache_hit: true,
791                    batches: None,
792                });
793            }
794            // Expired — let it fall through; we'll overwrite on success.
795            tier1_expired = true;
796        }
797
798        // ── Tier 2: persistent cache (phase 3b populates this) ──
799        let mut tier2_expired = false;
800        if let Some(p) = &persistent {
801            if let Some(entry) = p.get(key).await {
802                if !entry.is_expired() {
803                    let age = entry.age();
804                    // Hydrate tier-1 so subsequent reads stay in-process.
805                    // Memory hydration is silent (no event) — only the
806                    // logical tier-2 hit fires.
807                    let _ = primary.put(key, entry.clone()).await;
808                    emit_cache_hit(
809                        &hooks,
810                        key,
811                        &source_name,
812                        hooks::CacheTier::Persistent,
813                        age,
814                    );
815                    return Ok(ResolveOutcome {
816                        result: FetchResult {
817                            data: entry.data,
818                            metadata: entry.metadata,
819                        },
820                        cache_hit: true,
821                        batches: None,
822                    });
823                }
824                // Expired — evict from tier-2 too.
825                let _ = p.invalidate(key).await;
826                tier2_expired = true;
827            }
828        }
829
830        // Both tiers missed — emit one cache-miss with the most specific
831        // reason we can prove. Precedence: `Invalidated` (most specific —
832        // an operator explicitly cleared this key or fired a bulk
833        // invalidate) wins over `Expired` (TTL elapsed naturally), which
834        // wins over `NotFound` (key was never cached or was evicted by
835        // some path the resolver doesn't track).
836        let miss_reason = if self.consume_invalidation_reason(key) {
837            hooks::MissReason::Invalidated
838        } else if tier1_expired || tier2_expired {
839            hooks::MissReason::Expired
840        } else {
841            hooks::MissReason::NotFound
842        };
843        emit_cache_miss(&hooks, key, &source_name, miss_reason);
844
845        // Provider call start — fire BEFORE the dedup wait so consumers
846        // see a "fetching" event for every distinct cache miss, not just
847        // the first concurrent one.
848        emit_progress(
849            &hooks,
850            hooks::Phase::Fetch,
851            &source_name,
852            None,
853            None,
854            format!(
855                "Fetching {}",
856                source_name.as_deref().unwrap_or("source"),
857            ),
858        );
859
860        // ── In-flight dedup ──
861        // If another fetch for the same key is already in flight, await its
862        // shared future. Otherwise install a new shared future and run it.
863        let shared = self.intern_inflight(key, request, primary, persistent);
864        let result = shared.await;
865
866        // Cleanup: drop the inflight slot whether the fetch succeeded or
867        // failed so subsequent calls re-dispatch instead of replaying a
868        // stale terminal state. Errors clear too — failed fetches should
869        // retry, not stick.
870        self.inflight.write_lock("inflight").remove(&key);
871
872        match result {
873            Ok(fetched) => {
874                let row_count = fetched.result.data.num_rows();
875                emit_progress(
876                    &hooks,
877                    hooks::Phase::Fetch,
878                    &source_name,
879                    Some(row_count as u64),
880                    None,
881                    format!(
882                        "Fetched {} ({} rows)",
883                        source_name.as_deref().unwrap_or("source"),
884                        row_count,
885                    ),
886                );
887                Ok(ResolveOutcome {
888                    result: fetched.result,
889                    cache_hit: false,
890                    batches: fetched.batches,
891                })
892            }
893            Err(err) => {
894                emit_error(&hooks, hooks::Phase::Fetch, &source_name, err.to_string());
895                Err(err)
896            }
897        }
898    }
899
900    /// Get-or-insert the in-flight `Shared` future for a key. Returns a
901    /// clone of the `Shared` so the caller can `await` it; the original
902    /// stays in the map until removed by the caller.
903    fn intern_inflight(
904        &self,
905        key: u64,
906        request: FetchRequest,
907        primary: CacheBackendRef,
908        persistent: Option<CacheBackendRef>,
909    ) -> SharedFetch {
910        let mut inflight = self.inflight.write_lock("inflight");
911        if let Some(existing) = inflight.get(&key) {
912            return existing.clone();
913        }
914
915        // Build the work future: dispatch to the right provider, write to
916        // both cache tiers on success. The future is `'static` because it
917        // owns everything it touches via `Arc` clones.
918        let providers = self.snapshot_providers();
919        let cache_cfg = request.cache.clone();
920        let namespace = request.namespace.clone();
921        let slug = request.spec.datasource.clone();
922
923        let work = async move {
924            let provider = dispatch_provider(&providers, &request.spec)?;
925            let batch_result = provider.fetch_batches(request).await?;
926
927            // Concat batches → DataTable for caching.
928            let data = if batch_result.batches.is_empty() {
929                DataTable::from_record_batch(RecordBatch::new_empty(
930                    Arc::clone(&batch_result.schema),
931                ))
932            } else {
933                let concat = arrow::compute::concat_batches(
934                    &batch_result.schema,
935                    &batch_result.batches,
936                )
937                .map_err(|e| FetchError::DecodeFailed(format!("concat batches: {e}")))?;
938                DataTable::from_record_batch(concat)
939            };
940
941            let fetch_result = FetchResult {
942                data: data.clone(),
943                metadata: batch_result.metadata.clone(),
944            };
945
946            // Write-through: tier-1 always; tier-2 if configured.
947            let entry = CachedEntry {
948                data,
949                fetched_at: SystemTime::now(),
950                ttl: cache_cfg
951                    .as_ref()
952                    .and_then(|c| c.ttl_duration())
953                    .unwrap_or(DEFAULT_TTL),
954                tags: build_tags(slug.as_deref(), namespace.as_deref()),
955                metadata: batch_result.metadata.clone(),
956            };
957            let _ = primary.put(key, entry.clone()).await;
958            if let Some(p) = &persistent {
959                let _ = p.put(key, entry).await;
960            }
961            Ok(FetchWithBatches {
962                result: fetch_result,
963                batches: Some(batch_result),
964            })
965        };
966
967        #[cfg(not(target_arch = "wasm32"))]
968        let boxed: ResolverFuture<Result<FetchWithBatches, FetchError>> = work.boxed();
969        #[cfg(target_arch = "wasm32")]
970        let boxed: ResolverFuture<Result<FetchWithBatches, FetchError>> = work.boxed_local();
971
972        let future = boxed.shared();
973        inflight.insert(key, future.clone());
974        future
975    }
976
977    /// Snapshot the provider map for use inside the inflight future. We
978    /// snapshot rather than holding the lock so the future stays `'static`.
979    fn snapshot_providers(&self) -> HashMap<String, Arc<dyn DataSourceProvider>> {
980        self.providers.read_lock("providers").clone()
981    }
982
983    // ── Bulk invalidation API ──
984
985    /// Drop a single entry from every cache tier. The next miss on `key`
986    /// will be reported via [`hooks::ResolverHooks::on_cache_miss`] with
987    /// [`hooks::MissReason::Invalidated`] rather than `NotFound`.
988    pub async fn invalidate(&self, key: u64) {
989        let primary = self.primary_snapshot();
990        let persistent = self.persistent_snapshot_if_ready();
991        let _ = primary.invalidate(key).await;
992        if let Some(p) = &persistent {
993            let _ = p.invalidate(key).await;
994        }
995        self.recently_invalidated
996            .write_lock("recently_invalidated")
997            .keys
998            .insert(key);
999    }
1000
1001    /// Drop every cached entry across all tiers. The very next miss on any
1002    /// key will be reported as `Invalidated`; subsequent misses fall back
1003    /// to the regular `NotFound` / `Expired` reasoning. See the field-level
1004    /// docs on [`Resolver::recently_invalidated`] for why per-key tracking
1005    /// isn't done here (would require a `keys()` method on every backend).
1006    pub async fn invalidate_all(&self) {
1007        let primary = self.primary_snapshot();
1008        let persistent = self.persistent_snapshot_if_ready();
1009        let _ = primary.clear().await;
1010        if let Some(p) = &persistent {
1011            let _ = p.clear().await;
1012        }
1013        self.mark_bulk_invalidated();
1014    }
1015
1016    /// Drop every entry whose source spec carried the given `datasource`
1017    /// slug. Useful for "datasource X was edited; invalidate all queries
1018    /// against it" workflows. Subject to the same single-shot
1019    /// `Invalidated` reporting as [`Resolver::invalidate_all`].
1020    pub async fn invalidate_by_slug(&self, slug: &str) {
1021        let tag = format!("{TAG_SLUG_PREFIX}{slug}");
1022        let primary = self.primary_snapshot();
1023        let persistent = self.persistent_snapshot_if_ready();
1024        let _ = primary.invalidate_by_tag(&tag).await;
1025        if let Some(p) = &persistent {
1026            let _ = p.invalidate_by_tag(&tag).await;
1027        }
1028        self.mark_bulk_invalidated();
1029    }
1030
1031    /// Drop every entry tagged with the given namespace. Used for tenant
1032    /// isolation flows ("user logged out; clear their cached data").
1033    /// Subject to the same single-shot `Invalidated` reporting as
1034    /// [`Resolver::invalidate_all`].
1035    pub async fn invalidate_by_namespace(&self, namespace: &str) {
1036        let tag = format!("{TAG_NAMESPACE_PREFIX}{namespace}");
1037        let primary = self.primary_snapshot();
1038        let persistent = self.persistent_snapshot_if_ready();
1039        let _ = primary.invalidate_by_tag(&tag).await;
1040        if let Some(p) = &persistent {
1041            let _ = p.invalidate_by_tag(&tag).await;
1042        }
1043        self.mark_bulk_invalidated();
1044    }
1045
1046    /// Set the bulk-pending flag so the next post-bulk miss surfaces as
1047    /// [`hooks::MissReason::Invalidated`]. Synchronous; the lock is never
1048    /// held across an `.await`.
1049    fn mark_bulk_invalidated(&self) {
1050        self.recently_invalidated
1051            .write_lock("recently_invalidated")
1052            .bulk_pending = true;
1053    }
1054
1055    /// Check whether a miss for `key` should be reported as
1056    /// [`hooks::MissReason::Invalidated`]. Drains the per-key entry on
1057    /// first observation and consumes the bulk-pending flag at most once
1058    /// per bulk invalidation, so this returns `true` for at most one miss
1059    /// per `invalidate*` call. Called from the hot fetch path so the lock
1060    /// is taken briefly and never held across an `.await`.
1061    fn consume_invalidation_reason(&self, key: u64) -> bool {
1062        let mut tracker = self
1063            .recently_invalidated
1064            .write_lock("recently_invalidated");
1065        if tracker.keys.remove(&key) {
1066            return true;
1067        }
1068        if tracker.bulk_pending {
1069            tracker.bulk_pending = false;
1070            return true;
1071        }
1072        false
1073    }
1074
1075    /// Iterate every registered provider AND cache backend, awaiting their
1076    /// `shutdown` hook in turn. Wired up by `ChartML::shutdown()`.
1077    pub async fn shutdown(&self) {
1078        let providers = self.snapshot_providers();
1079        for (_, provider) in providers {
1080            provider.shutdown().await;
1081        }
1082        let primary = self.primary_snapshot();
1083        primary.shutdown().await;
1084        if let Some(p) = self.persistent_snapshot_if_ready() {
1085            p.shutdown().await;
1086        }
1087    }
1088}
1089
1090/// Emit a [`hooks::CacheHitEvent`] through the registered hook impl, if
1091/// any. Fire-and-forget via `spawn_hook`; never blocks the resolver.
1092fn emit_cache_hit(
1093    hooks: &Option<HooksRef>,
1094    key: u64,
1095    source_name: &Option<String>,
1096    tier: hooks::CacheTier,
1097    age: Duration,
1098) {
1099    let Some(h) = hooks.as_ref() else { return };
1100    let h = h.clone();
1101    let event = hooks::CacheHitEvent {
1102        key,
1103        source_name: source_name.clone(),
1104        tier,
1105        age,
1106    };
1107    hooks::spawn_hook(async move {
1108        h.on_cache_hit(event).await;
1109    });
1110}
1111
1112/// Emit a [`hooks::CacheMissEvent`] through the registered hook impl.
1113fn emit_cache_miss(
1114    hooks: &Option<HooksRef>,
1115    key: u64,
1116    source_name: &Option<String>,
1117    reason: hooks::MissReason,
1118) {
1119    let Some(h) = hooks.as_ref() else { return };
1120    let h = h.clone();
1121    let event = hooks::CacheMissEvent {
1122        key,
1123        source_name: source_name.clone(),
1124        reason,
1125    };
1126    hooks::spawn_hook(async move {
1127        h.on_cache_miss(event).await;
1128    });
1129}
1130
1131/// Emit a [`hooks::ProgressEvent`] through the registered hook impl.
1132/// `pub(crate)` so `ChartML::transform` / `render_prepared_to_svg` can
1133/// emit transform/render-phase progress without re-implementing the
1134/// snapshot dance.
1135pub(crate) fn emit_progress(
1136    hooks: &Option<HooksRef>,
1137    phase: hooks::Phase,
1138    source_name: &Option<String>,
1139    loaded: Option<u64>,
1140    total: Option<u64>,
1141    message: String,
1142) {
1143    let Some(h) = hooks.as_ref() else { return };
1144    let h = h.clone();
1145    let event = hooks::ProgressEvent {
1146        phase,
1147        source_name: source_name.clone(),
1148        loaded,
1149        total,
1150        message,
1151    };
1152    hooks::spawn_hook(async move {
1153        h.on_progress(event).await;
1154    });
1155}
1156
1157/// Emit a [`hooks::ErrorEvent`] through the registered hook impl.
1158/// `pub(crate)` so `ChartML::transform` can emit transform-phase errors.
1159pub(crate) fn emit_error(
1160    hooks: &Option<HooksRef>,
1161    phase: hooks::Phase,
1162    source_name: &Option<String>,
1163    error: String,
1164) {
1165    let Some(h) = hooks.as_ref() else { return };
1166    let h = h.clone();
1167    let event = hooks::ErrorEvent {
1168        phase,
1169        source_name: source_name.clone(),
1170        error,
1171    };
1172    hooks::spawn_hook(async move {
1173        h.on_error(event).await;
1174    });
1175}
1176
1177/// Build the tag list applied to `CachedEntry` on write. `slug` and
1178/// `namespace` are optional — entries without one of them simply skip the
1179/// corresponding tag (no empty-string tag pollution).
1180fn build_tags(slug: Option<&str>, namespace: Option<&str>) -> Vec<String> {
1181    let mut tags = Vec::new();
1182    if let Some(slug) = slug {
1183        tags.push(format!("{TAG_SLUG_PREFIX}{slug}"));
1184    }
1185    if let Some(ns) = namespace {
1186        tags.push(format!("{TAG_NAMESPACE_PREFIX}{ns}"));
1187    }
1188    tags
1189}
1190
1191/// Apply the design-doc dispatch routing. Precedence: explicit `provider`
1192/// key wins over inferred shape (rows → inline, url → http, datasource →
1193/// datasource). Returns the `Arc` so the caller can `await` the trait
1194/// method without holding any lock.
1195fn dispatch_provider(
1196    providers: &HashMap<String, Arc<dyn DataSourceProvider>>,
1197    spec: &InlineData,
1198) -> Result<Arc<dyn DataSourceProvider>, FetchError> {
1199    let kind = if let Some(kind) = spec.provider.as_deref() {
1200        kind
1201    } else if spec.rows.is_some() {
1202        "inline"
1203    } else if spec.url.is_some() {
1204        "http"
1205    } else if spec.datasource.is_some() {
1206        "datasource"
1207    } else {
1208        return Err(FetchError::Other(
1209            "no dispatch match for spec — needs one of `provider`, `rows`, `url`, or `datasource`"
1210                .to_string(),
1211        ));
1212    };
1213
1214    providers
1215        .get(kind)
1216        .cloned()
1217        .ok_or_else(|| FetchError::ProviderNotFound {
1218            kind: kind.to_string(),
1219        })
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224    #![allow(clippy::unwrap_used)]
1225    use super::*;
1226
1227    fn empty_inline() -> InlineData {
1228        InlineData {
1229            provider: None,
1230            rows: None,
1231            url: None,
1232            endpoint: None,
1233            cache: None,
1234            datasource: None,
1235            query: None,
1236        }
1237    }
1238
1239    #[test]
1240    fn key_for_is_deterministic() {
1241        let spec = InlineData {
1242            datasource: Some("warehouse".into()),
1243            query: Some("SELECT 1".into()),
1244            ..empty_inline()
1245        };
1246        let k1 = Resolver::key_for(&spec, Some("ns"));
1247        let k2 = Resolver::key_for(&spec, Some("ns"));
1248        assert_eq!(k1, k2);
1249    }
1250
1251    #[test]
1252    fn key_for_namespace_changes_key() {
1253        let spec = InlineData {
1254            datasource: Some("warehouse".into()),
1255            ..empty_inline()
1256        };
1257        let k1 = Resolver::key_for(&spec, Some("tenant-a"));
1258        let k2 = Resolver::key_for(&spec, Some("tenant-b"));
1259        assert_ne!(k1, k2);
1260    }
1261
1262    #[test]
1263    fn key_for_none_distinguishes_from_literal_none_string() {
1264        let spec_none = InlineData {
1265            datasource: None,
1266            url: Some("https://x".into()),
1267            ..empty_inline()
1268        };
1269        let spec_literal = InlineData {
1270            datasource: Some("None".into()),
1271            url: Some("https://x".into()),
1272            ..empty_inline()
1273        };
1274        // Sentinel byte must NOT collide with the literal "None" string.
1275        assert_ne!(
1276            Resolver::key_for(&spec_none, None),
1277            Resolver::key_for(&spec_literal, None),
1278        );
1279    }
1280
1281    #[test]
1282    fn key_for_field_separator_prevents_bleed() {
1283        // `(datasource="ab", query=None)` must hash differently from
1284        // `(datasource="a", query="b")` — without a separator they'd both
1285        // serialize to "ab".
1286        let merged = InlineData {
1287            datasource: Some("ab".into()),
1288            ..empty_inline()
1289        };
1290        let split = InlineData {
1291            datasource: Some("a".into()),
1292            query: Some("b".into()),
1293            ..empty_inline()
1294        };
1295        assert_ne!(
1296            Resolver::key_for(&merged, None),
1297            Resolver::key_for(&split, None),
1298        );
1299    }
1300
1301    #[test]
1302    fn dispatch_provider_precedence_explicit_wins() {
1303        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1304            "custom".to_string(),
1305            Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1306        )]
1307        .into_iter()
1308        .collect();
1309        let spec = InlineData {
1310            provider: Some("custom".into()),
1311            // rows would normally route to "inline", but explicit wins.
1312            rows: Some(vec![]),
1313            ..empty_inline()
1314        };
1315        // We can't compare trait objects, but `is_ok()` is sufficient: the
1316        // ONLY registered provider key is "custom", so reaching `Ok` means
1317        // dispatch picked it. Routing rows alone (no `provider`) would
1318        // search for "inline", which is unregistered → would return
1319        // `ProviderNotFound`.
1320        assert!(dispatch_provider(&providers, &spec).is_ok());
1321    }
1322
1323    #[test]
1324    fn dispatch_provider_inferred_inline() {
1325        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1326            "inline".to_string(),
1327            Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1328        )]
1329        .into_iter()
1330        .collect();
1331        let spec = InlineData {
1332            rows: Some(vec![]),
1333            ..empty_inline()
1334        };
1335        assert!(dispatch_provider(&providers, &spec).is_ok());
1336    }
1337
1338    #[test]
1339    fn dispatch_provider_missing_kind_errors() {
1340        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1341        let spec = InlineData {
1342            datasource: Some("warehouse".into()),
1343            ..empty_inline()
1344        };
1345        let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1346        assert!(matches!(err, FetchError::ProviderNotFound { ref kind } if kind == "datasource"));
1347    }
1348
1349    #[test]
1350    fn dispatch_provider_unmatched_spec_errors() {
1351        let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1352        let spec = empty_inline();
1353        let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1354        assert!(matches!(err, FetchError::Other(_)));
1355    }
1356
1357    // ── PersistentSlot / factory tests ──────────────────────────────────
1358
1359    /// Minimal `CacheBackend` impl for testing the persistent slot plumbing.
1360    struct MockBackend;
1361
1362    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1363    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1364    impl CacheBackend for MockBackend {
1365        async fn get(&self, _key: u64) -> Option<CachedEntry> {
1366            None
1367        }
1368        async fn put(&self, _key: u64, _entry: CachedEntry) -> Result<(), CacheError> {
1369            Ok(())
1370        }
1371        async fn invalidate(&self, _key: u64) -> Result<(), CacheError> {
1372            Ok(())
1373        }
1374        async fn invalidate_by_tag(&self, _tag: &str) -> Result<(), CacheError> {
1375            Ok(())
1376        }
1377        async fn clear(&self) -> Result<(), CacheError> {
1378            Ok(())
1379        }
1380        async fn shutdown(&self) {}
1381    }
1382
1383    #[test]
1384    fn test_persistent_slot_default_is_none() {
1385        let resolver = Resolver::new();
1386        assert!(resolver.persistent_snapshot_if_ready().is_none());
1387    }
1388
1389    #[test]
1390    fn test_set_persistent_cache_still_works() {
1391        let resolver = Resolver::new();
1392        let backend: CacheBackendRef = SharedRef::new(MockBackend);
1393        resolver.set_persistent_cache(backend);
1394        assert!(resolver.persistent_snapshot_if_ready().is_some());
1395    }
1396
1397    #[test]
1398    fn test_persistent_cache_factory_stores_deferred() {
1399        let resolver = Resolver::new();
1400        resolver.set_persistent_cache_factory(|| async {
1401            Some(SharedRef::new(MockBackend) as CacheBackendRef)
1402        });
1403        // Factory hasn't fired yet — sync snapshot must be None
1404        assert!(resolver.persistent_snapshot_if_ready().is_none());
1405    }
1406
1407    #[tokio::test]
1408    async fn test_persistent_cache_factory_resolves_on_fetch() {
1409        let resolver = Resolver::new();
1410        resolver.set_persistent_cache_factory(|| async {
1411            Some(SharedRef::new(MockBackend) as CacheBackendRef)
1412        });
1413        // resolve_persistent triggers the factory
1414        let result = resolver.resolve_persistent().await;
1415        assert!(result.is_some());
1416        // After resolution the slot is Ready — sync snapshot works now
1417        assert!(resolver.persistent_snapshot_if_ready().is_some());
1418    }
1419
1420    #[tokio::test]
1421    async fn test_persistent_cache_factory_none_result() {
1422        let resolver = Resolver::new();
1423        resolver.set_persistent_cache_factory(|| async { None });
1424        let result = resolver.resolve_persistent().await;
1425        assert!(result.is_none());
1426        // After a None factory result the slot becomes None — subsequent
1427        // resolve_persistent calls return None without re-running the factory
1428        let result2 = resolver.resolve_persistent().await;
1429        assert!(result2.is_none());
1430        assert!(resolver.persistent_snapshot_if_ready().is_none());
1431    }
1432
1433    #[test]
1434    fn test_persistent_cache_factory_replaces_previous() {
1435        let resolver = Resolver::new();
1436        // Set a ready backend
1437        let backend: CacheBackendRef = SharedRef::new(MockBackend);
1438        resolver.set_persistent_cache(backend);
1439        assert!(resolver.persistent_snapshot_if_ready().is_some());
1440
1441        // Now set a factory — it replaces the ready backend
1442        resolver.set_persistent_cache_factory(|| async {
1443            Some(SharedRef::new(MockBackend) as CacheBackendRef)
1444        });
1445        // Sync snapshot returns None because the slot is now Deferred
1446        assert!(resolver.persistent_snapshot_if_ready().is_none());
1447    }
1448}