use futures::stream::StreamExt;
use kanade_shared::ipc::notifications::Notification;
use kanade_shared::subject;
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
pub const BROADCAST_CAPACITY: usize = 256;
fn filter_subjects(pc_id: &str, groups: &[String]) -> Vec<String> {
let mut group_subjects: Vec<String> = groups
.iter()
.map(|g| subject::notifications_group(g))
.collect();
group_subjects.sort();
group_subjects.dedup();
let mut subjects = Vec::with_capacity(2 + group_subjects.len());
subjects.push(subject::NOTIFICATIONS_ALL.to_string());
subjects.push(subject::notifications_pc(pc_id));
subjects.extend(group_subjects);
subjects
}
pub fn spawn(
client: async_nats::Client,
pc_id: String,
groups_rx: tokio::sync::watch::Receiver<Vec<String>>,
notif_tx: broadcast::Sender<Notification>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(run(client, pc_id, groups_rx, notif_tx))
}
async fn run(
client: async_nats::Client,
pc_id: String,
mut groups_rx: tokio::sync::watch::Receiver<Vec<String>>,
notif_tx: broadcast::Sender<Notification>,
) {
let mut groups_watch_alive = true;
loop {
let groups: Vec<String> = groups_rx.borrow_and_update().clone();
let subjects = filter_subjects(&pc_id, &groups);
let mut subs = Vec::with_capacity(subjects.len());
for subj in &subjects {
match client.subscribe(subj.clone()).await {
Ok(s) => subs.push(s),
Err(e) => warn!(error = %e, subject = %subj, "notify_bus: subscribe failed"),
}
}
if subs.is_empty() {
warn!(pc_id = %pc_id, "notify_bus: no subscriptions established; retrying");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
if subs.len() < subjects.len() {
warn!(
pc_id = %pc_id,
subscribed = subs.len(),
expected = subjects.len(),
"notify_bus: partial subscription; some subjects unavailable until next membership change",
);
}
let mut merged = futures::stream::select_all(subs);
info!(
pc_id = %pc_id,
groups = ?groups,
subjects = ?subjects,
"notify_bus: subscriptions ready",
);
loop {
tokio::select! {
maybe_msg = merged.next() => {
match maybe_msg {
Some(msg) => forward(&msg, ¬if_tx, &pc_id),
None => {
debug!(pc_id = %pc_id, "notify_bus: all subscriptions ended; rebuilding");
break;
}
}
}
changed = groups_rx.changed(), if groups_watch_alive => {
match changed {
Ok(()) => {
debug!(pc_id = %pc_id, "notify_bus: membership changed; rebuilding subscriptions");
break;
}
Err(_) => {
groups_watch_alive = false;
}
}
}
}
}
}
}
fn forward(msg: &async_nats::Message, notif_tx: &broadcast::Sender<Notification>, pc_id: &str) {
match serde_json::from_slice::<Notification>(&msg.payload) {
Ok(notification) => {
let id = notification.id.clone();
match notif_tx.send(notification) {
Ok(n) => {
debug!(pc_id = %pc_id, notification_id = %id, receivers = n, "notify_bus: broadcast")
}
Err(_) => debug!(
pc_id = %pc_id,
notification_id = %id,
"notify_bus: no connected clients; dropped (recoverable via notifications.list)",
),
}
}
Err(e) => warn!(
error = %e,
subject = %msg.subject,
"notify_bus: failed to decode Notification",
),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn filter_subjects_includes_all_pc_and_sorted_groups() {
let subjects = filter_subjects("PC1234", &["wave1".into(), "tokyo".into(), "wave1".into()]);
assert_eq!(subjects[0], "notifications.all");
assert_eq!(subjects[1], "notifications.pc.PC1234");
assert_eq!(
&subjects[2..],
&["notifications.group.tokyo", "notifications.group.wave1"]
);
}
#[test]
fn filter_subjects_no_groups_is_all_plus_pc() {
let subjects = filter_subjects("pc-01", &[]);
assert_eq!(
subjects,
vec!["notifications.all", "notifications.pc.pc-01"]
);
}
}