use anyhow::{Context, Result};
use async_nats::jetstream::{self, consumer::pull::Config as PullConfig};
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kanade_shared::kv::STREAM_EVENTS;
use kanade_shared::subject::EVENTS_STARTED_FILTER;
use kanade_shared::wire::EventStarted;
use sqlx::SqlitePool;
use std::time::Duration;
use tracing::{debug, info, warn};
use super::spec_cache::ExplodeSpecCache;
const MIN_REAP_SLACK: Duration = Duration::from_secs(120);
fn reap_deadline(timeout: Duration, recorded_at: DateTime<Utc>) -> DateTime<Utc> {
let slack = std::cmp::max(MIN_REAP_SLACK, timeout / 2);
let cap = chrono::Duration::days(365);
let span = timeout
.checked_add(slack)
.and_then(|d| chrono::Duration::from_std(d).ok())
.unwrap_or(cap);
recorded_at
.checked_add_signed(span)
.unwrap_or_else(|| recorded_at + cap)
}
async fn resolve_expires_at(
cache: &ExplodeSpecCache,
manifest_id: &str,
recorded_at: DateTime<Utc>,
) -> Option<DateTime<Utc>> {
let manifest = cache.manifest(manifest_id).await?;
let timeout = humantime::parse_duration(&manifest.execute.timeout).ok()?;
Some(reap_deadline(timeout, recorded_at))
}
pub(crate) const CONSUMER_NAME: &str = "backend_events_projector";
pub async fn run(js: jetstream::Context, pool: SqlitePool, cache: ExplodeSpecCache) -> Result<()> {
let stream = js
.get_stream(STREAM_EVENTS)
.await
.with_context(|| format!("get stream {STREAM_EVENTS}"))?;
let consumer = stream
.get_or_create_consumer(
CONSUMER_NAME,
PullConfig {
durable_name: Some(CONSUMER_NAME.into()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
filter_subject: EVENTS_STARTED_FILTER.into(),
..Default::default()
},
)
.await
.context("create events consumer")?;
info!(
stream = STREAM_EVENTS,
consumer = CONSUMER_NAME,
filter = EVENTS_STARTED_FILTER,
"events projector started"
);
let mut messages = consumer
.messages()
.await
.context("subscribe events messages")?;
while let Some(msg) = messages.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "events consumer error");
continue;
}
};
let recorded_at = super::publish_time(&msg);
match serde_json::from_slice::<EventStarted>(&msg.payload) {
Ok(e) => {
let expires_at = resolve_expires_at(&cache, &e.manifest_id, recorded_at).await;
if let Err(err) = insert_inflight_row(&pool, &e, recorded_at, expires_at).await {
warn!(
error = %err,
result_id = %e.result_id,
exec_id = %e.exec_id,
pc_id = %e.pc_id,
"events_started insert failed — skipping ack so JetStream redelivers",
);
continue;
}
debug!(
result_id = %e.result_id,
exec_id = %e.exec_id,
pc_id = %e.pc_id,
manifest_id = %e.manifest_id,
"projected events.started",
);
match promote_execution_running(&pool, &e.exec_id).await {
Ok(n) if n > 0 => {
debug!(exec_id = %e.exec_id, "promoted execution pending→running")
}
Ok(_) => {}
Err(err) => warn!(
error = %err,
exec_id = %e.exec_id,
"promote execution to running failed (non-fatal)",
),
}
}
Err(e) => warn!(
error = %e,
subject = %msg.subject,
"deserialize EventStarted",
),
}
if let Err(e) = msg.ack().await {
warn!(error = ?e, "ack events message");
}
}
Ok(())
}
async fn insert_inflight_row(
pool: &SqlitePool,
e: &EventStarted,
recorded_at: DateTime<Utc>,
expires_at: Option<DateTime<Utc>>,
) -> Result<()> {
sqlx::query(
"INSERT INTO execution_results (
result_id, request_id, exec_id, pc_id, started_at,
version, job_id, recorded_at, expires_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(result_id) DO NOTHING",
)
.bind(&e.result_id)
.bind(&e.request_id)
.bind(&e.exec_id)
.bind(&e.pc_id)
.bind(e.started_at)
.bind(&e.version)
.bind(&e.manifest_id)
.bind(recorded_at)
.bind(expires_at)
.execute(pool)
.await?;
Ok(())
}
async fn promote_execution_running(pool: &SqlitePool, exec_id: &str) -> Result<u64> {
let rows = sqlx::query(
"UPDATE executions SET status = 'running'
WHERE exec_id = ? AND status = 'pending'",
)
.bind(exec_id)
.execute(pool)
.await?;
Ok(rows.rows_affected())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
fn sample(result_id: &str, exec_id: &str, pc_id: &str) -> EventStarted {
EventStarted {
result_id: result_id.into(),
request_id: "req-1".into(),
exec_id: exec_id.into(),
pc_id: pc_id.into(),
started_at: Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 0).unwrap(),
manifest_id: "inv-hw".into(),
version: "1.0.0".into(),
}
}
#[tokio::test]
async fn events_started_creates_inflight_row() {
let pool = fresh_pool().await;
insert_inflight_row(&pool, &sample("r1", "e1", "pc1"), Utc::now(), None)
.await
.unwrap();
let row: (Option<i64>, Option<chrono::DateTime<chrono::Utc>>) = sqlx::query_as(
"SELECT exit_code, finished_at FROM execution_results WHERE result_id = ?",
)
.bind("r1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, None, "in-flight rows have no exit_code yet");
assert_eq!(row.1, None, "in-flight rows have no finished_at yet");
}
#[tokio::test]
async fn events_started_redelivery_is_idempotent() {
let pool = fresh_pool().await;
let e = sample("r1", "e1", "pc1");
insert_inflight_row(&pool, &e, Utc::now(), None)
.await
.unwrap();
insert_inflight_row(&pool, &e, Utc::now(), None)
.await
.unwrap();
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM execution_results WHERE result_id = ?")
.bind("r1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1);
}
#[tokio::test]
async fn events_started_after_result_does_not_overwrite() {
let pool = fresh_pool().await;
sqlx::query(
"INSERT INTO execution_results (
result_id, request_id, exec_id, pc_id, exit_code,
started_at, finished_at, job_id, version
) VALUES (
'r1', 'req-1', 'e1', 'pc1', 0,
'2026-05-20T12:00:00Z', '2026-05-20T12:00:05Z',
'inv-hw', '1.0.0'
)",
)
.execute(&pool)
.await
.unwrap();
insert_inflight_row(&pool, &sample("r1", "e1", "pc1"), Utc::now(), None)
.await
.unwrap();
let row: (i64, Option<chrono::DateTime<chrono::Utc>>) = sqlx::query_as(
"SELECT COUNT(*) AS n, MAX(finished_at) AS f FROM execution_results WHERE result_id = ?",
)
.bind("r1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 1);
assert!(
row.1.is_some(),
"finished_at must remain set; events.started must not clobber it",
);
}
#[tokio::test]
async fn broadcast_fan_out_creates_one_row_per_pc() {
let pool = fresh_pool().await;
insert_inflight_row(&pool, &sample("r1", "e1", "pc1"), Utc::now(), None)
.await
.unwrap();
insert_inflight_row(&pool, &sample("r2", "e1", "pc2"), Utc::now(), None)
.await
.unwrap();
insert_inflight_row(&pool, &sample("r3", "e1", "pc3"), Utc::now(), None)
.await
.unwrap();
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM execution_results WHERE exec_id = ?")
.bind("e1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 3);
}
#[tokio::test]
async fn insert_inflight_row_persists_expires_at() {
let pool = fresh_pool().await;
let recorded = Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 0).unwrap();
let expires = reap_deadline(Duration::from_secs(360), recorded);
insert_inflight_row(&pool, &sample("r1", "e1", "pc1"), recorded, Some(expires))
.await
.unwrap();
let row: (Option<DateTime<Utc>>,) =
sqlx::query_as("SELECT expires_at FROM execution_results WHERE result_id = ?")
.bind("r1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, Some(expires));
}
#[test]
fn reap_deadline_uses_min_slack_for_short_timeouts() {
let recorded = Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 0).unwrap();
let d = reap_deadline(Duration::from_secs(20), recorded);
assert_eq!(d, recorded + chrono::Duration::seconds(140));
}
#[test]
fn reap_deadline_scales_slack_for_long_timeouts() {
let recorded = Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 0).unwrap();
let d = reap_deadline(Duration::from_secs(3600), recorded);
assert_eq!(d, recorded + chrono::Duration::minutes(90));
}
async fn insert_execution(pool: &SqlitePool, exec_id: &str, status: &str) {
sqlx::query(
"INSERT INTO executions
(exec_id, job_id, version, initiated_by, target_count, status)
VALUES (?, 'j', '1.0', 'tester', 1, ?)",
)
.bind(exec_id)
.bind(status)
.execute(pool)
.await
.unwrap();
}
async fn exec_status(pool: &SqlitePool, exec_id: &str) -> String {
sqlx::query_scalar("SELECT status FROM executions WHERE exec_id = ?")
.bind(exec_id)
.fetch_one(pool)
.await
.unwrap()
}
#[tokio::test]
async fn promote_execution_running_flips_pending() {
let pool = fresh_pool().await;
insert_execution(&pool, "e1", "pending").await;
let n = promote_execution_running(&pool, "e1").await.unwrap();
assert_eq!(n, 1);
assert_eq!(exec_status(&pool, "e1").await, "running");
}
#[tokio::test]
async fn promote_execution_running_is_idempotent_and_safe() {
let pool = fresh_pool().await;
insert_execution(&pool, "done", "completed").await;
insert_execution(&pool, "gone", "expired").await;
assert_eq!(promote_execution_running(&pool, "done").await.unwrap(), 0);
assert_eq!(promote_execution_running(&pool, "gone").await.unwrap(), 0);
assert_eq!(exec_status(&pool, "done").await, "completed");
assert_eq!(exec_status(&pool, "gone").await, "expired");
assert_eq!(promote_execution_running(&pool, "nope").await.unwrap(), 0);
}
}