use anyhow::{Context, Result};
use async_nats::jetstream::{self, consumer::pull::Config as PullConfig};
use futures::StreamExt;
use kanade_shared::ipc::notifications::{NotificationAcked, NotificationUnacked};
use kanade_shared::kv::STREAM_EVENTS;
use kanade_shared::subject::EVENTS_NOTIFICATIONS_FILTER;
use sqlx::SqlitePool;
use tracing::{debug, info, warn};
pub(crate) const CONSUMER_NAME: &str = "backend_notification_acks_projector_v2";
pub async fn run(js: jetstream::Context, pool: SqlitePool) -> Result<()> {
let stream = js
.get_stream(STREAM_EVENTS)
.await
.with_context(|| format!("get stream {STREAM_EVENTS}"))?;
let consumer = stream
.get_or_create_consumer(
CONSUMER_NAME,
PullConfig {
durable_name: Some(CONSUMER_NAME.into()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
filter_subject: EVENTS_NOTIFICATIONS_FILTER.into(),
max_ack_pending: 1,
..Default::default()
},
)
.await
.context("create notification-acks consumer")?;
info!(
stream = STREAM_EVENTS,
consumer = CONSUMER_NAME,
filter = EVENTS_NOTIFICATIONS_FILTER,
"notification-acks projector started"
);
let mut messages = consumer
.messages()
.await
.context("subscribe notification-acks messages")?;
while let Some(msg) = messages.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "notification-acks consumer error");
continue;
}
};
let recorded_at = super::publish_time(&msg);
let subject = msg.subject.as_str();
let is_unack = is_unack_subject(subject);
let outcome = if is_unack {
match serde_json::from_slice::<NotificationUnacked>(&msg.payload) {
Ok(u) => apply_unack(&pool, &u, recorded_at).await.map(|_| {
debug!(
notification_id = %u.notification_id,
pc_id = %u.pc_id,
user_sid = %u.user_sid,
"projected notification unack",
);
}),
Err(e) => {
warn!(error = %e, %subject, "deserialize NotificationUnacked — dropping (deliberate ack)");
Ok(())
}
}
} else {
match serde_json::from_slice::<NotificationAcked>(&msg.payload) {
Ok(a) => apply_ack(&pool, &a, recorded_at).await.map(|_| {
debug!(
notification_id = %a.notification_id,
pc_id = %a.pc_id,
user_sid = %a.user_sid,
"projected notification ack",
);
}),
Err(e) => {
warn!(error = %e, %subject, "deserialize NotificationAcked — dropping (deliberate ack)");
Ok(())
}
}
};
if let Err(err) = outcome {
warn!(
error = %err,
%subject,
"notification ack/unack projection failed — skipping ack so JetStream redelivers",
);
continue;
}
if let Err(e) = msg.ack().await {
warn!(error = ?e, "ack notification-acks message");
}
}
Ok(())
}
fn is_unack_subject(subject: &str) -> bool {
subject.starts_with("events.notifications.unacked.")
}
async fn apply_ack(
pool: &SqlitePool,
a: &NotificationAcked,
recorded_at: chrono::DateTime<chrono::Utc>,
) -> Result<()> {
let mut tx = pool.begin().await?;
append_event(
&mut tx,
&AckEventRow {
notification_id: &a.notification_id,
pc_id: &a.pc_id,
user_sid: &a.user_sid,
kind: "acked",
occurred_at: a.acked_at,
recorded_at,
account: a.account.as_deref(),
},
)
.await?;
sqlx::query(
"INSERT INTO notification_acks (
notification_id, pc_id, user_sid, acked_at, recorded_at, account, unacked_at
) VALUES (?, ?, ?, ?, ?, ?, NULL)
ON CONFLICT(notification_id, pc_id, user_sid) DO UPDATE SET
acked_at = excluded.acked_at,
recorded_at = excluded.recorded_at,
account = excluded.account,
unacked_at = NULL",
)
.bind(&a.notification_id)
.bind(&a.pc_id)
.bind(&a.user_sid)
.bind(a.acked_at)
.bind(recorded_at)
.bind(&a.account)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
async fn apply_unack(
pool: &SqlitePool,
u: &NotificationUnacked,
recorded_at: chrono::DateTime<chrono::Utc>,
) -> Result<()> {
let mut tx = pool.begin().await?;
append_event(
&mut tx,
&AckEventRow {
notification_id: &u.notification_id,
pc_id: &u.pc_id,
user_sid: &u.user_sid,
kind: "unacked",
occurred_at: u.unacked_at,
recorded_at,
account: u.account.as_deref(),
},
)
.await?;
sqlx::query(
"UPDATE notification_acks SET unacked_at = ?
WHERE notification_id = ? AND pc_id = ? AND user_sid = ?
AND acked_at <= ?",
)
.bind(u.unacked_at)
.bind(&u.notification_id)
.bind(&u.pc_id)
.bind(&u.user_sid)
.bind(u.unacked_at)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
struct AckEventRow<'a> {
notification_id: &'a str,
pc_id: &'a str,
user_sid: &'a str,
kind: &'a str,
occurred_at: chrono::DateTime<chrono::Utc>,
recorded_at: chrono::DateTime<chrono::Utc>,
account: Option<&'a str>,
}
async fn append_event(tx: &mut sqlx::SqliteConnection, event: &AckEventRow<'_>) -> Result<()> {
sqlx::query(
"INSERT INTO notification_ack_events (
notification_id, pc_id, user_sid, kind, occurred_at, recorded_at, account
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(notification_id, pc_id, user_sid, kind, occurred_at) DO NOTHING",
)
.bind(event.notification_id)
.bind(event.pc_id)
.bind(event.user_sid)
.bind(event.kind)
.bind(event.occurred_at)
.bind(event.recorded_at)
.bind(event.account)
.execute(&mut *tx)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
fn sample(notif: &str, pc: &str, sid: &str) -> NotificationAcked {
NotificationAcked {
notification_id: notif.into(),
pc_id: pc.into(),
user_sid: sid.into(),
acked_at: Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 5).unwrap(),
account: Some("EXAMPLE\\taro".into()),
}
}
fn sample_unack(notif: &str, pc: &str, sid: &str, secs: u32) -> NotificationUnacked {
NotificationUnacked {
notification_id: notif.into(),
pc_id: pc.into(),
user_sid: sid.into(),
unacked_at: Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, secs).unwrap(),
account: Some("EXAMPLE\\taro".into()),
}
}
async fn ack_state(
pool: &SqlitePool,
notif: &str,
sid: &str,
) -> Option<(
chrono::DateTime<chrono::Utc>,
Option<chrono::DateTime<chrono::Utc>>,
)> {
sqlx::query_as(
"SELECT acked_at, unacked_at FROM notification_acks
WHERE notification_id = ? AND user_sid = ?",
)
.bind(notif)
.bind(sid)
.fetch_optional(pool)
.await
.unwrap()
}
#[tokio::test]
async fn ack_insert_persists_confirmed_row() {
let pool = fresh_pool().await;
apply_ack(&pool, &sample("n1", "pc1", "S-1-5-21-1001"), Utc::now())
.await
.unwrap();
let (_acked, unacked) = ack_state(&pool, "n1", "S-1-5-21-1001").await.unwrap();
assert!(unacked.is_none(), "fresh ack ⇒ 確認済み (unacked_at NULL)");
let acct: (Option<String>,) =
sqlx::query_as("SELECT account FROM notification_acks WHERE notification_id = ?")
.bind("n1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(acct.0.as_deref(), Some("EXAMPLE\\taro"));
}
#[tokio::test]
async fn ack_redelivery_is_idempotent() {
let pool = fresh_pool().await;
let a = sample("n1", "pc1", "S-1-5-21-1001");
apply_ack(&pool, &a, Utc::now()).await.unwrap();
apply_ack(&pool, &a, Utc::now()).await.unwrap();
let acks: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM notification_acks WHERE notification_id = ?")
.bind("n1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(acks.0, 1);
let events: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM notification_ack_events WHERE notification_id = ?",
)
.bind("n1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(events.0, 1, "duplicate ack event deduped in audit log");
}
#[tokio::test]
async fn unack_flips_to_revoked_but_keeps_acked_at_and_audits_both() {
let pool = fresh_pool().await;
apply_ack(&pool, &sample("n1", "pc1", "S-1-5-21-1001"), Utc::now())
.await
.unwrap();
apply_unack(
&pool,
&sample_unack("n1", "pc1", "S-1-5-21-1001", 30),
Utc::now(),
)
.await
.unwrap();
let (acked, unacked) = ack_state(&pool, "n1", "S-1-5-21-1001").await.unwrap();
assert_eq!(
acked,
Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 5).unwrap(),
"acked_at retained after revoke (so the operator sees they had confirmed)"
);
assert_eq!(
unacked,
Some(Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 30).unwrap()),
"unacked_at stamped ⇒ 取消済み"
);
let kinds: Vec<(String,)> = sqlx::query_as(
"SELECT kind FROM notification_ack_events
WHERE notification_id = ? ORDER BY occurred_at",
)
.bind("n1")
.fetch_all(&pool)
.await
.unwrap();
let kinds: Vec<&str> = kinds.iter().map(|k| k.0.as_str()).collect();
assert_eq!(kinds, vec!["acked", "unacked"]);
}
#[tokio::test]
async fn reack_after_unack_returns_to_confirmed() {
let pool = fresh_pool().await;
apply_ack(&pool, &sample("n1", "pc1", "S-1-5-21-1001"), Utc::now())
.await
.unwrap();
apply_unack(
&pool,
&sample_unack("n1", "pc1", "S-1-5-21-1001", 30),
Utc::now(),
)
.await
.unwrap();
let mut reack = sample("n1", "pc1", "S-1-5-21-1001");
reack.acked_at = Utc.with_ymd_and_hms(2026, 5, 20, 12, 1, 0).unwrap();
apply_ack(&pool, &reack, Utc::now()).await.unwrap();
let (acked, unacked) = ack_state(&pool, "n1", "S-1-5-21-1001").await.unwrap();
assert_eq!(acked, Utc.with_ymd_and_hms(2026, 5, 20, 12, 1, 0).unwrap());
assert!(
unacked.is_none(),
"re-ack clears the revoke ⇒ 確認済み again"
);
let events: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM notification_ack_events WHERE notification_id = ?",
)
.bind("n1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(events.0, 3, "audit log keeps ack + unack + re-ack");
}
#[tokio::test]
async fn stale_unack_does_not_overwrite_a_newer_reack() {
let pool = fresh_pool().await;
let mut standing = sample("n1", "pc1", "S-1-5-21-1001");
standing.acked_at = Utc.with_ymd_and_hms(2026, 5, 20, 12, 1, 0).unwrap();
apply_ack(&pool, &standing, Utc::now()).await.unwrap();
apply_unack(
&pool,
&sample_unack("n1", "pc1", "S-1-5-21-1001", 30),
Utc::now(),
)
.await
.unwrap();
let (acked, unacked) = ack_state(&pool, "n1", "S-1-5-21-1001").await.unwrap();
assert_eq!(acked, Utc.with_ymd_and_hms(2026, 5, 20, 12, 1, 0).unwrap());
assert!(
unacked.is_none(),
"stale unack must not flip a newer standing ack to 取消済み"
);
let events: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM notification_ack_events
WHERE notification_id = ? AND kind = 'unacked'",
)
.bind("n1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(events.0, 1, "stale unack still audited");
}
#[tokio::test]
async fn orphan_unack_is_noop_on_read_model_but_audited() {
let pool = fresh_pool().await;
apply_unack(
&pool,
&sample_unack("n1", "pc1", "S-1-5-21-1001", 30),
Utc::now(),
)
.await
.unwrap();
assert!(
ack_state(&pool, "n1", "S-1-5-21-1001").await.is_none(),
"orphan unack creates no read-model row"
);
let events: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM notification_ack_events WHERE notification_id = ?",
)
.bind("n1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(events.0, 1, "orphan unack still audited");
}
#[test]
fn unack_subject_discrimination_is_prefix_based() {
assert!(is_unack_subject(
"events.notifications.unacked.PC1.S-1-5-21-1.notif-9f3a"
));
assert!(!is_unack_subject(
"events.notifications.acked.PC1.S-1-5-21-1.notif-9f3a"
));
assert!(!is_unack_subject(
"events.notifications.acked.PC1.S-1-5-21-1.my.unacked.notification"
));
}
#[tokio::test]
async fn distinct_users_on_same_pc_each_get_a_row() {
let pool = fresh_pool().await;
apply_ack(&pool, &sample("n1", "pc1", "S-1-5-21-1001"), Utc::now())
.await
.unwrap();
apply_ack(&pool, &sample("n1", "pc1", "S-1-5-21-1002"), Utc::now())
.await
.unwrap();
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM notification_acks WHERE notification_id = ?")
.bind("n1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 2);
}
}