use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{Row, SqlitePool};
use tracing::warn;
const DEFAULT_WINDOW_SECS: i64 = 60 * 60;
const DEFAULT_STEP_SECS: i64 = 5 * 60;
const MAX_BUCKETS: i64 = 10_000;
#[derive(Deserialize)]
pub struct PerfQuery {
from: Option<DateTime<Utc>>,
to: Option<DateTime<Utc>>,
step: Option<String>,
}
#[derive(Serialize)]
pub struct PerfResponse {
pub pc_id: String,
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
pub step_seconds: i64,
pub points: Vec<PerfPoint>,
}
#[derive(Serialize)]
pub struct PerfPoint {
pub at: DateTime<Utc>,
pub cpu_pct: Option<f64>,
pub mem_used_bytes: Option<f64>,
pub mem_total_bytes: Option<f64>,
pub swap_used_bytes: Option<f64>,
pub swap_total_bytes: Option<f64>,
pub disk_read_bytes_per_sec: Option<f64>,
pub disk_written_bytes_per_sec: Option<f64>,
pub net_rx_bytes_per_sec: Option<f64>,
pub net_tx_bytes_per_sec: Option<f64>,
}
pub async fn perf(
State(pool): State<SqlitePool>,
Path(pc_id): Path<String>,
Query(q): Query<PerfQuery>,
) -> Result<Json<PerfResponse>, StatusCode> {
let to = q.to.unwrap_or_else(Utc::now);
let from = q
.from
.unwrap_or_else(|| to - Duration::seconds(DEFAULT_WINDOW_SECS));
let step_secs = match q.step.as_deref() {
None => DEFAULT_STEP_SECS,
Some(raw) => match humantime::parse_duration(raw) {
Ok(d) => i64::try_from(d.as_secs()).unwrap_or(DEFAULT_STEP_SECS),
Err(_) => return Err(StatusCode::BAD_REQUEST),
},
};
if step_secs <= 0 {
return Err(StatusCode::BAD_REQUEST);
}
if from >= to {
return Err(StatusCode::BAD_REQUEST);
}
let window_secs = (to - from).num_seconds();
if window_secs / step_secs > MAX_BUCKETS {
return Err(StatusCode::BAD_REQUEST);
}
let rows = sqlx::query(
"SELECT
(CAST(strftime('%s', at) AS INTEGER) / ?) * ? AS bucket_unix,
AVG(cpu_pct) AS cpu_pct,
AVG(mem_used_bytes) AS mem_used_bytes,
AVG(mem_total_bytes) AS mem_total_bytes,
AVG(swap_used_bytes) AS swap_used_bytes,
AVG(swap_total_bytes) AS swap_total_bytes,
AVG(disk_read_bytes_per_sec) AS disk_read_bytes_per_sec,
AVG(disk_written_bytes_per_sec) AS disk_written_bytes_per_sec,
AVG(net_rx_bytes_per_sec) AS net_rx_bytes_per_sec,
AVG(net_tx_bytes_per_sec) AS net_tx_bytes_per_sec
FROM host_perf_samples
WHERE pc_id = ?
AND at >= ?
AND at < ?
GROUP BY bucket_unix
ORDER BY bucket_unix ASC",
)
.bind(step_secs)
.bind(step_secs)
.bind(&pc_id)
.bind(from)
.bind(to)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, pc_id, "host_perf query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let points = rows
.into_iter()
.map(|r| {
let bucket: i64 = r.try_get("bucket_unix").unwrap_or(0);
PerfPoint {
at: DateTime::<Utc>::from_timestamp(bucket, 0).unwrap_or(from),
cpu_pct: r.try_get("cpu_pct").ok(),
mem_used_bytes: r.try_get("mem_used_bytes").ok(),
mem_total_bytes: r.try_get("mem_total_bytes").ok(),
swap_used_bytes: r.try_get("swap_used_bytes").ok(),
swap_total_bytes: r.try_get("swap_total_bytes").ok(),
disk_read_bytes_per_sec: r.try_get("disk_read_bytes_per_sec").ok(),
disk_written_bytes_per_sec: r.try_get("disk_written_bytes_per_sec").ok(),
net_rx_bytes_per_sec: r.try_get("net_rx_bytes_per_sec").ok(),
net_tx_bytes_per_sec: r.try_get("net_tx_bytes_per_sec").ok(),
}
})
.collect();
Ok(Json(PerfResponse {
pc_id,
from,
to,
step_seconds: step_secs,
points,
}))
}