use std::collections::HashMap;
use anyhow::{Context, Result};
use async_nats::jetstream;
use futures::StreamExt;
use kanade_shared::kv::BUCKET_AGENT_GROUPS;
use kanade_shared::subject;
use kanade_shared::wire::AgentGroups;
use tokio::task::JoinHandle;
use tracing::{info, warn};
use crate::commands;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct SubscriptionDelta {
pub to_subscribe: Vec<String>,
pub to_unsubscribe: Vec<String>,
}
impl SubscriptionDelta {
pub fn is_empty(&self) -> bool {
self.to_subscribe.is_empty() && self.to_unsubscribe.is_empty()
}
}
pub async fn manage(client: async_nats::Client, pc_id: String) -> Result<()> {
let js = jetstream::new(client.clone());
let kv = match js.get_key_value(BUCKET_AGENT_GROUPS).await {
Ok(k) => k,
Err(e) => {
warn!(
error = %e,
bucket = BUCKET_AGENT_GROUPS,
"agent_groups KV bucket missing — group subscriptions idle until bootstrap"
);
return Ok(());
}
};
let mut subs: HashMap<String, JoinHandle<()>> = HashMap::new();
let initial_desired = match kv.get(&pc_id).await {
Ok(Some(bytes)) => parse_groups(&bytes),
Ok(None) => {
info!(pc_id = %pc_id, "no agent_groups entry — starting with empty membership");
Vec::new()
}
Err(e) => {
warn!(error = %e, "initial agent_groups KV read failed");
Vec::new()
}
};
apply_delta(
&diff_groups::<String, String>(&[], &initial_desired),
&mut subs,
&client,
&pc_id,
)
.await;
let mut watch = kv
.watch(&pc_id)
.await
.context("watch agent_groups KV key")?;
while let Some(entry) = watch.next().await {
let bytes = match entry {
Ok(e) => e.value,
Err(e) => {
warn!(error = %e, "agent_groups watch entry");
continue;
}
};
let desired = parse_groups(&bytes);
let current: Vec<String> = subs.keys().cloned().collect();
let delta = diff_groups(¤t, &desired);
if !delta.is_empty() {
info!(
add = ?delta.to_subscribe,
drop = ?delta.to_unsubscribe,
"agent_groups update — reconciling subscriptions",
);
apply_delta(&delta, &mut subs, &client, &pc_id).await;
}
}
Ok(())
}
async fn apply_delta(
delta: &SubscriptionDelta,
subs: &mut HashMap<String, JoinHandle<()>>,
client: &async_nats::Client,
pc_id: &str,
) {
for g in &delta.to_unsubscribe {
if let Some(handle) = subs.remove(g) {
handle.abort();
info!(group = %g, "unsubscribed from group");
}
}
for g in &delta.to_subscribe {
match client.subscribe(subject::commands_group(g)).await {
Ok(sub) => {
let _ = client.flush().await;
let handle = tokio::spawn(commands::command_loop(
client.clone(),
pc_id.to_string(),
sub,
));
subs.insert(g.clone(), handle);
info!(group = %g, "subscribed to group");
}
Err(e) => warn!(error = %e, group = %g, "subscribe to group failed"),
}
}
}
fn parse_groups(bytes: &[u8]) -> Vec<String> {
match serde_json::from_slice::<AgentGroups>(bytes) {
Ok(g) => g.groups,
Err(e) => {
warn!(
error = %e,
bytes = bytes.len(),
"agent_groups value did not parse as AgentGroups JSON; treating as empty"
);
Vec::new()
}
}
}
pub fn diff_groups<S: AsRef<str>, T: AsRef<str>>(
current: &[S],
desired: &[T],
) -> SubscriptionDelta {
use std::collections::BTreeSet;
let current_set: BTreeSet<&str> = current.iter().map(AsRef::as_ref).collect();
let desired_set: BTreeSet<&str> = desired.iter().map(AsRef::as_ref).collect();
SubscriptionDelta {
to_subscribe: desired_set
.difference(¤t_set)
.map(|s| (*s).to_string())
.collect(),
to_unsubscribe: current_set
.difference(&desired_set)
.map(|s| (*s).to_string())
.collect(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn no_change_when_sets_match() {
let d = diff_groups::<&str, &str>(&["wave1", "canary"], &["canary", "wave1"]);
assert_eq!(d, SubscriptionDelta::default());
assert!(d.is_empty());
}
#[test]
fn no_change_on_both_empty() {
let d: SubscriptionDelta = diff_groups::<&str, &str>(&[], &[]);
assert!(d.is_empty());
}
#[test]
fn initial_subscribe_when_current_empty() {
let d = diff_groups::<&str, &str>(&[], &["wave1", "canary"]);
assert_eq!(d.to_subscribe, vec!["canary", "wave1"]);
assert!(d.to_unsubscribe.is_empty());
}
#[test]
fn drop_all_when_desired_empty() {
let d = diff_groups::<&str, &str>(&["wave1", "canary"], &[]);
assert!(d.to_subscribe.is_empty());
assert_eq!(d.to_unsubscribe, vec!["canary", "wave1"]);
}
#[test]
fn add_one_keep_rest() {
let d = diff_groups::<&str, &str>(&["canary"], &["canary", "wave1"]);
assert_eq!(d.to_subscribe, vec!["wave1"]);
assert!(d.to_unsubscribe.is_empty());
}
#[test]
fn drop_one_keep_rest() {
let d = diff_groups::<&str, &str>(&["canary", "wave1"], &["canary"]);
assert!(d.to_subscribe.is_empty());
assert_eq!(d.to_unsubscribe, vec!["wave1"]);
}
#[test]
fn full_swap() {
let d = diff_groups::<&str, &str>(&["wave1", "wave2"], &["dept-eng", "canary"]);
assert_eq!(d.to_subscribe, vec!["canary", "dept-eng"]);
assert_eq!(d.to_unsubscribe, vec!["wave1", "wave2"]);
}
#[test]
fn dedups_inputs() {
let d = diff_groups::<&str, &str>(&["canary", "canary"], &["canary", "canary", "wave1"]);
assert_eq!(d.to_subscribe, vec!["wave1"]);
assert!(d.to_unsubscribe.is_empty());
}
#[test]
fn output_is_sorted_regardless_of_input_order() {
let d = diff_groups::<&str, &str>(&[], &["zeta", "alpha", "mu"]);
assert_eq!(d.to_subscribe, vec!["alpha", "mu", "zeta"]);
}
#[test]
fn accepts_string_and_str_inputs() {
let current: Vec<String> = vec!["wave1".into()];
let desired: Vec<&str> = vec!["wave1", "canary"];
let d = diff_groups(¤t, &desired);
assert_eq!(d.to_subscribe, vec!["canary"]);
}
}