use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use web_time::SystemTime;
use async_trait::async_trait;
use futures::future::{FutureExt, Shared};
use thiserror::Error;
use xxhash_rust::xxh3::Xxh3;
#[cfg(not(target_arch = "wasm32"))]
type ResolverFuture<T> = futures::future::BoxFuture<'static, T>;
#[cfg(target_arch = "wasm32")]
type ResolverFuture<T> = futures::future::LocalBoxFuture<'static, T>;
#[cfg(not(target_arch = "wasm32"))]
pub type SharedRef<T> = Arc<T>;
#[cfg(target_arch = "wasm32")]
pub type SharedRef<T> = std::rc::Rc<T>;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) type Lock<T> = std::sync::Mutex<T>;
#[cfg(target_arch = "wasm32")]
pub(crate) type Lock<T> = std::cell::RefCell<T>;
trait LockExt<T> {
type Read<'a>: std::ops::Deref<Target = T>
where
Self: 'a;
type Write<'a>: std::ops::DerefMut<Target = T>
where
Self: 'a;
fn read_lock(&self, label: &'static str) -> Self::Read<'_>;
fn write_lock(&self, label: &'static str) -> Self::Write<'_>;
}
#[cfg(not(target_arch = "wasm32"))]
impl<T> LockExt<T> for std::sync::Mutex<T> {
type Read<'a>
= std::sync::MutexGuard<'a, T>
where
Self: 'a;
type Write<'a>
= std::sync::MutexGuard<'a, T>
where
Self: 'a;
fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
self.lock()
.unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
}
fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
self.lock()
.unwrap_or_else(|_| panic!("resolver {label} lock poisoned"))
}
}
#[cfg(target_arch = "wasm32")]
impl<T> LockExt<T> for std::cell::RefCell<T> {
type Read<'a>
= std::cell::Ref<'a, T>
where
Self: 'a;
type Write<'a>
= std::cell::RefMut<'a, T>
where
Self: 'a;
fn read_lock(&self, label: &'static str) -> Self::Read<'_> {
self.try_borrow()
.unwrap_or_else(|_| panic!("resolver {label} cell already borrowed mutably"))
}
fn write_lock(&self, label: &'static str) -> Self::Write<'_> {
self.try_borrow_mut()
.unwrap_or_else(|_| panic!("resolver {label} cell already borrowed"))
}
}
use crate::data::DataTable;
use crate::error::ChartError;
use crate::spec::source::CacheConfig as SpecCacheConfig;
use crate::spec::InlineData;
pub mod builtin;
pub mod cache;
pub mod cancel;
pub mod hooks;
pub mod backends;
pub use builtin::{HttpProvider, InlineProvider};
pub use cache::{CacheBackend, CacheError, CachedEntry, MemoryBackend};
pub use cancel::CancellationToken;
pub use hooks::{
CacheHitEvent, CacheMissEvent, CacheTier, ErrorEvent, HooksRef, MissReason, NullHooks, Phase,
ProgressEvent, ResolverHooks,
};
pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
const HASH_NONE_SENTINEL: u8 = 0xFE;
const HASH_FIELD_SEP: u8 = 0xFF;
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait DataSourceProvider: Send + Sync {
async fn fetch(&self, request: FetchRequest) -> Result<FetchResult, FetchError>;
async fn shutdown(&self) {}
}
#[derive(Debug, Clone)]
pub struct FetchRequest {
pub source_name: Option<String>,
pub spec: InlineData,
pub cache: Option<CacheConfig>,
pub headers: HashMap<String, String>,
pub namespace: Option<String>,
pub cancel_token: Option<CancellationToken>,
}
#[derive(Debug, Clone)]
pub struct FetchResult {
pub data: DataTable,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Error, Clone)]
pub enum FetchError {
#[error("datasource '{slug}' not found")]
SlugNotFound { slug: String },
#[error("query failed: {0}")]
QueryFailed(String),
#[error("decode failed: {0}")]
DecodeFailed(String),
#[error("cancelled")]
Cancelled,
#[error("no provider registered for kind '{kind}'")]
ProviderNotFound { kind: String },
#[error("cache backend error: {0}")]
Cache(String),
#[error("{0}")]
Other(String),
}
impl From<FetchError> for ChartError {
fn from(err: FetchError) -> Self {
match err {
FetchError::SlugNotFound { slug } => {
ChartError::DataError(format!("datasource '{slug}' not found"))
}
FetchError::QueryFailed(msg) => ChartError::DataError(format!("query failed: {msg}")),
FetchError::DecodeFailed(msg) => ChartError::DataError(format!("decode failed: {msg}")),
FetchError::Cancelled => ChartError::DataError("fetch cancelled".to_string()),
FetchError::ProviderNotFound { kind } => ChartError::PluginError(format!(
"no provider registered for kind '{kind}'"
)),
FetchError::Cache(msg) => ChartError::DataError(format!("cache error: {msg}")),
FetchError::Other(msg) => ChartError::DataError(msg),
}
}
}
impl From<CacheError> for FetchError {
fn from(err: CacheError) -> Self {
FetchError::Cache(err.to_string())
}
}
#[derive(Debug, Clone, Default)]
pub struct CacheConfig {
pub ttl: Option<Duration>,
pub auto_refresh: bool,
}
impl CacheConfig {
pub fn from_spec(spec: Option<&SpecCacheConfig>) -> Result<Option<Self>, ChartError> {
let Some(spec) = spec else {
return Ok(None);
};
let ttl = match spec.ttl.as_deref() {
Some(s) => Some(humantime::parse_duration(s).map_err(|e| {
ChartError::InvalidSpec(format!(
"invalid cache.ttl value {s:?}: {e} (expected humantime format like \"30s\", \"5m\", \"6h\", \"1d\")"
))
})?),
None => None,
};
Ok(Some(Self {
ttl,
auto_refresh: spec.auto_refresh.unwrap_or(false),
}))
}
pub fn ttl_duration(&self) -> Option<Duration> {
self.ttl
}
}
#[derive(Debug, Clone)]
pub struct ResolveOutcome {
pub result: FetchResult,
pub cache_hit: bool,
}
pub const TAG_SLUG_PREFIX: &str = "slug:";
pub const TAG_NAMESPACE_PREFIX: &str = "namespace:";
pub type ResolverRef = SharedRef<Resolver>;
pub type CacheBackendRef = SharedRef<dyn CacheBackend>;
pub struct Resolver {
memory: SharedRef<MemoryBackend>,
primary: Lock<CacheBackendRef>,
persistent: Lock<Option<CacheBackendRef>>,
inflight: SharedRef<Lock<HashMap<u64, SharedFetch>>>,
providers: Lock<HashMap<String, Arc<dyn DataSourceProvider>>>,
hooks: Lock<Option<HooksRef>>,
recently_invalidated: SharedRef<Lock<InvalidationTracker>>,
}
#[derive(Debug, Default)]
struct InvalidationTracker {
keys: HashSet<u64>,
bulk_pending: bool,
}
type SharedFetch = Shared<ResolverFuture<Result<FetchResult, FetchError>>>;
impl Default for Resolver {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for Resolver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let has_persistent = self.persistent.read_lock("persistent cache").is_some();
let provider_keys: Vec<String> = self
.providers
.read_lock("providers")
.keys()
.cloned()
.collect();
f.debug_struct("Resolver")
.field("memory", &self.memory)
.field("has_persistent", &has_persistent)
.field("providers", &provider_keys)
.finish_non_exhaustive()
}
}
impl Resolver {
pub fn new() -> Self {
let memory = SharedRef::new(MemoryBackend::new());
let primary: CacheBackendRef = memory.clone();
Self {
memory,
primary: Lock::new(primary),
persistent: Lock::new(None),
inflight: SharedRef::new(Lock::new(HashMap::new())),
providers: Lock::new(HashMap::new()),
hooks: Lock::new(None),
recently_invalidated: SharedRef::new(Lock::new(InvalidationTracker::default())),
}
}
pub fn set_primary_cache(&self, backend: CacheBackendRef) {
let mut guard = self.primary.write_lock("primary cache");
*guard = backend;
}
pub fn set_persistent_cache(&self, backend: CacheBackendRef) {
let mut guard = self.persistent.write_lock("persistent cache");
*guard = Some(backend);
}
pub fn set_hooks(&self, hooks: HooksRef) {
let mut guard = self.hooks.write_lock("hooks");
*guard = Some(hooks);
}
pub fn clear_hooks(&self) {
let mut guard = self.hooks.write_lock("hooks");
*guard = None;
}
pub(crate) fn hooks_snapshot(&self) -> Option<HooksRef> {
self.hooks.read_lock("hooks").clone()
}
fn primary_snapshot(&self) -> CacheBackendRef {
self.primary.read_lock("primary cache").clone()
}
fn persistent_snapshot(&self) -> Option<CacheBackendRef> {
self.persistent.read_lock("persistent cache").clone()
}
pub fn register_provider(&self, kind: &str, provider: Arc<dyn DataSourceProvider>) {
let mut providers = self.providers.write_lock("providers");
providers.insert(kind.to_string(), provider);
}
pub fn provider_kinds(&self) -> Vec<String> {
self.providers
.read_lock("providers")
.keys()
.cloned()
.collect()
}
pub fn key_for(spec: &InlineData, namespace: Option<&str>) -> u64 {
let mut hasher = Xxh3::new();
let fields: [Option<&str>; 5] = [
namespace,
spec.datasource.as_deref(),
spec.query.as_deref(),
spec.url.as_deref(),
spec.provider.as_deref(),
];
for field in fields {
match field {
Some(s) => hasher.update(s.as_bytes()),
None => hasher.update(&[HASH_NONE_SENTINEL]),
}
hasher.update(&[HASH_FIELD_SEP]);
}
match spec.rows.as_ref() {
Some(rows) => match serde_json::to_vec(rows) {
Ok(bytes) => hasher.update(&bytes),
Err(_) => hasher.update(&[HASH_NONE_SENTINEL]),
},
None => hasher.update(&[HASH_NONE_SENTINEL]),
}
hasher.digest()
}
pub async fn fetch(
&self,
key: u64,
request: FetchRequest,
) -> Result<ResolveOutcome, FetchError> {
let primary = self.primary_snapshot();
let persistent = self.persistent_snapshot();
let hooks = self.hooks_snapshot();
let source_name = request.source_name.clone();
let mut tier1_expired = false;
if let Some(entry) = primary.get(key).await {
if !entry.is_expired() {
let age = entry.age();
emit_cache_hit(&hooks, key, &source_name, hooks::CacheTier::Memory, age);
return Ok(ResolveOutcome {
result: FetchResult {
data: entry.data,
metadata: entry.metadata,
},
cache_hit: true,
});
}
tier1_expired = true;
}
let mut tier2_expired = false;
if let Some(p) = &persistent {
if let Some(entry) = p.get(key).await {
if !entry.is_expired() {
let age = entry.age();
let _ = primary.put(key, entry.clone()).await;
emit_cache_hit(
&hooks,
key,
&source_name,
hooks::CacheTier::Persistent,
age,
);
return Ok(ResolveOutcome {
result: FetchResult {
data: entry.data,
metadata: entry.metadata,
},
cache_hit: true,
});
}
let _ = p.invalidate(key).await;
tier2_expired = true;
}
}
let miss_reason = if self.consume_invalidation_reason(key) {
hooks::MissReason::Invalidated
} else if tier1_expired || tier2_expired {
hooks::MissReason::Expired
} else {
hooks::MissReason::NotFound
};
emit_cache_miss(&hooks, key, &source_name, miss_reason);
emit_progress(
&hooks,
hooks::Phase::Fetch,
&source_name,
None,
None,
format!(
"Fetching {}",
source_name.as_deref().unwrap_or("source"),
),
);
let shared = self.intern_inflight(key, request, primary, persistent);
let result = shared.await;
self.inflight.write_lock("inflight").remove(&key);
match result {
Ok(fetch_result) => {
let row_count = fetch_result.data.num_rows();
emit_progress(
&hooks,
hooks::Phase::Fetch,
&source_name,
Some(row_count as u64),
None,
format!(
"Fetched {} ({} rows)",
source_name.as_deref().unwrap_or("source"),
row_count,
),
);
Ok(ResolveOutcome {
result: fetch_result,
cache_hit: false,
})
}
Err(err) => {
emit_error(&hooks, hooks::Phase::Fetch, &source_name, err.to_string());
Err(err)
}
}
}
fn intern_inflight(
&self,
key: u64,
request: FetchRequest,
primary: CacheBackendRef,
persistent: Option<CacheBackendRef>,
) -> SharedFetch {
let mut inflight = self.inflight.write_lock("inflight");
if let Some(existing) = inflight.get(&key) {
return existing.clone();
}
let providers = self.snapshot_providers();
let cache_cfg = request.cache.clone();
let namespace = request.namespace.clone();
let slug = request.spec.datasource.clone();
let work = async move {
let provider = dispatch_provider(&providers, &request.spec)?;
let result = provider.fetch(request).await?;
let entry = CachedEntry {
data: result.data.clone(),
fetched_at: SystemTime::now(),
ttl: cache_cfg
.as_ref()
.and_then(|c| c.ttl_duration())
.unwrap_or(DEFAULT_TTL),
tags: build_tags(slug.as_deref(), namespace.as_deref()),
metadata: result.metadata.clone(),
};
let _ = primary.put(key, entry.clone()).await;
if let Some(p) = &persistent {
let _ = p.put(key, entry).await;
}
Ok(result)
};
#[cfg(not(target_arch = "wasm32"))]
let boxed: ResolverFuture<Result<FetchResult, FetchError>> = work.boxed();
#[cfg(target_arch = "wasm32")]
let boxed: ResolverFuture<Result<FetchResult, FetchError>> = work.boxed_local();
let future = boxed.shared();
inflight.insert(key, future.clone());
future
}
fn snapshot_providers(&self) -> HashMap<String, Arc<dyn DataSourceProvider>> {
self.providers.read_lock("providers").clone()
}
pub async fn invalidate(&self, key: u64) {
let primary = self.primary_snapshot();
let persistent = self.persistent_snapshot();
let _ = primary.invalidate(key).await;
if let Some(p) = &persistent {
let _ = p.invalidate(key).await;
}
self.recently_invalidated
.write_lock("recently_invalidated")
.keys
.insert(key);
}
pub async fn invalidate_all(&self) {
let primary = self.primary_snapshot();
let persistent = self.persistent_snapshot();
let _ = primary.clear().await;
if let Some(p) = &persistent {
let _ = p.clear().await;
}
self.mark_bulk_invalidated();
}
pub async fn invalidate_by_slug(&self, slug: &str) {
let tag = format!("{TAG_SLUG_PREFIX}{slug}");
let primary = self.primary_snapshot();
let persistent = self.persistent_snapshot();
let _ = primary.invalidate_by_tag(&tag).await;
if let Some(p) = &persistent {
let _ = p.invalidate_by_tag(&tag).await;
}
self.mark_bulk_invalidated();
}
pub async fn invalidate_by_namespace(&self, namespace: &str) {
let tag = format!("{TAG_NAMESPACE_PREFIX}{namespace}");
let primary = self.primary_snapshot();
let persistent = self.persistent_snapshot();
let _ = primary.invalidate_by_tag(&tag).await;
if let Some(p) = &persistent {
let _ = p.invalidate_by_tag(&tag).await;
}
self.mark_bulk_invalidated();
}
fn mark_bulk_invalidated(&self) {
self.recently_invalidated
.write_lock("recently_invalidated")
.bulk_pending = true;
}
fn consume_invalidation_reason(&self, key: u64) -> bool {
let mut tracker = self
.recently_invalidated
.write_lock("recently_invalidated");
if tracker.keys.remove(&key) {
return true;
}
if tracker.bulk_pending {
tracker.bulk_pending = false;
return true;
}
false
}
pub async fn shutdown(&self) {
let providers = self.snapshot_providers();
for (_, provider) in providers {
provider.shutdown().await;
}
let primary = self.primary_snapshot();
primary.shutdown().await;
if let Some(p) = self.persistent_snapshot() {
p.shutdown().await;
}
}
}
fn emit_cache_hit(
hooks: &Option<HooksRef>,
key: u64,
source_name: &Option<String>,
tier: hooks::CacheTier,
age: Duration,
) {
let Some(h) = hooks.as_ref() else { return };
let h = h.clone();
let event = hooks::CacheHitEvent {
key,
source_name: source_name.clone(),
tier,
age,
};
hooks::spawn_hook(async move {
h.on_cache_hit(event).await;
});
}
fn emit_cache_miss(
hooks: &Option<HooksRef>,
key: u64,
source_name: &Option<String>,
reason: hooks::MissReason,
) {
let Some(h) = hooks.as_ref() else { return };
let h = h.clone();
let event = hooks::CacheMissEvent {
key,
source_name: source_name.clone(),
reason,
};
hooks::spawn_hook(async move {
h.on_cache_miss(event).await;
});
}
pub(crate) fn emit_progress(
hooks: &Option<HooksRef>,
phase: hooks::Phase,
source_name: &Option<String>,
loaded: Option<u64>,
total: Option<u64>,
message: String,
) {
let Some(h) = hooks.as_ref() else { return };
let h = h.clone();
let event = hooks::ProgressEvent {
phase,
source_name: source_name.clone(),
loaded,
total,
message,
};
hooks::spawn_hook(async move {
h.on_progress(event).await;
});
}
pub(crate) fn emit_error(
hooks: &Option<HooksRef>,
phase: hooks::Phase,
source_name: &Option<String>,
error: String,
) {
let Some(h) = hooks.as_ref() else { return };
let h = h.clone();
let event = hooks::ErrorEvent {
phase,
source_name: source_name.clone(),
error,
};
hooks::spawn_hook(async move {
h.on_error(event).await;
});
}
fn build_tags(slug: Option<&str>, namespace: Option<&str>) -> Vec<String> {
let mut tags = Vec::new();
if let Some(slug) = slug {
tags.push(format!("{TAG_SLUG_PREFIX}{slug}"));
}
if let Some(ns) = namespace {
tags.push(format!("{TAG_NAMESPACE_PREFIX}{ns}"));
}
tags
}
fn dispatch_provider(
providers: &HashMap<String, Arc<dyn DataSourceProvider>>,
spec: &InlineData,
) -> Result<Arc<dyn DataSourceProvider>, FetchError> {
let kind = if let Some(kind) = spec.provider.as_deref() {
kind
} else if spec.rows.is_some() {
"inline"
} else if spec.url.is_some() {
"http"
} else if spec.datasource.is_some() {
"datasource"
} else {
return Err(FetchError::Other(
"no dispatch match for spec — needs one of `provider`, `rows`, `url`, or `datasource`"
.to_string(),
));
};
providers
.get(kind)
.cloned()
.ok_or_else(|| FetchError::ProviderNotFound {
kind: kind.to_string(),
})
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_inline() -> InlineData {
InlineData {
provider: None,
rows: None,
url: None,
endpoint: None,
cache: None,
datasource: None,
query: None,
}
}
#[test]
fn key_for_is_deterministic() {
let spec = InlineData {
datasource: Some("warehouse".into()),
query: Some("SELECT 1".into()),
..empty_inline()
};
let k1 = Resolver::key_for(&spec, Some("ns"));
let k2 = Resolver::key_for(&spec, Some("ns"));
assert_eq!(k1, k2);
}
#[test]
fn key_for_namespace_changes_key() {
let spec = InlineData {
datasource: Some("warehouse".into()),
..empty_inline()
};
let k1 = Resolver::key_for(&spec, Some("tenant-a"));
let k2 = Resolver::key_for(&spec, Some("tenant-b"));
assert_ne!(k1, k2);
}
#[test]
fn key_for_none_distinguishes_from_literal_none_string() {
let spec_none = InlineData {
datasource: None,
url: Some("https://x".into()),
..empty_inline()
};
let spec_literal = InlineData {
datasource: Some("None".into()),
url: Some("https://x".into()),
..empty_inline()
};
assert_ne!(
Resolver::key_for(&spec_none, None),
Resolver::key_for(&spec_literal, None),
);
}
#[test]
fn key_for_field_separator_prevents_bleed() {
let merged = InlineData {
datasource: Some("ab".into()),
..empty_inline()
};
let split = InlineData {
datasource: Some("a".into()),
query: Some("b".into()),
..empty_inline()
};
assert_ne!(
Resolver::key_for(&merged, None),
Resolver::key_for(&split, None),
);
}
#[test]
fn dispatch_provider_precedence_explicit_wins() {
let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
"custom".to_string(),
Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
)]
.into_iter()
.collect();
let spec = InlineData {
provider: Some("custom".into()),
rows: Some(vec![]),
..empty_inline()
};
assert!(dispatch_provider(&providers, &spec).is_ok());
}
#[test]
fn dispatch_provider_inferred_inline() {
let providers: HashMap<String, Arc<dyn DataSourceProvider>> = [(
"inline".to_string(),
Arc::new(InlineProvider::new()) as Arc<dyn DataSourceProvider>,
)]
.into_iter()
.collect();
let spec = InlineData {
rows: Some(vec![]),
..empty_inline()
};
assert!(dispatch_provider(&providers, &spec).is_ok());
}
#[test]
fn dispatch_provider_missing_kind_errors() {
let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
let spec = InlineData {
datasource: Some("warehouse".into()),
..empty_inline()
};
let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
assert!(matches!(err, FetchError::ProviderNotFound { ref kind } if kind == "datasource"));
}
#[test]
fn dispatch_provider_unmatched_spec_errors() {
let providers: HashMap<String, Arc<dyn DataSourceProvider>> = HashMap::new();
let spec = empty_inline();
let err = dispatch_provider(&providers, &spec).err().expect("dispatch must error");
assert!(matches!(err, FetchError::Other(_)));
}
}