chartml_core/resolver/mod.rs
1//! Resolver: caching + dedup + dispatch layer between `ChartML::fetch` and
2//! the registered `DataSourceProvider` implementations (chartml 5.0 phase 3).
3//!
4//! The resolver owns:
5//! - **Tier-1 cache** — a `MemoryBackend` always present.
6//! - **Tier-2 cache** — a [`PersistentSlot`] that is either `None` (no
7//! persistent cache), `Ready` (backend constructed), or `Deferred`
8//! (lazy factory awaiting first `fetch()` to initialize — used for
9//! `IndexedDbBackend` whose async constructor can't run at setup time).
10//! - **In-flight tracker** — a `HashMap<u64, Shared<BoxFuture<...>>>` so two
11//! concurrent fetches for the same key share one provider invocation.
12//! - **Provider registry** — `HashMap<String, Arc<dyn DataSourceProvider>>`,
13//! keyed by dispatch slug (`"inline"`, `"http"`, `"datasource"`, …).
14//!
15//! Phase 3 leaves `ResolverHooks` integration as a no-op stub; phase 3c will
16//! add hook dispatch without further changes to this file's structure.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use arrow::array::RecordBatch;
22use arrow::datatypes::SchemaRef;
23// `web_time::SystemTime` works on `wasm32-unknown-unknown` (where the std
24// version panics). On native it's a transparent alias for `std::time`.
25use std::time::Duration;
26use web_time::SystemTime;
27
28use async_trait::async_trait;
29use futures::future::{FutureExt, Shared};
30use thiserror::Error;
31use xxhash_rust::xxh3::Xxh3;
32
33// `?Send` on WASM means the futures we shove into `Shared` cannot promise
34// `Send`. Use `LocalBoxFuture` on WASM and `BoxFuture` (which IS `Send`) on
35// native so the resolver can stay multi-threaded on tokio while compiling
36// cleanly under `wasm32-unknown-unknown`.
37#[cfg(not(target_arch = "wasm32"))]
38type ResolverFuture<T> = futures::future::BoxFuture<'static, T>;
39#[cfg(target_arch = "wasm32")]
40type ResolverFuture<T> = futures::future::LocalBoxFuture<'static, T>;
41
42// Cfg-gated shared-ownership + interior-mutability primitives. On native we
43// use `Arc<Mutex<T>>` so the resolver stays multi-threaded for tokio. On
44// WASM the resolver's inflight `Shared<LocalBoxFuture<...>>` map is
45// inherently `?Send`, so wrapping it in `Arc<Mutex<...>>` would trip
46// `clippy::arc_with_non_send_sync`. Single-threaded `Rc<RefCell<...>>` is
47// the correct primitive for the wasm32-unknown-unknown target.
48/// Cfg-gated shared-ownership pointer. `Arc<T>` on native (so handles can
49/// move across `tokio::spawn` task boundaries), `Rc<T>` on WASM (where
50/// `wasm32-unknown-unknown` is single-threaded and the resolver's
51/// internals are `?Send`, so an `Arc` would be both incorrect and
52/// rejected by `clippy::arc_with_non_send_sync`).
53///
54/// Public so consumers wiring [`CacheBackend`] / [`DataSourceProvider`]
55/// trait objects through the resolver can use the same alias as the
56/// resolver's internal storage — their wasm32-unknown-unknown builds get
57/// the right primitive without a manual `cfg_attr` dance at every site.
58#[cfg(not(target_arch = "wasm32"))]
59pub type SharedRef<T> = Arc<T>;
60#[cfg(target_arch = "wasm32")]
61pub type SharedRef<T> = std::rc::Rc<T>;
62
63#[cfg(not(target_arch = "wasm32"))]
64pub(crate) type Lock<T> = std::sync::Mutex<T>;
65#[cfg(target_arch = "wasm32")]
66pub(crate) type Lock<T> = std::cell::RefCell<T>;
67
68/// Cfg-gated extension trait so `self.field.write_lock("…").something()` and
69/// `self.field.read_lock("…").something()` work uniformly across the native
70/// `Mutex` and the WASM `RefCell`. Eliminates per-call-site `cfg` blocks
71/// without losing the panic message that distinguishes which lock failed.
72trait LockExt<T> {
73 type Read<'a>: std::ops::Deref<Target = T>
74 where
75 Self: 'a;
76 type Write<'a>: std::ops::DerefMut<Target = T>
77 where
78 Self: 'a;
79 fn read_lock(&self, label: &'static str) -> Self::Read<'_>;
80 fn write_lock(&self, label: &'static str) -> Self::Write<'_>;
81}
82
83#[cfg(not(target_arch = "wasm32"))]
84impl<T> LockExt<T> for std::sync::Mutex<T> {
85 type Read<'a>
86 = std::sync::MutexGuard<'a, T>
87 where
88 Self: 'a;
89 type Write<'a>
90 = std::sync::MutexGuard<'a, T>
91 where
92 Self: 'a;
93 fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
94 self.lock()
95 .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
96 }
97 fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
98 self.lock()
99 .unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
100 }
101}
102
103#[cfg(target_arch = "wasm32")]
104impl<T> LockExt<T> for std::cell::RefCell<T> {
105 type Read<'a>
106 = std::cell::Ref<'a, T>
107 where
108 Self: 'a;
109 type Write<'a>
110 = std::cell::RefMut<'a, T>
111 where
112 Self: 'a;
113 fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
114 self.try_borrow()
115 .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed mutably"))
116 }
117 fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
118 self.try_borrow_mut()
119 .unwrap_or_else(|_| panic!("resolver {label} cell already borrowed"))
120 }
121}
122
123use crate::data::DataTable;
124use crate::error::ChartError;
125use crate::spec::source::CacheConfig as SpecCacheConfig;
126use crate::spec::InlineData;
127
128pub mod builtin;
129pub mod cache;
130pub mod cancel;
131pub mod hooks;
132
133// Phase 3b: persistent cache backends. The container module is unconditional
134// because it hosts the pure-Rust `codec` submodule (shared blob framing) that
135// needs to compile + test on every target. Backend implementations themselves
136// (`indexeddb`, future `sqlite`/`fs`/…) carry their own `#[cfg(...)]` gates
137// inside `backends/mod.rs`, so non-browser builds and browser builds without
138// the `wasm-indexeddb` feature don't compile any of the IndexedDB-specific
139// code (which depends on the `idb` crate, brought in only by that feature).
140pub mod backends;
141
142pub use builtin::{HttpProvider, InlineProvider};
143pub use cache::{CacheBackend, CacheError, CachedEntry, MemoryBackend};
144pub use cancel::CancellationToken;
145pub use hooks::{
146 CacheHitEvent, CacheMissEvent, CacheTier, ErrorEvent, HooksRef, MissReason, NullHooks, Phase,
147 ProgressEvent, ResolverHooks,
148};
149
150/// Default TTL applied when a spec doesn't declare one. Five minutes matches
151/// the JS middleware's `CACHE_TTL_DEFAULT_MS`.
152pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
153
154/// Sentinel byte mixed into the hash for `None` fields. Distinct from any
155/// UTF-8 byte (0xFE is invalid as a UTF-8 start byte) so `None` can never
156/// collide with a real string value (e.g., a literal `"None"` datasource).
157const HASH_NONE_SENTINEL: u8 = 0xFE;
158const HASH_FIELD_SEP: u8 = 0xFF;
159
160/// Provider dispatch + fetch trait. Host apps implement this for their
161/// custom data sources (BigQuery, Snowflake, internal REST APIs, …) and
162/// register them via `ChartML::register_provider(kind, provider)`.
163///
164/// `?Send` on WASM matches `TransformMiddleware` so single-threaded
165/// browser environments can hold non-`Send` state inside provider impls.
166#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
167#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
168pub trait DataSourceProvider: Send + Sync {
169 /// Fetch one source. The resolver handles caching + dedup + parallelism;
170 /// providers only see this single call per actual upstream invocation.
171 async fn fetch(&self, request: FetchRequest) -> Result<FetchResult, FetchError>;
172
173 /// Batch-oriented fetch. Returns multiple RecordBatches with a shared
174 /// schema, allowing the transform layer to register them into MemTable
175 /// without concatenation.
176 ///
177 /// Default wraps `fetch()` into a single-batch result. Override to
178 /// stream batches from an HTTP endpoint (e.g. Arrow IPC stream).
179 async fn fetch_batches(&self, request: FetchRequest) -> Result<FetchBatchResult, FetchError> {
180 let result = self.fetch(request).await?;
181 let schema = result.data.schema();
182 let batch = result.data.into_record_batch();
183 Ok(FetchBatchResult {
184 schema,
185 batches: vec![batch],
186 metadata: result.metadata,
187 })
188 }
189
190 /// Optional graceful shutdown hook. Called by `ChartML::shutdown()` on
191 /// SSR request end / tab close. Default no-op so providers that hold no
192 /// pooled resources don't have to implement it.
193 async fn shutdown(&self) {}
194}
195
196/// Per-source request context. Carries the resolved spec (params already
197/// substituted) plus cache + header + namespace + cancellation hints.
198#[derive(Debug, Clone)]
199pub struct FetchRequest {
200 /// User-chosen name for this source within the chart spec. `None` for
201 /// unnamed (flat) `data:` forms.
202 pub source_name: Option<String>,
203 /// Fully resolved flat-form source spec — `datasource`, `query`, `url`,
204 /// `rows`, etc. with `$param.name` references already substituted.
205 pub spec: InlineData,
206 /// Parsed cache config from `spec.cache.ttl` (and `auto_refresh`).
207 pub cache: Option<CacheConfig>,
208 /// Request-level HTTP headers (merged with `HttpProvider` defaults).
209 /// Ignored by non-HTTP providers unless they explicitly read this field.
210 pub headers: HashMap<String, String>,
211 /// Tenant / workspace namespace folded into the cache key. `None` for
212 /// single-tenant deployments.
213 pub namespace: Option<String>,
214 /// Optional cancellation token. Phase 3 always passes `None` from the
215 /// resolver; providers that opt into honoring it stay forward-compatible
216 /// with future tab-close / cancel-on-route-change wiring.
217 pub cancel_token: Option<CancellationToken>,
218}
219
220/// Provider response. `Clone` is required because `futures::future::Shared`
221/// (used for in-flight dedup) takes a `Future<Output: Clone>`. `DataTable`
222/// is `Arc`-backed so cloning is cheap.
223#[derive(Debug, Clone)]
224pub struct FetchResult {
225 pub data: DataTable,
226 /// Free-form per-provider metadata — `bytes_billed`, `rows_returned`,
227 /// `server_refreshed_at`, `upstream_cache_hit`, `warnings`, etc.
228 pub metadata: HashMap<String, serde_json::Value>,
229}
230
231/// Batch-oriented provider response. Carries multiple RecordBatches with a
232/// shared schema, allowing the transform layer to register them into
233/// MemTable without concatenation.
234#[derive(Debug, Clone)]
235pub struct FetchBatchResult {
236 pub schema: SchemaRef,
237 pub batches: Vec<RecordBatch>,
238 pub metadata: HashMap<String, serde_json::Value>,
239}
240
241/// Provider failures. `Clone` matches `FetchResult` so `Shared` can clone
242/// the error when multiple in-flight callers receive the same outcome.
243#[derive(Debug, Error, Clone)]
244pub enum FetchError {
245 #[error("datasource '{slug}' not found")]
246 SlugNotFound { slug: String },
247
248 #[error("query failed: {0}")]
249 QueryFailed(String),
250
251 #[error("decode failed: {0}")]
252 DecodeFailed(String),
253
254 #[error("cancelled")]
255 Cancelled,
256
257 #[error("no provider registered for kind '{kind}'")]
258 ProviderNotFound { kind: String },
259
260 #[error("cache backend error: {0}")]
261 Cache(String),
262
263 #[error("{0}")]
264 Other(String),
265}
266
267impl From<FetchError> for ChartError {
268 fn from(err: FetchError) -> Self {
269 match err {
270 FetchError::SlugNotFound { slug } => {
271 ChartError::DataError(format!("datasource '{slug}' not found"))
272 }
273 FetchError::QueryFailed(msg) => ChartError::DataError(format!("query failed: {msg}")),
274 FetchError::DecodeFailed(msg) => ChartError::DataError(format!("decode failed: {msg}")),
275 FetchError::Cancelled => ChartError::DataError("fetch cancelled".to_string()),
276 FetchError::ProviderNotFound { kind } => ChartError::PluginError(format!(
277 "no provider registered for kind '{kind}'"
278 )),
279 FetchError::Cache(msg) => ChartError::DataError(format!("cache error: {msg}")),
280 FetchError::Other(msg) => ChartError::DataError(msg),
281 }
282 }
283}
284
285impl From<CacheError> for FetchError {
286 fn from(err: CacheError) -> Self {
287 FetchError::Cache(err.to_string())
288 }
289}
290
291/// Parsed cache config. Built from `spec::source::CacheConfig` (which carries
292/// the raw YAML strings) so the resolver doesn't have to re-parse `humantime`
293/// formats on every fetch.
294#[derive(Debug, Clone, Default)]
295pub struct CacheConfig {
296 /// TTL parsed via `humantime` (`"30s"`, `"5m"`, `"6h"`, `"1d"`, `"7d"`).
297 /// `None` → default TTL applies.
298 pub ttl: Option<Duration>,
299 /// Component-layer hint — the resolver doesn't auto-refresh, but the
300 /// flag is preserved end-to-end so consumers (Leptos, React) can read it.
301 pub auto_refresh: bool,
302}
303
304impl CacheConfig {
305 /// Parse a `spec::source::CacheConfig` (the raw YAML form) into the
306 /// resolver-friendly shape. Returns `Ok(None)` when the input is `None`
307 /// so the upstream `Option` plumbing stays clean. Returns `Err` when the
308 /// declared TTL string is present but unparseable — silent fallback to
309 /// `DEFAULT_TTL` would let an operator's typo (`"5 minutes"` instead of
310 /// `"5m"`) ship to production unnoticed, so the parser's complaint is
311 /// surfaced to the caller verbatim.
312 pub fn from_spec(spec: Option<&SpecCacheConfig>) -> Result<Option<Self>, ChartError> {
313 let Some(spec) = spec else {
314 return Ok(None);
315 };
316 let ttl = match spec.ttl.as_deref() {
317 Some(s) => Some(humantime::parse_duration(s).map_err(|e| {
318 ChartError::InvalidSpec(format!(
319 "invalid cache.ttl value {s:?}: {e} (expected humantime format like \"30s\", \"5m\", \"6h\", \"1d\")"
320 ))
321 })?),
322 None => None,
323 };
324 Ok(Some(Self {
325 ttl,
326 auto_refresh: spec.auto_refresh.unwrap_or(false),
327 }))
328 }
329
330 /// Convenience accessor matching the design doc's `ttl_duration()` name.
331 pub fn ttl_duration(&self) -> Option<Duration> {
332 self.ttl
333 }
334}
335
336/// Outcome of a `Resolver::fetch` call: the provider's result PLUS whether
337/// the value came from a cache tier or a fresh provider invocation.
338///
339/// `ChartML::fetch` uses this to populate `FetchMetadata.cache_hits` /
340/// `cache_misses` per source name without needing the resolver to know about
341/// source names itself (which it doesn't — it works in keys, not names).
342#[derive(Debug, Clone)]
343pub struct ResolveOutcome {
344 pub result: FetchResult,
345 pub cache_hit: bool,
346 /// Original provider batches. `Some` on cache miss (when the provider
347 /// returned via `fetch_batches`), `None` on cache hit. Allows the
348 /// transform layer to register batches into MemTable without concat.
349 pub batches: Option<FetchBatchResult>,
350}
351
352/// Tag prefix for source-slug-based bulk invalidation. Public so consumers
353/// computing tags out-of-band (e.g., a custom CacheBackend that wants to
354/// pre-populate entries) match the resolver's wire format exactly.
355pub const TAG_SLUG_PREFIX: &str = "slug:";
356/// Tag prefix for namespace-based bulk invalidation.
357pub const TAG_NAMESPACE_PREFIX: &str = "namespace:";
358
359/// Public alias for the shared-ownership wrapper around `Resolver`. `Arc` on
360/// native (so consumers using `tokio::spawn` can move resolver handles across
361/// task boundaries) and `Rc` on WASM (where the resolver's inflight map is
362/// inherently `?Send`, so an `Arc` would be both incorrect and rejected by
363/// `clippy::arc_with_non_send_sync`). Returned by `ChartML::resolver()` and
364/// accepted by every host that wants a long-lived handle for the bulk
365/// `invalidate*` API.
366pub type ResolverRef = SharedRef<Resolver>;
367
368/// Public alias for the shared-ownership wrapper around a [`CacheBackend`]
369/// trait object. `Arc<dyn CacheBackend>` on native, `Rc<dyn CacheBackend>`
370/// on WASM — mirrors the [`SharedRef`] story so wasm32 consumers can hand
371/// off `!Send` backends (e.g. [`backends::indexeddb::IndexedDbBackend`])
372/// without tripping `clippy::arc_with_non_send_sync`.
373pub type CacheBackendRef = SharedRef<dyn CacheBackend>;
374
375/// State of the optional tier-2 (persistent) cache slot. Supports both
376/// eagerly-constructed backends (`Ready`) and lazy factories (`Deferred`)
377/// whose async construction is deferred to the first `fetch()`.
378enum PersistentSlot {
379 /// No persistent cache configured.
380 None,
381 /// Backend already constructed and ready to use.
382 Ready(CacheBackendRef),
383 /// Lazy factory — will be invoked on the first `fetch()` call. The inner
384 /// `Option` is `take()`-d once so only one caller runs the factory;
385 /// concurrent callers that find `None` inside skip tier-2 for that fetch.
386 Deferred(Option<BoxPersistentFactory>),
387}
388
389/// Boxed lazy factory for the persistent cache backend.
390/// On native the closure and its returned future must be `Send` so the
391/// resolver stays compatible with `tokio::spawn`. On WASM `?Send` is fine
392/// because the runtime is single-threaded.
393#[cfg(not(target_arch = "wasm32"))]
394type BoxPersistentFactory = Box<dyn FnOnce() -> ResolverFuture<Option<CacheBackendRef>> + Send>;
395#[cfg(target_arch = "wasm32")]
396type BoxPersistentFactory = Box<dyn FnOnce() -> ResolverFuture<Option<CacheBackendRef>>>;
397
398/// Provider dispatch + cache + dedup orchestration.
399///
400/// One `Resolver` per `ChartML` instance. The resolver is held inside
401/// `ChartML` as `SharedRef<Resolver>` (`Arc` on native, `Rc` on WASM) so
402/// consumers can grab a handle for the `invalidate*` API while the chart
403/// instance keeps using it.
404pub struct Resolver {
405 /// Default in-process cache (always present, never replaced — kept for
406 /// the rare case a consumer wants to introspect the in-memory tier
407 /// directly even after swapping `primary`). Held as `SharedRef` so the
408 /// `MemoryBackend` clone the resolver hands itself the very first time
409 /// is the same single-threaded type the wasm32 backends need (clippy
410 /// rejects `Arc<MemoryBackend>` when other tier swaps land on
411 /// non-`Send` types in the same `Lock`).
412 memory: SharedRef<MemoryBackend>,
413 /// Tier-1 cache. Defaults to the always-present in-memory backend; can
414 /// be replaced via `ChartML::set_cache(...)` (e.g., a host's custom
415 /// process-wide LRU). Behind a `Lock<CacheBackendRef>` so swaps are
416 /// atomic and don't require `&mut self` (the resolver is held inside a
417 /// `SharedRef`). The `CacheBackendRef` alias is `Arc` on native and
418 /// `Rc` on WASM — wasm32 backends like `IndexedDbBackend` are `!Send`
419 /// and would trip `clippy::arc_with_non_send_sync` if forced into
420 /// `std::sync::Arc`.
421 primary: Lock<CacheBackendRef>,
422 /// Tier-2 (persistent) cache. Starts as `PersistentSlot::None`; populated
423 /// eagerly via `set_persistent_cache` (`Ready`) or lazily via
424 /// `set_persistent_cache_factory` (`Deferred` — resolved on first fetch).
425 persistent: Lock<PersistentSlot>,
426 inflight: SharedRef<Lock<HashMap<u64, SharedFetch>>>,
427 providers: Lock<HashMap<String, Arc<dyn DataSourceProvider>>>,
428 /// Optional hook impl. Wrapped in `Lock<Option<...>>` so `set_hooks`
429 /// works without `&mut self` (the resolver lives behind a `SharedRef`).
430 /// Snapshotted into a local clone before each instrumentation site so
431 /// the hook lock is never held across an `await`.
432 hooks: Lock<Option<HooksRef>>,
433 /// Tracker for which keys have been explicitly invalidated since their
434 /// last fetch, so the next cache-miss for that key can be reported as
435 /// [`hooks::MissReason::Invalidated`] instead of `NotFound`.
436 ///
437 /// **Per-key invalidation** (`invalidate(key)`) inserts into
438 /// [`InvalidationTracker::keys`] — the next miss for that exact key
439 /// reports `Invalidated` and removes the entry (so subsequent misses on
440 /// the same key without re-invalidating fall back to the regular
441 /// `NotFound` / `Expired` reasoning).
442 ///
443 /// **Bulk invalidation** (`invalidate_all` / `invalidate_by_slug` /
444 /// `invalidate_by_namespace`) sets [`InvalidationTracker::bulk_pending`]
445 /// to `true`. Enumerating every just-evicted key is impractical — the
446 /// `CacheBackend` trait doesn't expose iteration (and adding it would
447 /// be expensive on `IndexedDbBackend`, which would need a cursor sweep
448 /// per call) — so the resolver instead reports the *first* post-bulk
449 /// miss as `Invalidated` and clears the flag. Subsequent misses fall
450 /// back to `NotFound` / `Expired` until another invalidation happens.
451 /// This is a deliberate trade-off: documented below, mirrored in the
452 /// integration test `test_invalidate_emits_invalidated_miss_reason`.
453 recently_invalidated: SharedRef<Lock<InvalidationTracker>>,
454}
455
456/// Per-resolver tracker for invalidation events. Held inside a
457/// `Lock<...>` on the resolver so it survives across `&self` borrows
458/// (the resolver lives behind a `SharedRef` and uses interior mutability
459/// for every other piece of state too).
460#[derive(Debug, Default)]
461struct InvalidationTracker {
462 /// Specific keys invalidated via `Resolver::invalidate(key)`. Drained
463 /// on first observation by `consume_invalidation_reason`.
464 keys: HashSet<u64>,
465 /// Whether ANY bulk invalidate (`invalidate_all` /
466 /// `invalidate_by_slug` / `invalidate_by_namespace`) has fired since
467 /// the last bulk-pending consumption. Cleared the first time a miss
468 /// observes it — at most one post-bulk miss is reported as
469 /// `Invalidated`.
470 bulk_pending: bool,
471}
472
473/// Shared in-flight future type. Boxed for dyn-trait erasure; `Shared` lets
474/// multiple awaiters poll the same future without cloning the work. The
475/// inner `ResolverFuture` is `BoxFuture` (Send) on native and
476/// `LocalBoxFuture` (?Send) on WASM so we don't conflict with the
477/// `?Send` async traits the providers and cache backends use.
478///
479/// Provider invocation result carried inside the in-flight dedup future.
480/// Holds the DataTable (for caching) plus optional original batches.
481#[derive(Debug, Clone)]
482struct FetchWithBatches {
483 result: FetchResult,
484 batches: Option<FetchBatchResult>,
485}
486
487type SharedFetch = Shared<ResolverFuture<Result<FetchWithBatches, FetchError>>>;
488
489impl Default for Resolver {
490 fn default() -> Self {
491 Self::new()
492 }
493}
494
495impl std::fmt::Debug for Resolver {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 let persistent_state = {
498 let guard = self.persistent.read_lock("persistent cache");
499 match &*guard {
500 PersistentSlot::None => "none",
501 PersistentSlot::Ready(_) => "ready",
502 PersistentSlot::Deferred(_) => "deferred",
503 }
504 };
505 let provider_keys: Vec<String> = self
506 .providers
507 .read_lock("providers")
508 .keys()
509 .cloned()
510 .collect();
511 f.debug_struct("Resolver")
512 .field("memory", &self.memory)
513 .field("persistent", &persistent_state)
514 .field("providers", &provider_keys)
515 .finish_non_exhaustive()
516 }
517}
518
519impl Resolver {
520 /// New resolver with the default `MemoryBackend` as tier-1, no tier-2,
521 /// and no providers registered. `ChartML::new()` registers the built-in
522 /// `inline` + `http` providers immediately after construction.
523 pub fn new() -> Self {
524 let memory = SharedRef::new(MemoryBackend::new());
525 let primary: CacheBackendRef = memory.clone();
526 Self {
527 memory,
528 primary: Lock::new(primary),
529 persistent: Lock::new(PersistentSlot::None),
530 inflight: SharedRef::new(Lock::new(HashMap::new())),
531 providers: Lock::new(HashMap::new()),
532 hooks: Lock::new(None),
533 recently_invalidated: SharedRef::new(Lock::new(InvalidationTracker::default())),
534 }
535 }
536
537 /// Replace the tier-1 cache backend. Used by `ChartML::set_cache`.
538 /// The fresh backend starts empty — entries in the old backend are not
539 /// migrated (caller's responsibility if they want to).
540 pub fn set_primary_cache(&self, backend: CacheBackendRef) {
541 let mut guard = self.primary.write_lock("primary cache");
542 *guard = backend;
543 }
544
545 /// Set the tier-2 (persistent) cache to an already-constructed backend.
546 /// Immediately marks the slot as `Ready` — no lazy resolution needed.
547 pub fn set_persistent_cache(&self, backend: CacheBackendRef) {
548 let mut guard = self.persistent.write_lock("persistent cache");
549 *guard = PersistentSlot::Ready(backend);
550 }
551
552 /// Set the tier-2 (persistent) cache to a lazy factory. The factory is
553 /// invoked on the first `fetch()` call; until then the slot stays
554 /// `Deferred` and sync invalidation/shutdown paths skip tier-2 (the
555 /// backend doesn't exist yet, so there is nothing to invalidate).
556 ///
557 /// This exists because on WASM `IndexedDbBackend::new()` is async — it
558 /// can't be ready at `ChartML` construction time. The factory defers the
559 /// database open until the first fetch needs it.
560 #[cfg(not(target_arch = "wasm32"))]
561 pub fn set_persistent_cache_factory<F, Fut>(&self, factory: F)
562 where
563 F: FnOnce() -> Fut + Send + 'static,
564 Fut: std::future::Future<Output = Option<CacheBackendRef>> + Send + 'static,
565 {
566 let mut guard = self.persistent.write_lock("persistent cache");
567 *guard = PersistentSlot::Deferred(Some(Box::new(move || Box::pin(factory()))));
568 }
569
570 /// Set the tier-2 (persistent) cache to a lazy factory. The factory is
571 /// invoked on the first `fetch()` call; until then the slot stays
572 /// `Deferred` and sync invalidation/shutdown paths skip tier-2 (the
573 /// backend doesn't exist yet, so there is nothing to invalidate).
574 ///
575 /// WASM variant — no `Send` bound on the closure or its returned future,
576 /// matching the single-threaded `wasm32-unknown-unknown` runtime.
577 #[cfg(target_arch = "wasm32")]
578 pub fn set_persistent_cache_factory<F, Fut>(&self, factory: F)
579 where
580 F: FnOnce() -> Fut + 'static,
581 Fut: std::future::Future<Output = Option<CacheBackendRef>> + 'static,
582 {
583 let mut guard = self.persistent.write_lock("persistent cache");
584 *guard = PersistentSlot::Deferred(Some(Box::new(move || Box::pin(factory()))));
585 }
586
587 /// Enable IndexedDB-backed persistent caching. The database is opened
588 /// lazily on the first fetch. `db_name` is the IDB database name,
589 /// `namespace` isolates entries (e.g. per-workspace).
590 ///
591 /// This is a convenience wrapper around `set_persistent_cache_factory`
592 /// using the built-in `IndexedDbBackend`.
593 #[cfg(all(target_arch = "wasm32", feature = "wasm-indexeddb"))]
594 pub fn enable_indexeddb_cache(&self, db_name: &str, namespace: &str) {
595 let db = db_name.to_string();
596 let ns = namespace.to_string();
597 self.set_persistent_cache_factory(move || async move {
598 use crate::resolver::backends::indexeddb::IndexedDbBackend;
599 match IndexedDbBackend::new(&db, &ns).await {
600 Ok(backend) => Some(SharedRef::new(backend) as CacheBackendRef),
601 Err(e) => {
602 tracing::warn!(error = %e, "IndexedDB cache unavailable");
603 None
604 }
605 }
606 });
607 }
608
609 /// Register a [`ResolverHooks`] impl. Replaces any previously registered
610 /// hooks; passes the new impl in as a `HooksRef` (`Arc` on native, `Rc`
611 /// on WASM). After this call every `Resolver::fetch` invocation emits
612 /// progress / cache / error events through the new impl.
613 pub fn set_hooks(&self, hooks: HooksRef) {
614 let mut guard = self.hooks.write_lock("hooks");
615 *guard = Some(hooks);
616 }
617
618 /// Clear any previously registered hooks. Subsequent `fetch` calls
619 /// behave as if `set_hooks` had never been called (no-op emission).
620 pub fn clear_hooks(&self) {
621 let mut guard = self.hooks.write_lock("hooks");
622 *guard = None;
623 }
624
625 /// Snapshot the current hooks `HooksRef` (or `None`) so the resolver
626 /// can release the lock before entering any cache walk or `.await`.
627 /// Must be called once at the top of `fetch`; downstream sites use the
628 /// snapshot rather than re-acquiring the lock to keep the hook lock
629 /// off the hot path. Also called by `ChartML::transform` (one crate up)
630 /// at the top of the transform stage for the same reason.
631 pub(crate) fn hooks_snapshot(&self) -> Option<HooksRef> {
632 self.hooks.read_lock("hooks").clone()
633 }
634
635 /// Snapshot the tier-1 cache handle so we can drop the lock before the
636 /// async cache call. Always returns a backend (the field starts as
637 /// `MemoryBackend` and `set_primary_cache` only replaces, never clears).
638 fn primary_snapshot(&self) -> CacheBackendRef {
639 self.primary.read_lock("primary cache").clone()
640 }
641
642 /// Resolve the tier-2 cache, lazily initializing the factory if needed.
643 /// Returns the backend handle (or `None`). Used by `fetch()` — the only
644 /// path that should trigger lazy init.
645 ///
646 /// Lock discipline: the sync lock is NEVER held across an `.await`.
647 /// The factory closure is `take()`-d under a write lock, the lock is
648 /// dropped, then the factory's returned future is awaited. The result
649 /// is written back under a fresh write lock.
650 async fn resolve_persistent(&self) -> Option<CacheBackendRef> {
651 // Fast path — no write lock needed
652 {
653 let guard = self.persistent.read_lock("persistent cache");
654 match &*guard {
655 PersistentSlot::None => return None,
656 PersistentSlot::Ready(b) => return Some(b.clone()),
657 PersistentSlot::Deferred(_) => {} // fall through to slow path
658 }
659 }
660
661 // Slow path — take the factory under write lock, release, then await
662 let factory = {
663 let mut guard = self.persistent.write_lock("persistent cache");
664 match &mut *guard {
665 PersistentSlot::Deferred(opt) => opt.take(),
666 PersistentSlot::Ready(b) => return Some(b.clone()),
667 PersistentSlot::None => return None,
668 }
669 };
670
671 let Some(factory) = factory else {
672 // Another caller already took the factory; skip tier-2 for this fetch
673 return None;
674 };
675
676 let result = factory().await;
677 let mut guard = self.persistent.write_lock("persistent cache");
678 match &result {
679 Some(backend) => *guard = PersistentSlot::Ready(backend.clone()),
680 None => *guard = PersistentSlot::None,
681 }
682 result
683 }
684
685 /// Sync snapshot of the tier-2 cache — returns `Some` only if the slot
686 /// is already `Ready`. Does NOT trigger lazy initialization. Used by
687 /// invalidation and shutdown paths where we must not start an async
688 /// factory (the backend doesn't exist yet, so there is nothing to
689 /// invalidate or shut down).
690 fn persistent_snapshot_if_ready(&self) -> Option<CacheBackendRef> {
691 let guard = self.persistent.read_lock("persistent cache");
692 match &*guard {
693 PersistentSlot::Ready(b) => Some(b.clone()),
694 _ => None,
695 }
696 }
697
698 /// Register a provider under a dispatch key (`"inline"`, `"http"`,
699 /// `"datasource"`, or any custom slug). Re-registration replaces the
700 /// previously registered provider for that key.
701 pub fn register_provider(&self, kind: &str, provider: Arc<dyn DataSourceProvider>) {
702 let mut providers = self.providers.write_lock("providers");
703 providers.insert(kind.to_string(), provider);
704 }
705
706 /// Snapshot the registered provider kinds. Useful for tests and
707 /// host-app diagnostics.
708 pub fn provider_kinds(&self) -> Vec<String> {
709 self.providers
710 .read_lock("providers")
711 .keys()
712 .cloned()
713 .collect()
714 }
715
716 /// Compute the cache key the resolver would use for a given spec.
717 ///
718 /// Public so phase 4 (Leptos refresh button) and phase 6 (Kyomi
719 /// invalidate-on-change) can compute the exact key the resolver caches
720 /// under without re-implementing the hash.
721 ///
722 /// Hashes `(namespace, datasource, query, url, provider, rows_hash)` in
723 /// that order. `None` fields contribute a sentinel byte (`0xFE`, an
724 /// invalid UTF-8 start byte) so they cannot collide with a real string
725 /// value of `"None"`. Field separator is `0xFF` (also invalid UTF-8) so
726 /// adjacent fields can't bleed into each other ("a|b" vs "ab|").
727 pub fn key_for(spec: &InlineData, namespace: Option<&str>) -> u64 {
728 let mut hasher = Xxh3::new();
729
730 let fields: [Option<&str>; 5] = [
731 namespace,
732 spec.datasource.as_deref(),
733 spec.query.as_deref(),
734 spec.url.as_deref(),
735 spec.provider.as_deref(),
736 ];
737 for field in fields {
738 match field {
739 Some(s) => hasher.update(s.as_bytes()),
740 None => hasher.update(&[HASH_NONE_SENTINEL]),
741 }
742 hasher.update(&[HASH_FIELD_SEP]);
743 }
744
745 // Inline `rows` is hashed via its canonical JSON serialization so
746 // two specs with the same row data hash identically regardless of
747 // hashmap iteration order. `None` rows contribute the sentinel.
748 match spec.rows.as_ref() {
749 Some(rows) => match serde_json::to_vec(rows) {
750 Ok(bytes) => hasher.update(&bytes),
751 Err(_) => hasher.update(&[HASH_NONE_SENTINEL]),
752 },
753 None => hasher.update(&[HASH_NONE_SENTINEL]),
754 }
755
756 hasher.digest()
757 }
758
759 /// The core orchestration: tier-1 → tier-2 → in-flight dedup → provider.
760 ///
761 /// Returns `ResolveOutcome` (result + cache-hit flag) so the caller can
762 /// classify the source under `cache_hits` vs `cache_misses` in
763 /// `FetchMetadata`. Tier-1 hits hydrate from memory only; tier-2 hits
764 /// also re-populate tier-1 so subsequent reads in the same session
765 /// short-circuit.
766 pub async fn fetch(
767 &self,
768 key: u64,
769 request: FetchRequest,
770 ) -> Result<ResolveOutcome, FetchError> {
771 let primary = self.primary_snapshot();
772 let persistent = self.resolve_persistent().await;
773 // Snapshot the hooks `HooksRef` ONCE up front so the hooks lock is
774 // never held across an `.await` and downstream sites can share the
775 // same clone without re-acquiring.
776 let hooks = self.hooks_snapshot();
777 let source_name = request.source_name.clone();
778
779 // ── Tier 1: in-process cache ──
780 let mut tier1_expired = false;
781 if let Some(entry) = primary.get(key).await {
782 if !entry.is_expired() {
783 let age = entry.age();
784 emit_cache_hit(&hooks, key, &source_name, hooks::CacheTier::Memory, age);
785 let mut metadata = entry.metadata;
786 inject_fetched_at(&mut metadata, &entry.fetched_at);
787 return Ok(ResolveOutcome {
788 result: FetchResult {
789 data: entry.data,
790 metadata,
791 },
792 cache_hit: true,
793 batches: None,
794 });
795 }
796 // Expired — let it fall through; we'll overwrite on success.
797 tier1_expired = true;
798 }
799
800 // ── Tier 2: persistent cache (phase 3b populates this) ──
801 let mut tier2_expired = false;
802 if let Some(p) = &persistent {
803 if let Some(entry) = p.get(key).await {
804 if !entry.is_expired() {
805 let age = entry.age();
806 // Hydrate tier-1 so subsequent reads stay in-process.
807 // Memory hydration is silent (no event) — only the
808 // logical tier-2 hit fires.
809 let _ = primary.put(key, entry.clone()).await;
810 emit_cache_hit(
811 &hooks,
812 key,
813 &source_name,
814 hooks::CacheTier::Persistent,
815 age,
816 );
817 let mut metadata = entry.metadata;
818 inject_fetched_at(&mut metadata, &entry.fetched_at);
819 return Ok(ResolveOutcome {
820 result: FetchResult {
821 data: entry.data,
822 metadata,
823 },
824 cache_hit: true,
825 batches: None,
826 });
827 }
828 // Expired — evict from tier-2 too.
829 let _ = p.invalidate(key).await;
830 tier2_expired = true;
831 }
832 }
833
834 // Both tiers missed — emit one cache-miss with the most specific
835 // reason we can prove. Precedence: `Invalidated` (most specific —
836 // an operator explicitly cleared this key or fired a bulk
837 // invalidate) wins over `Expired` (TTL elapsed naturally), which
838 // wins over `NotFound` (key was never cached or was evicted by
839 // some path the resolver doesn't track).
840 let miss_reason = if self.consume_invalidation_reason(key) {
841 hooks::MissReason::Invalidated
842 } else if tier1_expired || tier2_expired {
843 hooks::MissReason::Expired
844 } else {
845 hooks::MissReason::NotFound
846 };
847 emit_cache_miss(&hooks, key, &source_name, miss_reason);
848
849 // Provider call start — fire BEFORE the dedup wait so consumers
850 // see a "fetching" event for every distinct cache miss, not just
851 // the first concurrent one.
852 emit_progress(
853 &hooks,
854 hooks::Phase::Fetch,
855 &source_name,
856 None,
857 None,
858 format!(
859 "Fetching {}",
860 source_name.as_deref().unwrap_or("source"),
861 ),
862 );
863
864 // ── In-flight dedup ──
865 // If another fetch for the same key is already in flight, await its
866 // shared future. Otherwise install a new shared future and run it.
867 let shared = self.intern_inflight(key, request, primary, persistent);
868 let result = shared.await;
869
870 // Cleanup: drop the inflight slot whether the fetch succeeded or
871 // failed so subsequent calls re-dispatch instead of replaying a
872 // stale terminal state. Errors clear too — failed fetches should
873 // retry, not stick.
874 self.inflight.write_lock("inflight").remove(&key);
875
876 match result {
877 Ok(fetched) => {
878 let row_count = fetched.result.data.num_rows();
879 emit_progress(
880 &hooks,
881 hooks::Phase::Fetch,
882 &source_name,
883 Some(row_count as u64),
884 None,
885 format!(
886 "Fetched {} ({} rows)",
887 source_name.as_deref().unwrap_or("source"),
888 row_count,
889 ),
890 );
891 Ok(ResolveOutcome {
892 result: fetched.result,
893 cache_hit: false,
894 batches: fetched.batches,
895 })
896 }
897 Err(err) => {
898 emit_error(&hooks, hooks::Phase::Fetch, &source_name, err.to_string());
899 Err(err)
900 }
901 }
902 }
903
904 /// Get-or-insert the in-flight `Shared` future for a key. Returns a
905 /// clone of the `Shared` so the caller can `await` it; the original
906 /// stays in the map until removed by the caller.
907 fn intern_inflight(
908 &self,
909 key: u64,
910 request: FetchRequest,
911 primary: CacheBackendRef,
912 persistent: Option<CacheBackendRef>,
913 ) -> SharedFetch {
914 let mut inflight = self.inflight.write_lock("inflight");
915 if let Some(existing) = inflight.get(&key) {
916 return existing.clone();
917 }
918
919 // Build the work future: dispatch to the right provider, write to
920 // both cache tiers on success. The future is `'static` because it
921 // owns everything it touches via `Arc` clones.
922 let providers = self.snapshot_providers();
923 let cache_cfg = request.cache.clone();
924 let namespace = request.namespace.clone();
925 let slug = request.spec.datasource.clone();
926
927 let work = async move {
928 let provider = dispatch_provider(&providers, &request.spec)?;
929 let batch_result = provider.fetch_batches(request).await?;
930
931 // Concat batches → DataTable for caching.
932 let data = if batch_result.batches.is_empty() {
933 DataTable::from_record_batch(RecordBatch::new_empty(
934 Arc::clone(&batch_result.schema),
935 ))
936 } else {
937 let concat = arrow::compute::concat_batches(
938 &batch_result.schema,
939 &batch_result.batches,
940 )
941 .map_err(|e| FetchError::DecodeFailed(format!("concat batches: {e}")))?;
942 DataTable::from_record_batch(concat)
943 };
944
945 let fetch_result = FetchResult {
946 data: data.clone(),
947 metadata: batch_result.metadata.clone(),
948 };
949
950 // Write-through: tier-1 always; tier-2 if configured.
951 let entry = CachedEntry {
952 data,
953 fetched_at: SystemTime::now(),
954 ttl: cache_cfg
955 .as_ref()
956 .and_then(|c| c.ttl_duration())
957 .unwrap_or(DEFAULT_TTL),
958 tags: build_tags(slug.as_deref(), namespace.as_deref()),
959 metadata: batch_result.metadata.clone(),
960 };
961 let _ = primary.put(key, entry.clone()).await;
962 if let Some(p) = &persistent {
963 let _ = p.put(key, entry).await;
964 }
965 Ok(FetchWithBatches {
966 result: fetch_result,
967 batches: Some(batch_result),
968 })
969 };
970
971 #[cfg(not(target_arch = "wasm32"))]
972 let boxed: ResolverFuture<Result<FetchWithBatches, FetchError>> = work.boxed();
973 #[cfg(target_arch = "wasm32")]
974 let boxed: ResolverFuture<Result<FetchWithBatches, FetchError>> = work.boxed_local();
975
976 let future = boxed.shared();
977 inflight.insert(key, future.clone());
978 future
979 }
980
981 /// Snapshot the provider map for use inside the inflight future. We
982 /// snapshot rather than holding the lock so the future stays `'static`.
983 fn snapshot_providers(&self) -> HashMap<String, Arc<dyn DataSourceProvider>> {
984 self.providers.read_lock("providers").clone()
985 }
986
987 // ── Bulk invalidation API ──
988
989 /// Drop a single entry from every cache tier. The next miss on `key`
990 /// will be reported via [`hooks::ResolverHooks::on_cache_miss`] with
991 /// [`hooks::MissReason::Invalidated`] rather than `NotFound`.
992 pub async fn invalidate(&self, key: u64) {
993 let primary = self.primary_snapshot();
994 let persistent = self.persistent_snapshot_if_ready();
995 let _ = primary.invalidate(key).await;
996 if let Some(p) = &persistent {
997 let _ = p.invalidate(key).await;
998 }
999 self.recently_invalidated
1000 .write_lock("recently_invalidated")
1001 .keys
1002 .insert(key);
1003 }
1004
1005 /// Drop every cached entry across all tiers. The very next miss on any
1006 /// key will be reported as `Invalidated`; subsequent misses fall back
1007 /// to the regular `NotFound` / `Expired` reasoning. See the field-level
1008 /// docs on [`Resolver::recently_invalidated`] for why per-key tracking
1009 /// isn't done here (would require a `keys()` method on every backend).
1010 pub async fn invalidate_all(&self) {
1011 let primary = self.primary_snapshot();
1012 let persistent = self.persistent_snapshot_if_ready();
1013 let _ = primary.clear().await;
1014 if let Some(p) = &persistent {
1015 let _ = p.clear().await;
1016 }
1017 self.mark_bulk_invalidated();
1018 }
1019
1020 /// Drop every entry whose source spec carried the given `datasource`
1021 /// slug. Useful for "datasource X was edited; invalidate all queries
1022 /// against it" workflows. Subject to the same single-shot
1023 /// `Invalidated` reporting as [`Resolver::invalidate_all`].
1024 pub async fn invalidate_by_slug(&self, slug: &str) {
1025 let tag = format!("{TAG_SLUG_PREFIX}{slug}");
1026 let primary = self.primary_snapshot();
1027 let persistent = self.persistent_snapshot_if_ready();
1028 let _ = primary.invalidate_by_tag(&tag).await;
1029 if let Some(p) = &persistent {
1030 let _ = p.invalidate_by_tag(&tag).await;
1031 }
1032 self.mark_bulk_invalidated();
1033 }
1034
1035 /// Drop every entry tagged with the given namespace. Used for tenant
1036 /// isolation flows ("user logged out; clear their cached data").
1037 /// Subject to the same single-shot `Invalidated` reporting as
1038 /// [`Resolver::invalidate_all`].
1039 pub async fn invalidate_by_namespace(&self, namespace: &str) {
1040 let tag = format!("{TAG_NAMESPACE_PREFIX}{namespace}");
1041 let primary = self.primary_snapshot();
1042 let persistent = self.persistent_snapshot_if_ready();
1043 let _ = primary.invalidate_by_tag(&tag).await;
1044 if let Some(p) = &persistent {
1045 let _ = p.invalidate_by_tag(&tag).await;
1046 }
1047 self.mark_bulk_invalidated();
1048 }
1049
1050 /// Set the bulk-pending flag so the next post-bulk miss surfaces as
1051 /// [`hooks::MissReason::Invalidated`]. Synchronous; the lock is never
1052 /// held across an `.await`.
1053 fn mark_bulk_invalidated(&self) {
1054 self.recently_invalidated
1055 .write_lock("recently_invalidated")
1056 .bulk_pending = true;
1057 }
1058
1059 /// Check whether a miss for `key` should be reported as
1060 /// [`hooks::MissReason::Invalidated`]. Drains the per-key entry on
1061 /// first observation and consumes the bulk-pending flag at most once
1062 /// per bulk invalidation, so this returns `true` for at most one miss
1063 /// per `invalidate*` call. Called from the hot fetch path so the lock
1064 /// is taken briefly and never held across an `.await`.
1065 fn consume_invalidation_reason(&self, key: u64) -> bool {
1066 let mut tracker = self
1067 .recently_invalidated
1068 .write_lock("recently_invalidated");
1069 if tracker.keys.remove(&key) {
1070 return true;
1071 }
1072 if tracker.bulk_pending {
1073 tracker.bulk_pending = false;
1074 return true;
1075 }
1076 false
1077 }
1078
1079 /// Iterate every registered provider AND cache backend, awaiting their
1080 /// `shutdown` hook in turn. Wired up by `ChartML::shutdown()`.
1081 pub async fn shutdown(&self) {
1082 let providers = self.snapshot_providers();
1083 for (_, provider) in providers {
1084 provider.shutdown().await;
1085 }
1086 let primary = self.primary_snapshot();
1087 primary.shutdown().await;
1088 if let Some(p) = self.persistent_snapshot_if_ready() {
1089 p.shutdown().await;
1090 }
1091 }
1092}
1093
1094/// Emit a [`hooks::CacheHitEvent`] through the registered hook impl, if
1095/// any. Fire-and-forget via `spawn_hook`; never blocks the resolver.
1096fn emit_cache_hit(
1097 hooks: &Option<HooksRef>,
1098 key: u64,
1099 source_name: &Option<String>,
1100 tier: hooks::CacheTier,
1101 age: Duration,
1102) {
1103 let Some(h) = hooks.as_ref() else { return };
1104 let h = h.clone();
1105 let event = hooks::CacheHitEvent {
1106 key,
1107 source_name: source_name.clone(),
1108 tier,
1109 age,
1110 };
1111 hooks::spawn_hook(async move {
1112 h.on_cache_hit(event).await;
1113 });
1114}
1115
1116/// Emit a [`hooks::CacheMissEvent`] through the registered hook impl.
1117fn emit_cache_miss(
1118 hooks: &Option<HooksRef>,
1119 key: u64,
1120 source_name: &Option<String>,
1121 reason: hooks::MissReason,
1122) {
1123 let Some(h) = hooks.as_ref() else { return };
1124 let h = h.clone();
1125 let event = hooks::CacheMissEvent {
1126 key,
1127 source_name: source_name.clone(),
1128 reason,
1129 };
1130 hooks::spawn_hook(async move {
1131 h.on_cache_miss(event).await;
1132 });
1133}
1134
1135/// Emit a [`hooks::ProgressEvent`] through the registered hook impl.
1136/// `pub(crate)` so `ChartML::transform` / `render_prepared_to_svg` can
1137/// emit transform/render-phase progress without re-implementing the
1138/// snapshot dance.
1139pub(crate) fn emit_progress(
1140 hooks: &Option<HooksRef>,
1141 phase: hooks::Phase,
1142 source_name: &Option<String>,
1143 loaded: Option<u64>,
1144 total: Option<u64>,
1145 message: String,
1146) {
1147 let Some(h) = hooks.as_ref() else { return };
1148 let h = h.clone();
1149 let event = hooks::ProgressEvent {
1150 phase,
1151 source_name: source_name.clone(),
1152 loaded,
1153 total,
1154 message,
1155 };
1156 hooks::spawn_hook(async move {
1157 h.on_progress(event).await;
1158 });
1159}
1160
1161/// Emit a [`hooks::ErrorEvent`] through the registered hook impl.
1162/// `pub(crate)` so `ChartML::transform` can emit transform-phase errors.
1163pub(crate) fn emit_error(
1164 hooks: &Option<HooksRef>,
1165 phase: hooks::Phase,
1166 source_name: &Option<String>,
1167 error: String,
1168) {
1169 let Some(h) = hooks.as_ref() else { return };
1170 let h = h.clone();
1171 let event = hooks::ErrorEvent {
1172 phase,
1173 source_name: source_name.clone(),
1174 error,
1175 };
1176 hooks::spawn_hook(async move {
1177 h.on_error(event).await;
1178 });
1179}
1180
1181/// Inject `fetched_at_ms` into provider metadata on cache hits so consumers
1182/// can display the actual data age rather than "just now."
1183fn inject_fetched_at(metadata: &mut HashMap<String, serde_json::Value>, fetched_at: &SystemTime) {
1184 if let Ok(d) = fetched_at.duration_since(SystemTime::UNIX_EPOCH) {
1185 metadata.insert(
1186 "fetched_at_ms".to_string(),
1187 serde_json::Value::from(d.as_millis() as f64),
1188 );
1189 }
1190}
1191
1192/// Build the tag list applied to `CachedEntry` on write. `slug` and
1193/// `namespace` are optional — entries without one of them simply skip the
1194/// corresponding tag (no empty-string tag pollution).
1195fn build_tags(slug: Option<&str>, namespace: Option<&str>) -> Vec<String> {
1196 let mut tags = Vec::new();
1197 if let Some(slug) = slug {
1198 tags.push(format!("{TAG_SLUG_PREFIX}{slug}"));
1199 }
1200 if let Some(ns) = namespace {
1201 tags.push(format!("{TAG_NAMESPACE_PREFIX}{ns}"));
1202 }
1203 tags
1204}
1205
1206/// Apply the design-doc dispatch routing. Precedence: explicit `provider`
1207/// key wins over inferred shape (rows → inline, url → http, datasource →
1208/// datasource). Returns the `Arc` so the caller can `await` the trait
1209/// method without holding any lock.
1210fn dispatch_provider(
1211 providers: &HashMap<String, Arc<dyn DataSourceProvider>>,
1212 spec: &InlineData,
1213) -> Result<Arc<dyn DataSourceProvider>, FetchError> {
1214 let kind = if let Some(kind) = spec.provider.as_deref() {
1215 kind
1216 } else if spec.rows.is_some() {
1217 "inline"
1218 } else if spec.url.is_some() {
1219 "http"
1220 } else if spec.datasource.is_some() {
1221 "datasource"
1222 } else {
1223 return Err(FetchError::Other(
1224 "no dispatch match for spec — needs one of `provider`, `rows`, `url`, or `datasource`"
1225 .to_string(),
1226 ));
1227 };
1228
1229 providers
1230 .get(kind)
1231 .cloned()
1232 .ok_or_else(|| FetchError::ProviderNotFound {
1233 kind: kind.to_string(),
1234 })
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239 #![allow(clippy::unwrap_used)]
1240 use super::*;
1241
1242 fn empty_inline() -> InlineData {
1243 InlineData {
1244 provider: None,
1245 rows: None,
1246 url: None,
1247 endpoint: None,
1248 cache: None,
1249 datasource: None,
1250 query: None,
1251 }
1252 }
1253
1254 #[test]
1255 fn key_for_is_deterministic() {
1256 let spec = InlineData {
1257 datasource: Some("warehouse".into()),
1258 query: Some("SELECT 1".into()),
1259 ..empty_inline()
1260 };
1261 let k1 = Resolver::key_for(&spec, Some("ns"));
1262 let k2 = Resolver::key_for(&spec, Some("ns"));
1263 assert_eq!(k1, k2);
1264 }
1265
1266 #[test]
1267 fn key_for_namespace_changes_key() {
1268 let spec = InlineData {
1269 datasource: Some("warehouse".into()),
1270 ..empty_inline()
1271 };
1272 let k1 = Resolver::key_for(&spec, Some("tenant-a"));
1273 let k2 = Resolver::key_for(&spec, Some("tenant-b"));
1274 assert_ne!(k1, k2);
1275 }
1276
1277 #[test]
1278 fn key_for_none_distinguishes_from_literal_none_string() {
1279 let spec_none = InlineData {
1280 datasource: None,
1281 url: Some("https://x".into()),
1282 ..empty_inline()
1283 };
1284 let spec_literal = InlineData {
1285 datasource: Some("None".into()),
1286 url: Some("https://x".into()),
1287 ..empty_inline()
1288 };
1289 // Sentinel byte must NOT collide with the literal "None" string.
1290 assert_ne!(
1291 Resolver::key_for(&spec_none, None),
1292 Resolver::key_for(&spec_literal, None),
1293 );
1294 }
1295
1296 #[test]
1297 fn key_for_field_separator_prevents_bleed() {
1298 // `(datasource="ab", query=None)` must hash differently from
1299 // `(datasource="a", query="b")` — without a separator they'd both
1300 // serialize to "ab".
1301 let merged = InlineData {
1302 datasource: Some("ab".into()),
1303 ..empty_inline()
1304 };
1305 let split = InlineData {
1306 datasource: Some("a".into()),
1307 query: Some("b".into()),
1308 ..empty_inline()
1309 };
1310 assert_ne!(
1311 Resolver::key_for(&merged, None),
1312 Resolver::key_for(&split, None),
1313 );
1314 }
1315
1316 #[test]
1317 fn dispatch_provider_precedence_explicit_wins() {
1318 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1319 "custom".to_string(),
1320 Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1321 )]
1322 .into_iter()
1323 .collect();
1324 let spec = InlineData {
1325 provider: Some("custom".into()),
1326 // rows would normally route to "inline", but explicit wins.
1327 rows: Some(vec![]),
1328 ..empty_inline()
1329 };
1330 // We can't compare trait objects, but `is_ok()` is sufficient: the
1331 // ONLY registered provider key is "custom", so reaching `Ok` means
1332 // dispatch picked it. Routing rows alone (no `provider`) would
1333 // search for "inline", which is unregistered → would return
1334 // `ProviderNotFound`.
1335 assert!(dispatch_provider(&providers, &spec).is_ok());
1336 }
1337
1338 #[test]
1339 fn dispatch_provider_inferred_inline() {
1340 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
1341 "inline".to_string(),
1342 Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
1343 )]
1344 .into_iter()
1345 .collect();
1346 let spec = InlineData {
1347 rows: Some(vec![]),
1348 ..empty_inline()
1349 };
1350 assert!(dispatch_provider(&providers, &spec).is_ok());
1351 }
1352
1353 #[test]
1354 fn dispatch_provider_missing_kind_errors() {
1355 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1356 let spec = InlineData {
1357 datasource: Some("warehouse".into()),
1358 ..empty_inline()
1359 };
1360 let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1361 assert!(matches!(err, FetchError::ProviderNotFound { ref kind } if kind == "datasource"));
1362 }
1363
1364 #[test]
1365 fn dispatch_provider_unmatched_spec_errors() {
1366 let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
1367 let spec = empty_inline();
1368 let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
1369 assert!(matches!(err, FetchError::Other(_)));
1370 }
1371
1372 // ── PersistentSlot / factory tests ──────────────────────────────────
1373
1374 /// Minimal `CacheBackend` impl for testing the persistent slot plumbing.
1375 struct MockBackend;
1376
1377 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1378 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1379 impl CacheBackend for MockBackend {
1380 async fn get(&self, _key: u64) -> Option<CachedEntry> {
1381 None
1382 }
1383 async fn put(&self, _key: u64, _entry: CachedEntry) -> Result<(), CacheError> {
1384 Ok(())
1385 }
1386 async fn invalidate(&self, _key: u64) -> Result<(), CacheError> {
1387 Ok(())
1388 }
1389 async fn invalidate_by_tag(&self, _tag: &str) -> Result<(), CacheError> {
1390 Ok(())
1391 }
1392 async fn clear(&self) -> Result<(), CacheError> {
1393 Ok(())
1394 }
1395 async fn shutdown(&self) {}
1396 }
1397
1398 #[test]
1399 fn test_persistent_slot_default_is_none() {
1400 let resolver = Resolver::new();
1401 assert!(resolver.persistent_snapshot_if_ready().is_none());
1402 }
1403
1404 #[test]
1405 fn test_set_persistent_cache_still_works() {
1406 let resolver = Resolver::new();
1407 let backend: CacheBackendRef = SharedRef::new(MockBackend);
1408 resolver.set_persistent_cache(backend);
1409 assert!(resolver.persistent_snapshot_if_ready().is_some());
1410 }
1411
1412 #[test]
1413 fn test_persistent_cache_factory_stores_deferred() {
1414 let resolver = Resolver::new();
1415 resolver.set_persistent_cache_factory(|| async {
1416 Some(SharedRef::new(MockBackend) as CacheBackendRef)
1417 });
1418 // Factory hasn't fired yet — sync snapshot must be None
1419 assert!(resolver.persistent_snapshot_if_ready().is_none());
1420 }
1421
1422 #[tokio::test]
1423 async fn test_persistent_cache_factory_resolves_on_fetch() {
1424 let resolver = Resolver::new();
1425 resolver.set_persistent_cache_factory(|| async {
1426 Some(SharedRef::new(MockBackend) as CacheBackendRef)
1427 });
1428 // resolve_persistent triggers the factory
1429 let result = resolver.resolve_persistent().await;
1430 assert!(result.is_some());
1431 // After resolution the slot is Ready — sync snapshot works now
1432 assert!(resolver.persistent_snapshot_if_ready().is_some());
1433 }
1434
1435 #[tokio::test]
1436 async fn test_persistent_cache_factory_none_result() {
1437 let resolver = Resolver::new();
1438 resolver.set_persistent_cache_factory(|| async { None });
1439 let result = resolver.resolve_persistent().await;
1440 assert!(result.is_none());
1441 // After a None factory result the slot becomes None — subsequent
1442 // resolve_persistent calls return None without re-running the factory
1443 let result2 = resolver.resolve_persistent().await;
1444 assert!(result2.is_none());
1445 assert!(resolver.persistent_snapshot_if_ready().is_none());
1446 }
1447
1448 #[test]
1449 fn test_persistent_cache_factory_replaces_previous() {
1450 let resolver = Resolver::new();
1451 // Set a ready backend
1452 let backend: CacheBackendRef = SharedRef::new(MockBackend);
1453 resolver.set_persistent_cache(backend);
1454 assert!(resolver.persistent_snapshot_if_ready().is_some());
1455
1456 // Now set a factory — it replaces the ready backend
1457 resolver.set_persistent_cache_factory(|| async {
1458 Some(SharedRef::new(MockBackend) as CacheBackendRef)
1459 });
1460 // Sync snapshot returns None because the slot is now Deferred
1461 assert!(resolver.persistent_snapshot_if_ready().is_none());
1462 }
1463}