use std::collections::{HashMap, HashSet};
use std::time::Duration;
use async_nats::jetstream::consumer::pull::Config as PullConfig;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use futures::StreamExt;
use kanade_shared::ipc::notifications::{
AudiencePc, EditNotificationRequest, Notification, NotificationAckEntry, NotificationAckStatus,
NotificationAmend, NotificationAmendOp, NotificationDetail, NotificationTarget,
PublishNotificationRequest, PublishNotificationResponse,
};
use kanade_shared::kv::STREAM_NOTIFICATIONS;
use kanade_shared::subject;
use sqlx::SqlitePool;
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::agent_groups;
use crate::audit;
use crate::audit::Caller;
pub async fn publish(
State(s): State<AppState>,
caller: Caller,
Json(req): Json<PublishNotificationRequest>,
) -> Result<Json<PublishNotificationResponse>, (StatusCode, String)> {
if !req.target.is_specified() {
return Err((
StatusCode::BAD_REQUEST,
"target must set at least one of `all`, `groups`, or `pcs`".to_string(),
));
}
if req.title.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"title must not be empty".to_string(),
));
}
if let Some(expires_at) = req.expires_at
&& expires_at <= chrono::Utc::now()
{
return Err((
StatusCode::BAD_REQUEST,
"expires_at must be in the future".to_string(),
));
}
let id = req
.id
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_owned)
.unwrap_or_else(|| Uuid::new_v4().to_string());
let notification = Notification {
id: id.clone(),
priority: req.priority,
require_ack: req.require_ack,
title: req.title,
body: req.body,
toast: req.toast,
issued_at: chrono::Utc::now(),
issued_by: req.issued_by,
expires_at: req.expires_at,
acked_at: None,
edited_at: None,
acks_reset_at: None,
};
let (delivered, failures) = fan_out_notification(&s.jetstream, ¬ification, &req.target)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
if delivered.is_empty() {
return Err((
StatusCode::BAD_GATEWAY,
format!("all notification publishes failed for subjects: {failures:?}"),
));
}
info!(
notification_id = %id,
priority = ?notification.priority,
require_ack = notification.require_ack,
delivered = ?delivered,
failed = ?failures,
"notification published",
);
audit::record(
&s.nats,
"operator",
"notification",
Some(&id),
Some(&caller),
serde_json::json!({
"notification_id": id,
"priority": notification.priority,
"require_ack": notification.require_ack,
"subjects": delivered,
"failed_subjects": failures,
}),
)
.await;
Ok(Json(PublishNotificationResponse {
id,
subjects: delivered,
}))
}
pub(crate) async fn fan_out_notification(
js: &async_nats::jetstream::Context,
notification: &Notification,
target: &kanade_shared::manifest::Target,
) -> Result<(Vec<String>, Vec<String>), String> {
let mut subjects = Vec::new();
if target.all {
subjects.push(subject::NOTIFICATIONS_ALL.to_string());
}
for g in &target.groups {
subjects.push(subject::notifications_group(g));
}
for pc in &target.pcs {
subjects.push(subject::notifications_pc(pc));
}
let payload: bytes::Bytes = serde_json::to_vec(notification)
.map_err(|e| format!("serialize: {e}"))?
.into();
let mut delivered = Vec::new();
let mut failures = Vec::new();
for subj in &subjects {
let outcome = match js.publish(subj.clone(), payload.clone()).await {
Ok(ack) => ack.await.map_err(|e| e.to_string()),
Err(e) => Err(e.to_string()),
};
match outcome {
Ok(_) => delivered.push(subj.clone()),
Err(e) => {
warn!(error = %e, subject = %subj, "notification publish failed");
failures.push(subj.clone());
}
}
}
Ok((delivered, failures))
}
pub async fn ack_status(
State(pool): State<SqlitePool>,
Path(id): Path<String>,
) -> Result<Json<NotificationAckStatus>, (StatusCode, String)> {
let acks = fetch_acks(&pool, &id).await?;
Ok(Json(NotificationAckStatus { id, acks }))
}
async fn fetch_acks(
pool: &SqlitePool,
id: &str,
) -> Result<Vec<NotificationAckEntry>, (StatusCode, String)> {
let rows: Vec<(
String,
String,
chrono::DateTime<chrono::Utc>,
Option<String>,
)> = sqlx::query_as(
"SELECT na.pc_id, na.user_sid, na.acked_at,
COALESCE(na.account, a.last_logon_display_name, a.last_logon_user)
FROM notification_acks na
LEFT JOIN agents a ON a.pc_id = na.pc_id
WHERE na.notification_id = ?
ORDER BY na.acked_at ASC",
)
.bind(id)
.fetch_all(pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("query notification_acks: {e}"),
)
})?;
Ok(rows
.into_iter()
.map(
|(pc_id, user_sid, acked_at, account)| NotificationAckEntry {
pc_id,
user_sid,
acked_at,
account,
},
)
.collect())
}
const SENT_MAX_REPLAY: usize = 5000;
const SENT_REPLAY_BATCH: usize = 500;
const SENT_MAX_ITEMS: usize = 200;
pub async fn list_sent(
State(s): State<AppState>,
) -> Result<Json<Vec<Notification>>, (StatusCode, String)> {
let raw = replay_all_sent(&s).await?;
let notifs = raw.into_iter().map(|(n, _subj, _seq)| n).collect();
Ok(Json(dedup_newest_first(notifs, SENT_MAX_ITEMS)))
}
pub async fn detail(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<NotificationDetail>, (StatusCode, String)> {
let raw = replay_all_sent(&s).await?;
let mut notification: Option<Notification> = None;
let mut subjects: Vec<String> = Vec::new();
for (n, subj, _seq) in raw {
if n.id != id {
continue;
}
subjects.push(subj);
match ¬ification {
Some(prev) if n.issued_at <= prev.issued_at => {}
_ => notification = Some(n),
}
}
let notification = notification.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
format!("notification {id} not found"),
)
})?;
let acks = fetch_acks(&s.pool, &id).await?;
let target = Some(parse_notification_target(&subjects));
let audience = resolve_audience(&s, &subjects, &acks).await?;
Ok(Json(NotificationDetail {
notification,
acks,
audience,
target,
}))
}
pub async fn recall(
State(s): State<AppState>,
caller: Caller,
Path(id): Path<String>,
) -> Result<StatusCode, (StatusCode, String)> {
let raw = replay_all_sent(&s).await?;
let seqs: Vec<u64> = raw
.iter()
.filter(|(n, _, _)| n.id == id)
.map(|(_, _, seq)| *seq)
.collect();
if seqs.is_empty() {
return Err((
StatusCode::NOT_FOUND,
format!("notification {id} not found"),
));
}
let (deleted, failed) = delete_stream_msgs(&s, &seqs).await?;
if deleted == 0 || failed > 0 {
return Err((
StatusCode::BAD_GATEWAY,
format!(
"failed to fully recall notification {id}: deleted {deleted}, failed {failed} of {} copies",
seqs.len()
),
));
}
fan_out_amend(
&s,
&NotificationAmend {
id: id.clone(),
op: NotificationAmendOp::Recall,
},
)
.await;
info!(
notification_id = %id,
deleted,
"notification recalled",
);
audit::record(
&s.nats,
"operator",
"notification.recall",
Some(&id),
Some(&caller),
serde_json::json!({
"notification_id": id,
"deleted_copies": deleted,
}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn edit(
State(s): State<AppState>,
caller: Caller,
Path(id): Path<String>,
Json(req): Json<EditNotificationRequest>,
) -> Result<Json<PublishNotificationResponse>, (StatusCode, String)> {
if req.title.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"title must not be empty".to_string(),
));
}
let raw = replay_all_sent(&s).await?;
let mut existing: Option<Notification> = None;
let mut subjects: Vec<String> = Vec::new();
let mut seqs: Vec<u64> = Vec::new();
for (n, subj, seq) in raw {
if n.id != id {
continue;
}
subjects.push(subj);
seqs.push(seq);
match &existing {
Some(prev) if n.issued_at <= prev.issued_at => {}
_ => existing = Some(n),
}
}
let existing = existing.ok_or_else(|| {
(
StatusCode::NOT_FOUND,
format!("notification {id} not found"),
)
})?;
let now = chrono::Utc::now();
let acks_reset_at = if req.reset_acks {
Some(now)
} else {
existing.acks_reset_at
};
let merged = Notification {
id: existing.id.clone(),
priority: req.priority,
require_ack: req.require_ack,
title: req.title,
body: req.body,
toast: req.toast,
issued_at: existing.issued_at,
issued_by: existing.issued_by.clone(),
expires_at: req.expires_at,
acked_at: None,
edited_at: Some(now),
acks_reset_at,
};
let parsed = parse_notification_target(&subjects);
let target = kanade_shared::manifest::Target {
all: parsed.all,
groups: parsed.groups,
pcs: parsed.pcs,
};
let (deleted, failed) = delete_stream_msgs(&s, &seqs).await?;
if deleted == 0 || failed > 0 {
return Err((
StatusCode::BAD_GATEWAY,
format!(
"failed to replace notification {id}: deleted {deleted}, failed {failed} of {} copies; not re-publishing to avoid a duplicate",
seqs.len()
),
));
}
let (delivered, pub_failures) = fan_out_notification(&s.jetstream, &merged, &target)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
if delivered.is_empty() || !pub_failures.is_empty() {
return Err((
StatusCode::BAD_GATEWAY,
format!(
"deleted the old copies of {id} but re-publish failed for {pub_failures:?} (delivered {delivered:?}); the notification is now missing from the failed subjects — recall and re-send it"
),
));
}
let acks_cleared = if req.reset_acks {
match sqlx::query("DELETE FROM notification_acks WHERE notification_id = ?")
.bind(&id)
.execute(&s.pool)
.await
{
Ok(_) => true,
Err(e) => {
warn!(error = %e, notification_id = %id, "edit: clearing notification_acks failed");
false
}
}
} else {
true
};
info!(
notification_id = %id,
delivered = ?delivered,
reset_acks = req.reset_acks,
acks_cleared,
"notification edited",
);
audit::record(
&s.nats,
"operator",
"notification.edit",
Some(&id),
Some(&caller),
serde_json::json!({
"notification_id": id,
"subjects": delivered,
"reset_acks": req.reset_acks,
"acks_cleared": acks_cleared,
}),
)
.await;
Ok(Json(PublishNotificationResponse {
id,
subjects: delivered,
}))
}
async fn delete_stream_msgs(
s: &AppState,
seqs: &[u64],
) -> Result<(u64, u64), (StatusCode, String)> {
let stream = s
.jetstream
.get_stream(STREAM_NOTIFICATIONS)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("open {STREAM_NOTIFICATIONS} stream: {e}"),
)
})?;
let mut deleted = 0u64;
let mut failed = 0u64;
for &seq in seqs {
if seq == 0 {
continue;
}
match stream.delete_message(seq).await {
Ok(_) => deleted += 1,
Err(e) => {
warn!(error = %e, seq, "recall: delete_message failed");
failed += 1;
}
}
}
Ok((deleted, failed))
}
async fn fan_out_amend(s: &AppState, amend: &NotificationAmend) {
let payload = match serde_json::to_vec(amend) {
Ok(b) => bytes::Bytes::from(b),
Err(e) => {
warn!(error = %e, "fan_out_amend: serialize failed");
return;
}
};
if let Err(e) = s
.nats
.publish(subject::NOTIFICATIONS_AMEND_SUBJECT, payload)
.await
{
warn!(error = %e, "fan_out_amend: publish failed");
}
}
async fn resolve_audience(
s: &AppState,
subjects: &[String],
acks: &[NotificationAckEntry],
) -> Result<Vec<AudiencePc>, (StatusCode, String)> {
let all = subjects.iter().any(|s| s == subject::NOTIFICATIONS_ALL);
let needs_groups = subjects
.iter()
.any(|s| s.starts_with(subject::NOTIFICATIONS_GROUP_PREFIX));
let membership = if needs_groups {
agent_groups::membership_map(s).await
} else {
HashMap::new()
};
let agent_rows = if all {
load_agents(s, None).await?
} else {
let mut candidates: HashSet<String> = HashSet::new();
for subj in subjects {
if let Some(pc) = subj.strip_prefix(subject::NOTIFICATIONS_PC_PREFIX) {
candidates.insert(pc.to_string());
}
}
for a in acks {
candidates.insert(a.pc_id.clone());
}
for (pc_id, pc_groups) in &membership {
if pc_groups
.iter()
.any(|g| subjects.contains(&subject::notifications_group(g)))
{
candidates.insert(pc_id.clone());
}
}
if candidates.is_empty() {
Vec::new()
} else {
load_agents(s, Some(candidates)).await?
}
};
Ok(assemble_roster(subjects, &agent_rows, &membership, acks))
}
async fn load_agents(
s: &AppState,
only: Option<HashSet<String>>,
) -> Result<Vec<(String, Option<String>, Option<String>)>, (StatusCode, String)> {
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT pc_id, last_logon_user, last_logon_display_name FROM agents",
);
if let Some(candidates) = only {
qb.push(" WHERE pc_id IN (");
let mut sep = qb.separated(", ");
for pc in candidates {
sep.push_bind(pc);
}
sep.push_unseparated(")");
}
qb.build_query_as().fetch_all(&s.pool).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("query agents for audience: {e}"),
)
})
}
fn parse_notification_target(subjects: &[String]) -> NotificationTarget {
let mut all = false;
let mut groups: Vec<String> = Vec::new();
let mut pcs: Vec<String> = Vec::new();
for subj in subjects {
if subj == subject::NOTIFICATIONS_ALL {
all = true;
} else if let Some(g) = subj.strip_prefix(subject::NOTIFICATIONS_GROUP_PREFIX) {
groups.push(g.to_string());
} else if let Some(pc) = subj.strip_prefix(subject::NOTIFICATIONS_PC_PREFIX) {
pcs.push(pc.to_string());
}
}
groups.sort();
groups.dedup();
pcs.sort();
pcs.dedup();
NotificationTarget { all, groups, pcs }
}
fn assemble_roster(
subjects: &[String],
agent_rows: &[(String, Option<String>, Option<String>)],
membership: &HashMap<String, Vec<String>>,
acks: &[NotificationAckEntry],
) -> Vec<AudiencePc> {
let mut all = false;
let mut groups: HashSet<&str> = HashSet::new();
let mut pcs: HashSet<String> = HashSet::new();
for subj in subjects {
if subj == subject::NOTIFICATIONS_ALL {
all = true;
} else if let Some(g) = subj.strip_prefix(subject::NOTIFICATIONS_GROUP_PREFIX) {
groups.insert(g);
} else if let Some(pc) = subj.strip_prefix(subject::NOTIFICATIONS_PC_PREFIX) {
pcs.insert(pc.to_string());
}
}
let logon: HashMap<&str, (Option<String>, Option<String>)> = agent_rows
.iter()
.map(|(pc, u, d)| (pc.as_str(), (u.clone(), d.clone())))
.collect();
let mut expected: HashSet<String> = pcs;
if all {
expected.extend(agent_rows.iter().map(|(pc, _, _)| pc.clone()));
}
if !groups.is_empty() {
for (pc_id, pc_groups) in membership {
if pc_groups.iter().any(|g| groups.contains(g.as_str())) {
expected.insert(pc_id.clone());
}
}
}
let mut acked: HashMap<&str, chrono::DateTime<chrono::Utc>> = HashMap::new();
for a in acks {
acked
.entry(a.pc_id.as_str())
.and_modify(|t| {
if a.acked_at < *t {
*t = a.acked_at;
}
})
.or_insert(a.acked_at);
expected.insert(a.pc_id.clone());
}
let mut roster: Vec<AudiencePc> = expected
.into_iter()
.map(|pc_id| {
let acked_at = acked.get(pc_id.as_str()).copied();
let (last_logon_user, last_logon_display_name) =
logon.get(pc_id.as_str()).cloned().unwrap_or((None, None));
AudiencePc {
last_logon_user,
last_logon_display_name,
confirmed: acked_at.is_some(),
acked_at,
pc_id,
}
})
.collect();
roster.sort_by(|a, b| {
a.confirmed
.cmp(&b.confirmed)
.then_with(|| a.pc_id.cmp(&b.pc_id))
});
roster
}
async fn replay_all_sent(
s: &AppState,
) -> Result<Vec<(Notification, String, u64)>, (StatusCode, String)> {
let stream = s
.jetstream
.get_stream(STREAM_NOTIFICATIONS)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("open {STREAM_NOTIFICATIONS} stream: {e}"),
)
})?;
if stream.cached_info().state.messages == 0 {
return Ok(Vec::new());
}
let consumer = stream
.create_consumer(PullConfig {
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::None,
filter_subjects: vec!["notifications.>".to_string()],
inactive_threshold: Duration::from_secs(30),
..Default::default()
})
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("create ephemeral consumer: {e}"),
)
})?;
let mut buf: std::collections::VecDeque<(Notification, String, u64)> =
std::collections::VecDeque::with_capacity(SENT_MAX_REPLAY + 1);
let mut dropped = 0usize;
loop {
let mut batch = consumer
.fetch()
.max_messages(SENT_REPLAY_BATCH)
.expires(Duration::from_millis(200))
.messages()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("fetch: {e}")))?;
let mut got = 0usize;
let mut exhausted = false;
while let Some(m) = batch.next().await {
let m = m.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("message: {e}")))?;
got += 1;
let info = m.info();
if info.as_ref().is_ok_and(|i| i.pending == 0) {
exhausted = true;
}
let seq = info.as_ref().map(|i| i.stream_sequence).unwrap_or(0);
match serde_json::from_slice::<Notification>(&m.payload) {
Ok(n) => {
buf.push_back((n, m.subject.to_string(), seq));
if buf.len() > SENT_MAX_REPLAY {
buf.pop_front();
dropped += 1;
}
}
Err(e) => warn!(
error = %e,
subject = %m.subject,
"list_sent: skipping unparseable notification",
),
}
}
if exhausted || got < SENT_REPLAY_BATCH {
break;
}
}
if dropped > 0 {
warn!(
dropped,
cap = SENT_MAX_REPLAY,
"list_sent: NOTIFICATIONS exceeded replay cap; oldest beyond the cap omitted",
);
}
Ok(Vec::from(buf))
}
fn dedup_newest_first(raw: Vec<Notification>, max_items: usize) -> Vec<Notification> {
let mut idx_of: HashMap<String, usize> = HashMap::new();
let mut deduped: Vec<Notification> = Vec::new();
for n in raw {
match idx_of.get(&n.id) {
Some(&i) if n.issued_at <= deduped[i].issued_at => {}
Some(&i) => deduped[i] = n,
None => {
idx_of.insert(n.id.clone(), deduped.len());
deduped.push(n);
}
}
}
deduped.sort_by(|a, b| b.issued_at.cmp(&a.issued_at).then_with(|| a.id.cmp(&b.id)));
deduped.truncate(max_items);
deduped
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::ipc::notifications::NotificationPriority;
fn notif(id: &str, issued: chrono::DateTime<chrono::Utc>) -> Notification {
Notification {
id: id.into(),
priority: NotificationPriority::Info,
require_ack: false,
title: "t".into(),
body: "b".into(),
toast: false,
issued_at: issued,
issued_by: None,
expires_at: None,
acked_at: None,
edited_at: None,
acks_reset_at: None,
}
}
fn at(secs: i64) -> chrono::DateTime<chrono::Utc> {
chrono::TimeZone::with_ymd_and_hms(&chrono::Utc, 2026, 6, 1, 12, 0, 0).unwrap()
+ chrono::Duration::seconds(secs)
}
#[test]
fn dedups_fanout_copies_to_one_row_per_id() {
let raw = vec![notif("n1", at(0)), notif("n1", at(0)), notif("n1", at(0))];
let out = dedup_newest_first(raw, 200);
assert_eq!(out.len(), 1, "fan-out copies collapse to one");
assert_eq!(out[0].id, "n1");
}
#[test]
fn sorts_newest_first() {
let raw = vec![
notif("old", at(0)),
notif("new", at(120)),
notif("mid", at(60)),
];
let out = dedup_newest_first(raw, 200);
let ids: Vec<&str> = out.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids, vec!["new", "mid", "old"]);
}
#[test]
fn dedup_keeps_newest_issued_for_repeated_id() {
let raw = vec![notif("dup", at(0)), notif("dup", at(60))];
let out = dedup_newest_first(raw, 200);
assert_eq!(out.len(), 1);
assert_eq!(out[0].issued_at, at(60), "newest issued_at wins");
}
#[test]
fn caps_at_max_items_after_sort() {
let raw = vec![notif("a", at(0)), notif("b", at(60)), notif("c", at(120))];
let out = dedup_newest_first(raw, 2);
let ids: Vec<&str> = out.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids, vec!["c", "b"], "newest two kept");
}
fn agent(
pc: &str,
user: Option<&str>,
display: Option<&str>,
) -> (String, Option<String>, Option<String>) {
(pc.into(), user.map(Into::into), display.map(Into::into))
}
fn ack(pc: &str, sid: &str, secs: i64) -> NotificationAckEntry {
NotificationAckEntry {
pc_id: pc.into(),
user_sid: sid.into(),
acked_at: at(secs),
account: None,
}
}
#[test]
fn parse_target_from_subjects() {
let t = parse_notification_target(&[
"notifications.group.it-admins".to_string(),
"notifications.pc.minipc".to_string(),
]);
assert!(!t.all);
assert_eq!(t.groups, vec!["it-admins"]);
assert_eq!(t.pcs, vec!["minipc"]);
let t = parse_notification_target(&[
"notifications.all".to_string(),
"notifications.group.b".to_string(),
"notifications.group.a".to_string(),
"notifications.group.a".to_string(),
]);
assert!(t.all);
assert_eq!(t.groups, vec!["a", "b"]);
assert!(t.pcs.is_empty());
}
#[test]
fn roster_all_targets_every_agent_pending_first() {
let agents = vec![
agent("PC1", Some("D\\a"), Some("Alice")),
agent("PC2", Some("D\\b"), Some("Bob")),
agent("PC3", None, None),
];
let roster = assemble_roster(
&["notifications.all".to_string()],
&agents,
&HashMap::new(),
&[ack("PC2", "S-2", 10)],
);
let view: Vec<(&str, bool)> = roster
.iter()
.map(|r| (r.pc_id.as_str(), r.confirmed))
.collect();
assert_eq!(view, vec![("PC1", false), ("PC3", false), ("PC2", true)]);
let pc2 = roster.iter().find(|r| r.pc_id == "PC2").unwrap();
assert_eq!(pc2.acked_at, Some(at(10)));
assert_eq!(pc2.last_logon_display_name.as_deref(), Some("Bob"));
}
#[test]
fn roster_group_expands_via_membership_and_pc_is_direct() {
let agents = vec![
agent("PC1", None, None),
agent("PC2", None, None),
agent("PC9", None, None),
agent("PCX", None, None), ];
let membership: HashMap<String, Vec<String>> = [
("PC1".to_string(), vec!["fin".to_string()]),
(
"PC2".to_string(),
vec!["fin".to_string(), "ops".to_string()],
),
("PCX".to_string(), vec!["ops".to_string()]),
]
.into_iter()
.collect();
let roster = assemble_roster(
&[
"notifications.group.fin".to_string(),
"notifications.pc.PC9".to_string(),
],
&agents,
&membership,
&[],
);
let mut pcs: Vec<&str> = roster.iter().map(|r| r.pc_id.as_str()).collect();
pcs.sort();
assert_eq!(pcs, vec!["PC1", "PC2", "PC9"], "fin members + direct PC9");
assert!(roster.iter().all(|r| !r.confirmed), "no acks → all pending");
}
#[test]
fn roster_includes_acked_pc_outside_resolved_audience() {
let agents = vec![agent("PC1", None, None), agent("PC7", None, None)];
let membership: HashMap<String, Vec<String>> =
[("PC1".to_string(), vec!["fin".to_string()])]
.into_iter()
.collect();
let roster = assemble_roster(
&["notifications.group.fin".to_string()],
&agents,
&membership,
&[ack("PC7", "S-7", 5)],
);
let pc7 = roster.iter().find(|r| r.pc_id == "PC7");
assert!(
pc7.is_some_and(|r| r.confirmed),
"acked PC always in roster"
);
}
#[test]
fn roster_earliest_ack_wins_per_pc() {
let agents = vec![agent("PC1", None, None)];
let roster = assemble_roster(
&["notifications.pc.PC1".to_string()],
&agents,
&HashMap::new(),
&[ack("PC1", "S-a", 30), ack("PC1", "S-b", 5)],
);
assert_eq!(roster.len(), 1);
assert_eq!(roster[0].acked_at, Some(at(5)), "earliest ack per PC");
}
}