use anyhow::{Context, Result};
use futures::StreamExt;
use kanade_shared::wire::Heartbeat;
use sqlx::SqlitePool;
use tracing::{info, warn};
pub async fn run(client: async_nats::Client, pool: SqlitePool) -> Result<()> {
let mut sub = client
.subscribe("heartbeat.>")
.await
.context("subscribe heartbeat.>")?;
info!("heartbeat projector started (subject: heartbeat.>)");
while let Some(msg) = sub.next().await {
match serde_json::from_slice::<Heartbeat>(&msg.payload) {
Ok(hb) => {
if let Err(e) = upsert_baseline(&pool, &hb).await {
warn!(error = %e, pc_id = %hb.pc_id, "heartbeat upsert failed");
}
}
Err(e) => {
warn!(error = %e, subject = %msg.subject, "decode Heartbeat");
}
}
}
Ok(())
}
async fn upsert_baseline(pool: &SqlitePool, hb: &Heartbeat) -> Result<()> {
sqlx::query(
"INSERT INTO agents (
pc_id, hostname, os_family, agent_version,
last_heartbeat, updated_at
) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(pc_id) DO UPDATE SET
hostname = COALESCE(agents.hostname, excluded.hostname),
os_family = COALESCE(agents.os_family, excluded.os_family),
agent_version = excluded.agent_version,
last_heartbeat = excluded.last_heartbeat,
updated_at = CURRENT_TIMESTAMP",
)
.bind(&hb.pc_id)
.bind(&hb.hostname)
.bind(&hb.os_family)
.bind(&hb.agent_version)
.bind(hb.at)
.execute(pool)
.await?;
Ok(())
}