use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use kanade_shared::subject;
use kanade_shared::wire::{JobTailReply, JobTailRequest};
use regex::Regex;
use serde::{Deserialize, Serialize};
use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
use tracing::warn;
use super::AppState;
#[derive(Serialize, Debug, PartialEq)]
pub struct MatchSnippet {
pub before: String,
pub matched: String,
pub after: String,
pub clipped_start: bool,
pub clipped_end: bool,
}
#[derive(Serialize)]
pub struct ResultRow {
pub result_id: String,
pub request_id: String,
pub exec_id: Option<String>,
pub pc_id: String,
pub exit_code: Option<i64>,
pub stdout: String,
pub stderr: String,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
pub job_id: Option<String>,
pub version: Option<String>,
pub stdout_truncated: bool,
pub stderr_truncated: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub stdout_match: Option<MatchSnippet>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stderr_match: Option<MatchSnippet>,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "lowercase")]
pub enum StatusFilter {
Success,
Failure,
Running,
}
#[derive(Deserialize)]
pub struct ListParams {
#[serde(default = "default_limit")]
pub limit: u32,
pub pc_id: Option<String>,
pub job_id: Option<String>,
pub exec_id: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub status: Option<StatusFilter>,
pub since: Option<chrono::DateTime<chrono::Utc>>,
}
fn default_limit() -> u32 {
50
}
const MAX_FETCH: i64 = 10_000;
const MAX_FETCH_WITH_OUTPUT: i64 = 1_000;
const META_COLUMNS: &str =
"result_id, request_id, exec_id, pc_id, exit_code, started_at, finished_at, job_id, version";
const HYDRATE_CHUNK: usize = 500;
const PREVIEW_CHARS: usize = 200;
const SNIPPET_CONTEXT_CHARS: usize = 80;
const SNIPPET_MATCH_CHARS: usize = 240;
fn compile(opt: Option<&str>) -> Result<Option<Regex>, (StatusCode, String)> {
match opt.filter(|s| !s.is_empty()) {
Some(s) => Regex::new(s)
.map(Some)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("invalid regex `{s}`: {e}"))),
None => Ok(None),
}
}
pub async fn list(
State(pool): State<SqlitePool>,
Query(params): Query<ListParams>,
) -> Result<Json<Vec<ResultRow>>, (StatusCode, String)> {
let pc_re = compile(params.pc_id.as_deref())?;
let job_re = compile(params.job_id.as_deref())?;
let exec_re = compile(params.exec_id.as_deref())?;
let stdout_re = compile(params.stdout.as_deref())?;
let stderr_re = compile(params.stderr.as_deref())?;
let has_regex = pc_re.is_some()
|| job_re.is_some()
|| exec_re.is_some()
|| stdout_re.is_some()
|| stderr_re.is_some();
let needs_output = stdout_re.is_some() || stderr_re.is_some();
let select = if has_regex && !needs_output {
format!("SELECT {META_COLUMNS} FROM execution_results")
} else {
"SELECT * FROM execution_results".to_string()
};
let mut qb: QueryBuilder<Sqlite> = QueryBuilder::new(select);
let mut sep = " WHERE ";
if let Some(status) = ¶ms.status {
let cmp = match status {
StatusFilter::Success => "exit_code = 0",
StatusFilter::Failure => "exit_code IS NOT NULL AND exit_code <> 0",
StatusFilter::Running => "finished_at IS NULL",
};
qb.push(sep).push(cmp);
sep = " AND ";
}
if let Some(since) = params.since {
qb.push(sep).push("started_at >= ").push_bind(since);
sep = " AND ";
}
let _ = sep;
qb.push(" ORDER BY started_at DESC, result_id DESC LIMIT ");
let sql_limit = if !has_regex {
params.limit as i64
} else if needs_output {
MAX_FETCH_WITH_OUTPUT
} else {
MAX_FETCH
};
qb.push_bind(sql_limit);
let rows = qb.build().fetch_all(&pool).await.map_err(|e| {
warn!(error = %e, "list results");
(
StatusCode::INTERNAL_SERVER_ERROR,
"list results failed".to_string(),
)
})?;
if !has_regex {
let out: Vec<ResultRow> = rows
.iter()
.map(|r| {
let stdout_raw: &str = r.try_get("stdout").unwrap_or("");
let stderr_raw: &str = r.try_get("stderr").unwrap_or("");
let (sp, st) = take_prefix(stdout_raw, PREVIEW_CHARS);
let (ep, et) = take_prefix(stderr_raw, PREVIEW_CHARS);
build_row(r, sp.to_string(), ep.to_string(), st, et, None, None)
})
.collect();
return Ok(Json(out));
}
let limit = params.limit as usize;
let mut out: Vec<ResultRow> = Vec::with_capacity(limit.min(64));
for r in rows {
if let Some(re) = &pc_re
&& !re.is_match(r.try_get::<&str, _>("pc_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &job_re
&& !re.is_match(r.try_get::<&str, _>("job_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &exec_re
&& !re.is_match(r.try_get::<&str, _>("exec_id").unwrap_or(""))
{
continue;
}
if let Some(re) = &stdout_re
&& !re.is_match(r.try_get::<&str, _>("stdout").unwrap_or(""))
{
continue;
}
if let Some(re) = &stderr_re
&& !re.is_match(r.try_get::<&str, _>("stderr").unwrap_or(""))
{
continue;
}
let stdout_raw: &str = r.try_get("stdout").unwrap_or("");
let stderr_raw: &str = r.try_get("stderr").unwrap_or("");
let stdout_match = stdout_re
.as_ref()
.and_then(|re| match_snippet(re, stdout_raw));
let stderr_match = stderr_re
.as_ref()
.and_then(|re| match_snippet(re, stderr_raw));
let (sp, st) = take_prefix(stdout_raw, PREVIEW_CHARS);
let (ep, et) = take_prefix(stderr_raw, PREVIEW_CHARS);
out.push(build_row(
&r,
sp.to_string(),
ep.to_string(),
st,
et,
stdout_match,
stderr_match,
));
if out.len() >= limit {
break;
}
}
if !needs_output && !out.is_empty() {
let mut by_id: std::collections::HashMap<String, (String, String)> =
std::collections::HashMap::with_capacity(out.len());
for chunk in out.chunks(HYDRATE_CHUNK) {
let mut qb: QueryBuilder<Sqlite> =
QueryBuilder::new("SELECT result_id, stdout, stderr FROM execution_results");
qb.push(" WHERE result_id IN (");
{
let mut sep = qb.separated(", ");
for row in chunk {
sep.push_bind(row.result_id.clone());
}
}
qb.push(")");
let blob_rows = qb.build().fetch_all(&pool).await.map_err(|e| {
warn!(error = %e, "hydrate result output");
(
StatusCode::INTERNAL_SERVER_ERROR,
"list results failed".to_string(),
)
})?;
by_id.extend(blob_rows.into_iter().map(|r| {
(
r.try_get("result_id").unwrap_or_default(),
(
r.try_get("stdout").unwrap_or_default(),
r.try_get("stderr").unwrap_or_default(),
),
)
}));
}
for row in &mut out {
if let Some((stdout, stderr)) = by_id.remove(&row.result_id) {
let (sp, st) = take_prefix(&stdout, PREVIEW_CHARS);
let (ep, et) = take_prefix(&stderr, PREVIEW_CHARS);
row.stdout = sp.to_string();
row.stderr = ep.to_string();
row.stdout_truncated = st;
row.stderr_truncated = et;
}
}
}
Ok(Json(out))
}
pub async fn detail(
State(pool): State<SqlitePool>,
Path(id): Path<String>,
) -> Result<Json<ResultRow>, StatusCode> {
let row = sqlx::query("SELECT * FROM execution_results WHERE result_id = ?")
.bind(&id)
.fetch_optional(&pool)
.await
.map_err(|e| {
warn!(error = %e, "detail result");
StatusCode::INTERNAL_SERVER_ERROR
})?;
match row {
Some(r) => Ok(Json(row_to_result(r))),
None => Err(StatusCode::NOT_FOUND),
}
}
#[derive(Serialize)]
pub struct TailResponse {
pub running: bool,
pub live: bool,
pub stdout: String,
pub stderr: String,
pub stdout_truncated: bool,
pub stderr_truncated: bool,
pub exit_code: Option<i64>,
}
pub async fn tail(
State(state): State<AppState>,
Path(result_id): Path<String>,
) -> Result<Json<TailResponse>, StatusCode> {
let row = sqlx::query(
"SELECT pc_id, finished_at, exit_code, stdout, stderr \
FROM execution_results WHERE result_id = ?",
)
.bind(&result_id)
.fetch_optional(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, "tail: lookup result row");
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let pc_id: String = row.try_get("pc_id").unwrap_or_default();
let finished_at: Option<chrono::DateTime<chrono::Utc>> = row.try_get("finished_at").ok();
let exit_code: Option<i64> = row.try_get("exit_code").ok();
if finished_at.is_some() {
return Ok(Json(TailResponse {
running: false,
live: false,
stdout: row.try_get("stdout").unwrap_or_default(),
stderr: row.try_get("stderr").unwrap_or_default(),
stdout_truncated: false,
stderr_truncated: false,
exit_code,
}));
}
let waiting = || {
Ok(Json(TailResponse {
running: true,
live: false,
stdout: String::new(),
stderr: String::new(),
stdout_truncated: false,
stderr_truncated: false,
exit_code: None,
}))
};
let req = JobTailRequest {
result_id: result_id.clone(),
};
let payload = match serde_json::to_vec(&req) {
Ok(p) => p,
Err(e) => {
warn!(error = %e, "tail: encode JobTailRequest");
return waiting();
}
};
let subject = subject::job_tail(&pc_id);
let reply = match tokio::time::timeout(
std::time::Duration::from_secs(3),
state.nats.request(subject, payload.into()),
)
.await
{
Ok(Ok(msg)) => msg,
Ok(Err(e)) => {
warn!(error = %e, %pc_id, "tail: job.tail request failed");
return waiting();
}
Err(_) => {
return waiting();
}
};
let parsed: JobTailReply = match serde_json::from_slice(&reply.payload) {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "tail: decode JobTailReply");
return waiting();
}
};
if !parsed.found {
return waiting();
}
Ok(Json(TailResponse {
running: parsed.running,
live: true,
stdout: parsed.stdout,
stderr: parsed.stderr,
stdout_truncated: parsed.stdout_truncated,
stderr_truncated: parsed.stderr_truncated,
exit_code: None,
}))
}
fn take_prefix(s: &str, n: usize) -> (&str, bool) {
match s.char_indices().nth(n) {
Some((idx, _)) => (&s[..idx], true),
None => (s, false),
}
}
fn take_suffix(s: &str, n: usize) -> (&str, bool) {
if n == 0 {
return ("", !s.is_empty());
}
let mut iter = s.char_indices().rev();
match iter.nth(n - 1) {
Some((idx, _)) => (&s[idx..], iter.next().is_some()),
None => (s, false),
}
}
fn match_snippet(re: &Regex, hay: &str) -> Option<MatchSnippet> {
let m = re.find(hay)?;
let (before, clipped_start) = take_suffix(&hay[..m.start()], SNIPPET_CONTEXT_CHARS);
let (matched, matched_clipped) = take_prefix(&hay[m.start()..m.end()], SNIPPET_MATCH_CHARS);
let (after, after_clipped) = take_prefix(&hay[m.end()..], SNIPPET_CONTEXT_CHARS);
Some(MatchSnippet {
before: before.to_string(),
matched: matched.to_string(),
after: after.to_string(),
clipped_start,
clipped_end: matched_clipped || after_clipped,
})
}
fn build_row(
r: &sqlx::sqlite::SqliteRow,
stdout: String,
stderr: String,
stdout_truncated: bool,
stderr_truncated: bool,
stdout_match: Option<MatchSnippet>,
stderr_match: Option<MatchSnippet>,
) -> ResultRow {
ResultRow {
result_id: r.try_get("result_id").unwrap_or_default(),
request_id: r.try_get("request_id").unwrap_or_default(),
exec_id: r.try_get("exec_id").ok(),
pc_id: r.try_get("pc_id").unwrap_or_default(),
exit_code: r.try_get("exit_code").ok(),
stdout,
stderr,
started_at: r.try_get("started_at").ok(),
finished_at: r.try_get("finished_at").ok(),
job_id: r.try_get("job_id").ok(),
version: r.try_get("version").ok(),
stdout_truncated,
stderr_truncated,
stdout_match,
stderr_match,
}
}
fn row_to_result(r: sqlx::sqlite::SqliteRow) -> ResultRow {
let stdout: String = r.try_get("stdout").unwrap_or_default();
let stderr: String = r.try_get("stderr").unwrap_or_default();
build_row(&r, stdout, stderr, false, false, None, None)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, Utc};
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
fn params(since: Option<chrono::DateTime<chrono::Utc>>) -> ListParams {
ListParams {
limit: default_limit(),
pc_id: None,
job_id: None,
exec_id: None,
stdout: None,
stderr: None,
status: None,
since,
}
}
async fn insert_row(pool: &SqlitePool, result_id: &str) {
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES (?, 'req', 'pc-1', 0, '', '', ?, ?, ?)",
)
.bind(result_id)
.bind(now - Duration::minutes(10))
.bind(now - Duration::minutes(9))
.bind(now - Duration::minutes(9))
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn since_filter_matches_same_utc_day_rows() {
let pool = fresh_pool().await;
insert_row(&pool, "r-1").await;
let rows = list(
State(pool.clone()),
Query(params(Some(Utc::now() - Duration::hours(24)))),
)
.await
.unwrap()
.0;
assert_eq!(
rows.len(),
1,
"row that ran minutes ago must be inside the rolling 24h window",
);
let rows = list(
State(pool),
Query(params(Some(Utc::now() + Duration::minutes(5)))),
)
.await
.unwrap()
.0;
assert!(rows.is_empty(), "future bound must exclude the row");
}
#[tokio::test]
async fn since_filters_on_started_at_not_recorded_at() {
let pool = fresh_pool().await;
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES ('r-replayed', 'req', 'pc-1', 0, '', '', ?, ?, ?)",
)
.bind(now - Duration::days(21)) .bind(now - Duration::days(21))
.bind(now) .execute(&pool)
.await
.unwrap();
let rows = list(
State(pool.clone()),
Query(params(Some(now - Duration::hours(24)))),
)
.await
.unwrap()
.0;
assert!(
rows.is_empty(),
"a re-projected three-week-old run must not flood the 24h window",
);
let rows = list(State(pool), Query(params(Some(now - Duration::days(30)))))
.await
.unwrap()
.0;
assert_eq!(rows.len(), 1);
}
#[tokio::test]
async fn metadata_regex_path_still_returns_output() {
let pool = fresh_pool().await;
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES ('r-out', 'req', 'pc-9', 0, 'hello stdout', 'hello stderr', ?, ?, ?)",
)
.bind(now - Duration::minutes(10))
.bind(now - Duration::minutes(9))
.bind(now - Duration::minutes(9))
.execute(&pool)
.await
.unwrap();
insert_row(&pool, "r-other").await;
let mut p = params(None);
p.pc_id = Some("^pc-9$".into());
let rows = list(State(pool), Query(p)).await.unwrap().0;
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].result_id, "r-out");
assert_eq!(
rows[0].stdout, "hello stdout",
"winners must be re-hydrated with their output",
);
assert_eq!(rows[0].stderr, "hello stderr");
}
#[tokio::test]
async fn metadata_regex_projection_matches_full_row() {
let pool = fresh_pool().await;
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, exec_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at, job_id, version)
VALUES ('r-full', 'req-1', 'ex-1', 'pc-7', 3, 'out body', 'err body',
?, ?, ?, 'job-x', '1.2.3')",
)
.bind(now - Duration::minutes(10))
.bind(now - Duration::minutes(9))
.bind(now - Duration::minutes(9))
.execute(&pool)
.await
.unwrap();
let fast = list(State(pool.clone()), Query(params(None)))
.await
.unwrap()
.0;
let mut p = params(None);
p.pc_id = Some("pc-7".into());
let via_regex = list(State(pool), Query(p)).await.unwrap().0;
assert_eq!(fast.len(), 1);
assert_eq!(via_regex.len(), 1);
assert_eq!(
serde_json::to_value(&fast[0]).unwrap(),
serde_json::to_value(&via_regex[0]).unwrap(),
"metadata projection + hydration must reproduce the full row exactly",
);
}
#[tokio::test]
async fn stdout_regex_path_matches_against_output() {
let pool = fresh_pool().await;
let now = Utc::now();
for (id, out) in [("r-hit", "ERROR: kaboom"), ("r-miss", "all fine")] {
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES (?, 'req', 'pc-1', 0, ?, '', ?, ?, ?)",
)
.bind(id)
.bind(out)
.bind(now - Duration::minutes(10))
.bind(now - Duration::minutes(9))
.bind(now - Duration::minutes(9))
.execute(&pool)
.await
.unwrap();
}
let mut p = params(None);
p.stdout = Some("(?i)error".into());
let rows = list(State(pool), Query(p)).await.unwrap().0;
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].result_id, "r-hit");
assert_eq!(rows[0].stdout, "ERROR: kaboom");
}
async fn insert_row_io(pool: &SqlitePool, result_id: &str, stdout: &str, stderr: &str) {
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES (?, 'req', 'pc-1', 0, ?, ?, ?, ?, ?)",
)
.bind(result_id)
.bind(stdout)
.bind(stderr)
.bind(now - Duration::minutes(10))
.bind(now - Duration::minutes(9))
.bind(now - Duration::minutes(9))
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn list_clips_stdout_to_preview() {
let pool = fresh_pool().await;
let long = "x".repeat(PREVIEW_CHARS + 50);
insert_row_io(&pool, "r-long", &long, "").await;
let rows = list(
State(pool),
Query(params(Some(Utc::now() - Duration::hours(24)))),
)
.await
.unwrap()
.0;
assert_eq!(rows.len(), 1);
let row = &rows[0];
assert_eq!(
row.stdout.chars().count(),
PREVIEW_CHARS,
"stdout must be clipped to the preview length",
);
assert!(row.stdout_truncated, "a clipped stdout must be flagged");
assert!(!row.stderr_truncated, "empty stderr is not truncated");
assert!(
row.stdout_match.is_none(),
"no regex filter → no match excerpt",
);
}
#[tokio::test]
async fn metadata_regex_path_clips_rehydrated_output() {
let pool = fresh_pool().await;
let long = "y".repeat(PREVIEW_CHARS + 80);
let now = Utc::now();
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at)
VALUES ('r-big', 'req', 'pc-9', 0, ?, '', ?, ?, ?)",
)
.bind(&long)
.bind(now - Duration::minutes(10))
.bind(now - Duration::minutes(9))
.bind(now - Duration::minutes(9))
.execute(&pool)
.await
.unwrap();
let mut p = params(None);
p.pc_id = Some("^pc-9$".into());
let rows = list(State(pool), Query(p)).await.unwrap().0;
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0].stdout.chars().count(),
PREVIEW_CHARS,
"rehydrated output must still be clipped to the preview",
);
assert!(
rows[0].stdout_truncated,
"a clipped rehydrated stdout must be flagged",
);
}
#[tokio::test]
async fn stdout_regex_returns_match_excerpt() {
let pool = fresh_pool().await;
let hay = format!("{}NEEDLE{}", "a".repeat(PREVIEW_CHARS + 20), "b".repeat(30));
insert_row_io(&pool, "r-match", &hay, "").await;
let mut p = params(Some(Utc::now() - Duration::hours(24)));
p.stdout = Some("NEEDLE".to_string());
let rows = list(State(pool), Query(p)).await.unwrap().0;
assert_eq!(rows.len(), 1);
let m = rows[0]
.stdout_match
.as_ref()
.expect("a stdout match must carry an excerpt");
assert_eq!(m.matched, "NEEDLE");
assert!(m.clipped_start, "context before the needle was clipped");
assert!(m.before.ends_with('a'), "before-context is the lead-in");
assert!(m.after.starts_with('b'), "after-context is the tail");
}
#[tokio::test]
async fn greedy_match_is_capped() {
let pool = fresh_pool().await;
let hay = "z".repeat(SNIPPET_MATCH_CHARS + 100);
insert_row_io(&pool, "r-greedy", &hay, "").await;
let mut p = params(Some(Utc::now() - Duration::hours(24)));
p.stdout = Some("z+".to_string());
let rows = list(State(pool), Query(p)).await.unwrap().0;
let m = rows[0].stdout_match.as_ref().unwrap();
assert_eq!(
m.matched.chars().count(),
SNIPPET_MATCH_CHARS,
"matched run is capped",
);
assert!(m.clipped_end, "a capped match flags clipped_end");
}
}