use anyhow::{Context, Result};
use async_nats::jetstream::{self, consumer::pull::Config as PullConfig};
use futures::StreamExt;
use kanade_shared::kv::STREAM_EVENTS;
use kanade_shared::subject::EVENTS_STARTED_FILTER;
use kanade_shared::wire::EventStarted;
use sqlx::SqlitePool;
use tracing::{info, warn};
const CONSUMER_NAME: &str = "backend_events_projector";
pub async fn run(js: jetstream::Context, pool: SqlitePool) -> Result<()> {
let stream = js
.get_stream(STREAM_EVENTS)
.await
.with_context(|| format!("get stream {STREAM_EVENTS}"))?;
let consumer = stream
.get_or_create_consumer(
CONSUMER_NAME,
PullConfig {
durable_name: Some(CONSUMER_NAME.into()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
filter_subject: EVENTS_STARTED_FILTER.into(),
..Default::default()
},
)
.await
.context("create events consumer")?;
info!(
stream = STREAM_EVENTS,
consumer = CONSUMER_NAME,
filter = EVENTS_STARTED_FILTER,
"events projector started"
);
let mut messages = consumer
.messages()
.await
.context("subscribe events messages")?;
while let Some(msg) = messages.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "events consumer error");
continue;
}
};
match serde_json::from_slice::<EventStarted>(&msg.payload) {
Ok(e) => {
if let Err(err) = insert_inflight_row(&pool, &e).await {
warn!(
error = %err,
result_id = %e.result_id,
exec_id = %e.exec_id,
pc_id = %e.pc_id,
"events_started insert failed — skipping ack so JetStream redelivers",
);
continue;
}
info!(
result_id = %e.result_id,
exec_id = %e.exec_id,
pc_id = %e.pc_id,
manifest_id = %e.manifest_id,
"projected events.started",
);
}
Err(e) => warn!(
error = %e,
subject = %msg.subject,
"deserialize EventStarted",
),
}
if let Err(e) = msg.ack().await {
warn!(error = ?e, "ack events message");
}
}
Ok(())
}
async fn insert_inflight_row(pool: &SqlitePool, e: &EventStarted) -> Result<()> {
sqlx::query(
"INSERT INTO execution_results (
result_id, request_id, exec_id, pc_id, started_at,
version, job_id
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(result_id) DO NOTHING",
)
.bind(&e.result_id)
.bind(&e.request_id)
.bind(&e.exec_id)
.bind(&e.pc_id)
.bind(e.started_at)
.bind(&e.version)
.bind(&e.manifest_id)
.execute(pool)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
fn sample(result_id: &str, exec_id: &str, pc_id: &str) -> EventStarted {
EventStarted {
result_id: result_id.into(),
request_id: "req-1".into(),
exec_id: exec_id.into(),
pc_id: pc_id.into(),
started_at: Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 0).unwrap(),
manifest_id: "inv-hw".into(),
version: "1.0.0".into(),
}
}
#[tokio::test]
async fn events_started_creates_inflight_row() {
let pool = fresh_pool().await;
insert_inflight_row(&pool, &sample("r1", "e1", "pc1"))
.await
.unwrap();
let row: (Option<i64>, Option<chrono::DateTime<chrono::Utc>>) = sqlx::query_as(
"SELECT exit_code, finished_at FROM execution_results WHERE result_id = ?",
)
.bind("r1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, None, "in-flight rows have no exit_code yet");
assert_eq!(row.1, None, "in-flight rows have no finished_at yet");
}
#[tokio::test]
async fn events_started_redelivery_is_idempotent() {
let pool = fresh_pool().await;
let e = sample("r1", "e1", "pc1");
insert_inflight_row(&pool, &e).await.unwrap();
insert_inflight_row(&pool, &e).await.unwrap();
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM execution_results WHERE result_id = ?")
.bind("r1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1);
}
#[tokio::test]
async fn events_started_after_result_does_not_overwrite() {
let pool = fresh_pool().await;
sqlx::query(
"INSERT INTO execution_results (
result_id, request_id, exec_id, pc_id, exit_code,
started_at, finished_at, job_id, version
) VALUES (
'r1', 'req-1', 'e1', 'pc1', 0,
'2026-05-20T12:00:00Z', '2026-05-20T12:00:05Z',
'inv-hw', '1.0.0'
)",
)
.execute(&pool)
.await
.unwrap();
insert_inflight_row(&pool, &sample("r1", "e1", "pc1"))
.await
.unwrap();
let row: (i64, Option<chrono::DateTime<chrono::Utc>>) = sqlx::query_as(
"SELECT COUNT(*) AS n, MAX(finished_at) AS f FROM execution_results WHERE result_id = ?",
)
.bind("r1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 1);
assert!(
row.1.is_some(),
"finished_at must remain set; events.started must not clobber it",
);
}
#[tokio::test]
async fn broadcast_fan_out_creates_one_row_per_pc() {
let pool = fresh_pool().await;
insert_inflight_row(&pool, &sample("r1", "e1", "pc1"))
.await
.unwrap();
insert_inflight_row(&pool, &sample("r2", "e1", "pc2"))
.await
.unwrap();
insert_inflight_row(&pool, &sample("r3", "e1", "pc3"))
.await
.unwrap();
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM execution_results WHERE exec_id = ?")
.bind("e1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 3);
}
}