use chrono::Utc;
use futures::StreamExt;
use kanade_shared::subject;
use kanade_shared::wire::Heartbeat;
use tracing::{info, warn};
pub async fn serve(
client: async_nats::Client,
pc_id: String,
agent_version: String,
hostname: Option<String>,
os_family: Option<String>,
) {
let subj = subject::ping(&pc_id);
let mut sub = match client.subscribe(subj.clone()).await {
Ok(s) => s,
Err(e) => {
warn!(error = %e, subject = %subj, "subscribe ping failed");
return;
}
};
info!(subject = %subj, "ping responder ready");
while let Some(msg) = sub.next().await {
let Some(reply) = msg.reply.clone() else {
warn!(subject = %subj, "ping without reply subject — skipping");
continue;
};
let hb = Heartbeat {
pc_id: pc_id.clone(),
at: Utc::now(),
agent_version: agent_version.clone(),
hostname: hostname.clone(),
os_family: os_family.clone(),
agent_cpu_pct: None,
agent_rss_bytes: None,
agent_disk_read_bytes: None,
agent_disk_written_bytes: None,
};
let payload = match serde_json::to_vec(&hb) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "ping: serialize Heartbeat");
continue;
}
};
if let Err(e) = client.publish(reply, payload.into()).await {
warn!(error = %e, "publish ping reply");
}
}
}