use anyhow::{Context, Result};
use async_nats::jetstream::{self, consumer::pull::Config as PullConfig};
use futures::StreamExt;
use kanade_shared::kv::STREAM_OBS_EVENTS;
use kanade_shared::subject::OBS_FILTER;
use kanade_shared::wire::ObsEvent;
use sqlx::SqlitePool;
use tracing::{info, warn};
const CONSUMER_NAME: &str = "backend_obs_events_projector";
pub async fn run(js: jetstream::Context, pool: SqlitePool) -> Result<()> {
let stream = js
.get_stream(STREAM_OBS_EVENTS)
.await
.with_context(|| format!("get stream {STREAM_OBS_EVENTS}"))?;
let consumer = stream
.get_or_create_consumer(
CONSUMER_NAME,
PullConfig {
durable_name: Some(CONSUMER_NAME.into()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
filter_subject: OBS_FILTER.into(),
..Default::default()
},
)
.await
.context("create obs_events consumer")?;
info!(
stream = STREAM_OBS_EVENTS,
consumer = CONSUMER_NAME,
filter = OBS_FILTER,
"obs_events projector started"
);
let mut messages = consumer
.messages()
.await
.context("subscribe obs_events messages")?;
while let Some(msg) = messages.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "obs_events consumer error");
continue;
}
};
match serde_json::from_slice::<ObsEvent>(&msg.payload) {
Ok(e) => {
if let Err(err) = insert_event(&pool, &e).await {
warn!(
error = %err,
pc_id = %e.pc_id,
kind = %e.kind,
source = %e.source,
"obs_events insert failed — skipping ack so JetStream redelivers",
);
continue;
}
tracing::trace!(
pc_id = %e.pc_id,
kind = %e.kind,
source = %e.source,
at = %e.at,
"projected obs_event",
);
}
Err(e) => warn!(
error = %e,
subject = %msg.subject,
"deserialize ObsEvent",
),
}
if let Err(e) = msg.ack().await {
warn!(error = ?e, "ack obs_events message");
}
}
Ok(())
}
async fn insert_event(pool: &SqlitePool, e: &ObsEvent) -> Result<()> {
let payload_json = e.payload.to_string();
sqlx::query(
"INSERT OR IGNORE INTO obs_events (
pc_id, at, kind, source, event_record_id, payload
) VALUES (?, ?, ?, ?, ?, ?)",
)
.bind(&e.pc_id)
.bind(e.at)
.bind(&e.kind)
.bind(&e.source)
.bind(&e.event_record_id)
.bind(&payload_json)
.execute(pool)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use serde_json::json;
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
fn logon_event(pc_id: &str, event_record_id: &str) -> ObsEvent {
ObsEvent {
pc_id: pc_id.into(),
at: Utc.with_ymd_and_hms(2026, 5, 28, 10, 41, 0).unwrap(),
kind: "logon".into(),
source: "winlog:Security".into(),
event_record_id: Some(event_record_id.into()),
payload: json!({ "user": "yukimemi", "logon_type": 2 }),
}
}
#[tokio::test]
async fn insert_event_writes_one_row() {
let pool = fresh_pool().await;
insert_event(&pool, &logon_event("minipc", "1234567"))
.await
.unwrap();
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM obs_events")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn duplicate_same_keys_is_no_op() {
let pool = fresh_pool().await;
let e = logon_event("minipc", "1234567");
insert_event(&pool, &e).await.unwrap();
insert_event(&pool, &e).await.unwrap();
insert_event(&pool, &e).await.unwrap();
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM obs_events")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn different_pcs_with_same_record_id_both_persist() {
let pool = fresh_pool().await;
insert_event(&pool, &logon_event("minipc", "1234"))
.await
.unwrap();
insert_event(&pool, &logon_event("laptop", "1234"))
.await
.unwrap();
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM obs_events")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 2);
}
#[tokio::test]
async fn payload_round_trips_through_text_column() {
let pool = fresh_pool().await;
let e = logon_event("minipc", "rec-1");
insert_event(&pool, &e).await.unwrap();
let (raw,): (String,) =
sqlx::query_as("SELECT payload FROM obs_events WHERE event_record_id = ?")
.bind("rec-1")
.fetch_one(&pool)
.await
.unwrap();
let parsed: serde_json::Value = serde_json::from_str(&raw).unwrap();
assert_eq!(parsed, e.payload);
}
#[tokio::test]
async fn null_event_record_id_persists() {
let pool = fresh_pool().await;
let e = ObsEvent {
pc_id: "minipc".into(),
at: Utc.with_ymd_and_hms(2026, 5, 28, 10, 0, 0).unwrap(),
kind: "agent_started".into(),
source: "agent:internal".into(),
event_record_id: None,
payload: serde_json::Value::Null,
};
insert_event(&pool, &e).await.unwrap();
let later = ObsEvent {
at: Utc.with_ymd_and_hms(2026, 5, 28, 11, 0, 0).unwrap(),
..e.clone()
};
insert_event(&pool, &later).await.unwrap();
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM obs_events")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 2, "NULL ≠ NULL: each agent_started is its own row");
}
}