use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use base64::Engine as _;
use kanade_shared::kv::{BUCKET_SCRIPT_CURRENT, OBJECT_SCRIPTS};
use kanade_shared::manifest::{FanoutPlan, Manifest};
use kanade_shared::subject;
use kanade_shared::wire::Command;
use serde::Serialize;
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::jobs;
use crate::audit;
use crate::audit::Caller;
#[derive(Serialize, Clone)]
pub struct ExecResponse {
pub exec_id: String,
pub job_id: String,
pub version: String,
pub target_count: u32,
pub subjects: Vec<String>,
}
pub async fn exec_manifest(
s: &AppState,
manifest: Manifest,
plan: FanoutPlan,
actor: &str,
caller: Option<&Caller>,
retry: Option<kanade_shared::wire::RetrySpec>,
) -> Result<ExecResponse, (StatusCode, String)> {
let has_rollout = plan
.rollout
.as_ref()
.map(|r| !r.waves.is_empty())
.unwrap_or(false);
if !has_rollout && !plan.target.is_specified() {
return Err((
StatusCode::BAD_REQUEST,
"target must specify at least one of `all` / `groups` / `pcs` (or set `rollout.waves`)"
.into(),
));
}
manifest
.execute
.validate_script_source()
.map_err(|e| (StatusCode::BAD_REQUEST, e))?;
let (inline_script, script_object_ref) = resolve_script_source(s, &manifest).await?;
let timeout_secs = humantime::parse_duration(&manifest.execute.timeout)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("invalid timeout: {e}")))?
.as_secs();
let jitter_secs = plan
.jitter
.as_deref()
.map(humantime::parse_duration)
.transpose()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("invalid jitter: {e}")))?
.map(|d| d.as_secs());
let exec_id = Uuid::new_v4().to_string();
let deadline_at = plan.deadline_at;
let make_cmd = || Command {
id: manifest.id.clone(),
version: manifest.version.clone(),
request_id: Uuid::new_v4().to_string(),
exec_id: Some(exec_id.clone()),
shell: manifest.execute.shell.into(),
script: inline_script.clone(),
script_object: script_object_ref.as_ref().map(|(k, _)| k.clone()),
script_object_sha256: script_object_ref.as_ref().map(|(_, d)| d.clone()),
timeout_secs,
jitter_secs,
run_as: manifest.execute.run_as,
cwd: manifest.execute.cwd.clone(),
deadline_at,
staleness: manifest.staleness.clone(),
emit: manifest.emit.clone(),
check: manifest.check.clone(),
retry,
};
let mut subjects: Vec<String> = Vec::new();
let expected_target = expected_target_for(&plan);
let target_count: u32 = match crate::scheduler::resolve_expected_pcs(s, &expected_target).await
{
Ok(pcs) => {
if pcs.is_empty() {
warn!(
job_id = %manifest.id,
"target resolved to zero alive PCs; flooring target_count at 1",
);
}
(pcs.len() as u32).max(1)
}
Err(e) => {
let fallback = fallback_subject_count(&plan);
warn!(
error = ?e,
fallback,
"target resolve failed; falling back to per-subject target_count",
);
fallback
}
};
match s.jetstream.get_key_value(BUCKET_SCRIPT_CURRENT).await {
Ok(kv) => {
if let Err(e) = kv
.put(
&manifest.id,
bytes::Bytes::from(manifest.version.clone().into_bytes()),
)
.await
{
warn!(error = %e, cmd_id = %manifest.id, "script_current put failed");
}
}
Err(e) => warn!(error = %e, "script_current KV missing; skipping version pin"),
}
if let Some(rollout) = plan.rollout.as_ref() {
let mut delays = Vec::with_capacity(rollout.waves.len());
for (idx, wave) in rollout.waves.iter().enumerate() {
let d = humantime::parse_duration(&wave.delay).map_err(|e| {
(
StatusCode::BAD_REQUEST,
format!("invalid rollout.waves[{idx}].delay: {e}"),
)
})?;
delays.push(d);
}
for ((idx, wave), delay) in rollout.waves.iter().enumerate().zip(delays) {
let subj = subject::commands_group(&wave.group);
subjects.push(subj.clone());
let cmd = make_cmd();
let payload = serde_json::to_vec(&cmd)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
if delay.is_zero() {
if let Err(e) = s.nats.publish(subj.clone(), payload.into()).await {
return Err((StatusCode::BAD_GATEWAY, format!("publish to {subj}: {e}")));
}
info!(wave = idx, subject = %subj, "wave published (immediate)");
} else {
let nats = s.nats.clone();
let subj_for_spawn = subj.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
match nats.publish(subj_for_spawn.clone(), payload.into()).await {
Ok(()) => info!(
wave = idx,
subject = %subj_for_spawn,
delay_secs = delay.as_secs(),
"wave published (delayed)",
),
Err(e) => warn!(
error = %e,
wave = idx,
subject = %subj_for_spawn,
"delayed wave publish failed",
),
}
});
info!(
wave = idx,
subject = %subj,
delay_secs = delay.as_secs(),
"wave scheduled",
);
}
}
} else {
if plan.target.all {
subjects.push(subject::COMMANDS_ALL.to_string());
}
for g in &plan.target.groups {
subjects.push(subject::commands_group(g));
}
for pc in &plan.target.pcs {
subjects.push(subject::commands_pc(pc));
}
for subj in &subjects {
let cmd = make_cmd();
let payload = serde_json::to_vec(&cmd)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
if let Err(e) = s.nats.publish(subj.clone(), payload.into()).await {
warn!(error = %e, subject = %subj, "publish failed");
return Err((StatusCode::BAD_GATEWAY, format!("publish to {subj}: {e}")));
}
}
}
let _ = s.nats.flush().await;
sqlx::query(
"INSERT INTO executions (exec_id, job_id, version, initiated_by, target_count, status, initiated_at)
VALUES (?, ?, ?, ?, ?, 'pending', ?)",
)
.bind(&exec_id)
.bind(&manifest.id)
.bind(&manifest.version)
.bind(actor)
.bind(target_count as i64)
.bind(chrono::Utc::now())
.execute(&s.pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("insert executions: {e}"),
)
})?;
info!(
exec_id = %exec_id,
job_id = %manifest.id,
version = %manifest.version,
actor,
target_count,
wave_mode = has_rollout,
subjects = ?subjects,
"execution published",
);
audit::record(
&s.nats,
actor,
"exec",
Some(&manifest.id),
caller,
serde_json::json!({
"exec_id": exec_id,
"version": manifest.version,
"target_count": target_count,
"subjects": subjects,
"wave_mode": has_rollout,
}),
)
.await;
Ok(ExecResponse {
exec_id,
job_id: manifest.id,
version: manifest.version,
target_count,
subjects,
})
}
pub async fn create(
State(s): State<AppState>,
Path(job_id): Path<String>,
caller: Caller,
Json(plan): Json<FanoutPlan>,
) -> Result<Json<ExecResponse>, (StatusCode, String)> {
let manifest = match jobs::fetch(&s.jetstream, &job_id).await {
Ok(Some(m)) => m,
Ok(None) => {
return Err((
StatusCode::NOT_FOUND,
format!(
"job '{job_id}' not found in catalog — register it first with \
`kanade job create <manifest.yaml>`"
),
));
}
Err(e) => {
warn!(error = %e, %job_id, "exec: job catalog lookup failed");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("job catalog lookup: {e}"),
));
}
};
exec_manifest(&s, manifest, plan, "operator", Some(&caller), None)
.await
.map(Json)
}
async fn resolve_script_source(
s: &AppState,
manifest: &Manifest,
) -> Result<(String, Option<(String, String)>), (StatusCode, String)> {
if let Some(inline) = manifest.execute.script.as_deref().filter(|s| !s.is_empty()) {
return Ok((inline.to_owned(), None));
}
let key = manifest.execute.script_object.as_deref().ok_or((
StatusCode::BAD_REQUEST,
"execute: one of `script` or `script_object` must be set \
(Manifest::validate() should have caught this earlier)"
.to_string(),
))?;
let store = s
.jetstream
.get_object_store(OBJECT_SCRIPTS)
.await
.map_err(|e| {
warn!(error = %e, "exec: get_object_store scripts");
(
StatusCode::SERVICE_UNAVAILABLE,
format!(
"Object Store '{OBJECT_SCRIPTS}' missing — \
run `kanade jetstream setup`"
),
)
})?;
let info = store.info(key).await.map_err(|e| {
let msg = e.to_string();
if msg.contains("not found") || msg.contains("no objects") {
(
StatusCode::NOT_FOUND,
format!("script_object '{key}' not found in OBJECT_SCRIPTS"),
)
} else {
warn!(error = %e, %key, "exec: object_store.info");
(StatusCode::INTERNAL_SERVER_ERROR, msg)
}
})?;
let raw = info.digest.as_deref().ok_or((
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"script_object '{key}' has no digest metadata — \
broker should always populate it"
),
))?;
let b64 = raw.strip_prefix("SHA-256=").unwrap_or(raw);
use base64::alphabet::URL_SAFE;
use base64::engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig};
static DECODER: GeneralPurpose = GeneralPurpose::new(
&URL_SAFE,
GeneralPurposeConfig::new().with_decode_padding_mode(DecodePaddingMode::Indifferent),
);
let bytes = DECODER.decode(b64).map_err(|e| {
warn!(error = %e, %key, raw, "exec: decode object_store digest");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode digest for '{key}': {e}"),
)
})?;
let digest = hex_lower(&bytes);
Ok((String::new(), Some((key.to_owned(), digest))))
}
fn hex_lower(bytes: &[u8]) -> String {
use std::fmt::Write;
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = write!(out, "{b:02x}");
}
out
}
fn expected_target_for(plan: &FanoutPlan) -> kanade_shared::manifest::Target {
if let Some(rollout) = plan.rollout.as_ref() {
kanade_shared::manifest::Target {
groups: rollout.waves.iter().map(|w| w.group.clone()).collect(),
pcs: Vec::new(),
all: false,
}
} else {
plan.target.clone()
}
}
fn fallback_subject_count(plan: &FanoutPlan) -> u32 {
(if let Some(rollout) = plan.rollout.as_ref() {
rollout.waves.len()
} else {
plan.target.all as usize + plan.target.groups.len() + plan.target.pcs.len()
}) as u32
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::manifest::{FanoutPlan, Rollout, Target, Wave};
fn plan(target: Target, rollout: Option<Rollout>) -> FanoutPlan {
FanoutPlan {
target,
rollout,
..FanoutPlan::default()
}
}
#[test]
fn expected_target_plain_passes_through() {
let t = Target {
all: true,
groups: vec!["g1".into()],
pcs: vec!["pc-1".into()],
};
let p = plan(t.clone(), None);
let resolved = expected_target_for(&p);
assert!(resolved.all);
assert_eq!(resolved.groups, t.groups);
assert_eq!(resolved.pcs, t.pcs);
assert_eq!(fallback_subject_count(&p), 3);
}
#[test]
fn expected_target_rollout_unions_wave_groups() {
let p = plan(
Target::default(),
Some(Rollout {
strategy: Default::default(),
waves: vec![
Wave {
group: "canary".into(),
delay: "0s".into(),
},
Wave {
group: "wave1".into(),
delay: "10m".into(),
},
],
}),
);
let resolved = expected_target_for(&p);
assert!(!resolved.all);
assert_eq!(resolved.groups, vec!["canary", "wave1"]);
assert!(resolved.pcs.is_empty());
assert_eq!(fallback_subject_count(&p), 2);
}
#[test]
fn empty_waves_rollout_counts_nothing_on_both_paths() {
let p = plan(
Target {
all: true,
groups: vec![],
pcs: vec![],
},
Some(Rollout {
strategy: Default::default(),
waves: vec![],
}),
);
let resolved = expected_target_for(&p);
assert!(!resolved.all && resolved.groups.is_empty() && resolved.pcs.is_empty());
assert_eq!(fallback_subject_count(&p), 0);
}
}