use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{Context, Result};
use async_nats::jetstream::{self, kv::Operation};
use futures::StreamExt;
use kanade_shared::kv::BUCKET_JOBS;
use kanade_shared::manifest::{ExplodeSpec, Manifest};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Clone, Default)]
pub struct ExplodeSpecCache {
inner: Arc<RwLock<HashMap<String, Vec<ExplodeSpec>>>>,
}
impl ExplodeSpecCache {
pub fn new() -> Self {
Self::default()
}
pub async fn get(&self, manifest_id: &str, field: &str) -> Option<ExplodeSpec> {
let guard = self.inner.read().await;
guard
.get(manifest_id)?
.iter()
.find(|s| s.field == field)
.cloned()
}
pub async fn insert(&self, manifest_id: String, specs: Vec<ExplodeSpec>) {
let mut guard = self.inner.write().await;
guard.insert(manifest_id, specs);
}
pub async fn drop_manifest(&self, manifest_id: &str) {
let mut guard = self.inner.write().await;
guard.remove(manifest_id);
}
pub async fn len(&self) -> usize {
self.inner.read().await.len()
}
}
pub async fn prewarm(cache: &ExplodeSpecCache, jetstream: &jetstream::Context) -> Result<usize> {
let kv = jetstream
.get_key_value(BUCKET_JOBS)
.await
.with_context(|| format!("get KV {BUCKET_JOBS} for prewarm"))?;
let mut keys = match kv.keys().await {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "spec_cache prewarm: keys() failed; will rely on watcher + miss path");
return Ok(0);
}
};
let mut count = 0usize;
while let Some(key) = keys.next().await {
let key = match key {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "spec_cache prewarm: key iter error; skipping");
continue;
}
};
let entry = match kv.get(&key).await {
Ok(Some(b)) => b,
Ok(None) => continue,
Err(e) => {
warn!(error = %e, key = %key, "spec_cache prewarm: kv.get failed; skipping");
continue;
}
};
let manifest: Manifest = match serde_json::from_slice(&entry) {
Ok(m) => m,
Err(e) => {
warn!(error = %e, key = %key, "spec_cache prewarm: decode manifest failed; skipping");
continue;
}
};
let specs = manifest
.inventory
.as_ref()
.and_then(|h| h.explode.clone())
.unwrap_or_default();
cache.insert(manifest.id, specs).await;
count += 1;
}
Ok(count)
}
pub async fn run(cache: ExplodeSpecCache, jetstream: jetstream::Context) -> Result<()> {
let kv = jetstream
.get_key_value(BUCKET_JOBS)
.await
.with_context(|| format!("get KV {BUCKET_JOBS} for watcher"))?;
let mut watcher = kv
.watch_all()
.await
.context("kv watch_all on BUCKET_JOBS")?;
let cached = cache.len().await;
info!(cached, "explode spec cache watcher started");
while let Some(entry) = watcher.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "spec_cache watch: entry error; continuing");
continue;
}
};
match entry.operation {
Operation::Put => {
let manifest: Manifest = match serde_json::from_slice(&entry.value) {
Ok(m) => m,
Err(e) => {
warn!(error = %e, key = %entry.key, "spec_cache watch: decode failed");
continue;
}
};
let specs = manifest
.inventory
.as_ref()
.and_then(|h| h.explode.clone())
.unwrap_or_default();
debug!(
manifest_id = %manifest.id,
n_specs = specs.len(),
"spec_cache watch: refresh",
);
cache.insert(manifest.id, specs).await;
}
Operation::Delete | Operation::Purge => {
debug!(manifest_id = %entry.key, "spec_cache watch: drop");
cache.drop_manifest(&entry.key).await;
}
}
}
Err(anyhow::anyhow!(
"BUCKET_JOBS watch_all stream ended; cache invalidation halted"
))
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::manifest::{ExplodeColumn, ExplodeSpec};
fn sample_spec(field: &str) -> ExplodeSpec {
ExplodeSpec {
field: field.into(),
table: format!("inventory_test_{field}"),
primary_key: vec!["name".into()],
columns: vec![ExplodeColumn {
field: "name".into(),
kind: Some("text".into()),
index: false,
}],
track_history: false,
}
}
#[tokio::test]
async fn miss_returns_none() {
let cache = ExplodeSpecCache::new();
assert!(cache.get("inventory-sw", "apps").await.is_none());
}
#[tokio::test]
async fn insert_then_get_returns_spec() {
let cache = ExplodeSpecCache::new();
cache
.insert("inventory-sw".into(), vec![sample_spec("apps")])
.await;
let hit = cache.get("inventory-sw", "apps").await.expect("cache hit");
assert_eq!(hit.field, "apps");
assert_eq!(hit.table, "inventory_test_apps");
}
#[tokio::test]
async fn insert_replaces_prior_entry() {
let cache = ExplodeSpecCache::new();
cache
.insert("inventory-sw".into(), vec![sample_spec("apps")])
.await;
cache
.insert("inventory-sw".into(), vec![sample_spec("services")])
.await;
assert!(cache.get("inventory-sw", "apps").await.is_none());
assert!(cache.get("inventory-sw", "services").await.is_some());
}
#[tokio::test]
async fn drop_removes_all_fields_for_manifest() {
let cache = ExplodeSpecCache::new();
cache
.insert(
"inventory-sw".into(),
vec![sample_spec("apps"), sample_spec("services")],
)
.await;
cache.drop_manifest("inventory-sw").await;
assert!(cache.get("inventory-sw", "apps").await.is_none());
assert!(cache.get("inventory-sw", "services").await.is_none());
}
#[tokio::test]
async fn unknown_field_on_known_manifest_returns_none() {
let cache = ExplodeSpecCache::new();
cache
.insert("inventory-sw".into(), vec![sample_spec("apps")])
.await;
assert!(cache.get("inventory-sw", "services").await.is_none());
}
}