kanade-shared 0.43.49

Shared wire types, NATS subject helpers, KV constants, YAML manifest schema, and teravars-backed config loader for the kanade endpoint-management system
Documentation
//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
//!
//! Lists every NATS JetStream resource the kanade fleet expects —
//! streams, KV buckets, Object Stores — and asks the broker to
//! create-or-update them. v0.25.0 switched from `create_*` to
//! `create_or_update_*`: the old form returned error 10058 ("name
//! already in use with a different configuration") when a release
//! widened a stream's subjects or changed its retention policy on
//! a broker that still held the older config. With the new form the
//! broker reconciles its definition to the one in this file, so
//! version bumps no longer require operator-side data wipes.
//!
//! Centralising the list here means a future "we added a new
//! bucket" change touches one place and both the operator CLI +
//! the auto-bootstrap path pick it up.

use std::time::Duration;

use anyhow::{Context, Result};
use async_nats::jetstream::{
    self,
    kv::Config as KvConfig,
    object_store::Config as ObjectStoreConfig,
    stream::{Config as StreamConfig, DiscardPolicy},
};
use tracing::info;

use crate::kv::{
    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
    BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT,
    BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT,
    OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS,
    STREAM_RESULTS,
};

/// Idempotently create every NATS JetStream resource the kanade
/// fleet relies on. Calling repeatedly is safe — `create_*` returns
/// the existing resource if it's already configured.
///
/// Returns once every resource is in place. The function is async
/// so backends can `await` it as part of their startup sequence
/// (one round-trip per resource — ~10 RTTs total).
pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
    // ── Streams ──────────────────────────────────────────────────
    // #518: every stream carries a `max_bytes` cap with
    // `Discard::Old` on top of its `max_age` window. Within their
    // age windows the streams used to be unbounded by size, and
    // JetStream's file store shares a disk with SQLite on the
    // backend host — one job printing 200 KB per run fleet-wide
    // could exhaust the store, at which point EVERY publish fails
    // (results, obs, audit, KV puts). With the caps, worst-case
    // degradation is "shorter history on the offending stream"
    // instead of "broker down".
    //
    // Sizing: JetStream RESERVES each `max_bytes` against its
    // available storage (min of max_file_store and free disk) at
    // create/update time and fails with error 10047 when the sum
    // doesn't fit, so these must stay small enough for modest
    // hosts. That's fine: every stream here is a transport +
    // replay buffer — the durable record is the backend's SQLite
    // (results/inventory/obs/audit are all projected within
    // seconds) — so the caps are runaway-output backstops, not
    // history budgets. Total reservation ≈ 5.3 GiB including the
    // result_output object store below.
    const MIB: i64 = 1024 * 1024;
    const GIB: i64 = 1024 * MIB;

    // INVENTORY — 90-day rolling history (spec §2.3.1).
    js.create_or_update_stream(StreamConfig {
        name: STREAM_INVENTORY.into(),
        subjects: vec!["inventory.>".into()],
        max_age: Duration::from_secs(90 * 24 * 60 * 60),
        max_bytes: GIB,
        discard: DiscardPolicy::Old,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
    info!(stream = STREAM_INVENTORY, "ready");

    // RESULTS — 30-day rolling history. The biggest producer by
    // far (every job run on every PC, with up to 256 KB of inline
    // stdout/stderr per message), so it gets the largest slice of
    // the disk budget.
    js.create_or_update_stream(StreamConfig {
        name: STREAM_RESULTS.into(),
        subjects: vec!["results.>".into()],
        max_age: Duration::from_secs(30 * 24 * 60 * 60),
        max_bytes: 2 * GIB,
        discard: DiscardPolicy::Old,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
    info!(stream = STREAM_RESULTS, "ready");

    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
    // single backend publish lands in BOTH the agent's live core
    // subscription AND the stream's retention store. Reconnecting
    // agents catch up via a durable consumer with
    // `DeliverPolicy::LastPerSubject` — they receive the most
    // recent Command per subject they care about, no matter how
    // long they were offline (within `max_age`).
    js.create_or_update_stream(StreamConfig {
        name: STREAM_EXEC.into(),
        subjects: vec!["commands.>".into()],
        max_messages_per_subject: 1,
        max_age: Duration::from_secs(7 * 24 * 60 * 60),
        // Latest-per-subject keeps this tiny (one Command per
        // group/pc subject); the cap is a backstop against subject
        // cardinality bugs, not a working budget.
        max_bytes: 64 * MIB,
        discard: DiscardPolicy::Old,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
    info!(stream = STREAM_EXEC, "ready");

    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
    // 7-day window matches the EXEC spec window.
    js.create_or_update_stream(StreamConfig {
        name: STREAM_EVENTS.into(),
        subjects: vec!["events.>".into()],
        max_age: Duration::from_secs(7 * 24 * 60 * 60),
        max_bytes: 256 * MIB,
        discard: DiscardPolicy::Old,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
    info!(stream = STREAM_EVENTS, "ready");

    // AUDIT — operator-action record (spec §2.3.1). The DURABLE
    // copy is the backend's SQLite `audit_log` table (the projector
    // INSERTs each message, idempotently since #501; 365-day
    // retention since #486) — the stream is transport + replay
    // buffer, not the archive, so it can be bounded like the rest.
    // 90 days / 512 MiB is far more than the projector ever lags;
    // previously this stream had NO limits at all, making it an
    // unbounded disk leak on the broker host.
    js.create_or_update_stream(StreamConfig {
        name: STREAM_AUDIT.into(),
        subjects: vec!["audit.>".into()],
        max_age: Duration::from_secs(90 * 24 * 60 * 60),
        max_bytes: 512 * MIB,
        discard: DiscardPolicy::Old,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
    info!(stream = STREAM_AUDIT, "ready");

    // OBS_EVENTS — per-PC observability timeline (Issue #246). The
    // 90-day window matches `obs_events` table retention so a
    // backend bootstrapping after long downtime can catch up but
    // doesn't carry data the table will discard anyway. Subject
    // filter `obs.>` catches every PC without a per-PC subscription.
    //
    // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
    // 60` open-coded across bootstrap + cleanup; the matching prune
    // window in `kanade-backend::cleanup` quotes the same number
    // separately (SQLite-relative string syntax there, not a
    // duration), so it can't share a constant — but a single
    // arithmetic spell-out here makes the relationship grep-able.
    const SECS_PER_DAY: u64 = 24 * 60 * 60;
    const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
    js.create_or_update_stream(StreamConfig {
        name: STREAM_OBS_EVENTS.into(),
        subjects: vec!["obs.>".into()],
        max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
        max_bytes: 512 * MIB,
        discard: DiscardPolicy::Old,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
    info!(stream = STREAM_OBS_EVENTS, "ready");

    // ── KV buckets ───────────────────────────────────────────────
    // script_current — cmd_id → version (spec §2.6 Layer 2).
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_SCRIPT_CURRENT.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");

    // script_status — cmd_id → ACTIVE / REVOKED.
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_SCRIPT_STATUS.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");

    // agents_state — pc_id → latest hw snapshot (history=1).
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_AGENTS_STATE.into(),
        history: 1,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
    info!(bucket = BUCKET_AGENTS_STATE, "ready");

    // agent_config — Sprint 6 layered scopes (global / groups.* /
    // pcs.*) plus the legacy target_version key.
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_AGENT_CONFIG.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
    info!(bucket = BUCKET_AGENT_CONFIG, "ready");

    // agent_groups — Sprint 5 per-pc group membership.
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_AGENT_GROUPS.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
    info!(bucket = BUCKET_AGENT_GROUPS, "ready");

    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
    // Backend's scheduler.rs also creates this on startup; calling
    // twice is harmless.
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_SCHEDULES.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
    info!(bucket = BUCKET_SCHEDULES, "ready");

    // jobs — v0.15 operator-registered Manifest catalog. Schedules
    // reference rows here by id; editing a job rewrites what future
    // schedule fires exec.
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_JOBS.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
    info!(bucket = BUCKET_JOBS, "ready");

    // fleet_config — #418 Phase 5 fleet-wide singletons (the global
    // change-freeze under KEY_FREEZE). history: 1 — only the current
    // state matters; both schedulers watch it.
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_FLEET_CONFIG.into(),
        history: 1,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
    info!(bucket = BUCKET_FLEET_CONFIG, "ready");

    // jobs_yaml / schedules_yaml — operator source-of-truth YAML
    // alongside the JSON catalogs above. Same key shape (manifest id
    // / schedule id), but the value is the raw YAML bytes so the
    // SPA's YAML editor preserves comments + script block-scalar
    // indentation across edits. Agents/scheduler don't read these.
    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_JOBS_YAML.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
    info!(bucket = BUCKET_JOBS_YAML, "ready");

    js.create_or_update_key_value(KvConfig {
        bucket: BUCKET_SCHEDULES_YAML.into(),
        history: 5,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
    info!(bucket = BUCKET_SCHEDULES_YAML, "ready");

    // ── Object Store ─────────────────────────────────────────────
    // agent_releases — one object per version, raw exe bytes.
    js.create_object_store(ObjectStoreConfig {
        bucket: OBJECT_AGENT_RELEASES.into(),
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
    info!(store = OBJECT_AGENT_RELEASES, "ready");

    // app_packages — generic operator-uploaded binary distribution
    // (kanade-client today; third-party installers like Webex /
    // Teams once those flows land). Object keys are
    // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
    // for the full rationale.
    js.create_object_store(ObjectStoreConfig {
        bucket: OBJECT_APP_PACKAGES.into(),
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_object_store {OBJECT_APP_PACKAGES}"))?;
    info!(store = OBJECT_APP_PACKAGES, "ready");

    // scripts — manifest script bodies referenced by
    // `Execute::script_object` (SPEC §2.4.1). Sibling of
    // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
    // the bucket-split rationale (smaller payloads + manifest-
    // coupled lifecycle vs operator-curated installers).
    js.create_object_store(ObjectStoreConfig {
        bucket: OBJECT_SCRIPTS.into(),
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_object_store {OBJECT_SCRIPTS}"))?;
    info!(store = OBJECT_SCRIPTS, "ready");

    // result_output — overflow stdout / stderr blobs for the
    // `ExecResult` wire kind (#227). Anything larger than the agent's
    // 256 KB inline threshold gets uploaded here under
    // `<request_id>/{stdout,stderr}`; the backend's results
    // projector derefs the pointer fields before INSERT so SQLite
    // + the SPA see the full text inline. 30-day max_age matches
    // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
    // resolvable in execution_results never points at a missing
    // blob.
    // #518: capped like the streams — a job whose output overflows
    // the inline threshold writes blobs HERE instead of
    // STREAM_RESULTS, so without its own cap this store bypasses
    // the stream budget entirely and can still fill the file store.
    // The projector derefs blobs within seconds of publish, so
    // eviction only ever hits already-projected (or expired)
    // output.
    js.create_object_store(ObjectStoreConfig {
        bucket: OBJECT_RESULT_OUTPUT.into(),
        max_age: Duration::from_secs(SECS_PER_DAY * 30),
        max_bytes: GIB,
        ..Default::default()
    })
    .await
    .with_context(|| format!("create_object_store {OBJECT_RESULT_OUTPUT}"))?;
    info!(store = OBJECT_RESULT_OUTPUT, "ready");

    Ok(())
}