use chrono::Utc;
use futures::StreamExt;
use kanade_shared::subject;
use kanade_shared::wire::Heartbeat;
use tracing::{info, warn};
pub async fn serve(
client: async_nats::Client,
pc_id: String,
agent_version: String,
hostname: Option<String>,
os_family: Option<String>,
tracker: crate::staleness::Tracker,
) {
let subj = subject::ping(&pc_id);
loop {
let mut sub = crate::nats_retry::wait_for_subscribe(&client, &tracker, &subj, "ping").await;
info!(subject = %subj, "ping responder ready");
while let Some(msg) = sub.next().await {
let Some(reply) = msg.reply.clone() else {
warn!(subject = %subj, "ping without reply subject — skipping");
continue;
};
let hb = Heartbeat {
pc_id: pc_id.clone(),
at: Utc::now(),
agent_version: agent_version.clone(),
hostname: hostname.clone(),
os_family: os_family.clone(),
agent_cpu_pct: None,
agent_rss_bytes: None,
agent_disk_read_bytes: None,
agent_disk_written_bytes: None,
};
let payload = match serde_json::to_vec(&hb) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "ping: serialize Heartbeat");
continue;
}
};
if let Err(e) = client.publish(reply, payload.into()).await {
warn!(error = %e, "publish ping reply");
}
}
warn!(subject = %subj, "ping subscription closed; reopening");
crate::nats_retry::reopen_pause().await;
}
}