use anyhow::{Context, Result};
use futures::StreamExt;
use kanade_shared::wire::HostPerf;
use sqlx::SqlitePool;
use tracing::{info, warn};
pub async fn run(client: async_nats::Client, pool: SqlitePool) -> Result<()> {
let mut sub = client
.subscribe("host_perf.>")
.await
.context("subscribe host_perf.>")?;
info!("host_perf projector started (subject: host_perf.>)");
while let Some(msg) = sub.next().await {
match serde_json::from_slice::<HostPerf>(&msg.payload) {
Ok(s) => {
if let Err(e) = insert_sample(&pool, &s).await {
warn!(error = %e, pc_id = %s.pc_id, "host_perf insert failed");
}
}
Err(e) => {
warn!(error = %e, subject = %msg.subject, "decode HostPerf");
}
}
}
Ok(())
}
async fn insert_sample(pool: &SqlitePool, s: &HostPerf) -> Result<()> {
sqlx::query(
"INSERT OR IGNORE INTO host_perf_samples (
pc_id, at,
cpu_pct, cpu_count,
mem_used_bytes, mem_total_bytes,
swap_used_bytes, swap_total_bytes,
disk_read_bytes_per_sec, disk_written_bytes_per_sec,
net_rx_bytes_per_sec, net_tx_bytes_per_sec
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&s.pc_id)
.bind(s.at)
.bind(s.cpu_pct)
.bind(s.cpu_count)
.bind(s.mem_used_bytes)
.bind(s.mem_total_bytes)
.bind(s.swap_used_bytes)
.bind(s.swap_total_bytes)
.bind(s.disk_read_bytes_per_sec)
.bind(s.disk_written_bytes_per_sec)
.bind(s.net_rx_bytes_per_sec)
.bind(s.net_tx_bytes_per_sec)
.execute(pool)
.await?;
Ok(())
}