use axum::Json;
use axum::extract::State;
use axum::http::StatusCode;
use futures::StreamExt;
use kanade_shared::kv::{
BUCKET_AGENT_CONFIG, KEY_AGENT_CONFIG_GLOBAL, OBJECT_AGENT_RELEASES, agent_config_group_key,
agent_config_pc_key,
};
use kanade_shared::wire::ConfigScope;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use super::AppState;
#[derive(Serialize)]
pub struct ReleaseRow {
pub version: String,
pub size: u64,
pub digest: Option<String>,
pub modified: Option<String>,
}
pub async fn list_releases(
State(state): State<AppState>,
) -> Result<Json<Vec<ReleaseRow>>, (StatusCode, String)> {
let store = state
.jetstream
.get_object_store(OBJECT_AGENT_RELEASES)
.await
.map_err(|e| {
warn!(error = %e, "get_object_store agent_releases");
(
StatusCode::SERVICE_UNAVAILABLE,
format!(
"Object Store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`"
),
)
})?;
let mut list = store.list().await.map_err(|e| {
warn!(error = %e, "object_store.list");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let mut rows = Vec::new();
while let Some(meta) = list.next().await {
let Ok(meta) = meta else { continue };
let modified = meta.modified.and_then(|t| {
let nanos = t.unix_timestamp_nanos();
let secs = (nanos.div_euclid(1_000_000_000)) as i64;
let nsec = (nanos.rem_euclid(1_000_000_000)) as u32;
chrono::DateTime::<chrono::Utc>::from_timestamp(secs, nsec).map(|d| d.to_rfc3339())
});
rows.push(ReleaseRow {
version: meta.name,
size: meta.size as u64,
digest: meta.digest,
modified,
});
}
rows.sort_by(|a, b| b.modified.cmp(&a.modified));
Ok(Json(rows))
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "snake_case", tag = "type", content = "value")]
pub enum RolloutScope {
Global,
Group(String),
Pc(String),
}
#[derive(Deserialize, Debug)]
pub struct RolloutBody {
pub version: String,
pub scope: RolloutScope,
#[serde(default)]
pub jitter: Option<String>,
}
#[derive(Serialize)]
pub struct RolloutResponse {
pub version: String,
pub scope_key: String,
pub scope_label: String,
pub jitter: Option<String>,
}
pub async fn rollout(
State(state): State<AppState>,
Json(body): Json<RolloutBody>,
) -> Result<Json<RolloutResponse>, (StatusCode, String)> {
let (key, label) = match &body.scope {
RolloutScope::Global => (KEY_AGENT_CONFIG_GLOBAL.to_string(), "global".to_string()),
RolloutScope::Group(g) => (agent_config_group_key(g), format!("group:{g}")),
RolloutScope::Pc(p) => (agent_config_pc_key(p), format!("pc:{p}")),
};
let store = state
.jetstream
.get_object_store(OBJECT_AGENT_RELEASES)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
if store.info(&body.version).await.is_err() {
return Err((
StatusCode::NOT_FOUND,
format!(
"version '{}' not found in {OBJECT_AGENT_RELEASES} — run `kanade agent publish` first",
body.version
),
));
}
let kv = state
.jetstream
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
let mut scope = match kv.get(&key).await.map_err(|e| {
warn!(error = %e, %key, "kv.get scope");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})? {
Some(b) => serde_json::from_slice::<ConfigScope>(&b).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode existing {BUCKET_AGENT_CONFIG}.{key}: {e}"),
)
})?,
None => ConfigScope::default(),
};
scope.target_version = Some(body.version.clone());
if let Some(j) = body.jitter.as_deref() {
scope.target_version_jitter = Some(j.to_owned());
}
let payload = serde_json::to_vec(&scope).map_err(|e| {
warn!(error = %e, "encode ConfigScope");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
kv.put(key.as_str(), payload.into()).await.map_err(|e| {
warn!(error = %e, %key, "kv.put scope");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
info!(
scope = %label,
version = %body.version,
jitter = ?body.jitter,
"rollout: target_version flipped via HTTP",
);
Ok(Json(RolloutResponse {
version: body.version,
scope_key: key,
scope_label: label,
jitter: body.jitter,
}))
}