use std::future::Future;
use std::pin::Pin;
use sha2::{Digest as _, Sha256};
pub type IsrFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub trait IsrCoordinator: Send + Sync + 'static {
fn backend(&self) -> &'static str;
fn try_acquire<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, bool>;
fn release<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, ()>;
}
#[derive(Debug, Default)]
pub struct LocalIsrCoordinator;
impl LocalIsrCoordinator {
#[must_use]
pub const fn new() -> Self {
Self
}
}
impl IsrCoordinator for LocalIsrCoordinator {
fn backend(&self) -> &'static str {
"local"
}
fn try_acquire<'a>(&'a self, _url_path: &'a str, _window_key: &'a str) -> IsrFuture<'a, bool> {
Box::pin(async move { true })
}
fn release<'a>(&'a self, _url_path: &'a str, _window_key: &'a str) -> IsrFuture<'a, ()> {
Box::pin(async move {})
}
}
#[cfg(feature = "db")]
pub struct PostgresIsrCoordinator {
pool: diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
held: std::sync::Mutex<
std::collections::HashMap<
i64,
diesel_async::pooled_connection::deadpool::Object<diesel_async::AsyncPgConnection>,
>,
>,
}
#[cfg(feature = "db")]
impl PostgresIsrCoordinator {
#[must_use]
pub fn new(
pool: diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
) -> Self {
Self {
pool,
held: std::sync::Mutex::new(std::collections::HashMap::new()),
}
}
}
#[cfg(feature = "db")]
impl IsrCoordinator for PostgresIsrCoordinator {
fn backend(&self) -> &'static str {
"postgres"
}
fn try_acquire<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, bool> {
let lock_key = isr_advisory_lock_key(url_path, window_key);
Box::pin(async move {
let mut conn = match self.pool.get().await {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "ISR coordinator: could not get Postgres connection");
return false;
}
};
match try_pg_advisory_lock(&mut conn, lock_key).await {
Ok(true) => {
self.held.lock().unwrap().insert(lock_key, conn);
true
}
Ok(false) => false,
Err(e) => {
tracing::warn!(error = %e, "ISR coordinator: pg_try_advisory_lock failed");
false
}
}
})
}
fn release<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, ()> {
let lock_key = isr_advisory_lock_key(url_path, window_key);
Box::pin(async move {
let maybe_conn = self.held.lock().unwrap().remove(&lock_key);
let Some(mut conn) = maybe_conn else {
tracing::warn!(
lock_key,
"ISR coordinator: release called without a held connection"
);
return;
};
match unlock_pg_advisory_lock(&mut conn, lock_key).await {
Ok(false) => {
tracing::warn!(
lock_key,
"ISR coordinator: pg_advisory_unlock returned false (lock already released)"
);
}
Ok(true) => {}
Err(e) => {
tracing::warn!(error = %e, "ISR coordinator: pg_advisory_unlock failed");
}
}
})
}
}
#[must_use]
pub fn isr_window_key(url_path: &str, revalidate_secs: u64, now_unix_secs: u64) -> String {
let interval = revalidate_secs.max(1);
let bucket = now_unix_secs / interval;
format!("{url_path}:{bucket}")
}
#[must_use]
pub fn isr_advisory_lock_key(url_path: &str, window_key: &str) -> i64 {
let mut hasher = Sha256::new();
hasher.update(b"isr\0");
hasher.update(url_path.as_bytes());
hasher.update(b"\0");
hasher.update(window_key.as_bytes());
let digest = hasher.finalize();
let mut bytes = [0_u8; 8];
bytes.copy_from_slice(&digest[..8]);
i64::from_be_bytes(bytes)
}
#[cfg(feature = "db")]
#[derive(diesel::QueryableByName)]
struct PgAdvisoryLockRow {
#[diesel(sql_type = diesel::sql_types::Bool)]
acquired: bool,
}
#[cfg(feature = "db")]
async fn try_pg_advisory_lock(
conn: &mut diesel_async::pooled_connection::deadpool::Object<diesel_async::AsyncPgConnection>,
key: i64,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
use diesel_async::RunQueryDsl as _;
let row = diesel::sql_query("SELECT pg_try_advisory_lock($1) AS acquired")
.bind::<diesel::sql_types::BigInt, _>(key)
.get_result::<PgAdvisoryLockRow>(&mut **conn)
.await?;
Ok(row.acquired)
}
#[cfg(feature = "db")]
#[derive(diesel::QueryableByName)]
struct PgAdvisoryUnlockRow {
#[diesel(sql_type = diesel::sql_types::Bool)]
released: bool,
}
#[cfg(feature = "db")]
async fn unlock_pg_advisory_lock(
conn: &mut diesel_async::pooled_connection::deadpool::Object<diesel_async::AsyncPgConnection>,
key: i64,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
use diesel_async::RunQueryDsl as _;
let row = diesel::sql_query("SELECT pg_advisory_unlock($1) AS released")
.bind::<diesel::sql_types::BigInt, _>(key)
.get_result::<PgAdvisoryUnlockRow>(&mut **conn)
.await?;
Ok(row.released)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn window_key_stable_within_interval() {
let a = isr_window_key("/about", 60, 1_700_000_000);
let b = isr_window_key("/about", 60, 1_700_000_039);
assert_eq!(a, b);
}
#[test]
fn window_key_changes_on_boundary() {
let a = isr_window_key("/about", 60, 1_700_000_039);
let b = isr_window_key("/about", 60, 1_700_000_040);
assert_ne!(a, b);
}
#[test]
fn window_key_route_prefix() {
let key = isr_window_key("/about", 60, 1_700_000_000);
assert!(
key.starts_with("/about:"),
"key should start with route: {key}"
);
}
#[test]
fn window_key_zero_revalidate_no_panic() {
let key = isr_window_key("/edge", 0, 42);
assert!(!key.is_empty());
}
#[test]
fn advisory_key_deterministic() {
let a = isr_advisory_lock_key("/about", "/about:28333333");
let b = isr_advisory_lock_key("/about", "/about:28333333");
assert_eq!(a, b);
}
#[test]
fn advisory_key_differs_by_route() {
let a = isr_advisory_lock_key("/", "/about:28333333");
let b = isr_advisory_lock_key("/about", "/about:28333333");
assert_ne!(a, b);
}
#[test]
fn advisory_key_differs_by_window() {
let a = isr_advisory_lock_key("/about", "/about:1");
let b = isr_advisory_lock_key("/about", "/about:2");
assert_ne!(a, b);
}
#[tokio::test]
async fn local_coordinator_always_grants() {
let c = LocalIsrCoordinator::new();
assert!(c.try_acquire("/a", "w1").await);
assert!(c.try_acquire("/a", "w1").await);
}
#[tokio::test]
async fn local_coordinator_release_is_noop() {
let c = LocalIsrCoordinator::new();
c.release("/a", "w1").await;
assert!(c.try_acquire("/a", "w1").await);
}
}