kanade-backend 0.13.2

axum + SQLite projection backend for the kanade endpoint-management system. Hosts /api/* and the embedded SPA dashboard, projects JetStream streams into SQLite, drives the cron scheduler
//! HTTP surface for the staged self-update flow.
//!
//! * `POST /api/agents/publish` — multipart upload (`file` =
//!   binary, `version` = label) → puts the bytes in the
//!   `agent_releases` Object Store. Mirrors `kanade agent
//!   publish` on the CLI side; the SPA's Rollout page wires a
//!   file picker to this endpoint.
//! * `GET  /api/agents/releases` — list every version present in
//!   the Object Store. Used by the Web UI's rollout picker.
//! * `POST /api/agents/rollout` — flip `target_version` (and
//!   optionally `target_version_jitter`) on one scope of the
//!   layered `agent_config` bucket. Mirrors the CLI's `rollout`
//!   subcommand.

use axum::Json;
use axum::extract::{Multipart, Path, State};
use axum::http::StatusCode;
use futures::StreamExt;
use kanade_shared::exe_version::extract_pe_version;
use kanade_shared::kv::{
    BUCKET_AGENT_CONFIG, KEY_AGENT_CONFIG_GLOBAL, OBJECT_AGENT_RELEASES, agent_config_group_key,
    agent_config_pc_key, parse_agent_config_group_key, parse_agent_config_pc_key,
};
use kanade_shared::wire::ConfigScope;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};

use super::AppState;

// ─── POST /api/agents/publish ────────────────────────────────────────

#[derive(Serialize)]
pub struct PublishResponse {
    pub version: String,
    pub size: u64,
    pub digest: Option<String>,
}

pub async fn publish(
    State(state): State<AppState>,
    mut multipart: Multipart,
) -> Result<Json<PublishResponse>, (StatusCode, String)> {
    // v0.13.1: the "version" form field is gone. The Object Store
    // key is whatever the binary's embedded VERSIONINFO says — that
    // makes a "label vs binary version" disagreement physically
    // impossible (the failure mode that caused the v0.13.0
    // "1.0.0"-loop incident).
    let mut bytes: Option<Vec<u8>> = None;

    while let Some(field) = multipart.next_field().await.map_err(|e| {
        (
            StatusCode::BAD_REQUEST,
            format!("read multipart field: {e}"),
        )
    })? {
        match field.name().unwrap_or("") {
            "file" => {
                let buf = field
                    .bytes()
                    .await
                    .map_err(|e| (StatusCode::BAD_REQUEST, format!("read file field: {e}")))?;
                bytes = Some(buf.to_vec());
            }
            "version" => {
                // Older clients (CLI v0.13.0 and earlier, SPA before
                // the upload-card refactor) still POST a `version`
                // form field; we ignore it now but drain the body so
                // multipart parsing doesn't stall.
                let _ = field.text().await;
                warn!("publish: 'version' form field ignored (extracted from PE bytes instead)");
            }
            other => {
                warn!(field = other, "publish: ignoring unknown multipart field");
            }
        }
    }

    let bytes = bytes.ok_or((StatusCode::BAD_REQUEST, "missing 'file' field".into()))?;
    if bytes.is_empty() {
        return Err((StatusCode::BAD_REQUEST, "'file' field is empty".into()));
    }

    let version = extract_pe_version(&bytes).ok_or((
        StatusCode::BAD_REQUEST,
        "couldn't extract VERSIONINFO from the uploaded binary — \
         is it a Windows PE built with `winres`? Kanade ≥ v0.13.1 \
         embeds the resource automatically; older binaries need to \
         be re-published from a current build."
            .to_owned(),
    ))?;

    let size = bytes.len() as u64;
    info!(version, size, "publish: uploading new agent binary");

    let store = state
        .jetstream
        .get_object_store(OBJECT_AGENT_RELEASES)
        .await
        .map_err(|e| {
            warn!(error = %e, "get_object_store agent_releases");
            (
                StatusCode::SERVICE_UNAVAILABLE,
                format!(
                    "Object Store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`"
                ),
            )
        })?;
    let mut cursor = std::io::Cursor::new(bytes);
    let meta = store
        .put(version.as_str(), &mut cursor)
        .await
        .map_err(|e| {
            warn!(error = %e, "object_store.put");
            (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
        })?;
    info!(version, digest = ?meta.digest, "publish: agent binary uploaded");

    Ok(Json(PublishResponse {
        version,
        size,
        digest: meta.digest,
    }))
}

/// `DELETE /api/agents/releases/<version>` — remove a release from
/// the Object Store. Rejects with 409 when any `agent_config` scope
/// still points at this version (global / per-group / per-pc), so an
/// operator can't accidentally remove a binary the fleet is rolling
/// out to. Pass the same path-parameter the listing endpoint
/// returns; the version is the Object Store key.
pub async fn delete_release(
    State(state): State<AppState>,
    Path(version): Path<String>,
) -> Result<StatusCode, (StatusCode, String)> {
    let kv = state
        .jetstream
        .get_key_value(BUCKET_AGENT_CONFIG)
        .await
        .map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
    let mut keys = kv
        .keys()
        .await
        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
    while let Some(k) = keys.next().await {
        let k = match k {
            Ok(k) => k,
            Err(_) => continue,
        };
        let entry = match kv.get(&k).await.map_err(|e| {
            warn!(error = %e, %k, "kv.get scope");
            (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
        })? {
            Some(b) => b,
            None => continue,
        };
        let scope: ConfigScope = match serde_json::from_slice(&entry) {
            Ok(s) => s,
            Err(_) => continue,
        };
        if scope.target_version.as_deref() == Some(version.as_str()) {
            let label = if k == KEY_AGENT_CONFIG_GLOBAL {
                "global".to_string()
            } else if let Some(g) = parse_agent_config_group_key(&k) {
                format!("group:{g}")
            } else if let Some(p) = parse_agent_config_pc_key(&k) {
                format!("pc:{p}")
            } else {
                k.clone()
            };
            return Err((
                StatusCode::CONFLICT,
                format!(
                    "version '{version}' is the current target_version of scope '{label}' — \
                     clear or change that scope first (kanade config unset target_version --… )"
                ),
            ));
        }
    }

    let store = state
        .jetstream
        .get_object_store(OBJECT_AGENT_RELEASES)
        .await
        .map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
    store.delete(&version).await.map_err(|e| {
        warn!(error = %e, %version, "object_store.delete");
        // async-nats returns a generic error if the key is missing;
        // surface that as 404 for a cleaner SPA experience.
        let msg = e.to_string();
        if msg.contains("not found") || msg.contains("no objects") {
            (
                StatusCode::NOT_FOUND,
                format!("version '{version}' not in Object Store"),
            )
        } else {
            (StatusCode::INTERNAL_SERVER_ERROR, msg)
        }
    })?;
    info!(%version, "publish: agent binary deleted");
    Ok(StatusCode::NO_CONTENT)
}

// ─── GET /api/agents/releases ────────────────────────────────────────

#[derive(Serialize)]
pub struct ReleaseRow {
    pub version: String,
    pub size: u64,
    pub digest: Option<String>,
    /// RFC3339 timestamp from the Object Store metadata. The
    /// underlying async-nats type is `time::OffsetDateTime`; we
    /// stringify so the SPA can parse it with `Date.parse` and
    /// kanade-shared doesn't need a time-crate dep.
    pub modified: Option<String>,
}

pub async fn list_releases(
    State(state): State<AppState>,
) -> Result<Json<Vec<ReleaseRow>>, (StatusCode, String)> {
    let store = state
        .jetstream
        .get_object_store(OBJECT_AGENT_RELEASES)
        .await
        .map_err(|e| {
            warn!(error = %e, "get_object_store agent_releases");
            (
                StatusCode::SERVICE_UNAVAILABLE,
                format!(
                    "Object Store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`"
                ),
            )
        })?;
    let mut list = store.list().await.map_err(|e| {
        warn!(error = %e, "object_store.list");
        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
    })?;

    let mut rows = Vec::new();
    while let Some(meta) = list.next().await {
        let Ok(meta) = meta else { continue };
        // async-nats' ObjectMeta.modified is a time::OffsetDateTime;
        // convert to chrono via Unix nanos so the wire shape matches
        // every other `*_at` field on the backend.
        let modified = meta.modified.and_then(|t| {
            let nanos = t.unix_timestamp_nanos();
            let secs = (nanos.div_euclid(1_000_000_000)) as i64;
            let nsec = (nanos.rem_euclid(1_000_000_000)) as u32;
            chrono::DateTime::<chrono::Utc>::from_timestamp(secs, nsec).map(|d| d.to_rfc3339())
        });
        rows.push(ReleaseRow {
            version: meta.name,
            size: meta.size as u64,
            digest: meta.digest,
            modified,
        });
    }
    rows.sort_by(|a, b| b.modified.cmp(&a.modified));
    Ok(Json(rows))
}

// ─── POST /api/agents/rollout ────────────────────────────────────────

#[derive(Deserialize, Debug)]
#[serde(rename_all = "snake_case", tag = "type", content = "value")]
pub enum RolloutScope {
    Global,
    Group(String),
    Pc(String),
}

#[derive(Deserialize, Debug)]
pub struct RolloutBody {
    pub version: String,
    pub scope: RolloutScope,
    /// Optional `target_version_jitter` override on the same scope
    /// (humantime, e.g. `"30m"`). Omit to leave the existing value
    /// alone.
    #[serde(default)]
    pub jitter: Option<String>,
}

#[derive(Serialize)]
pub struct RolloutResponse {
    pub version: String,
    pub scope_key: String,
    pub scope_label: String,
    pub jitter: Option<String>,
}

pub async fn rollout(
    State(state): State<AppState>,
    Json(body): Json<RolloutBody>,
) -> Result<Json<RolloutResponse>, (StatusCode, String)> {
    let (key, label) = match &body.scope {
        RolloutScope::Global => (KEY_AGENT_CONFIG_GLOBAL.to_string(), "global".to_string()),
        RolloutScope::Group(g) => (agent_config_group_key(g), format!("group:{g}")),
        RolloutScope::Pc(p) => (agent_config_pc_key(p), format!("pc:{p}")),
    };

    // Fail-fast on a version that doesn't have a binary uploaded yet.
    let store = state
        .jetstream
        .get_object_store(OBJECT_AGENT_RELEASES)
        .await
        .map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
    if store.info(&body.version).await.is_err() {
        return Err((
            StatusCode::NOT_FOUND,
            format!(
                "version '{}' not found in {OBJECT_AGENT_RELEASES} — run `kanade agent publish` first",
                body.version
            ),
        ));
    }

    let kv = state
        .jetstream
        .get_key_value(BUCKET_AGENT_CONFIG)
        .await
        .map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;

    let mut scope = match kv.get(&key).await.map_err(|e| {
        warn!(error = %e, %key, "kv.get scope");
        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
    })? {
        Some(b) => serde_json::from_slice::<ConfigScope>(&b).map_err(|e| {
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                format!("decode existing {BUCKET_AGENT_CONFIG}.{key}: {e}"),
            )
        })?,
        None => ConfigScope::default(),
    };
    scope.target_version = Some(body.version.clone());
    if let Some(j) = body.jitter.as_deref() {
        scope.target_version_jitter = Some(j.to_owned());
    }
    let payload = serde_json::to_vec(&scope).map_err(|e| {
        warn!(error = %e, "encode ConfigScope");
        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
    })?;
    kv.put(key.as_str(), payload.into()).await.map_err(|e| {
        warn!(error = %e, %key, "kv.put scope");
        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
    })?;

    info!(
        scope = %label,
        version = %body.version,
        jitter = ?body.jitter,
        "rollout: target_version flipped via HTTP",
    );

    Ok(Json(RolloutResponse {
        version: body.version,
        scope_key: key,
        scope_label: label,
        jitter: body.jitter,
    }))
}