use async_nats::jetstream::kv::Config as KvConfig;
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::http::header::HeaderMap;
use futures::TryStreamExt;
use kanade_shared::kv::{BUCKET_VIEWS, BUCKET_VIEWS_YAML};
use kanade_shared::manifest::View;
use serde::Serialize;
use tracing::{info, warn};
use crate::api::AppState;
use crate::api::yaml_body::{YamlOrJson, mirror_yaml, yaml_headers};
use crate::audit;
use crate::audit::Caller;
#[derive(Serialize)]
pub struct ViewSummary {
pub id: String,
pub description: Option<String>,
pub widget_count: usize,
pub tags: Vec<String>,
}
pub async fn list(State(s): State<AppState>) -> Result<Json<Vec<View>>, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_VIEWS).await {
Ok(k) => k,
Err(_) => return Ok(Json(Vec::new())),
};
let keys: Vec<String> = kv
.keys()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?
.try_collect()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let mut out = Vec::with_capacity(keys.len());
for k in keys {
match kv.get(&k).await {
Ok(Some(bytes)) => match serde_json::from_slice::<View>(&bytes) {
Ok(view) => out.push(view),
Err(e) => warn!(view_id = %k, error = %e, "views: skipping undecodable entry"),
},
Ok(None) => {}
Err(e) => warn!(view_id = %k, error = %e, "views: kv get failed, skipping"),
}
}
out.sort_by(|a, b| a.id.cmp(&b.id));
Ok(Json(out))
}
pub async fn create(
State(s): State<AppState>,
caller: Caller,
body: YamlOrJson<View>,
) -> Result<Json<ViewSummary>, (StatusCode, String)> {
let YamlOrJson {
value: view,
raw_yaml,
} = body;
if let Err(e) = view.validate() {
return Err((StatusCode::BAD_REQUEST, format!("invalid view: {e}")));
}
let kv = s
.jetstream
.create_key_value(KvConfig {
bucket: BUCKET_VIEWS.into(),
history: 5,
..Default::default()
})
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ensure KV: {e}")))?;
let body_bytes = serde_json::to_vec(&view)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
kv.put(&view.id, body_bytes.into())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV put: {e}")))?;
let yaml_source = raw_yaml.unwrap_or_else(|| {
serde_yaml::to_string(&view)
.unwrap_or_else(|_| String::from("# YAML mirror unavailable for this entry"))
});
if let Err(e) = mirror_yaml(&s, BUCKET_VIEWS_YAML, &view.id, &yaml_source).await {
warn!(error = %e, view_id = %view.id, "views: YAML mirror put failed; JSON catalog is current");
}
info!(view_id = %view.id, widgets = view.widgets.len(), "view upserted");
audit::record(
&s.nats,
"operator",
"view_upsert",
Some(&view.id),
Some(&caller),
serde_json::json!({ "widgets": view.widgets.len() }),
)
.await;
Ok(Json(ViewSummary {
id: view.id,
description: view.description,
widget_count: view.widgets.len(),
tags: view.tags,
}))
}
pub async fn get_yaml(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<(StatusCode, HeaderMap, String), (StatusCode, String)> {
if let Ok(kv) = s.jetstream.get_key_value(BUCKET_VIEWS_YAML).await
&& let Ok(Some(bytes)) = kv.get(&id).await
&& let Ok(text) = String::from_utf8(bytes.to_vec())
{
return Ok((StatusCode::OK, yaml_headers(), text));
}
let kv = s
.jetstream
.get_key_value(BUCKET_VIEWS)
.await
.map_err(|_| (StatusCode::NOT_FOUND, format!("view '{id}' not found")))?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("view '{id}' not found")))?;
let view: View = serde_json::from_slice(&bytes)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("decode: {e}")))?;
let yaml = serde_yaml::to_string(&view).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("encode YAML: {e}"),
)
})?;
Ok((StatusCode::OK, yaml_headers(), yaml))
}
pub async fn delete(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_VIEWS).await {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "views KV missing on delete");
return Err((StatusCode::NOT_FOUND, "views bucket missing".into()));
}
};
if kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv get: {e}")))?
.is_none()
{
return Err((StatusCode::NOT_FOUND, format!("view '{id}' not found")));
}
kv.delete(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv delete: {e}")))?;
if let Ok(ykv) = s.jetstream.get_key_value(BUCKET_VIEWS_YAML).await {
let _ = ykv.delete(&id).await;
}
info!(view_id = %id, "view deleted");
audit::record(
&s.nats,
"operator",
"view_delete",
Some(&id),
Some(&caller),
serde_json::json!({}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}