use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use kanade_shared::ipc::notifications::{
Notification, NotificationAckEntry, NotificationAckStatus, PublishNotificationRequest,
PublishNotificationResponse,
};
use kanade_shared::subject;
use sqlx::SqlitePool;
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::audit;
use crate::audit::Caller;
pub async fn publish(
State(s): State<AppState>,
caller: Caller,
Json(req): Json<PublishNotificationRequest>,
) -> Result<Json<PublishNotificationResponse>, (StatusCode, String)> {
if !req.target.is_specified() {
return Err((
StatusCode::BAD_REQUEST,
"target must set at least one of `all`, `groups`, or `pcs`".to_string(),
));
}
if req.title.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"title must not be empty".to_string(),
));
}
if let Some(expires_at) = req.expires_at
&& expires_at <= chrono::Utc::now()
{
return Err((
StatusCode::BAD_REQUEST,
"expires_at must be in the future".to_string(),
));
}
let id = req
.id
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_owned)
.unwrap_or_else(|| Uuid::new_v4().to_string());
let notification = Notification {
id: id.clone(),
priority: req.priority,
require_ack: req.require_ack,
title: req.title,
body: req.body,
issued_at: chrono::Utc::now(),
issued_by: req.issued_by,
expires_at: req.expires_at,
acked_at: None,
};
let mut subjects = Vec::new();
if req.target.all {
subjects.push(subject::NOTIFICATIONS_ALL.to_string());
}
for g in &req.target.groups {
subjects.push(subject::notifications_group(g));
}
for pc in &req.target.pcs {
subjects.push(subject::notifications_pc(pc));
}
let payload = serde_json::to_vec(¬ification)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
let mut delivered = Vec::new();
let mut failures = Vec::new();
for subj in &subjects {
let outcome = match s
.jetstream
.publish(subj.clone(), payload.clone().into())
.await
{
Ok(ack) => ack.await.map_err(|e| e.to_string()),
Err(e) => Err(e.to_string()),
};
match outcome {
Ok(_) => delivered.push(subj.clone()),
Err(e) => {
warn!(error = %e, subject = %subj, "notification publish failed");
failures.push(subj.clone());
}
}
}
if delivered.is_empty() {
return Err((
StatusCode::BAD_GATEWAY,
format!("all notification publishes failed for subjects: {failures:?}"),
));
}
info!(
notification_id = %id,
priority = ?notification.priority,
require_ack = notification.require_ack,
delivered = ?delivered,
failed = ?failures,
"notification published",
);
audit::record(
&s.nats,
"operator",
"notification",
Some(&id),
Some(&caller),
serde_json::json!({
"notification_id": id,
"priority": notification.priority,
"require_ack": notification.require_ack,
"subjects": delivered,
"failed_subjects": failures,
}),
)
.await;
Ok(Json(PublishNotificationResponse {
id,
subjects: delivered,
}))
}
pub async fn ack_status(
State(pool): State<SqlitePool>,
Path(id): Path<String>,
) -> Result<Json<NotificationAckStatus>, (StatusCode, String)> {
let rows: Vec<(String, String, chrono::DateTime<chrono::Utc>)> = sqlx::query_as(
"SELECT pc_id, user_sid, acked_at
FROM notification_acks
WHERE notification_id = ?
ORDER BY acked_at ASC",
)
.bind(&id)
.fetch_all(&pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("query notification_acks: {e}"),
)
})?;
let acks = rows
.into_iter()
.map(|(pc_id, user_sid, acked_at)| NotificationAckEntry {
pc_id,
user_sid,
acked_at,
})
.collect();
Ok(Json(NotificationAckStatus { id, acks }))
}