use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use serde::{Deserialize, Serialize};
use sqlx::{Row, SqlitePool};
use tracing::warn;
#[derive(Serialize)]
pub struct ExecutionRow {
pub exec_id: String,
pub job_id: String,
pub version: String,
pub initiated_by: String,
pub initiated_at: Option<chrono::DateTime<chrono::Utc>>,
pub target_count: i64,
pub success_count: i64,
pub failure_count: i64,
pub status: String,
}
#[derive(Deserialize)]
pub struct ListParams {
#[serde(default = "default_limit")]
pub limit: u32,
pub job_id: Option<String>,
pub status: Option<String>,
}
fn default_limit() -> u32 {
50
}
pub async fn list(
State(pool): State<SqlitePool>,
Query(params): Query<ListParams>,
) -> Result<Json<Vec<ExecutionRow>>, StatusCode> {
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT exec_id, job_id, version, initiated_by, initiated_at, \
target_count, success_count, failure_count, status \
FROM executions",
);
let mut sep = " WHERE ";
if let Some(jid) = params.job_id.as_deref().filter(|s| !s.is_empty()) {
qb.push(sep).push("job_id = ").push_bind(jid.to_owned());
sep = " AND ";
}
if let Some(st) = params.status.as_deref().filter(|s| !s.is_empty()) {
qb.push(sep).push("status = ").push_bind(st.to_owned());
let _ = sep;
}
qb.push(" ORDER BY initiated_at DESC LIMIT ")
.push_bind(params.limit as i64);
let rows = qb.build().fetch_all(&pool).await.map_err(|e| {
warn!(error = %e, "list executions");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(rows.into_iter().map(row_to_execution).collect()))
}
pub async fn detail(
State(pool): State<SqlitePool>,
Path(exec_id): Path<String>,
) -> Result<Json<ExecutionDetail>, StatusCode> {
let row = sqlx::query(
"SELECT exec_id, job_id, version, initiated_by, initiated_at, \
target_count, success_count, failure_count, status \
FROM executions WHERE exec_id = ?",
)
.bind(&exec_id)
.fetch_optional(&pool)
.await
.map_err(|e| {
warn!(error = %e, "detail execution");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let execution = match row {
Some(r) => row_to_execution(r),
None => return Err(StatusCode::NOT_FOUND),
};
let result_rows = sqlx::query(
"SELECT result_id, request_id, pc_id, exit_code, started_at,
finished_at, recorded_at
FROM execution_results
WHERE exec_id = ?
ORDER BY recorded_at DESC",
)
.bind(&exec_id)
.fetch_all(&pool)
.await
.map_err(|e| {
warn!(error = %e, "detail execution results");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let results: Vec<ExecutionResultSummary> = result_rows
.into_iter()
.map(|r| ExecutionResultSummary {
result_id: r.try_get("result_id").unwrap_or_default(),
request_id: r.try_get("request_id").unwrap_or_default(),
pc_id: r.try_get("pc_id").unwrap_or_default(),
exit_code: r.try_get("exit_code").unwrap_or(0),
started_at: r.try_get("started_at").ok(),
finished_at: r.try_get("finished_at").ok(),
})
.collect();
Ok(Json(ExecutionDetail { execution, results }))
}
#[derive(Serialize)]
pub struct ExecutionDetail {
pub execution: ExecutionRow,
pub results: Vec<ExecutionResultSummary>,
}
#[derive(Serialize)]
pub struct ExecutionResultSummary {
pub result_id: String,
pub request_id: String,
pub pc_id: String,
pub exit_code: i64,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
}
fn row_to_execution(r: sqlx::sqlite::SqliteRow) -> ExecutionRow {
ExecutionRow {
exec_id: r.try_get("exec_id").unwrap_or_default(),
job_id: r.try_get("job_id").unwrap_or_default(),
version: r.try_get("version").unwrap_or_default(),
initiated_by: r.try_get("initiated_by").unwrap_or_default(),
initiated_at: r.try_get("initiated_at").ok(),
target_count: r.try_get("target_count").unwrap_or(0),
success_count: r.try_get("success_count").unwrap_or(0),
failure_count: r.try_get("failure_count").unwrap_or(0),
status: r.try_get("status").unwrap_or_default(),
}
}