use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqliteRow;
use sqlx::{Row, SqlitePool};
use tracing::warn;
const DEFAULT_LIMIT: i64 = 200;
const MAX_LIMIT: i64 = 5_000;
#[derive(Deserialize)]
pub struct ListQuery {
pub pc_id: Option<String>,
pub from: Option<DateTime<Utc>>,
pub to: Option<DateTime<Utc>>,
pub kind: Option<String>,
pub source: Option<String>,
pub logon_type: Option<i64>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct EventRow {
pub id: i64,
pub pc_id: String,
pub at: DateTime<Utc>,
pub kind: String,
pub source: String,
pub event_record_id: Option<String>,
pub payload: serde_json::Value,
}
#[derive(Serialize)]
pub struct ListResponse {
pub events: Vec<EventRow>,
}
pub async fn list(
State(pool): State<SqlitePool>,
Query(q): Query<ListQuery>,
) -> Result<Json<ListResponse>, StatusCode> {
let limit = match q.limit {
None => DEFAULT_LIMIT,
Some(n) if n > 0 && n <= MAX_LIMIT => n,
_ => return Err(StatusCode::BAD_REQUEST),
};
let rows = sqlx::query(
"SELECT id, pc_id, at, kind, source, event_record_id, payload
FROM obs_events
WHERE (?1 IS NULL OR pc_id = ?1)
AND (?2 IS NULL OR at >= ?2)
AND (?3 IS NULL OR at < ?3)
AND (?4 IS NULL OR kind = ?4)
AND (?5 IS NULL OR source = ?5)
AND (?6 IS NULL OR json_extract(payload, '$.logon_type') = ?6)
ORDER BY at DESC, id DESC
LIMIT ?7",
)
.bind(q.pc_id.as_deref())
.bind(q.from)
.bind(q.to)
.bind(q.kind.as_deref())
.bind(q.source.as_deref())
.bind(q.logon_type)
.bind(limit)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "obs_events list query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let events = rows
.into_iter()
.filter_map(|r| match row_to_event(&r) {
Ok(e) => Some(e),
Err(e) => {
warn!(error = %e, "obs_events: drop row that failed to decode");
None
}
})
.collect();
Ok(Json(ListResponse { events }))
}
fn row_to_event(r: &SqliteRow) -> sqlx::Result<EventRow> {
let raw: String = r.try_get("payload")?;
let payload = serde_json::from_str(&raw).map_err(|e| {
sqlx::Error::Decode(format!("obs_events.payload not valid JSON: {e}").into())
})?;
Ok(EventRow {
id: r.try_get("id")?,
pc_id: r.try_get("pc_id")?,
at: r.try_get("at")?,
kind: r.try_get("kind")?,
source: r.try_get("source")?,
event_record_id: r.try_get("event_record_id")?,
payload,
})
}
#[derive(Serialize)]
pub struct KindsResponse {
pub kinds: Vec<String>,
}
pub async fn kinds(State(pool): State<SqlitePool>) -> Result<Json<KindsResponse>, StatusCode> {
let rows = sqlx::query("SELECT DISTINCT kind FROM obs_events ORDER BY kind")
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "obs_events kinds query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let kinds = rows
.into_iter()
.filter_map(|r| match r.try_get::<String, _>("kind") {
Ok(k) => Some(k),
Err(e) => {
warn!(error = %e, "obs_events kinds: drop row that failed to decode kind");
None
}
})
.collect();
Ok(Json(KindsResponse { kinds }))
}
#[derive(Deserialize)]
pub struct RecentQuery {
pub limit: Option<i64>,
}
pub async fn recent(
State(pool): State<SqlitePool>,
Query(q): Query<RecentQuery>,
) -> Result<Json<ListResponse>, StatusCode> {
let limit = match q.limit {
None => 50,
Some(n) if n > 0 && n <= MAX_LIMIT => n,
_ => return Err(StatusCode::BAD_REQUEST),
};
let rows = sqlx::query(
"SELECT id, pc_id, at, kind, source, event_record_id, payload
FROM obs_events
ORDER BY at DESC, id DESC
LIMIT ?",
)
.bind(limit)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "obs_events recent query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let events = rows
.into_iter()
.filter_map(|r| match row_to_event(&r) {
Ok(e) => Some(e),
Err(e) => {
warn!(error = %e, "obs_events recent: drop row that failed to decode");
None
}
})
.collect();
Ok(Json(ListResponse { events }))
}