use axum::Json;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::http::{HeaderMap, StatusCode, header};
use axum::response::{IntoResponse, Response};
use futures::StreamExt;
use kanade_shared::kv::{BUCKET_JOBS, OBJECT_COLLECTIONS};
use kanade_shared::manifest::Manifest;
use serde::Serialize;
use std::collections::HashMap;
use tokio_util::io::ReaderStream;
use tracing::warn;
use super::AppState;
use crate::audit;
use crate::audit::Caller;
#[derive(Serialize)]
pub struct BundleRow {
pub key: String,
pub pc_id: String,
pub job_id: String,
pub collected_at: Option<String>,
pub size: u64,
pub digest: Option<String>,
pub name: Option<String>,
pub description: Option<String>,
}
fn parse_bundle_key(key: &str) -> Option<(String, String, String)> {
let mut it = key.split('/');
let pc_id = it.next()?;
let job_id = it.next()?;
let filename = it.next()?;
if pc_id.is_empty() || job_id.is_empty() || it.next().is_some() {
return None;
}
let collected_at = filename.strip_suffix(".zip")?;
if collected_at.is_empty() {
return None;
}
Some((
pc_id.to_string(),
job_id.to_string(),
collected_at.to_string(),
))
}
async fn collect_job_meta(state: &AppState) -> HashMap<String, (String, Option<String>)> {
let mut map = HashMap::new();
let Ok(kv) = state.jetstream.get_key_value(BUCKET_JOBS).await else {
return map;
};
let Ok(mut keys) = kv.keys().await else {
return map;
};
let mut key_list = Vec::new();
while let Some(key) = keys.next().await {
if let Ok(key) = key {
key_list.push(key);
}
}
let fetched = futures::future::join_all(key_list.into_iter().map(|key| {
let kv = kv.clone();
async move {
match kv.get(&key).await {
Ok(Some(entry)) => serde_json::from_slice::<Manifest>(&entry).ok(),
_ => None,
}
}
}))
.await;
for job in fetched.into_iter().flatten() {
if let Some(hint) = job.collect {
map.insert(job.id, (hint.name, hint.description));
}
}
map
}
pub async fn list_bundles(
State(state): State<AppState>,
) -> Result<Json<Vec<BundleRow>>, (StatusCode, String)> {
let store = state
.jetstream
.get_object_store(OBJECT_COLLECTIONS)
.await
.map_err(|e| {
warn!(error = %e, "get_object_store collections");
(
StatusCode::SERVICE_UNAVAILABLE,
format!(
"Object Store '{OBJECT_COLLECTIONS}' missing — run `kanade jetstream setup`"
),
)
})?;
let mut list = store.list().await.map_err(|e| {
warn!(error = %e, "object_store.list collections");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let job_meta = collect_job_meta(&state).await;
let mut rows = Vec::new();
while let Some(item) = list.next().await {
let meta = item.map_err(|e| {
warn!(error = %e, "collect.list: object metadata stream error");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("list bundles: {e}"),
)
})?;
let Some((pc_id, job_id, collected_at)) = parse_bundle_key(&meta.name) else {
warn!(key = %meta.name, "collect.list: object key not <pc_id>/<job_id>/<ts>.zip — skipping");
continue;
};
let modified = meta
.modified
.and_then(|t| chrono::DateTime::from_timestamp(t.unix_timestamp(), t.nanosecond()))
.map(|d| d.to_rfc3339());
let (name, description) = match job_meta.get(&job_id) {
Some((n, d)) => (Some(n.clone()), d.clone()),
None => (None, None),
};
rows.push(BundleRow {
key: meta.name,
pc_id,
job_id,
collected_at: Some(collected_at).filter(|s| !s.is_empty()).or(modified),
size: meta.size as u64,
digest: meta.digest,
name,
description,
});
}
rows.sort_by(|a, b| {
b.collected_at
.cmp(&a.collected_at)
.then_with(|| a.key.cmp(&b.key))
});
Ok(Json(rows))
}
pub async fn download_bundle(
State(state): State<AppState>,
Path(key): Path<String>,
headers: HeaderMap,
) -> Result<Response, (StatusCode, String)> {
if key.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"bundle key must be non-empty".into(),
));
}
let store = state
.jetstream
.get_object_store(OBJECT_COLLECTIONS)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
let obj = match store.get(key.as_str()).await {
Ok(o) => o,
Err(e) => {
let msg = e.to_string();
if msg.contains("not found") || msg.contains("no objects") {
return Err((StatusCode::NOT_FOUND, format!("bundle '{key}' not found")));
}
warn!(error = %e, %key, "object_store.get collections");
return Err((StatusCode::INTERNAL_SERVER_ERROR, msg));
}
};
let total_size = obj.info().size as u64;
let etag = obj.info().digest.clone().map(|d| format!("\"{d}\""));
if let Some(ref expected) = etag
&& let Some(if_match) = headers.get(header::IF_MATCH)
&& let Ok(s) = if_match.to_str()
&& s != "*"
&& s != expected
{
return Err((
StatusCode::PRECONDITION_FAILED,
format!("If-Match {s:?} doesn't match current ETag {expected:?}"),
));
}
let suggested_filename: String = key
.chars()
.map(|c| match c {
'/' => '-',
'\\' | '"' => '_',
other => other,
})
.collect();
let mut resp = (
[
(header::CONTENT_TYPE, "application/zip".to_string()),
(header::CONTENT_LENGTH, total_size.to_string()),
(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{suggested_filename}\""),
),
],
Body::from_stream(ReaderStream::new(obj)),
)
.into_response();
if let Some(etag) = etag
&& let Ok(v) = etag.parse()
{
resp.headers_mut().insert(header::ETAG, v);
}
Ok(resp)
}
pub async fn delete_bundle(
State(state): State<AppState>,
Path(key): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
if key.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"bundle key must be non-empty".into(),
));
}
let store = state
.jetstream
.get_object_store(OBJECT_COLLECTIONS)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
store.delete(key.as_str()).await.map_err(|e| {
warn!(error = %e, %key, "object_store.delete collections");
let msg = e.to_string();
if msg.contains("not found") || msg.contains("no objects") {
(
StatusCode::NOT_FOUND,
format!("bundle '{key}' not in Object Store"),
)
} else {
(StatusCode::INTERNAL_SERVER_ERROR, msg)
}
})?;
audit::record(
&state.nats,
"operator",
"collect_bundle_delete",
Some(&key),
Some(&caller),
serde_json::json!({ "key": key }),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_bundle_key_splits_three_segments() {
assert_eq!(
parse_bundle_key("PC001/collect-diagnostics/20260615T220000Z.zip"),
Some((
"PC001".to_string(),
"collect-diagnostics".to_string(),
"20260615T220000Z".to_string(),
)),
);
}
#[test]
fn parse_bundle_key_rejects_short_or_empty() {
assert_eq!(parse_bundle_key("PC001/job"), None);
assert_eq!(parse_bundle_key("PC001"), None);
assert_eq!(parse_bundle_key("//x.zip"), None);
assert_eq!(parse_bundle_key(""), None);
}
#[test]
fn parse_bundle_key_enforces_zip_and_three_segments() {
assert_eq!(parse_bundle_key("pc/job/raw-name"), None);
assert_eq!(parse_bundle_key("pc/job/.zip"), None);
assert_eq!(parse_bundle_key("pc/job/extra/ts.zip"), None);
}
}