use std::sync::Arc;
use async_nats::jetstream::consumer::DeliverPolicy;
use async_nats::jetstream::consumer::pull::Config as PullConfig;
use futures::StreamExt;
use kanade_shared::kv::STREAM_EXEC;
use kanade_shared::wire::Command;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use crate::commands::{DedupCache, handle_command};
use crate::nats_retry;
use crate::script_cache::ScriptCache;
fn consumer_name(pc_id: &str) -> String {
let safe: String = pc_id
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
c
} else {
'_'
}
})
.collect();
format!("agent_replay_{safe}")
}
fn filter_subjects(pc_id: &str, groups: &[String]) -> Vec<String> {
let mut group_subjects: Vec<String> = groups
.iter()
.map(|g| kanade_shared::subject::commands_group(g))
.collect();
group_subjects.sort();
group_subjects.dedup();
let mut subjects = Vec::with_capacity(2 + group_subjects.len());
subjects.push(kanade_shared::subject::COMMANDS_ALL.to_string());
subjects.push(kanade_shared::subject::commands_pc(pc_id));
subjects.extend(group_subjects);
subjects
}
pub fn spawn(
client: async_nats::Client,
pc_id: String,
dedup: Arc<Mutex<DedupCache>>,
staleness: crate::staleness::Tracker,
script_cache: ScriptCache,
check_sink: crate::check_cache::CheckSink,
groups_rx: tokio::sync::watch::Receiver<Vec<String>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
run(
client,
pc_id,
dedup,
staleness,
script_cache,
check_sink,
groups_rx,
)
.await;
})
}
async fn run(
client: async_nats::Client,
pc_id: String,
dedup: Arc<Mutex<DedupCache>>,
staleness: crate::staleness::Tracker,
script_cache: ScriptCache,
check_sink: crate::check_cache::CheckSink,
mut groups_rx: tokio::sync::watch::Receiver<Vec<String>>,
) {
let jetstream = async_nats::jetstream::new(client.clone());
let name = consumer_name(&pc_id);
let mut groups_watch_alive = true;
loop {
let groups: Vec<String> = groups_rx.borrow_and_update().clone();
let current_filters = filter_subjects(&pc_id, &groups);
let stream = nats_retry::wait_for_stream(
&jetstream,
&client,
&staleness,
STREAM_EXEC,
"command_replay",
)
.await;
let consumer = nats_retry::wait_for_consumer_updating(
&stream,
&client,
&staleness,
&name,
"command_replay",
PullConfig {
durable_name: Some(name.clone()),
ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit,
deliver_policy: DeliverPolicy::LastPerSubject,
filter_subjects: current_filters.clone(),
..Default::default()
},
)
.await;
info!(
stream = STREAM_EXEC,
consumer = %name,
pc_id = %pc_id,
groups = ?groups,
"command-replay consumer ready",
);
let script_current = jetstream
.get_key_value(kanade_shared::kv::BUCKET_SCRIPT_CURRENT)
.await
.ok();
let script_status = jetstream
.get_key_value(kanade_shared::kv::BUCKET_SCRIPT_STATUS)
.await
.ok();
let mut messages = match consumer.messages().await {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "command-replay messages stream failed; reopening");
nats_retry::reopen_pause().await;
continue;
}
};
let mut membership_changed = false;
loop {
let msg = tokio::select! {
changed = groups_rx.changed(), if groups_watch_alive => {
match changed {
Ok(()) => {
let next =
filter_subjects(&pc_id, &groups_rx.borrow_and_update());
if next == current_filters {
debug!(
consumer = %name,
"group watch tick without effective membership change; keeping consumer",
);
continue;
}
info!(
consumer = %name,
"group membership changed; recreating replay consumer with updated filters",
);
membership_changed = true;
break;
}
Err(_) => {
groups_watch_alive = false;
continue;
}
}
}
maybe_msg = messages.next() => match maybe_msg {
Some(Ok(m)) => m,
Some(Err(e)) => {
warn!(error = %e, "replay consumer error");
continue;
}
None => break,
},
};
let _ = msg.ack().await;
let cmd: Command = match serde_json::from_slice(&msg.payload) {
Ok(c) => c,
Err(e) => {
warn!(error = %e, subject = %msg.subject, "deserialize replay command");
continue;
}
};
if !is_for_me(&msg.subject, &pc_id, &groups) {
warn!(
subject = %msg.subject,
"replay msg not addressed to this agent; dropping (already acked)",
);
continue;
}
if !dedup.lock().await.insert(cmd.request_id.clone()) {
debug!(
request_id = %cmd.request_id,
"replay dedup: already seen via core sub or earlier replay",
);
continue;
}
let client_for_task = client.clone();
let pc_for_task = pc_id.clone();
let cur = script_current.clone();
let sta = script_status.clone();
let stl = staleness.clone();
let sc = script_cache.clone();
let cs = check_sink.clone();
info!(
cmd_id = %cmd.id,
request_id = %cmd.request_id,
subject = %msg.subject,
"replay: handling missed command",
);
tokio::spawn(async move {
if let Err(e) =
handle_command(client_for_task, pc_for_task, cmd, cur, sta, stl, sc, cs).await
{
error!(error = %e, "replay command handler failed");
}
});
}
if membership_changed {
continue;
}
warn!(consumer = %name, "command-replay messages stream ended; reopening");
nats_retry::reopen_pause().await;
}
}
fn is_for_me(subject: &str, my_pc_id: &str, my_groups: &[String]) -> bool {
if subject == kanade_shared::subject::COMMANDS_ALL {
return true;
}
if let Some(pc) = subject.strip_prefix("commands.pc.") {
return pc == my_pc_id;
}
if let Some(group) = subject.strip_prefix("commands.group.") {
return my_groups.iter().any(|g| g == group);
}
false
}
#[cfg(test)]
mod tests {
use super::*;
fn groups(names: &[&str]) -> Vec<String> {
names.iter().map(|s| s.to_string()).collect()
}
#[test]
fn commands_all_matches_anyone() {
assert!(is_for_me("commands.all", "pc-01", &[]));
assert!(is_for_me("commands.all", "anything", &groups(&["g1"])));
}
#[test]
fn commands_pc_matches_only_owner() {
assert!(is_for_me("commands.pc.pc-01", "pc-01", &[]));
assert!(!is_for_me("commands.pc.pc-02", "pc-01", &[]));
}
#[test]
fn commands_group_matches_only_members() {
let mine = groups(&["canary", "wave1"]);
assert!(is_for_me("commands.group.canary", "pc-01", &mine));
assert!(is_for_me("commands.group.wave1", "pc-01", &mine));
assert!(!is_for_me("commands.group.prod", "pc-01", &mine));
assert!(!is_for_me("commands.group.canary", "pc-01", &[]));
assert!(!is_for_me("commands.group.can", "pc-01", &mine));
}
#[test]
fn unknown_subject_dropped() {
assert!(!is_for_me("commands.weird", "pc-01", &[]));
assert!(!is_for_me("results.x", "pc-01", &[]));
}
#[test]
fn filter_subjects_cover_all_pc_and_groups() {
assert_eq!(
filter_subjects("pc-01", &groups(&["canary", "wave1"])),
vec![
"commands.all".to_string(),
"commands.pc.pc-01".to_string(),
"commands.group.canary".to_string(),
"commands.group.wave1".to_string(),
],
);
assert_eq!(
filter_subjects("pc-01", &[]),
vec!["commands.all".to_string(), "commands.pc.pc-01".to_string()],
);
}
#[test]
fn consumer_name_sanitises_pc_id() {
assert_eq!(consumer_name("PC-01"), "agent_replay_PC-01");
assert_eq!(consumer_name("PC.001"), "agent_replay_PC_001");
assert_eq!(
consumer_name("host with space"),
"agent_replay_host_with_space"
);
}
}