use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use base64::Engine as _;
use kanade_shared::kv::{BUCKET_SCRIPT_CURRENT, OBJECT_SCRIPTS};
use kanade_shared::manifest::{FanoutPlan, Manifest};
use kanade_shared::subject;
use kanade_shared::wire::Command;
use serde::Serialize;
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::jobs;
use crate::audit;
use crate::audit::Caller;
#[derive(Serialize, Clone)]
pub struct ExecResponse {
pub exec_id: String,
pub job_id: String,
pub version: String,
pub target_count: u32,
pub subjects: Vec<String>,
}
pub async fn exec_manifest(
s: &AppState,
manifest: Manifest,
plan: FanoutPlan,
actor: &str,
caller: Option<&Caller>,
) -> Result<ExecResponse, (StatusCode, String)> {
let has_rollout = plan
.rollout
.as_ref()
.map(|r| !r.waves.is_empty())
.unwrap_or(false);
if !has_rollout && !plan.target.is_specified() {
return Err((
StatusCode::BAD_REQUEST,
"target must specify at least one of `all` / `groups` / `pcs` (or set `rollout.waves`)"
.into(),
));
}
manifest
.execute
.validate_script_source()
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let (inline_script, script_object_ref) = resolve_script_source(s, &manifest).await?;
let timeout_secs = humantime::parse_duration(&manifest.execute.timeout)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("invalid timeout: {e}")))?
.as_secs();
let jitter_secs = plan
.jitter
.as_deref()
.map(humantime::parse_duration)
.transpose()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("invalid jitter: {e}")))?
.map(|d| d.as_secs());
let exec_id = Uuid::new_v4().to_string();
let deadline_at = plan.deadline_at;
let make_cmd = || Command {
id: manifest.id.clone(),
version: manifest.version.clone(),
request_id: Uuid::new_v4().to_string(),
exec_id: Some(exec_id.clone()),
shell: manifest.execute.shell.into(),
script: inline_script.clone(),
script_object: script_object_ref.as_ref().map(|(k, _)| k.clone()),
script_object_sha256: script_object_ref.as_ref().map(|(_, d)| d.clone()),
timeout_secs,
jitter_secs,
run_as: manifest.execute.run_as,
cwd: manifest.execute.cwd.clone(),
deadline_at,
staleness: manifest.staleness.clone(),
emit: manifest.emit.clone(),
};
let mut subjects: Vec<String> = Vec::new();
let mut target_count: u32 = 0;
if let Some(rollout) = plan.rollout.as_ref() {
let mut delays = Vec::with_capacity(rollout.waves.len());
for (idx, wave) in rollout.waves.iter().enumerate() {
let d = humantime::parse_duration(&wave.delay).map_err(|e| {
(
StatusCode::BAD_REQUEST,
format!("invalid rollout.waves[{idx}].delay: {e}"),
)
})?;
delays.push(d);
}
for ((idx, wave), delay) in rollout.waves.iter().enumerate().zip(delays) {
let subj = subject::commands_group(&wave.group);
subjects.push(subj.clone());
target_count = target_count.saturating_add(1);
let cmd = make_cmd();
let payload = serde_json::to_vec(&cmd)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
if delay.is_zero() {
if let Err(e) = s.nats.publish(subj.clone(), payload.into()).await {
return Err((StatusCode::BAD_GATEWAY, format!("publish to {subj}: {e}")));
}
info!(wave = idx, subject = %subj, "wave published (immediate)");
} else {
let nats = s.nats.clone();
let subj_for_spawn = subj.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
match nats.publish(subj_for_spawn.clone(), payload.into()).await {
Ok(()) => info!(
wave = idx,
subject = %subj_for_spawn,
delay_secs = delay.as_secs(),
"wave published (delayed)",
),
Err(e) => warn!(
error = %e,
wave = idx,
subject = %subj_for_spawn,
"delayed wave publish failed",
),
}
});
info!(
wave = idx,
subject = %subj,
delay_secs = delay.as_secs(),
"wave scheduled",
);
}
}
} else {
if plan.target.all {
subjects.push(subject::COMMANDS_ALL.to_string());
target_count = target_count.saturating_add(1);
}
for g in &plan.target.groups {
subjects.push(subject::commands_group(g));
target_count = target_count.saturating_add(1);
}
for pc in &plan.target.pcs {
subjects.push(subject::commands_pc(pc));
target_count = target_count.saturating_add(1);
}
for subj in &subjects {
let cmd = make_cmd();
let payload = serde_json::to_vec(&cmd)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
if let Err(e) = s.nats.publish(subj.clone(), payload.into()).await {
warn!(error = %e, subject = %subj, "publish failed");
return Err((StatusCode::BAD_GATEWAY, format!("publish to {subj}: {e}")));
}
}
}
let _ = s.nats.flush().await;
match s.jetstream.get_key_value(BUCKET_SCRIPT_CURRENT).await {
Ok(kv) => {
if let Err(e) = kv
.put(
&manifest.id,
bytes::Bytes::from(manifest.version.clone().into_bytes()),
)
.await
{
warn!(error = %e, cmd_id = %manifest.id, "script_current put failed");
}
}
Err(e) => warn!(error = %e, "script_current KV missing; skipping version pin"),
}
sqlx::query(
"INSERT INTO executions (exec_id, job_id, version, initiated_by, target_count, status)
VALUES (?, ?, ?, ?, ?, 'pending')",
)
.bind(&exec_id)
.bind(&manifest.id)
.bind(&manifest.version)
.bind(actor)
.bind(target_count as i64)
.execute(&s.pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("insert executions: {e}"),
)
})?;
info!(
exec_id = %exec_id,
job_id = %manifest.id,
version = %manifest.version,
actor,
target_count,
wave_mode = has_rollout,
subjects = ?subjects,
"execution published",
);
audit::record(
&s.nats,
actor,
"exec",
Some(&manifest.id),
caller,
serde_json::json!({
"exec_id": exec_id,
"version": manifest.version,
"target_count": target_count,
"subjects": subjects,
"wave_mode": has_rollout,
}),
)
.await;
Ok(ExecResponse {
exec_id,
job_id: manifest.id,
version: manifest.version,
target_count,
subjects,
})
}
pub async fn create(
State(s): State<AppState>,
Path(job_id): Path<String>,
caller: Caller,
Json(plan): Json<FanoutPlan>,
) -> Result<Json<ExecResponse>, (StatusCode, String)> {
let manifest = match jobs::fetch(&s.jetstream, &job_id).await {
Ok(Some(m)) => m,
Ok(None) => {
return Err((
StatusCode::NOT_FOUND,
format!(
"job '{job_id}' not found in catalog — register it first with \
`kanade job create <manifest.yaml>`"
),
));
}
Err(e) => {
warn!(error = %e, %job_id, "exec: job catalog lookup failed");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("job catalog lookup: {e}"),
));
}
};
exec_manifest(&s, manifest, plan, "operator", Some(&caller))
.await
.map(Json)
}
async fn resolve_script_source(
s: &AppState,
manifest: &Manifest,
) -> Result<(String, Option<(String, String)>), (StatusCode, String)> {
if let Some(inline) = manifest.execute.script.as_deref().filter(|s| !s.is_empty()) {
return Ok((inline.to_owned(), None));
}
let key = manifest.execute.script_object.as_deref().ok_or((
StatusCode::BAD_REQUEST,
"execute: one of `script` or `script_object` must be set \
(Manifest::validate() should have caught this earlier)"
.to_string(),
))?;
let store = s
.jetstream
.get_object_store(OBJECT_SCRIPTS)
.await
.map_err(|e| {
warn!(error = %e, "exec: get_object_store scripts");
(
StatusCode::SERVICE_UNAVAILABLE,
format!(
"Object Store '{OBJECT_SCRIPTS}' missing — \
run `kanade jetstream setup`"
),
)
})?;
let info = store.info(key).await.map_err(|e| {
let msg = e.to_string();
if msg.contains("not found") || msg.contains("no objects") {
(
StatusCode::NOT_FOUND,
format!("script_object '{key}' not found in OBJECT_SCRIPTS"),
)
} else {
warn!(error = %e, %key, "exec: object_store.info");
(StatusCode::INTERNAL_SERVER_ERROR, msg)
}
})?;
let raw = info.digest.as_deref().ok_or((
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"script_object '{key}' has no digest metadata — \
broker should always populate it"
),
))?;
let b64 = raw.strip_prefix("SHA-256=").unwrap_or(raw);
use base64::alphabet::URL_SAFE;
use base64::engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig};
static DECODER: GeneralPurpose = GeneralPurpose::new(
&URL_SAFE,
GeneralPurposeConfig::new().with_decode_padding_mode(DecodePaddingMode::Indifferent),
);
let bytes = DECODER.decode(b64).map_err(|e| {
warn!(error = %e, %key, raw, "exec: decode object_store digest");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode digest for '{key}': {e}"),
)
})?;
let digest = hex_lower(&bytes);
Ok((String::new(), Some((key.to_owned(), digest))))
}
fn hex_lower(bytes: &[u8]) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = write!(out, "{b:02x}");
}
out
}