use std::collections::HashMap;
use std::time::Duration;
use anyhow::{Context, Result};
use futures::StreamExt;
use kanade_shared::wire::Heartbeat;
use sqlx::SqlitePool;
use tracing::{info, warn};
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
const FLUSH_MAX_BUFFERED: usize = 1000;
pub async fn run(client: async_nats::Client, pool: SqlitePool) -> Result<()> {
let mut sub = client
.subscribe("heartbeat.>")
.await
.context("subscribe heartbeat.>")?;
info!("heartbeat projector started (subject: heartbeat.>)");
let mut buf: HashMap<String, Heartbeat> = HashMap::new();
let mut tick = tokio::time::interval(FLUSH_INTERVAL);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
maybe = sub.next() => match maybe {
Some(msg) => {
match serde_json::from_slice::<Heartbeat>(&msg.payload) {
Ok(hb) => {
buf.insert(hb.pc_id.clone(), hb);
if buf.len() >= FLUSH_MAX_BUFFERED {
flush(&pool, &mut buf).await;
}
}
Err(e) => {
warn!(error = %e, subject = %msg.subject, "decode Heartbeat");
}
}
}
None => {
flush(&pool, &mut buf).await;
return Ok(());
}
},
_ = tick.tick() => {
if !buf.is_empty() {
flush(&pool, &mut buf).await;
}
}
}
}
}
async fn flush(pool: &SqlitePool, buf: &mut HashMap<String, Heartbeat>) {
if buf.is_empty() {
return;
}
let n = buf.len();
let res = async {
let mut tx = pool.begin().await.context("begin heartbeat flush tx")?;
for hb in buf.values() {
upsert_baseline(&mut *tx, hb).await?;
}
tx.commit().await.context("commit heartbeat flush tx")
}
.await;
if let Err(e) = res {
warn!(error = %e, buffered = n, "heartbeat flush failed; dropping window");
}
buf.clear();
}
async fn upsert_baseline<'e, E>(executor: E, hb: &Heartbeat) -> Result<()>
where
E: sqlx::Executor<'e, Database = sqlx::Sqlite>,
{
sqlx::query(
"INSERT INTO agents (
pc_id, hostname, os_family, agent_version,
last_heartbeat,
agent_cpu_pct, agent_rss_bytes,
agent_disk_read_bytes, agent_disk_written_bytes,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(pc_id) DO UPDATE SET
hostname = COALESCE(agents.hostname, excluded.hostname),
os_family = COALESCE(agents.os_family, excluded.os_family),
agent_version = excluded.agent_version,
last_heartbeat = excluded.last_heartbeat,
agent_cpu_pct = excluded.agent_cpu_pct,
agent_rss_bytes = excluded.agent_rss_bytes,
agent_disk_read_bytes = excluded.agent_disk_read_bytes,
agent_disk_written_bytes = excluded.agent_disk_written_bytes,
updated_at = CURRENT_TIMESTAMP",
)
.bind(&hb.pc_id)
.bind(&hb.hostname)
.bind(&hb.os_family)
.bind(&hb.agent_version)
.bind(hb.at)
.bind(hb.agent_cpu_pct)
.bind(hb.agent_rss_bytes)
.bind(hb.agent_disk_read_bytes)
.bind(hb.agent_disk_written_bytes)
.execute(executor)
.await?;
Ok(())
}