use std::time::Duration;
use anyhow::{Context, Result};
use futures::StreamExt;
use kanade_shared::wire::HostPerf;
use sqlx::SqlitePool;
use tracing::{info, warn};
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
const FLUSH_MAX_BUFFERED: usize = 1000;
pub async fn run(client: async_nats::Client, pool: SqlitePool) -> Result<()> {
let mut sub = client
.subscribe("host_perf.>")
.await
.context("subscribe host_perf.>")?;
info!("host_perf projector started (subject: host_perf.>)");
let mut buf: Vec<HostPerf> = Vec::new();
let mut tick = tokio::time::interval(FLUSH_INTERVAL);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
maybe = sub.next() => match maybe {
Some(msg) => {
match serde_json::from_slice::<HostPerf>(&msg.payload) {
Ok(s) => {
buf.push(s);
if buf.len() >= FLUSH_MAX_BUFFERED {
flush(&pool, &mut buf).await;
}
}
Err(e) => {
warn!(error = %e, subject = %msg.subject, "decode HostPerf");
}
}
}
None => {
flush(&pool, &mut buf).await;
return Ok(());
}
},
_ = tick.tick() => {
if !buf.is_empty() {
flush(&pool, &mut buf).await;
}
}
}
}
}
async fn flush(pool: &SqlitePool, buf: &mut Vec<HostPerf>) {
if buf.is_empty() {
return;
}
let n = buf.len();
let res = async {
let mut tx = pool.begin().await.context("begin host_perf flush tx")?;
for s in buf.iter() {
insert_sample(&mut *tx, s).await?;
}
tx.commit().await.context("commit host_perf flush tx")
}
.await;
if let Err(e) = res {
warn!(error = %e, buffered = n, "host_perf flush failed; dropping window");
}
buf.clear();
}
async fn insert_sample<'e, E>(executor: E, s: &HostPerf) -> Result<()>
where
E: sqlx::Executor<'e, Database = sqlx::Sqlite>,
{
sqlx::query(
"INSERT OR IGNORE INTO host_perf_samples (
pc_id, at,
cpu_pct, cpu_count,
mem_used_bytes, mem_total_bytes,
swap_used_bytes, swap_total_bytes,
disk_read_bytes_per_sec, disk_written_bytes_per_sec,
net_rx_bytes_per_sec, net_tx_bytes_per_sec
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&s.pc_id)
.bind(s.at)
.bind(s.cpu_pct)
.bind(s.cpu_count)
.bind(s.mem_used_bytes)
.bind(s.mem_total_bytes)
.bind(s.swap_used_bytes)
.bind(s.swap_total_bytes)
.bind(s.disk_read_bytes_per_sec)
.bind(s.disk_written_bytes_per_sec)
.bind(s.net_rx_bytes_per_sec)
.bind(s.net_tx_bytes_per_sec)
.execute(executor)
.await?;
Ok(())
}