use std::collections::{HashMap, HashSet};
use std::time::Duration;
use async_nats::jetstream::consumer::pull::Config as PullConfig;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use futures::StreamExt;
use kanade_shared::ipc::notifications::{
AudiencePc, Notification, NotificationAckEntry, NotificationAckStatus, NotificationDetail,
PublishNotificationRequest, PublishNotificationResponse,
};
use kanade_shared::kv::STREAM_NOTIFICATIONS;
use kanade_shared::subject;
use sqlx::SqlitePool;
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::agent_groups;
use crate::audit;
use crate::audit::Caller;
pub async fn publish(
State(s): State<AppState>,
caller: Caller,
Json(req): Json<PublishNotificationRequest>,
) -> Result<Json<PublishNotificationResponse>, (StatusCode, String)> {
if !req.target.is_specified() {
return Err((
StatusCode::BAD_REQUEST,
"target must set at least one of `all`, `groups`, or `pcs`".to_string(),
));
}
if req.title.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"title must not be empty".to_string(),
));
}
if let Some(expires_at) = req.expires_at
&& expires_at <= chrono::Utc::now()
{
return Err((
StatusCode::BAD_REQUEST,
"expires_at must be in the future".to_string(),
));
}
let id = req
.id
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_owned)
.unwrap_or_else(|| Uuid::new_v4().to_string());
let notification = Notification {
id: id.clone(),
priority: req.priority,
require_ack: req.require_ack,
title: req.title,
body: req.body,
issued_at: chrono::Utc::now(),
issued_by: req.issued_by,
expires_at: req.expires_at,
acked_at: None,
};
let mut subjects = Vec::new();
if req.target.all {
subjects.push(subject::NOTIFICATIONS_ALL.to_string());
}
for g in &req.target.groups {
subjects.push(subject::notifications_group(g));
}
for pc in &req.target.pcs {
subjects.push(subject::notifications_pc(pc));
}
let payload = serde_json::to_vec(¬ification)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
let mut delivered = Vec::new();
let mut failures = Vec::new();
for subj in &subjects {
let outcome = match s
.jetstream
.publish(subj.clone(), payload.clone().into())
.await
{
Ok(ack) => ack.await.map_err(|e| e.to_string()),
Err(e) => Err(e.to_string()),
};
match outcome {
Ok(_) => delivered.push(subj.clone()),
Err(e) => {
warn!(error = %e, subject = %subj, "notification publish failed");
failures.push(subj.clone());
}
}
}
if delivered.is_empty() {
return Err((
StatusCode::BAD_GATEWAY,
format!("all notification publishes failed for subjects: {failures:?}"),
));
}
info!(
notification_id = %id,
priority = ?notification.priority,
require_ack = notification.require_ack,
delivered = ?delivered,
failed = ?failures,
"notification published",
);
audit::record(
&s.nats,
"operator",
"notification",
Some(&id),
Some(&caller),
serde_json::json!({
"notification_id": id,
"priority": notification.priority,
"require_ack": notification.require_ack,
"subjects": delivered,
"failed_subjects": failures,
}),
)
.await;
Ok(Json(PublishNotificationResponse {
id,
subjects: delivered,
}))
}
pub async fn ack_status(
State(pool): State<SqlitePool>,
Path(id): Path<String>,
) -> Result<Json<NotificationAckStatus>, (StatusCode, String)> {
let acks = fetch_acks(&pool, &id).await?;
Ok(Json(NotificationAckStatus { id, acks }))
}
async fn fetch_acks(
pool: &SqlitePool,
id: &str,
) -> Result<Vec<NotificationAckEntry>, (StatusCode, String)> {
let rows: Vec<(
String,
String,
chrono::DateTime<chrono::Utc>,
Option<String>,
)> = sqlx::query_as(
"SELECT na.pc_id, na.user_sid, na.acked_at,
COALESCE(na.account, a.last_logon_display_name, a.last_logon_user)
FROM notification_acks na
LEFT JOIN agents a ON a.pc_id = na.pc_id
WHERE na.notification_id = ?
ORDER BY na.acked_at ASC",
)
.bind(id)
.fetch_all(pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("query notification_acks: {e}"),
)
})?;
Ok(rows
.into_iter()
.map(
|(pc_id, user_sid, acked_at, account)| NotificationAckEntry {
pc_id,
user_sid,
acked_at,
account,
},
)
.collect())
}
const SENT_MAX_REPLAY: usize = 5000;
const SENT_REPLAY_BATCH: usize = 500;
const SENT_MAX_ITEMS: usize = 200;
pub async fn list_sent(
State(s): State<AppState>,
) -> Result<Json<Vec<Notification>>, (StatusCode, String)> {
let raw = replay_all_sent(&s).await?;
let notifs = raw.into_iter().map(|(n, _subj)| n).collect();
Ok(Json(dedup_newest_first(notifs, SENT_MAX_ITEMS)))
}
pub async fn detail(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<NotificationDetail>, (StatusCode, String)> {
let raw = replay_all_sent(&s).await?;
let mut notification: Option<Notification> = None;
let mut subjects: Vec<String> = Vec::new();
for (n, subj) in raw {
if n.id != id {
continue;
}
subjects.push(subj);
match ¬ification {
Some(prev) if n.issued_at <= prev.issued_at => {}
_ => notification = Some(n),
}
}
let notification = notification.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
format!("notification {id} not found"),
)
})?;
let acks = fetch_acks(&s.pool, &id).await?;
let audience = resolve_audience(&s, &subjects, &acks).await?;
Ok(Json(NotificationDetail {
notification,
acks,
audience,
}))
}
async fn resolve_audience(
s: &AppState,
subjects: &[String],
acks: &[NotificationAckEntry],
) -> Result<Vec<AudiencePc>, (StatusCode, String)> {
let all = subjects.iter().any(|s| s == subject::NOTIFICATIONS_ALL);
let needs_groups = subjects
.iter()
.any(|s| s.starts_with(subject::NOTIFICATIONS_GROUP_PREFIX));
let membership = if needs_groups {
agent_groups::membership_map(s).await
} else {
HashMap::new()
};
let agent_rows = if all {
load_agents(s, None).await?
} else {
let mut candidates: HashSet<String> = HashSet::new();
for subj in subjects {
if let Some(pc) = subj.strip_prefix(subject::NOTIFICATIONS_PC_PREFIX) {
candidates.insert(pc.to_string());
}
}
for a in acks {
candidates.insert(a.pc_id.clone());
}
for (pc_id, pc_groups) in &membership {
if pc_groups
.iter()
.any(|g| subjects.contains(&subject::notifications_group(g)))
{
candidates.insert(pc_id.clone());
}
}
if candidates.is_empty() {
Vec::new()
} else {
load_agents(s, Some(candidates)).await?
}
};
Ok(assemble_roster(subjects, &agent_rows, &membership, acks))
}
async fn load_agents(
s: &AppState,
only: Option<HashSet<String>>,
) -> Result<Vec<(String, Option<String>, Option<String>)>, (StatusCode, String)> {
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT pc_id, last_logon_user, last_logon_display_name FROM agents",
);
if let Some(candidates) = only {
qb.push(" WHERE pc_id IN (");
let mut sep = qb.separated(", ");
for pc in candidates {
sep.push_bind(pc);
}
sep.push_unseparated(")");
}
qb.build_query_as().fetch_all(&s.pool).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("query agents for audience: {e}"),
)
})
}
fn assemble_roster(
subjects: &[String],
agent_rows: &[(String, Option<String>, Option<String>)],
membership: &HashMap<String, Vec<String>>,
acks: &[NotificationAckEntry],
) -> Vec<AudiencePc> {
let mut all = false;
let mut groups: HashSet<&str> = HashSet::new();
let mut pcs: HashSet<String> = HashSet::new();
for subj in subjects {
if subj == subject::NOTIFICATIONS_ALL {
all = true;
} else if let Some(g) = subj.strip_prefix(subject::NOTIFICATIONS_GROUP_PREFIX) {
groups.insert(g);
} else if let Some(pc) = subj.strip_prefix(subject::NOTIFICATIONS_PC_PREFIX) {
pcs.insert(pc.to_string());
}
}
let logon: HashMap<&str, (Option<String>, Option<String>)> = agent_rows
.iter()
.map(|(pc, u, d)| (pc.as_str(), (u.clone(), d.clone())))
.collect();
let mut expected: HashSet<String> = pcs;
if all {
expected.extend(agent_rows.iter().map(|(pc, _, _)| pc.clone()));
}
if !groups.is_empty() {
for (pc_id, pc_groups) in membership {
if pc_groups.iter().any(|g| groups.contains(g.as_str())) {
expected.insert(pc_id.clone());
}
}
}
let mut acked: HashMap<&str, chrono::DateTime<chrono::Utc>> = HashMap::new();
for a in acks {
acked
.entry(a.pc_id.as_str())
.and_modify(|t| {
if a.acked_at < *t {
*t = a.acked_at;
}
})
.or_insert(a.acked_at);
expected.insert(a.pc_id.clone());
}
let mut roster: Vec<AudiencePc> = expected
.into_iter()
.map(|pc_id| {
let acked_at = acked.get(pc_id.as_str()).copied();
let (last_logon_user, last_logon_display_name) =
logon.get(pc_id.as_str()).cloned().unwrap_or((None, None));
AudiencePc {
last_logon_user,
last_logon_display_name,
confirmed: acked_at.is_some(),
acked_at,
pc_id,
}
})
.collect();
roster.sort_by(|a, b| {
a.confirmed
.cmp(&b.confirmed)
.then_with(|| a.pc_id.cmp(&b.pc_id))
});
roster
}
async fn replay_all_sent(
s: &AppState,
) -> Result<Vec<(Notification, String)>, (StatusCode, String)> {
let stream = s
.jetstream
.get_stream(STREAM_NOTIFICATIONS)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("open {STREAM_NOTIFICATIONS} stream: {e}"),
)
})?;
if stream.cached_info().state.messages == 0 {
return Ok(Vec::new());
}
let consumer = stream
.create_consumer(PullConfig {
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::None,
filter_subjects: vec!["notifications.>".to_string()],
inactive_threshold: Duration::from_secs(30),
..Default::default()
})
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("create ephemeral consumer: {e}"),
)
})?;
let mut buf: std::collections::VecDeque<(Notification, String)> =
std::collections::VecDeque::with_capacity(SENT_MAX_REPLAY + 1);
let mut dropped = 0usize;
loop {
let mut batch = consumer
.fetch()
.max_messages(SENT_REPLAY_BATCH)
.expires(Duration::from_millis(200))
.messages()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("fetch: {e}")))?;
let mut got = 0usize;
let mut exhausted = false;
while let Some(m) = batch.next().await {
let m = m.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("message: {e}")))?;
got += 1;
if m.info().is_ok_and(|i| i.pending == 0) {
exhausted = true;
}
match serde_json::from_slice::<Notification>(&m.payload) {
Ok(n) => {
buf.push_back((n, m.subject.to_string()));
if buf.len() > SENT_MAX_REPLAY {
buf.pop_front();
dropped += 1;
}
}
Err(e) => warn!(
error = %e,
subject = %m.subject,
"list_sent: skipping unparseable notification",
),
}
}
if exhausted || got < SENT_REPLAY_BATCH {
break;
}
}
if dropped > 0 {
warn!(
dropped,
cap = SENT_MAX_REPLAY,
"list_sent: NOTIFICATIONS exceeded replay cap; oldest beyond the cap omitted",
);
}
Ok(Vec::from(buf))
}
fn dedup_newest_first(raw: Vec<Notification>, max_items: usize) -> Vec<Notification> {
let mut idx_of: HashMap<String, usize> = HashMap::new();
let mut deduped: Vec<Notification> = Vec::new();
for n in raw {
match idx_of.get(&n.id) {
Some(&i) if n.issued_at <= deduped[i].issued_at => {}
Some(&i) => deduped[i] = n,
None => {
idx_of.insert(n.id.clone(), deduped.len());
deduped.push(n);
}
}
}
deduped.sort_by(|a, b| b.issued_at.cmp(&a.issued_at).then_with(|| a.id.cmp(&b.id)));
deduped.truncate(max_items);
deduped
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::ipc::notifications::NotificationPriority;
fn notif(id: &str, issued: chrono::DateTime<chrono::Utc>) -> Notification {
Notification {
id: id.into(),
priority: NotificationPriority::Info,
require_ack: false,
title: "t".into(),
body: "b".into(),
issued_at: issued,
issued_by: None,
expires_at: None,
acked_at: None,
}
}
fn at(secs: i64) -> chrono::DateTime<chrono::Utc> {
chrono::TimeZone::with_ymd_and_hms(&chrono::Utc, 2026, 6, 1, 12, 0, 0).unwrap()
+ chrono::Duration::seconds(secs)
}
#[test]
fn dedups_fanout_copies_to_one_row_per_id() {
let raw = vec![notif("n1", at(0)), notif("n1", at(0)), notif("n1", at(0))];
let out = dedup_newest_first(raw, 200);
assert_eq!(out.len(), 1, "fan-out copies collapse to one");
assert_eq!(out[0].id, "n1");
}
#[test]
fn sorts_newest_first() {
let raw = vec![
notif("old", at(0)),
notif("new", at(120)),
notif("mid", at(60)),
];
let out = dedup_newest_first(raw, 200);
let ids: Vec<&str> = out.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids, vec!["new", "mid", "old"]);
}
#[test]
fn dedup_keeps_newest_issued_for_repeated_id() {
let raw = vec![notif("dup", at(0)), notif("dup", at(60))];
let out = dedup_newest_first(raw, 200);
assert_eq!(out.len(), 1);
assert_eq!(out[0].issued_at, at(60), "newest issued_at wins");
}
#[test]
fn caps_at_max_items_after_sort() {
let raw = vec![notif("a", at(0)), notif("b", at(60)), notif("c", at(120))];
let out = dedup_newest_first(raw, 2);
let ids: Vec<&str> = out.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids, vec!["c", "b"], "newest two kept");
}
fn agent(
pc: &str,
user: Option<&str>,
display: Option<&str>,
) -> (String, Option<String>, Option<String>) {
(pc.into(), user.map(Into::into), display.map(Into::into))
}
fn ack(pc: &str, sid: &str, secs: i64) -> NotificationAckEntry {
NotificationAckEntry {
pc_id: pc.into(),
user_sid: sid.into(),
acked_at: at(secs),
account: None,
}
}
#[test]
fn roster_all_targets_every_agent_pending_first() {
let agents = vec![
agent("PC1", Some("D\\a"), Some("Alice")),
agent("PC2", Some("D\\b"), Some("Bob")),
agent("PC3", None, None),
];
let roster = assemble_roster(
&["notifications.all".to_string()],
&agents,
&HashMap::new(),
&[ack("PC2", "S-2", 10)],
);
let view: Vec<(&str, bool)> = roster
.iter()
.map(|r| (r.pc_id.as_str(), r.confirmed))
.collect();
assert_eq!(view, vec![("PC1", false), ("PC3", false), ("PC2", true)]);
let pc2 = roster.iter().find(|r| r.pc_id == "PC2").unwrap();
assert_eq!(pc2.acked_at, Some(at(10)));
assert_eq!(pc2.last_logon_display_name.as_deref(), Some("Bob"));
}
#[test]
fn roster_group_expands_via_membership_and_pc_is_direct() {
let agents = vec![
agent("PC1", None, None),
agent("PC2", None, None),
agent("PC9", None, None),
agent("PCX", None, None), ];
let membership: HashMap<String, Vec<String>> = [
("PC1".to_string(), vec!["fin".to_string()]),
(
"PC2".to_string(),
vec!["fin".to_string(), "ops".to_string()],
),
("PCX".to_string(), vec!["ops".to_string()]),
]
.into_iter()
.collect();
let roster = assemble_roster(
&[
"notifications.group.fin".to_string(),
"notifications.pc.PC9".to_string(),
],
&agents,
&membership,
&[],
);
let mut pcs: Vec<&str> = roster.iter().map(|r| r.pc_id.as_str()).collect();
pcs.sort();
assert_eq!(pcs, vec!["PC1", "PC2", "PC9"], "fin members + direct PC9");
assert!(roster.iter().all(|r| !r.confirmed), "no acks → all pending");
}
#[test]
fn roster_includes_acked_pc_outside_resolved_audience() {
let agents = vec![agent("PC1", None, None), agent("PC7", None, None)];
let membership: HashMap<String, Vec<String>> =
[("PC1".to_string(), vec!["fin".to_string()])]
.into_iter()
.collect();
let roster = assemble_roster(
&["notifications.group.fin".to_string()],
&agents,
&membership,
&[ack("PC7", "S-7", 5)],
);
let pc7 = roster.iter().find(|r| r.pc_id == "PC7");
assert!(
pc7.is_some_and(|r| r.confirmed),
"acked PC always in roster"
);
}
#[test]
fn roster_earliest_ack_wins_per_pc() {
let agents = vec![agent("PC1", None, None)];
let roster = assemble_roster(
&["notifications.pc.PC1".to_string()],
&agents,
&HashMap::new(),
&[ack("PC1", "S-a", 30), ack("PC1", "S-b", 5)],
);
assert_eq!(roster.len(), 1);
assert_eq!(roster[0].acked_at, Some(at(5)), "earliest ack per PC");
}
}