use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use regex::Regex;
use serde::{Deserialize, Serialize};
use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
use tracing::warn;
#[derive(Serialize)]
pub struct ResultRow {
pub result_id: String,
pub request_id: String,
pub exec_id: Option<String>,
pub pc_id: String,
pub exit_code: Option<i64>,
pub stdout: String,
pub stderr: String,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
pub job_id: Option<String>,
pub version: Option<String>,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "lowercase")]
pub enum StatusFilter {
Success,
Failure,
Running,
}
#[derive(Deserialize)]
pub struct ListParams {
#[serde(default = "default_limit")]
pub limit: u32,
pub pc_id: Option<String>,
pub job_id: Option<String>,
pub exec_id: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub status: Option<StatusFilter>,
pub since: Option<chrono::DateTime<chrono::Utc>>,
}
fn default_limit() -> u32 {
50
}
const MAX_FETCH: i64 = 10_000;
fn compile(opt: Option<&str>) -> Result<Option<Regex>, (StatusCode, String)> {
match opt.filter(|s| !s.is_empty()) {
Some(s) => Regex::new(s)
.map(Some)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("invalid regex `{s}`: {e}"))),
None => Ok(None),
}
}
pub async fn list(
State(pool): State<SqlitePool>,
Query(params): Query<ListParams>,
) -> Result<Json<Vec<ResultRow>>, (StatusCode, String)> {
let pc_re = compile(params.pc_id.as_deref())?;
let job_re = compile(params.job_id.as_deref())?;
let exec_re = compile(params.exec_id.as_deref())?;
let stdout_re = compile(params.stdout.as_deref())?;
let stderr_re = compile(params.stderr.as_deref())?;
let has_regex = pc_re.is_some()
|| job_re.is_some()
|| exec_re.is_some()
|| stdout_re.is_some()
|| stderr_re.is_some();
let mut qb: QueryBuilder<Sqlite> = QueryBuilder::new("SELECT * FROM execution_results");
let mut sep = " WHERE ";
if let Some(status) = ¶ms.status {
let cmp = match status {
StatusFilter::Success => "exit_code = 0",
StatusFilter::Failure => "exit_code IS NOT NULL AND exit_code <> 0",
StatusFilter::Running => "finished_at IS NULL",
};
qb.push(sep).push(cmp);
sep = " AND ";
}
if let Some(since) = params.since {
qb.push(sep).push("recorded_at >= ").push_bind(since);
sep = " AND ";
}
let _ = sep;
qb.push(" ORDER BY recorded_at DESC LIMIT ");
let sql_limit = if has_regex {
MAX_FETCH
} else {
params.limit as i64
};
qb.push_bind(sql_limit);
let rows = qb.build().fetch_all(&pool).await.map_err(|e| {
warn!(error = %e, "list results");
(
StatusCode::INTERNAL_SERVER_ERROR,
"list results failed".to_string(),
)
})?;
if !has_regex {
return Ok(Json(rows.into_iter().map(row_to_result).collect()));
}
let limit = params.limit as usize;
let mut out: Vec<ResultRow> = Vec::with_capacity(limit.min(64));
for r in rows {
if let Some(re) = &pc_re
&& !re.is_match(r.try_get::<&str, _>("pc_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &job_re
&& !re.is_match(r.try_get::<&str, _>("job_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &exec_re
&& !re.is_match(r.try_get::<&str, _>("exec_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &stdout_re
&& !re.is_match(r.try_get::<&str, _>("stdout").unwrap_or(""))
{
continue;
}
if let Some(re) = &stderr_re
&& !re.is_match(r.try_get::<&str, _>("stderr").unwrap_or(""))
{
continue;
}
out.push(row_to_result(r));
if out.len() >= limit {
break;
}
}
Ok(Json(out))
}
pub async fn detail(
State(pool): State<SqlitePool>,
Path(id): Path<String>,
) -> Result<Json<ResultRow>, StatusCode> {
let row = sqlx::query("SELECT * FROM execution_results WHERE result_id = ?")
.bind(&id)
.fetch_optional(&pool)
.await
.map_err(|e| {
warn!(error = %e, "detail result");
StatusCode::INTERNAL_SERVER_ERROR
})?;
match row {
Some(r) => Ok(Json(row_to_result(r))),
None => Err(StatusCode::NOT_FOUND),
}
}
fn row_to_result(r: sqlx::sqlite::SqliteRow) -> ResultRow {
ResultRow {
result_id: r.try_get("result_id").unwrap_or_default(),
request_id: r.try_get("request_id").unwrap_or_default(),
exec_id: r.try_get("exec_id").ok(),
pc_id: r.try_get("pc_id").unwrap_or_default(),
exit_code: r.try_get("exit_code").ok(),
stdout: r.try_get("stdout").unwrap_or_default(),
stderr: r.try_get("stderr").unwrap_or_default(),
started_at: r.try_get("started_at").ok(),
finished_at: r.try_get("finished_at").ok(),
job_id: r.try_get("job_id").ok(),
version: r.try_get("version").ok(),
}
}