use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqliteRow;
use sqlx::{Row, SqlitePool};
use tracing::warn;
const DEFAULT_LIMIT: i64 = 200;
const MAX_LIMIT: i64 = 5_000;
#[derive(Deserialize)]
pub struct ListQuery {
pub pc_id: Option<String>,
pub from: Option<DateTime<Utc>>,
pub to: Option<DateTime<Utc>>,
pub kind: Option<String>,
pub source: Option<String>,
pub logon_type: Option<i64>,
pub kinds: Option<String>,
pub kinds_ex: Option<String>,
pub sources: Option<String>,
pub sources_ex: Option<String>,
pub payload_key: Option<String>,
pub payload_value: Option<String>,
pub limit: Option<i64>,
}
fn csv_to_json_array(csv: &Option<String>) -> Option<String> {
let vals: Vec<&str> = csv
.as_deref()?
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect();
if vals.is_empty() {
return None;
}
serde_json::to_string(&vals).ok()
}
#[derive(Serialize)]
pub struct EventRow {
pub id: i64,
pub pc_id: String,
pub at: DateTime<Utc>,
pub kind: String,
pub source: String,
pub event_record_id: Option<String>,
pub payload: serde_json::Value,
}
#[derive(Serialize)]
pub struct ListResponse {
pub events: Vec<EventRow>,
}
pub async fn list(
State(pool): State<SqlitePool>,
Query(q): Query<ListQuery>,
) -> Result<Json<ListResponse>, StatusCode> {
let limit = match q.limit {
None => DEFAULT_LIMIT,
Some(n) if n > 0 && n <= MAX_LIMIT => n,
_ => return Err(StatusCode::BAD_REQUEST),
};
let payload_key = match q.payload_key.as_deref().map(str::trim) {
None | Some("") => None,
Some(k) if k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') => Some(k),
Some(_) => return Err(StatusCode::BAD_REQUEST),
};
let payload_value = payload_key.and(q.payload_value.as_deref());
let payload_key = payload_value.and(payload_key);
let payload_value_num: Option<f64> = payload_value.and_then(|v| v.parse().ok());
let kinds_json = csv_to_json_array(&q.kinds);
let kinds_ex_json = csv_to_json_array(&q.kinds_ex);
let sources_json = csv_to_json_array(&q.sources);
let sources_ex_json = csv_to_json_array(&q.sources_ex);
let rows = sqlx::query(
"SELECT id, pc_id, at, kind, source, event_record_id, payload
FROM obs_events
WHERE (?1 IS NULL OR pc_id = ?1)
AND (?2 IS NULL OR at >= ?2)
AND (?3 IS NULL OR at < ?3)
AND (?4 IS NULL OR kind = ?4)
AND (?5 IS NULL OR source = ?5)
AND (?6 IS NULL OR json_extract(payload, '$.logon_type') = ?6)
AND (?7 IS NULL OR kind IN (SELECT value FROM json_each(?7)))
AND (?8 IS NULL OR kind NOT IN (SELECT value FROM json_each(?8)))
AND (?9 IS NULL OR source IN (SELECT value FROM json_each(?9)))
AND (?10 IS NULL OR source NOT IN (SELECT value FROM json_each(?10)))
AND (?11 IS NULL
OR json_extract(payload, '$.' || ?11) = ?12
OR (?13 IS NOT NULL AND json_extract(payload, '$.' || ?11) = ?13))
ORDER BY at DESC, id DESC
LIMIT ?14",
)
.bind(q.pc_id.as_deref())
.bind(q.from)
.bind(q.to)
.bind(q.kind.as_deref())
.bind(q.source.as_deref())
.bind(q.logon_type)
.bind(kinds_json)
.bind(kinds_ex_json)
.bind(sources_json)
.bind(sources_ex_json)
.bind(payload_key)
.bind(payload_value)
.bind(payload_value_num)
.bind(limit)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "obs_events list query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let events = rows
.into_iter()
.filter_map(|r| match row_to_event(&r) {
Ok(e) => Some(e),
Err(e) => {
warn!(error = %e, "obs_events: drop row that failed to decode");
None
}
})
.collect();
Ok(Json(ListResponse { events }))
}
fn row_to_event(r: &SqliteRow) -> sqlx::Result<EventRow> {
let raw: String = r.try_get("payload")?;
let payload = serde_json::from_str(&raw).map_err(|e| {
sqlx::Error::Decode(format!("obs_events.payload not valid JSON: {e}").into())
})?;
Ok(EventRow {
id: r.try_get("id")?,
pc_id: r.try_get("pc_id")?,
at: r.try_get("at")?,
kind: r.try_get("kind")?,
source: r.try_get("source")?,
event_record_id: r.try_get("event_record_id")?,
payload,
})
}
#[derive(Serialize)]
pub struct KindsResponse {
pub kinds: Vec<String>,
}
pub async fn kinds(State(pool): State<SqlitePool>) -> Result<Json<KindsResponse>, StatusCode> {
let rows = sqlx::query("SELECT DISTINCT kind FROM obs_events ORDER BY kind")
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "obs_events kinds query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let kinds = rows
.into_iter()
.filter_map(|r| match r.try_get::<String, _>("kind") {
Ok(k) => Some(k),
Err(e) => {
warn!(error = %e, "obs_events kinds: drop row that failed to decode kind");
None
}
})
.collect();
Ok(Json(KindsResponse { kinds }))
}
#[derive(Serialize)]
pub struct SourcesResponse {
pub sources: Vec<String>,
}
pub async fn sources(State(pool): State<SqlitePool>) -> Result<Json<SourcesResponse>, StatusCode> {
let rows = sqlx::query("SELECT DISTINCT source FROM obs_events ORDER BY source")
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "obs_events sources query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let sources = rows
.into_iter()
.filter_map(|r| match r.try_get::<String, _>("source") {
Ok(s) => Some(s),
Err(e) => {
warn!(error = %e, "obs_events sources: drop row that failed to decode source");
None
}
})
.collect();
Ok(Json(SourcesResponse { sources }))
}
#[derive(Deserialize)]
pub struct RecentQuery {
pub limit: Option<i64>,
}
pub async fn recent(
State(pool): State<SqlitePool>,
Query(q): Query<RecentQuery>,
) -> Result<Json<ListResponse>, StatusCode> {
let limit = match q.limit {
None => 50,
Some(n) if n > 0 && n <= MAX_LIMIT => n,
_ => return Err(StatusCode::BAD_REQUEST),
};
let rows = sqlx::query(
"SELECT id, pc_id, at, kind, source, event_record_id, payload
FROM obs_events
ORDER BY at DESC, id DESC
LIMIT ?",
)
.bind(limit)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "obs_events recent query");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let events = rows
.into_iter()
.filter_map(|r| match row_to_event(&r) {
Ok(e) => Some(e),
Err(e) => {
warn!(error = %e, "obs_events recent: drop row that failed to decode");
None
}
})
.collect();
Ok(Json(ListResponse { events }))
}