use crate::db::DbPool;
use crate::error::AppResult;
pub const DEFAULT_WINDOW_SECS: u64 = 300;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DedupOutcome {
Fresh,
Duplicate { existing_execution_id: i64 },
}
pub async fn ensure_table(pool: &DbPool) -> AppResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS noetl.subscription_dedup (
subscription_id BIGINT NOT NULL,
dedup_key TEXT NOT NULL,
execution_id BIGINT NOT NULL,
seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (subscription_id, dedup_key)
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS subscription_dedup_scope_seen
ON noetl.subscription_dedup (subscription_id, seen_at)
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn claim(
pool: &DbPool,
subscription_id: i64,
dedup_key: &str,
window_secs: u64,
execution_id: i64,
) -> AppResult<DedupOutcome> {
sqlx::query(
"DELETE FROM noetl.subscription_dedup \
WHERE subscription_id = $1 AND seen_at < now() - make_interval(secs => $2)",
)
.bind(subscription_id)
.bind(window_secs as f64)
.execute(pool)
.await?;
let inserted = sqlx::query(
"INSERT INTO noetl.subscription_dedup (subscription_id, dedup_key, execution_id, seen_at) \
VALUES ($1, $2, $3, now()) \
ON CONFLICT (subscription_id, dedup_key) DO NOTHING",
)
.bind(subscription_id)
.bind(dedup_key)
.bind(execution_id)
.execute(pool)
.await?;
if inserted.rows_affected() == 1 {
return Ok(DedupOutcome::Fresh);
}
let existing: i64 = sqlx::query_scalar(
"SELECT execution_id FROM noetl.subscription_dedup \
WHERE subscription_id = $1 AND dedup_key = $2",
)
.bind(subscription_id)
.bind(dedup_key)
.fetch_one(pool)
.await?;
Ok(DedupOutcome::Duplicate {
existing_execution_id: existing,
})
}