kanade-backend 0.29.0

axum + SQLite projection backend for the kanade endpoint-management system. Hosts /api/* and the embedded SPA dashboard, projects JetStream streams into SQLite, drives the cron scheduler
//! Job endpoints:
//!   * `POST /api/jobs/{job_id}/kill` — runtime control. Looks up every
//!     in-flight execution of `{job_id}` from the `executions` table
//!     (status pending / running) and publishes `kill.{exec_id}` per
//!     deployment so agents actually receive the signal (spec §2.6
//!     Layer 3). Pre-v0.29 this published `kill.{cmd_id}`, which no
//!     agent subscribes to — the kill button on the SPA was effectively
//!     a no-op since v0.27.
//!   * `GET / POST /api/jobs` + `DELETE /api/jobs/{id}` — catalog CRUD
//!     (v0.15). Schedules reference catalog rows by `job_id`.

use async_nats::jetstream::kv::Config as KvConfig;
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use futures::TryStreamExt;
use kanade_shared::kv::{
    BUCKET_JOBS, BUCKET_SCHEDULES, BUCKET_SCRIPT_STATUS, SCRIPT_STATUS_REVOKED,
};
use kanade_shared::manifest::{Manifest, Schedule};
use kanade_shared::subject;
use serde::Serialize;
use sqlx::Row;
use tracing::{info, warn};

use super::AppState;
use crate::audit;
use crate::audit::Caller;

pub async fn kill(
    State(state): State<AppState>,
    Path(job_id): Path<String>,
) -> Result<StatusCode, (StatusCode, String)> {
    // v0.29 / Issue #19: the agent listens on `kill.{exec_id}`, never
    // on `kill.{cmd_id}`. The path param here is the cmd / manifest
    // id, so we have to expand it to every still-running exec_id and
    // publish per-exec. status IN ('pending', 'running') skips
    // already-completed deployments — there's nothing to kill on those.
    let rows = sqlx::query(
        "SELECT exec_id FROM executions \
         WHERE job_id = ? AND status IN ('pending', 'running')",
    )
    .bind(&job_id)
    .fetch_all(&state.pool)
    .await
    .map_err(|e| {
        warn!(error = %e, job_id, "kill: lookup running execs");
        (
            StatusCode::INTERNAL_SERVER_ERROR,
            format!("lookup running execs: {e}"),
        )
    })?;

    let exec_ids: Vec<String> = rows
        .into_iter()
        .map(|r| r.try_get::<String, _>("exec_id").unwrap_or_default())
        .filter(|s| !s.is_empty())
        .collect();

    if exec_ids.is_empty() {
        // No running deployments → there's nothing to kill. Return
        // 204 anyway: the operator's mental model is "I clicked kill,
        // it's not running", which is what 204 + zero-published
        // conveys. A 404 here would just confuse the SPA.
        info!(
            %job_id,
            "kill: no running executions for this job (no-op)",
        );
        return Ok(StatusCode::NO_CONTENT);
    }

    for exec_id in &exec_ids {
        if let Err(e) = state
            .nats
            .publish(subject::kill(exec_id), bytes::Bytes::new())
            .await
        {
            warn!(error = %e, %job_id, %exec_id, "publish kill failed");
            return Err((
                StatusCode::INTERNAL_SERVER_ERROR,
                format!("publish kill.{exec_id}: {e}"),
            ));
        }
    }
    // flush so the subjects are on the wire before we ack the
    // operator — without it, a fast operator-then-shutdown could
    // theoretically drop the kill on the floor.
    let _ = state.nats.flush().await;
    info!(
        %job_id,
        kill_count = exec_ids.len(),
        "kill signal fanned out to running execs",
    );
    Ok(StatusCode::NO_CONTENT)
}

#[derive(Serialize)]
pub struct JobSummary {
    pub id: String,
    pub version: String,
    pub description: Option<String>,
    pub inventory: bool,
}

/// GET /api/jobs — list every registered job.
pub async fn list(State(s): State<AppState>) -> Result<Json<Vec<Manifest>>, (StatusCode, String)> {
    let kv = match s.jetstream.get_key_value(BUCKET_JOBS).await {
        Ok(k) => k,
        Err(_) => return Ok(Json(Vec::new())),
    };
    let keys_stream = kv
        .keys()
        .await
        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
    let keys: Vec<String> = keys_stream
        .try_collect()
        .await
        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
    let mut out = Vec::with_capacity(keys.len());
    for k in keys {
        if let Ok(Some(bytes)) = kv.get(&k).await
            && let Ok(job) = serde_json::from_slice::<Manifest>(&bytes)
        {
            out.push(job);
        }
    }
    out.sort_by(|a, b| a.id.cmp(&b.id));
    Ok(Json(out))
}

/// POST /api/jobs — upsert a Manifest into the job catalog. The KV
/// key is `manifest.id`.
pub async fn create(
    State(s): State<AppState>,
    caller: Caller,
    Json(job): Json<Manifest>,
) -> Result<Json<JobSummary>, (StatusCode, String)> {
    let kv = s
        .jetstream
        .create_key_value(KvConfig {
            bucket: BUCKET_JOBS.into(),
            history: 5,
            ..Default::default()
        })
        .await
        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ensure KV: {e}")))?;
    let body = serde_json::to_vec(&job)
        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
    kv.put(&job.id, body.into())
        .await
        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV put: {e}")))?;
    let summary = JobSummary {
        id: job.id.clone(),
        version: job.version.clone(),
        description: job.description.clone(),
        inventory: job.inventory.is_some(),
    };
    info!(job_id = %job.id, version = %job.version, "job upserted");
    audit::record(
        &s.nats,
        "operator",
        "job_upsert",
        Some(&job.id),
        Some(&caller),
        serde_json::json!({
            "version": job.version,
            "inventory": job.inventory.is_some(),
        }),
    )
    .await;
    Ok(Json(summary))
}

/// DELETE /api/jobs/{id} — 409 if any Schedule references it.
///
/// v0.27 (SPEC §2.6.4 (b)) cascades a Layer 2 revoke: before the
/// Manifest is removed from `BUCKET_JOBS`, the handler writes
/// `script_status.{id} = REVOKED` so any Command already in flight
/// (live core sub delivery in progress, or stored in `STREAM_EXEC`
/// awaiting a reconnecting agent) gets skipped by the agent's
/// `handle_command` KV check. Without this, deleting a Manifest only
/// stops *future* exec calls — already-published Commands would
/// still run.
///
/// To undo the cascade: re-create the Manifest with
/// `kanade job create`, then `kanade unrevoke <id>` to flip
/// `script_status` back to `ACTIVE`.
pub async fn delete(
    State(s): State<AppState>,
    Path(id): Path<String>,
    caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
    if let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES).await
        && let Ok(keys_stream) = kv.keys().await
    {
        let keys: Vec<String> = keys_stream.try_collect().await.unwrap_or_default();
        for k in keys {
            if let Ok(Some(bytes)) = kv.get(&k).await
                && let Ok(sched) = serde_json::from_slice::<Schedule>(&bytes)
                && sched.job_id == id
            {
                return Err((
                    StatusCode::CONFLICT,
                    format!(
                        "job '{id}' is referenced by schedule '{}'; remove the schedule first",
                        sched.id
                    ),
                ));
            }
        }
    }

    // v0.27 — SPEC §2.6.4 (b) cascade revoke: every job delete also
    // writes `script_status.{cmd_id} = REVOKED` so any in-flight
    // Command for this manifest (publish-already-emitted but the
    // agent hasn't run yet, or about to be replayed from STREAM_EXEC
    // on reconnect) gets caught by the Layer 2 KV check and skipped.
    // Without this, removing a Manifest only stops *future* exec
    // calls — Commands already in the broker would still execute on
    // any agent that reads them. We revoke FIRST, then delete the
    // Manifest, so that if delete somehow fails we're still in a safe
    // (revoked) state. Idempotent — re-revoking an already-REVOKED
    // entry is a no-op put. v0.27 round-2 review (gemini #36
    // line 208): resolve BOTH KV handles upfront before any write —
    // that way a missing / unreachable BUCKET_JOBS surfaces as a
    // clean 404 with zero side effects, instead of leaking a revoke
    // that has no matching delete.
    let status_kv = s
        .jetstream
        .get_key_value(BUCKET_SCRIPT_STATUS)
        .await
        .map_err(|e| {
            warn!(
                error = %e,
                bucket = BUCKET_SCRIPT_STATUS,
                "job_delete cascade revoke: status KV unavailable",
            );
            (
                StatusCode::SERVICE_UNAVAILABLE,
                format!("script_status bucket missing: {e}"),
            )
        })?;
    let kv = s.jetstream.get_key_value(BUCKET_JOBS).await.map_err(|e| {
        warn!(error = %e, "jobs KV missing on delete");
        (StatusCode::NOT_FOUND, "jobs bucket missing".to_string())
    })?;

    status_kv
        .put(&id, bytes::Bytes::from(SCRIPT_STATUS_REVOKED))
        .await
        .map_err(|e| {
            warn!(
                error = %e,
                job_id = %id,
                bucket = BUCKET_SCRIPT_STATUS,
                "job_delete cascade revoke: status KV put failed",
            );
            (
                StatusCode::INTERNAL_SERVER_ERROR,
                format!("script_status put: {e}"),
            )
        })?;
    // If the manifest delete fails *after* we successfully cascaded
    // the revoke, the operator needs to know that `script_status.{id}`
    // is now REVOKED so they can `kanade unrevoke <id>` as part of the
    // recovery. We audit + surface the revoke state in the error body
    // rather than silently dropping it (CodeRabbit #36 review).
    if let Err(e) = kv.delete(&id).await {
        warn!(
            error = %e,
            job_id = %id,
            cascade_revoke = true,
            "job_delete failed after cascade revoke",
        );
        audit::record(
            &s.nats,
            "operator",
            "job_delete_failed_post_revoke",
            Some(&id),
            Some(&caller),
            serde_json::json!({ "cascade_revoke": true, "error": e.to_string() }),
        )
        .await;
        return Err((
            StatusCode::INTERNAL_SERVER_ERROR,
            format!(
                "kv delete: {e}; script_status.{id} is already REVOKED — `kanade unrevoke {id}` to recover"
            ),
        ));
    }
    info!(job_id = %id, cascade_revoke = true, "job deleted");
    audit::record(
        &s.nats,
        "operator",
        "job_delete",
        Some(&id),
        Some(&caller),
        serde_json::json!({ "cascade_revoke": true }),
    )
    .await;
    Ok(StatusCode::NO_CONTENT)
}

/// Lookup helper for scheduler + projector. Returns `Ok(None)` when
/// the key is absent so callers can warn-and-skip without unwrapping
/// a fatal error.
pub async fn fetch(
    js: &async_nats::jetstream::Context,
    job_id: &str,
) -> anyhow::Result<Option<Manifest>> {
    let kv = match js.get_key_value(BUCKET_JOBS).await {
        Ok(k) => k,
        Err(_) => return Ok(None),
    };
    let Some(bytes) = kv.get(job_id).await? else {
        return Ok(None);
    };
    let job: Manifest = serde_json::from_slice(&bytes)?;
    Ok(Some(job))
}