kanade-backend 0.43.45

axum + SQLite projection backend for the kanade endpoint-management system. Hosts /api/* and the embedded SPA dashboard, projects JetStream streams into SQLite, drives the cron scheduler
//! v0.35 / #88: in-memory cache of every registered manifest's
//! [`ExplodeSpec`] list, kept fresh by a KV `watch_all()` on
//! [`BUCKET_JOBS`].
//!
//! The HTTP search endpoint (`GET /api/inventory/{manifest_id}/
//! search/{field}`) used to call `kv.get(manifest_id)` on every
//! request to resolve the spec — which was fine for occasional
//! curl traffic but becomes ~30 ms of latency per keystroke once
//! the SPA Software page (#87) makes search interactive. This
//! module replaces that round-trip with an `Arc<RwLock<HashMap>>`
//! lookup and trusts the KV watcher to invalidate stale entries
//! within 1 s of any `kanade job create` write.
//!
//! Wire shape:
//!
//! * Key   — `manifest_id` (e.g. `"inventory-sw"`).
//! * Value — the manifest's `hint.explode` list, or empty when
//!   the manifest has no `explode:` block. Storing empty avoids
//!   re-fetching on every "is this a search-eligible manifest"
//!   miss check.
//!
//! Concurrency:
//!
//! * `RwLock` because reads vastly outnumber writes (writes come
//!   only from KV watcher; reads come from every search request).
//! * The watcher task is a single tokio task spawned at startup;
//!   no cross-watcher coordination needed.
//! * If the watcher exits (NATS reconnect glitch, etc.) the cache
//!   keeps serving — entries will be stale until the watcher is
//!   restarted, but the next `load_explode_spec` falls back to
//!   the KV path on a miss anyway.

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{Context, Result};
use async_nats::jetstream::{self, kv::Operation};
use futures::StreamExt;
use kanade_shared::kv::BUCKET_JOBS;
use kanade_shared::manifest::{ExplodeSpec, Manifest};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};

/// Thread-safe `manifest_id → {Manifest, Vec<ExplodeSpec>}` cache.
/// Constructed once in `main.rs`, cloned (cheaply — it's an `Arc`)
/// into [`AppState`], the watcher task, and (#488) the results
/// projector, whose per-message inventory/check hint lookups used to
/// cost two `jobs_kv.get` broker round-trips per ExecResult — the
/// dominant per-message latency during a fleet-wide result burst.
///
/// [`AppState`]: crate::api::AppState
#[derive(Clone, Default)]
pub struct ExplodeSpecCache {
    inner: Arc<RwLock<HashMap<String, CachedJob>>>,
}

/// One cached job. `manifest` is `None` only for entries seeded via
/// the spec-only [`ExplodeSpecCache::insert`] path (tests); the
/// prewarm / watcher / miss paths always store the full manifest.
struct CachedJob {
    manifest: Option<Arc<Manifest>>,
    specs: Vec<ExplodeSpec>,
}

impl ExplodeSpecCache {
    pub fn new() -> Self {
        Self::default()
    }

    /// #488: full-manifest read path for the results projector's
    /// inventory/check hint lookups. `None` = miss (caller falls
    /// back to the KV fetch and warms the cache via
    /// [`ExplodeSpecCache::insert_manifest`]).
    pub async fn manifest(&self, manifest_id: &str) -> Option<Arc<Manifest>> {
        let guard = self.inner.read().await;
        guard.get(manifest_id)?.manifest.clone()
    }

    /// Store a full manifest (deriving its explode specs). The
    /// prewarm walk, the KV watcher, and both cold-miss fallbacks
    /// route through here so every entry carries the manifest.
    /// Returns the stored `Arc` so the cold-miss caller can hand it
    /// straight out without a second allocation (review PR #553).
    pub async fn insert_manifest(&self, manifest: Manifest) -> Arc<Manifest> {
        let specs = manifest
            .inventory
            .as_ref()
            .and_then(|h| h.explode.clone())
            .unwrap_or_default();
        let arc = Arc::new(manifest);
        let mut guard = self.inner.write().await;
        guard.insert(
            arc.id.clone(),
            CachedJob {
                manifest: Some(arc.clone()),
                specs,
            },
        );
        arc
    }

    /// Read path. Returns `Some(spec)` if `manifest_id` is cached
    /// AND has an explode spec for `field`, otherwise `None`.
    /// `None` lets the caller fall back to the KV fetch path for
    /// either cause (miss vs unknown field) — the distinction
    /// doesn't matter to the slow-path code.
    pub async fn get(&self, manifest_id: &str, field: &str) -> Option<ExplodeSpec> {
        let guard = self.inner.read().await;
        guard
            .get(manifest_id)?
            .specs
            .iter()
            .find(|s| s.field == field)
            .cloned()
    }

    /// Spec-only insert. Kept for tests; production writers use
    /// [`ExplodeSpecCache::insert_manifest`] so the entry also
    /// serves [`ExplodeSpecCache::manifest`] lookups.
    pub async fn insert(&self, manifest_id: String, specs: Vec<ExplodeSpec>) {
        let mut guard = self.inner.write().await;
        guard.insert(
            manifest_id,
            CachedJob {
                manifest: None,
                specs,
            },
        );
    }

    /// Drop the entry for `manifest_id`. Used by the KV watcher
    /// on `Operation::Delete` / `Operation::Purge` so a job that
    /// gets unregistered immediately stops returning cached specs.
    pub async fn drop_manifest(&self, manifest_id: &str) {
        let mut guard = self.inner.write().await;
        guard.remove(manifest_id);
    }

    /// Count cached entries — used by the watcher's startup log so
    /// operators can confirm prewarm picked up the expected number.
    pub async fn len(&self) -> usize {
        self.inner.read().await.len()
    }
}

/// Walk every key in `BUCKET_JOBS` once at startup, decode each
/// `Manifest`, and seed the cache with its `hint.explode` list.
/// Mirrors the prewarm pattern `projector::explode::
/// ensure_tables_for_jobs` already runs at boot — keeps the cold
/// p99 from spiking on the first batch of search requests.
pub async fn prewarm(cache: &ExplodeSpecCache, jetstream: &jetstream::Context) -> Result<usize> {
    let kv = jetstream
        .get_key_value(BUCKET_JOBS)
        .await
        .with_context(|| format!("get KV {BUCKET_JOBS} for prewarm"))?;
    let mut keys = match kv.keys().await {
        Ok(k) => k,
        Err(e) => {
            warn!(error = %e, "spec_cache prewarm: keys() failed; will rely on watcher + miss path");
            return Ok(0);
        }
    };
    let mut count = 0usize;
    while let Some(key) = keys.next().await {
        let key = match key {
            Ok(k) => k,
            Err(e) => {
                warn!(error = %e, "spec_cache prewarm: key iter error; skipping");
                continue;
            }
        };
        let entry = match kv.get(&key).await {
            Ok(Some(b)) => b,
            Ok(None) => continue,
            Err(e) => {
                warn!(error = %e, key = %key, "spec_cache prewarm: kv.get failed; skipping");
                continue;
            }
        };
        let manifest: Manifest = match serde_json::from_slice(&entry) {
            Ok(m) => m,
            Err(e) => {
                warn!(error = %e, key = %key, "spec_cache prewarm: decode manifest failed; skipping");
                continue;
            }
        };
        cache.insert_manifest(manifest).await;
        count += 1;
    }
    Ok(count)
}

/// Long-running task that tails `BUCKET_JOBS` and keeps the cache
/// in sync with every manifest write. Spawn-and-forget at startup;
/// exits only on a hard NATS error (caller logs and the cache
/// keeps serving stale data + miss-path fallback).
pub async fn run(cache: ExplodeSpecCache, jetstream: jetstream::Context) -> Result<()> {
    let kv = jetstream
        .get_key_value(BUCKET_JOBS)
        .await
        .with_context(|| format!("get KV {BUCKET_JOBS} for watcher"))?;
    let mut watcher = kv
        .watch_all()
        .await
        .context("kv watch_all on BUCKET_JOBS")?;
    // Hoist the await out of the `info!` arg list — keeping a
    // `.await` inside the macro borrows `format_args!`'s internal
    // non-Send temporary across the await point, which makes the
    // surrounding `tokio::spawn` future fail its Send bound.
    let cached = cache.len().await;
    info!(cached, "explode spec cache watcher started");
    while let Some(entry) = watcher.next().await {
        let entry = match entry {
            Ok(e) => e,
            Err(e) => {
                warn!(error = %e, "spec_cache watch: entry error; continuing");
                continue;
            }
        };
        match entry.operation {
            Operation::Put => {
                let manifest: Manifest = match serde_json::from_slice(&entry.value) {
                    Ok(m) => m,
                    Err(e) => {
                        warn!(error = %e, key = %entry.key, "spec_cache watch: decode failed");
                        continue;
                    }
                };
                debug!(
                    manifest_id = %manifest.id,
                    "spec_cache watch: refresh",
                );
                cache.insert_manifest(manifest).await;
            }
            Operation::Delete | Operation::Purge => {
                debug!(manifest_id = %entry.key, "spec_cache watch: drop");
                cache.drop_manifest(&entry.key).await;
            }
        }
    }
    // Gemini #120 review: returning an error here surfaces the
    // watcher-stream-ended state to the `tokio::spawn` supervisor
    // in main.rs, which logs it. The cache itself keeps serving
    // its current entries plus the cold-miss KV fallback path, so
    // the failure is non-fatal — but pretending it didn't happen
    // by `std::future::pending::<()>()`-ing would leave the cache
    // silently stale on every subsequent `kanade job create`.
    Err(anyhow::anyhow!(
        "BUCKET_JOBS watch_all stream ended; cache invalidation halted"
    ))
}

#[cfg(test)]
mod tests {
    use super::*;
    use kanade_shared::manifest::{ExplodeColumn, ExplodeSpec};

    fn sample_spec(field: &str) -> ExplodeSpec {
        ExplodeSpec {
            field: field.into(),
            table: format!("inventory_test_{field}"),
            primary_key: vec!["name".into()],
            columns: vec![ExplodeColumn {
                field: "name".into(),
                kind: Some("text".into()),
                index: false,
            }],
            track_history: false,
        }
    }

    #[tokio::test]
    async fn miss_returns_none() {
        let cache = ExplodeSpecCache::new();
        assert!(cache.get("inventory-sw", "apps").await.is_none());
    }

    #[tokio::test]
    async fn insert_then_get_returns_spec() {
        let cache = ExplodeSpecCache::new();
        cache
            .insert("inventory-sw".into(), vec![sample_spec("apps")])
            .await;
        let hit = cache.get("inventory-sw", "apps").await.expect("cache hit");
        assert_eq!(hit.field, "apps");
        assert_eq!(hit.table, "inventory_test_apps");
    }

    #[tokio::test]
    async fn insert_replaces_prior_entry() {
        // Watcher receives two Puts for the same manifest — the
        // second wins. Validates against an operator who edits the
        // explode spec via `kanade job create` and re-registers.
        let cache = ExplodeSpecCache::new();
        cache
            .insert("inventory-sw".into(), vec![sample_spec("apps")])
            .await;
        cache
            .insert("inventory-sw".into(), vec![sample_spec("services")])
            .await;
        assert!(cache.get("inventory-sw", "apps").await.is_none());
        assert!(cache.get("inventory-sw", "services").await.is_some());
    }

    #[tokio::test]
    async fn drop_removes_all_fields_for_manifest() {
        let cache = ExplodeSpecCache::new();
        cache
            .insert(
                "inventory-sw".into(),
                vec![sample_spec("apps"), sample_spec("services")],
            )
            .await;
        cache.drop_manifest("inventory-sw").await;
        assert!(cache.get("inventory-sw", "apps").await.is_none());
        assert!(cache.get("inventory-sw", "services").await.is_none());
    }

    #[tokio::test]
    async fn unknown_field_on_known_manifest_returns_none() {
        let cache = ExplodeSpecCache::new();
        cache
            .insert("inventory-sw".into(), vec![sample_spec("apps")])
            .await;
        assert!(cache.get("inventory-sw", "services").await.is_none());
    }

    fn sample_manifest() -> Manifest {
        serde_json::from_value(serde_json::json!({
            "id": "inventory-sw",
            "version": "0.0.1",
            "execute": {
                "shell": "powershell",
                "script": "echo '{}'",
                "timeout": "30s",
            },
            "inventory": {
                "display": [{ "field": "apps", "label": "Apps" }],
                "explode": [{
                    "field": "apps",
                    "table": "inventory_test_apps",
                    "primary_key": ["name"],
                    "columns": [{ "field": "name" }],
                }],
            },
        }))
        .expect("sample manifest parses")
    }

    #[tokio::test]
    async fn insert_manifest_serves_both_manifest_and_spec_lookups() {
        // #488: one insert_manifest entry must answer the results
        // projector's manifest() lookup AND the API's get() spec
        // lookup, and the returned Arc is the stored one.
        let cache = ExplodeSpecCache::new();
        let stored = cache.insert_manifest(sample_manifest()).await;
        let hit = cache.manifest("inventory-sw").await.expect("manifest hit");
        assert!(Arc::ptr_eq(&stored, &hit), "no duplicate allocation");
        assert_eq!(hit.id, "inventory-sw");
        assert!(cache.get("inventory-sw", "apps").await.is_some());
    }

    #[tokio::test]
    async fn spec_only_insert_yields_no_manifest() {
        // Legacy/test path: spec-only entries answer get() but not
        // manifest() — the projector then falls back to the KV path.
        let cache = ExplodeSpecCache::new();
        cache
            .insert("inventory-sw".into(), vec![sample_spec("apps")])
            .await;
        assert!(cache.manifest("inventory-sw").await.is_none());
    }

    #[tokio::test]
    async fn drop_manifest_also_clears_manifest_lookup() {
        let cache = ExplodeSpecCache::new();
        cache.insert_manifest(sample_manifest()).await;
        cache.drop_manifest("inventory-sw").await;
        assert!(cache.manifest("inventory-sw").await.is_none());
    }
}