use std::collections::HashMap;
use std::time::Duration;
use async_nats::jetstream::consumer::pull::Config as PullConfig;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use futures::StreamExt;
use kanade_shared::ipc::notifications::{
Notification, NotificationAckEntry, NotificationAckStatus, PublishNotificationRequest,
PublishNotificationResponse,
};
use kanade_shared::kv::STREAM_NOTIFICATIONS;
use kanade_shared::subject;
use sqlx::SqlitePool;
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::audit;
use crate::audit::Caller;
pub async fn publish(
State(s): State<AppState>,
caller: Caller,
Json(req): Json<PublishNotificationRequest>,
) -> Result<Json<PublishNotificationResponse>, (StatusCode, String)> {
if !req.target.is_specified() {
return Err((
StatusCode::BAD_REQUEST,
"target must set at least one of `all`, `groups`, or `pcs`".to_string(),
));
}
if req.title.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"title must not be empty".to_string(),
));
}
if let Some(expires_at) = req.expires_at
&& expires_at <= chrono::Utc::now()
{
return Err((
StatusCode::BAD_REQUEST,
"expires_at must be in the future".to_string(),
));
}
let id = req
.id
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_owned)
.unwrap_or_else(|| Uuid::new_v4().to_string());
let notification = Notification {
id: id.clone(),
priority: req.priority,
require_ack: req.require_ack,
title: req.title,
body: req.body,
issued_at: chrono::Utc::now(),
issued_by: req.issued_by,
expires_at: req.expires_at,
acked_at: None,
};
let mut subjects = Vec::new();
if req.target.all {
subjects.push(subject::NOTIFICATIONS_ALL.to_string());
}
for g in &req.target.groups {
subjects.push(subject::notifications_group(g));
}
for pc in &req.target.pcs {
subjects.push(subject::notifications_pc(pc));
}
let payload = serde_json::to_vec(¬ification)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
let mut delivered = Vec::new();
let mut failures = Vec::new();
for subj in &subjects {
let outcome = match s
.jetstream
.publish(subj.clone(), payload.clone().into())
.await
{
Ok(ack) => ack.await.map_err(|e| e.to_string()),
Err(e) => Err(e.to_string()),
};
match outcome {
Ok(_) => delivered.push(subj.clone()),
Err(e) => {
warn!(error = %e, subject = %subj, "notification publish failed");
failures.push(subj.clone());
}
}
}
if delivered.is_empty() {
return Err((
StatusCode::BAD_GATEWAY,
format!("all notification publishes failed for subjects: {failures:?}"),
));
}
info!(
notification_id = %id,
priority = ?notification.priority,
require_ack = notification.require_ack,
delivered = ?delivered,
failed = ?failures,
"notification published",
);
audit::record(
&s.nats,
"operator",
"notification",
Some(&id),
Some(&caller),
serde_json::json!({
"notification_id": id,
"priority": notification.priority,
"require_ack": notification.require_ack,
"subjects": delivered,
"failed_subjects": failures,
}),
)
.await;
Ok(Json(PublishNotificationResponse {
id,
subjects: delivered,
}))
}
pub async fn ack_status(
State(pool): State<SqlitePool>,
Path(id): Path<String>,
) -> Result<Json<NotificationAckStatus>, (StatusCode, String)> {
let rows: Vec<(String, String, chrono::DateTime<chrono::Utc>)> = sqlx::query_as(
"SELECT pc_id, user_sid, acked_at
FROM notification_acks
WHERE notification_id = ?
ORDER BY acked_at ASC",
)
.bind(&id)
.fetch_all(&pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("query notification_acks: {e}"),
)
})?;
let acks = rows
.into_iter()
.map(|(pc_id, user_sid, acked_at)| NotificationAckEntry {
pc_id,
user_sid,
acked_at,
})
.collect();
Ok(Json(NotificationAckStatus { id, acks }))
}
const SENT_MAX_REPLAY: usize = 5000;
const SENT_REPLAY_BATCH: usize = 500;
const SENT_MAX_ITEMS: usize = 200;
pub async fn list_sent(
State(s): State<AppState>,
) -> Result<Json<Vec<Notification>>, (StatusCode, String)> {
let stream = s
.jetstream
.get_stream(STREAM_NOTIFICATIONS)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("open {STREAM_NOTIFICATIONS} stream: {e}"),
)
})?;
if stream.cached_info().state.messages == 0 {
return Ok(Json(Vec::new()));
}
let consumer = stream
.create_consumer(PullConfig {
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::None,
filter_subjects: vec!["notifications.>".to_string()],
inactive_threshold: Duration::from_secs(30),
..Default::default()
})
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("create ephemeral consumer: {e}"),
)
})?;
let mut buf: std::collections::VecDeque<Notification> =
std::collections::VecDeque::with_capacity(SENT_MAX_REPLAY + 1);
let mut dropped = 0usize;
loop {
let mut batch = consumer
.fetch()
.max_messages(SENT_REPLAY_BATCH)
.expires(Duration::from_millis(200))
.messages()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("fetch: {e}")))?;
let mut got = 0usize;
let mut exhausted = false;
while let Some(m) = batch.next().await {
let m = m.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("message: {e}")))?;
got += 1;
if m.info().is_ok_and(|i| i.pending == 0) {
exhausted = true;
}
match serde_json::from_slice::<Notification>(&m.payload) {
Ok(n) => {
buf.push_back(n);
if buf.len() > SENT_MAX_REPLAY {
buf.pop_front();
dropped += 1;
}
}
Err(e) => warn!(
error = %e,
subject = %m.subject,
"list_sent: skipping unparseable notification",
),
}
}
if exhausted || got < SENT_REPLAY_BATCH {
break;
}
}
if dropped > 0 {
warn!(
dropped,
cap = SENT_MAX_REPLAY,
"list_sent: NOTIFICATIONS exceeded replay cap; oldest beyond the cap omitted",
);
}
Ok(Json(dedup_newest_first(Vec::from(buf), SENT_MAX_ITEMS)))
}
fn dedup_newest_first(raw: Vec<Notification>, max_items: usize) -> Vec<Notification> {
let mut idx_of: HashMap<String, usize> = HashMap::new();
let mut deduped: Vec<Notification> = Vec::new();
for n in raw {
match idx_of.get(&n.id) {
Some(&i) if n.issued_at <= deduped[i].issued_at => {}
Some(&i) => deduped[i] = n,
None => {
idx_of.insert(n.id.clone(), deduped.len());
deduped.push(n);
}
}
}
deduped.sort_by(|a, b| b.issued_at.cmp(&a.issued_at).then_with(|| a.id.cmp(&b.id)));
deduped.truncate(max_items);
deduped
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::ipc::notifications::NotificationPriority;
fn notif(id: &str, issued: chrono::DateTime<chrono::Utc>) -> Notification {
Notification {
id: id.into(),
priority: NotificationPriority::Info,
require_ack: false,
title: "t".into(),
body: "b".into(),
issued_at: issued,
issued_by: None,
expires_at: None,
acked_at: None,
}
}
fn at(secs: i64) -> chrono::DateTime<chrono::Utc> {
chrono::TimeZone::with_ymd_and_hms(&chrono::Utc, 2026, 6, 1, 12, 0, 0).unwrap()
+ chrono::Duration::seconds(secs)
}
#[test]
fn dedups_fanout_copies_to_one_row_per_id() {
let raw = vec![notif("n1", at(0)), notif("n1", at(0)), notif("n1", at(0))];
let out = dedup_newest_first(raw, 200);
assert_eq!(out.len(), 1, "fan-out copies collapse to one");
assert_eq!(out[0].id, "n1");
}
#[test]
fn sorts_newest_first() {
let raw = vec![
notif("old", at(0)),
notif("new", at(120)),
notif("mid", at(60)),
];
let out = dedup_newest_first(raw, 200);
let ids: Vec<&str> = out.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids, vec!["new", "mid", "old"]);
}
#[test]
fn dedup_keeps_newest_issued_for_repeated_id() {
let raw = vec![notif("dup", at(0)), notif("dup", at(60))];
let out = dedup_newest_first(raw, 200);
assert_eq!(out.len(), 1);
assert_eq!(out[0].issued_at, at(60), "newest issued_at wins");
}
#[test]
fn caps_at_max_items_after_sort() {
let raw = vec![notif("a", at(0)), notif("b", at(60)), notif("c", at(120))];
let out = dedup_newest_first(raw, 2);
let ids: Vec<&str> = out.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids, vec!["c", "b"], "newest two kept");
}
}