use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use regex::Regex;
use serde::{Deserialize, Serialize};
use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
use tracing::warn;
#[derive(Serialize)]
pub struct ResultRow {
pub result_id: String,
pub request_id: String,
pub exec_id: Option<String>,
pub pc_id: String,
pub exit_code: Option<i64>,
pub stdout: String,
pub stderr: String,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
pub job_id: Option<String>,
pub version: Option<String>,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "lowercase")]
pub enum StatusFilter {
Success,
Failure,
Running,
}
#[derive(Deserialize)]
pub struct ListParams {
#[serde(default = "default_limit")]
pub limit: u32,
pub pc_id: Option<String>,
pub job_id: Option<String>,
pub exec_id: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub status: Option<StatusFilter>,
pub since: Option<chrono::DateTime<chrono::Utc>>,
}
fn default_limit() -> u32 {
50
}
const MAX_FETCH: i64 = 10_000;
fn compile(opt: Option<&str>) -> Result<Option<Regex>, (StatusCode, String)> {
match opt.filter(|s| !s.is_empty()) {
Some(s) => Regex::new(s)
.map(Some)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("invalid regex `{s}`: {e}"))),
None => Ok(None),
}
}
pub async fn list(
State(pool): State<SqlitePool>,
Query(params): Query<ListParams>,
) -> Result<Json<Vec<ResultRow>>, (StatusCode, String)> {
let pc_re = compile(params.pc_id.as_deref())?;
let job_re = compile(params.job_id.as_deref())?;
let exec_re = compile(params.exec_id.as_deref())?;
let stdout_re = compile(params.stdout.as_deref())?;
let stderr_re = compile(params.stderr.as_deref())?;
let has_regex = pc_re.is_some()
|| job_re.is_some()
|| exec_re.is_some()
|| stdout_re.is_some()
|| stderr_re.is_some();
let mut qb: QueryBuilder<Sqlite> = QueryBuilder::new("SELECT * FROM execution_results");
let mut sep = " WHERE ";
if let Some(status) = ¶ms.status {
let cmp = match status {
StatusFilter::Success => "exit_code = 0",
StatusFilter::Failure => "exit_code IS NOT NULL AND exit_code <> 0",
StatusFilter::Running => "finished_at IS NULL",
};
qb.push(sep).push(cmp);
sep = " AND ";
}
if let Some(since) = params.since {
qb.push(sep).push("started_at >= ").push_bind(since);
sep = " AND ";
}
let _ = sep;
qb.push(" ORDER BY started_at DESC, result_id DESC LIMIT ");
let sql_limit = if has_regex {
MAX_FETCH
} else {
params.limit as i64
};
qb.push_bind(sql_limit);
let rows = qb.build().fetch_all(&pool).await.map_err(|e| {
warn!(error = %e, "list results");
(
StatusCode::INTERNAL_SERVER_ERROR,
"list results failed".to_string(),
)
})?;
if !has_regex {
return Ok(Json(rows.into_iter().map(row_to_result).collect()));
}
let limit = params.limit as usize;
let mut out: Vec<ResultRow> = Vec::with_capacity(limit.min(64));
for r in rows {
if let Some(re) = &pc_re
&& !re.is_match(r.try_get::<&str, _>("pc_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &job_re
&& !re.is_match(r.try_get::<&str, _>("job_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &exec_re
&& !re.is_match(r.try_get::<&str, _>("exec_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &stdout_re
&& !re.is_match(r.try_get::<&str, _>("stdout").unwrap_or(""))
{
continue;
}
if let Some(re) = &stderr_re
&& !re.is_match(r.try_get::<&str, _>("stderr").unwrap_or(""))
{
continue;
}
out.push(row_to_result(r));
if out.len() >= limit {
break;
}
}
Ok(Json(out))
}
pub async fn detail(
State(pool): State<SqlitePool>,
Path(id): Path<String>,
) -> Result<Json<ResultRow>, StatusCode> {
let row = sqlx::query("SELECT * FROM execution_results WHERE result_id = ?")
.bind(&id)
.fetch_optional(&pool)
.await
.map_err(|e| {
warn!(error = %e, "detail result");
StatusCode::INTERNAL_SERVER_ERROR
})?;
match row {
Some(r) => Ok(Json(row_to_result(r))),
None => Err(StatusCode::NOT_FOUND),
}
}
fn row_to_result(r: sqlx::sqlite::SqliteRow) -> ResultRow {
ResultRow {
result_id: r.try_get("result_id").unwrap_or_default(),
request_id: r.try_get("request_id").unwrap_or_default(),
exec_id: r.try_get("exec_id").ok(),
pc_id: r.try_get("pc_id").unwrap_or_default(),
exit_code: r.try_get("exit_code").ok(),
stdout: r.try_get("stdout").unwrap_or_default(),
stderr: r.try_get("stderr").unwrap_or_default(),
started_at: r.try_get("started_at").ok(),
finished_at: r.try_get("finished_at").ok(),
job_id: r.try_get("job_id").ok(),
version: r.try_get("version").ok(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, Utc};
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
fn params(since: Option<chrono::DateTime<chrono::Utc>>) -> ListParams {
ListParams {
limit: default_limit(),
pc_id: None,
job_id: None,
exec_id: None,
stdout: None,
stderr: None,
status: None,
since,
}
}
async fn insert_row(pool: &SqlitePool, result_id: &str) {
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES (?, 'req', 'pc-1', 0, '', '', ?, ?, ?)",
)
.bind(result_id)
.bind(now - Duration::minutes(10))
.bind(now - Duration::minutes(9))
.bind(now - Duration::minutes(9))
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn since_filter_matches_same_utc_day_rows() {
let pool = fresh_pool().await;
insert_row(&pool, "r-1").await;
let rows = list(
State(pool.clone()),
Query(params(Some(Utc::now() - Duration::hours(24)))),
)
.await
.unwrap()
.0;
assert_eq!(
rows.len(),
1,
"row that ran minutes ago must be inside the rolling 24h window",
);
let rows = list(
State(pool),
Query(params(Some(Utc::now() + Duration::minutes(5)))),
)
.await
.unwrap()
.0;
assert!(rows.is_empty(), "future bound must exclude the row");
}
#[tokio::test]
async fn since_filters_on_started_at_not_recorded_at() {
let pool = fresh_pool().await;
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES ('r-replayed', 'req', 'pc-1', 0, '', '', ?, ?, ?)",
)
.bind(now - Duration::days(21)) .bind(now - Duration::days(21))
.bind(now) .execute(&pool)
.await
.unwrap();
let rows = list(
State(pool.clone()),
Query(params(Some(now - Duration::hours(24)))),
)
.await
.unwrap()
.0;
assert!(
rows.is_empty(),
"a re-projected three-week-old run must not flood the 24h window",
);
let rows = list(State(pool), Query(params(Some(now - Duration::days(30)))))
.await
.unwrap()
.0;
assert_eq!(rows.len(), 1);
}
}