use anyhow::Result;
use async_nats::jetstream;
use async_nats::jetstream::ErrorCode;
use async_nats::jetstream::stream::ConsumerErrorKind;
use kanade_shared::kv::{STREAM_AUDIT, STREAM_EVENTS, STREAM_OBS_EVENTS, STREAM_RESULTS};
use sqlx::SqlitePool;
use tracing::{info, warn};
fn projector_consumers() -> [(&'static str, &'static str, &'static str); 4] {
[
(
STREAM_RESULTS,
super::results::CONSUMER_NAME,
"execution_results",
),
(
STREAM_EVENTS,
super::events::CONSUMER_NAME,
"execution_results",
),
(STREAM_AUDIT, super::audit::CONSUMER_NAME, "audit_log"),
(
STREAM_OBS_EVENTS,
super::obs_events::CONSUMER_NAME,
"obs_events",
),
]
}
pub async fn reset_if_wiped(js: &jetstream::Context, pool: &SqlitePool) -> Result<()> {
for (stream_name, consumer, table) in projector_consumers() {
match table_is_empty(pool, table).await {
Ok(true) => {}
Ok(false) => continue,
Err(e) => {
warn!(error = %e, table, "consumer reset: emptiness check failed; skipping");
continue;
}
}
let stream = match js.get_stream(stream_name).await {
Ok(s) => s,
Err(e) => {
warn!(error = %e, stream = stream_name, "consumer reset: get_stream failed; skipping");
continue;
}
};
match stream.delete_consumer(consumer).await {
Ok(_) => info!(
stream = stream_name,
consumer,
table,
"projection table is empty — dropped stale durable consumer; \
projector will re-derive from the stream",
),
Err(e)
if matches!(
e.kind(),
ConsumerErrorKind::JetStream(js_err)
if js_err.error_code() == ErrorCode::CONSUMER_NOT_FOUND
) => {}
Err(e) => {
warn!(error = %e, stream = stream_name, consumer, "consumer reset: delete failed; projector resumes from old position");
}
}
}
Ok(())
}
async fn table_is_empty(pool: &SqlitePool, table: &str) -> Result<bool> {
let exists: (i64,) = sqlx::query_as(sqlx::AssertSqlSafe(format!(
"SELECT EXISTS(SELECT 1 FROM {table})"
)))
.fetch_one(pool)
.await?;
Ok(exists.0 == 0)
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
#[tokio::test]
async fn listed_tables_exist_and_start_empty() {
let pool = fresh_pool().await;
for (_, _, table) in projector_consumers() {
assert!(
table_is_empty(&pool, table).await.unwrap(),
"{table} should exist and be empty on a fresh DB",
);
}
}
#[tokio::test]
async fn non_empty_table_is_detected() {
let pool = fresh_pool().await;
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, stdout, stderr, started_at)
VALUES ('r1', 'req1', 'pc1', '', '', '2026-06-01T00:00:00Z')",
)
.execute(&pool)
.await
.unwrap();
assert!(!table_is_empty(&pool, "execution_results").await.unwrap());
assert!(table_is_empty(&pool, "audit_log").await.unwrap());
}
}