use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{
Arc, OnceLock, RwLock,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use diesel::OptionalExtension as _;
use diesel_async::RunQueryDsl as _;
use diesel_async::pooled_connection::deadpool::Pool;
use futures::FutureExt as _;
use serde::Serialize;
use serde_json::Value;
use sha2::Digest as _;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use crate::{AutumnError, AutumnResult};
type PgPool = Pool<diesel_async::AsyncPgConnection>;
type HookFuture = Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>>;
type HookRunner = Arc<dyn Fn(Value, Value) -> HookFuture + Send + Sync + 'static>;
pub const REPOSITORY_COMMIT_HOOK_MIGRATIONS: diesel_migrations::EmbeddedMigrations =
diesel_migrations::embed_migrations!("repository_commit_hook_migrations");
const HOOK_SELECT_COLS: &str = "id, handler_key, hook_name, context::TEXT AS context, \
record::TEXT AS record, status, attempt, max_attempts, initial_backoff_ms";
const HOOK_WORKER_IDLE_SLEEP: Duration = Duration::from_millis(250);
const HOOK_STALE_CLAIM_AFTER: Duration = Duration::from_secs(60);
const HOOK_CLAIM_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(15);
const HOOK_PENDING_FINALIZER_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(15);
const HOOK_AFTER_HOOK_FAILURE_MARK_RETRY_SLEEP: Duration = Duration::from_millis(100);
const HOOK_AFTER_HOOK_FAILURE_MARK_MAX_ATTEMPTS: usize = 3;
const HOOK_ACK_SUCCESS_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET status = 'completed', finished_at = NOW(), \
context = '{}'::JSONB, record = '{}'::JSONB, \
claimed_by = NULL, claimed_at = NULL, last_error = NULL \
WHERE id = $1 AND claimed_by = $2 AND status = 'running'";
const HOOK_EXTEND_CLAIM_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET claimed_at = NOW() \
WHERE id = $1 AND claimed_by = $2 AND status = 'running'";
const HOOK_ENQUEUE_INSERT_SQL: &str = "INSERT INTO autumn_repository_commit_hooks \
(id, handler_key, hook_name, context, record, status, attempt, \
max_attempts, initial_backoff_ms, enqueued_at, run_at) \
VALUES ($1, $2, $3, $4::JSONB, $5::JSONB, 'enqueued', 1, 5, 1000, NOW(), NOW()) \
ON CONFLICT (id) DO NOTHING";
const HOOK_PENDING_INSERT_SQL: &str = "INSERT INTO autumn_repository_commit_hooks \
(id, handler_key, hook_name, context, record, status, attempt, \
max_attempts, initial_backoff_ms, enqueued_at, run_at, claimed_by, claimed_at) \
VALUES ($1, $2, $3, $4::JSONB, $5::JSONB, 'pending_after_hook', 1, 5, 1000, NOW(), NOW(), $6, NOW()) \
ON CONFLICT (id) DO UPDATE \
SET handler_key = EXCLUDED.handler_key, hook_name = EXCLUDED.hook_name, \
context = EXCLUDED.context, record = EXCLUDED.record, \
status = 'pending_after_hook', attempt = 1, \
max_attempts = EXCLUDED.max_attempts, \
initial_backoff_ms = EXCLUDED.initial_backoff_ms, \
enqueued_at = EXCLUDED.enqueued_at, run_at = EXCLUDED.run_at, \
claimed_by = EXCLUDED.claimed_by, claimed_at = EXCLUDED.claimed_at, \
started_at = NULL, finished_at = NULL, last_error = NULL \
WHERE autumn_repository_commit_hooks.status IN ('pending_after_hook', 'after_hook_failed')";
const HOOK_MARK_AFTER_HOOK_SUCCEEDED_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET context = $1::JSONB, record = $2::JSONB, status = 'after_hook_succeeded', \
claimed_at = NOW(), last_error = NULL \
WHERE id = $3 AND claimed_by = $4 AND status = 'pending_after_hook'";
const HOOK_FINALIZE_AFTER_HOOK_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET status = 'enqueued', run_at = NOW(), \
enqueued_at = COALESCE(enqueued_at, NOW()), \
claimed_by = NULL, claimed_at = NULL, last_error = NULL \
WHERE id = $1 AND claimed_by = $2 AND status = 'after_hook_succeeded'";
const HOOK_DISCARD_PENDING_SQL: &str = "DELETE FROM autumn_repository_commit_hooks \
WHERE id = $1 AND claimed_by = $2 AND status = 'pending_after_hook'";
const HOOK_AFTER_HOOK_FAILED_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET status = 'after_hook_failed', \
finished_at = NOW(), \
context = '{}'::JSONB, record = '{}'::JSONB, \
claimed_by = NULL, claimed_at = NULL, last_error = $1 \
WHERE id = $2 AND claimed_by = $3 AND status = 'pending_after_hook'";
const HOOK_EXTEND_PENDING_FINALIZER_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET claimed_at = NOW() \
WHERE id = $1 AND claimed_by = $2 AND status = 'pending_after_hook'";
const HOOK_RECOVER_STALE_RUNNING_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET status = CASE \
WHEN attempt < max_attempts THEN 'enqueued' \
ELSE 'failed' \
END, \
attempt = CASE \
WHEN attempt < max_attempts THEN attempt + 1 \
ELSE attempt \
END, \
run_at = CASE \
WHEN attempt < max_attempts THEN NOW() \
ELSE run_at \
END, \
started_at = NULL, \
finished_at = CASE \
WHEN attempt >= max_attempts THEN NOW() \
ELSE NULL \
END, \
claimed_by = NULL, \
claimed_at = NULL, \
last_error = $1 \
WHERE status = 'running' \
AND claimed_at < NOW() - ($2::BIGINT * INTERVAL '1 millisecond')";
const HOOK_RECOVER_STALE_PENDING_SQL: &str = "UPDATE autumn_repository_commit_hooks \
SET status = CASE \
WHEN status = 'after_hook_succeeded' THEN 'enqueued' \
ELSE 'after_hook_failed' \
END, \
run_at = CASE \
WHEN status = 'after_hook_succeeded' THEN NOW() \
ELSE run_at \
END, \
enqueued_at = CASE \
WHEN status = 'after_hook_succeeded' THEN COALESCE(enqueued_at, NOW()) \
ELSE enqueued_at \
END, \
context = CASE \
WHEN status = 'pending_after_hook' THEN '{}'::JSONB \
ELSE context \
END, \
record = CASE \
WHEN status = 'pending_after_hook' THEN '{}'::JSONB \
ELSE record \
END, \
finished_at = CASE \
WHEN status = 'pending_after_hook' THEN NOW() \
ELSE finished_at \
END, \
started_at = NULL, \
claimed_by = NULL, \
claimed_at = NULL, \
last_error = COALESCE(last_error, $1) \
WHERE status IN ('pending_after_hook', 'after_hook_succeeded') \
AND claimed_at < NOW() - ($2::BIGINT * INTERVAL '1 millisecond')";
static REPOSITORY_COMMIT_HOOK_RUNNERS: OnceLock<
RwLock<HashMap<String, RepositoryCommitHookRegistration>>,
> = OnceLock::new();
static REPOSITORY_COMMIT_HOOK_KICKERS: OnceLock<
RwLock<HashMap<usize, Arc<RepositoryCommitHookKickState>>>,
> = OnceLock::new();
struct RepositoryCommitHookKickState {
notify: Notify,
pending: AtomicBool,
}
impl Default for RepositoryCommitHookKickState {
fn default() -> Self {
Self {
notify: Notify::new(),
pending: AtomicBool::new(false),
}
}
}
impl RepositoryCommitHookKickState {
fn request_kick(&self) -> bool {
!self.pending.swap(true, Ordering::AcqRel)
}
fn take_pending_kick(&self) -> bool {
self.pending.swap(false, Ordering::AcqRel)
}
}
#[doc(hidden)]
#[must_use]
pub struct RepositoryCommitHookPendingHeartbeat {
shutdown: CancellationToken,
}
impl RepositoryCommitHookPendingHeartbeat {
const fn new(shutdown: CancellationToken) -> Self {
Self { shutdown }
}
pub fn cancel(&self) {
self.shutdown.cancel();
}
}
impl Drop for RepositoryCommitHookPendingHeartbeat {
fn drop(&mut self) {
self.shutdown.cancel();
}
}
#[doc(hidden)]
pub struct RepositoryCommitHookDescriptor {
pub register: fn(),
}
inventory::collect!(RepositoryCommitHookDescriptor);
#[derive(Clone)]
struct RepositoryCommitHookRegistration {
create: HookRunner,
update: HookRunner,
delete: HookRunner,
}
impl RepositoryCommitHookRegistration {
fn runner(&self, hook_name: &str) -> Option<HookRunner> {
match hook_name {
"create" => Some(self.create.clone()),
"update" => Some(self.update.clone()),
"delete" => Some(self.delete.clone()),
_ => None,
}
}
}
#[derive(diesel::QueryableByName, Debug, Clone)]
#[allow(dead_code)]
struct PgRepositoryCommitHookRow {
#[diesel(sql_type = diesel::sql_types::Text)]
id: String,
#[diesel(sql_type = diesel::sql_types::Text)]
handler_key: String,
#[diesel(sql_type = diesel::sql_types::Text)]
hook_name: String,
#[diesel(sql_type = diesel::sql_types::Text)]
context: String,
#[diesel(sql_type = diesel::sql_types::Text)]
record: String,
#[diesel(sql_type = diesel::sql_types::Text)]
status: String,
#[diesel(sql_type = diesel::sql_types::Integer)]
attempt: i32,
#[diesel(sql_type = diesel::sql_types::Integer)]
max_attempts: i32,
#[diesel(sql_type = diesel::sql_types::BigInt)]
initial_backoff_ms: i64,
}
pub fn register_repository_commit_hook_runner<
Create,
CreateFut,
Update,
UpdateFut,
Delete,
DeleteFut,
>(
handler_key: &'static str,
create: Create,
update: Update,
delete: Delete,
) where
Create: Fn(Value, Value) -> CreateFut + Send + Sync + 'static,
CreateFut: Future<Output = AutumnResult<()>> + Send + 'static,
Update: Fn(Value, Value) -> UpdateFut + Send + Sync + 'static,
UpdateFut: Future<Output = AutumnResult<()>> + Send + 'static,
Delete: Fn(Value, Value) -> DeleteFut + Send + Sync + 'static,
DeleteFut: Future<Output = AutumnResult<()>> + Send + 'static,
{
let registration = RepositoryCommitHookRegistration {
create: Arc::new(move |ctx, record| Box::pin(create(ctx, record))),
update: Arc::new(move |ctx, record| Box::pin(update(ctx, record))),
delete: Arc::new(move |ctx, record| Box::pin(delete(ctx, record))),
};
REPOSITORY_COMMIT_HOOK_RUNNERS
.get_or_init(|| RwLock::new(HashMap::new()))
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(handler_key.to_owned(), registration);
}
fn register_inventory_repository_commit_hook_runners() {
for descriptor in inventory::iter::<RepositoryCommitHookDescriptor> {
(descriptor.register)();
}
}
pub fn has_repository_commit_hook_descriptors() -> bool {
inventory::iter::<RepositoryCommitHookDescriptor>
.into_iter()
.next()
.is_some()
}
pub async fn enqueue_repository_commit_hook_on_conn<C, R>(
conn: &mut diesel_async::AsyncPgConnection,
handler_key: &str,
hook_name: &str,
idempotency_key: Option<&str>,
idempotency_discriminator: Option<&str>,
context: &C,
record: &R,
) -> AutumnResult<()>
where
C: Serialize + Sync + ?Sized,
R: Serialize + Sync + ?Sized,
{
let (context, record) = serialize_repository_commit_hook_payloads(context, record)?;
let id = repository_commit_hook_id(
idempotency_key,
idempotency_discriminator,
handler_key,
hook_name,
&record,
);
diesel::sql_query(HOOK_ENQUEUE_INSERT_SQL)
.bind::<diesel::sql_types::Text, _>(id)
.bind::<diesel::sql_types::Text, _>(handler_key)
.bind::<diesel::sql_types::Text, _>(hook_name)
.bind::<diesel::sql_types::Text, _>(context)
.bind::<diesel::sql_types::Text, _>(record)
.execute(conn)
.await
.map(|_| ())
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook enqueue failed: {error}"
))
})
}
pub async fn enqueue_repository_commit_hook_pending_on_conn<C, R>(
conn: &mut diesel_async::AsyncPgConnection,
handler_key: &str,
hook_name: &str,
idempotency_key: Option<&str>,
idempotency_discriminator: Option<&str>,
context: &C,
record: &R,
) -> AutumnResult<(String, String)>
where
C: Serialize + Sync + ?Sized,
R: Serialize + Sync + ?Sized,
{
let (context, record) = serialize_repository_commit_hook_payloads(context, record)?;
let id = repository_commit_hook_id(
idempotency_key,
idempotency_discriminator,
handler_key,
hook_name,
&record,
);
let owner = repository_commit_hook_pending_owner_id();
diesel::sql_query(HOOK_PENDING_INSERT_SQL)
.bind::<diesel::sql_types::Text, _>(id.clone())
.bind::<diesel::sql_types::Text, _>(handler_key)
.bind::<diesel::sql_types::Text, _>(hook_name)
.bind::<diesel::sql_types::Text, _>(context)
.bind::<diesel::sql_types::Text, _>(record)
.bind::<diesel::sql_types::Text, _>(owner.clone())
.execute(conn)
.await
.map(|_| (id, owner))
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook staging failed: {error}"
))
})
}
pub async fn enqueue_repository_commit_hooks_pending_bulk_on_conn<C, R>(
conn: &mut diesel_async::AsyncPgConnection,
handler_key: &str,
hook_name: &str,
inputs: &[(Option<String>, Option<String>, &C, &R)],
) -> AutumnResult<Vec<(String, String)>>
where
C: Serialize + Sync + ?Sized,
R: Serialize + Sync + ?Sized,
{
const SQL: &str = "INSERT INTO autumn_repository_commit_hooks \
(id, handler_key, hook_name, context, record, status, attempt, \
max_attempts, initial_backoff_ms, enqueued_at, run_at, claimed_by, claimed_at) \
SELECT \
t.id, t.handler_key, t.hook_name, t.context::JSONB, t.record::JSONB, \
'pending_after_hook', 1, 5, 1000, NOW(), NOW(), t.claimed_by, NOW() \
FROM UNNEST($1::TEXT[], $2::TEXT[], $3::TEXT[], $4::TEXT[], $5::TEXT[], $6::TEXT[]) \
AS t(id, handler_key, hook_name, context, record, claimed_by) \
ON CONFLICT (id) DO UPDATE \
SET handler_key = EXCLUDED.handler_key, hook_name = EXCLUDED.hook_name, \
context = EXCLUDED.context, record = EXCLUDED.record, \
status = 'pending_after_hook', attempt = 1, \
max_attempts = EXCLUDED.max_attempts, \
initial_backoff_ms = EXCLUDED.initial_backoff_ms, \
enqueued_at = EXCLUDED.enqueued_at, run_at = EXCLUDED.run_at, \
claimed_by = EXCLUDED.claimed_by, claimed_at = EXCLUDED.claimed_at, \
started_at = NULL, finished_at = NULL, last_error = NULL \
WHERE autumn_repository_commit_hooks.status IN ('pending_after_hook', 'after_hook_failed')";
if inputs.is_empty() {
return Ok(Vec::new());
}
let mut ids = Vec::with_capacity(inputs.len());
let mut handler_keys = Vec::with_capacity(inputs.len());
let mut hook_names = Vec::with_capacity(inputs.len());
let mut contexts = Vec::with_capacity(inputs.len());
let mut records = Vec::with_capacity(inputs.len());
let mut owners = Vec::with_capacity(inputs.len());
let mut results = Vec::with_capacity(inputs.len());
let owner = repository_commit_hook_pending_owner_id();
for &(ref idempotency_key, ref idempotency_discriminator, context, record) in inputs {
let (context_str, record_str) = serialize_repository_commit_hook_payloads(context, record)?;
let id = repository_commit_hook_id(
idempotency_key.as_deref(),
idempotency_discriminator.as_deref(),
handler_key,
hook_name,
&record_str,
);
ids.push(id.clone());
handler_keys.push(handler_key.to_string());
hook_names.push(hook_name.to_string());
contexts.push(context_str);
records.push(record_str);
owners.push(owner.clone());
results.push((id, owner.clone()));
}
diesel::sql_query(SQL)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(ids)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(handler_keys)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(hook_names)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(contexts)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(records)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(owners)
.execute(conn)
.await
.map(|_| results)
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook bulk staging failed: {error}"
))
})
}
pub async fn enqueue_repository_commit_hooks_bulk_on_conn<C, R>(
conn: &mut diesel_async::AsyncPgConnection,
handler_key: &str,
hook_name: &str,
inputs: &[(Option<String>, Option<String>, &C, &R)],
) -> AutumnResult<()>
where
C: Serialize + Sync + ?Sized,
R: Serialize + Sync + ?Sized,
{
const SQL: &str = "INSERT INTO autumn_repository_commit_hooks \
(id, handler_key, hook_name, context, record, status, attempt, \
max_attempts, initial_backoff_ms, enqueued_at, run_at) \
SELECT \
t.id, t.handler_key, t.hook_name, t.context::JSONB, t.record::JSONB, \
'enqueued', 1, 5, 1000, NOW(), NOW() \
FROM UNNEST($1::TEXT[], $2::TEXT[], $3::TEXT[], $4::TEXT[], $5::TEXT[]) \
AS t(id, handler_key, hook_name, context, record) \
ON CONFLICT (id) DO NOTHING";
if inputs.is_empty() {
return Ok(());
}
let mut ids = Vec::with_capacity(inputs.len());
let mut handler_keys = Vec::with_capacity(inputs.len());
let mut hook_names = Vec::with_capacity(inputs.len());
let mut contexts = Vec::with_capacity(inputs.len());
let mut records = Vec::with_capacity(inputs.len());
for &(ref idempotency_key, ref idempotency_discriminator, context, record) in inputs {
let (context_str, record_str) = serialize_repository_commit_hook_payloads(context, record)?;
let id = repository_commit_hook_id(
idempotency_key.as_deref(),
idempotency_discriminator.as_deref(),
handler_key,
hook_name,
&record_str,
);
ids.push(id);
handler_keys.push(handler_key.to_string());
hook_names.push(hook_name.to_string());
contexts.push(context_str);
records.push(record_str);
}
diesel::sql_query(SQL)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(ids)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(handler_keys)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(hook_names)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(contexts)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(records)
.execute(conn)
.await
.map(|_| ())
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook bulk enqueue failed: {error}"
))
})
}
pub async fn finalize_repository_commit_hook_after_hook<C, R>(
pool: &PgPool,
hook_id: &str,
owner: &str,
context: &C,
record: &R,
) -> AutumnResult<()>
where
C: Serialize + Sync + ?Sized,
R: Serialize + Sync + ?Sized,
{
let (context, record) = serialize_repository_commit_hook_payloads(context, record)?;
let mut conn = pool.get().await.map_err(|error| {
AutumnError::internal_server_error_msg(format!("pg pool error: {error}"))
})?;
let rows = diesel::sql_query(HOOK_MARK_AFTER_HOOK_SUCCEEDED_SQL)
.bind::<diesel::sql_types::Text, _>(context)
.bind::<diesel::sql_types::Text, _>(record)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(owner)
.execute(&mut *conn)
.await
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook after-hook success mark failed: {error}"
))
})?;
if rows == 0 {
return missing_repository_commit_hook_finalization_result(hook_id);
}
let rows = diesel::sql_query(HOOK_FINALIZE_AFTER_HOOK_SQL)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(owner)
.execute(&mut *conn)
.await
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook finalization failed: {error}"
))
})?;
if rows == 0 {
return Err(AutumnError::internal_server_error_msg(format!(
"repository commit hook finalization skipped marked row: {hook_id}"
)));
}
Ok(())
}
fn missing_repository_commit_hook_finalization_result(hook_id: &str) -> AutumnResult<()> {
Err(AutumnError::internal_server_error_msg(format!(
"repository commit hook finalization skipped missing staged row: {hook_id}"
)))
}
pub async fn discard_repository_commit_hook_pending(
pool: &PgPool,
hook_id: &str,
owner: &str,
) -> AutumnResult<()> {
let mut conn = pool.get().await.map_err(|error| {
AutumnError::internal_server_error_msg(format!("pg pool error: {error}"))
})?;
diesel::sql_query(HOOK_DISCARD_PENDING_SQL)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(owner)
.execute(&mut *conn)
.await
.map(|_| ())
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook pending discard failed: {error}"
))
})
}
pub async fn mark_repository_commit_hook_after_hook_failed(
pool: &PgPool,
hook_id: &str,
owner: &str,
failure: impl Into<String>,
) {
let failure = failure.into();
for attempt in 1..=HOOK_AFTER_HOOK_FAILURE_MARK_MAX_ATTEMPTS {
match pg_mark_repository_commit_hook_after_hook_failed(pool, hook_id, owner, &failure).await
{
Ok(true) => return,
Ok(false) => {
tracing::warn!(
hook_id = %hook_id,
"repository commit hook staged row was already unavailable while marking after-hook failure"
);
return;
}
Err(error) => {
if attempt == HOOK_AFTER_HOOK_FAILURE_MARK_MAX_ATTEMPTS {
tracing::warn!(
hook_id = %hook_id,
error = %error,
attempts = HOOK_AFTER_HOOK_FAILURE_MARK_MAX_ATTEMPTS,
"failed to mark repository commit hook after-hook failure; giving up so the committed mutation can return"
);
return;
}
tracing::warn!(
hook_id = %hook_id,
error = %error,
attempt,
max_attempts = HOOK_AFTER_HOOK_FAILURE_MARK_MAX_ATTEMPTS,
"failed to mark repository commit hook after-hook failure; retrying"
);
tokio::time::sleep(HOOK_AFTER_HOOK_FAILURE_MARK_RETRY_SLEEP).await;
}
}
}
}
async fn pg_mark_repository_commit_hook_after_hook_failed(
pool: &PgPool,
hook_id: &str,
owner: &str,
failure: &str,
) -> AutumnResult<bool> {
let mut conn = pool.get().await.map_err(|error| {
AutumnError::internal_server_error_msg(format!("pg pool error: {error}"))
})?;
diesel::sql_query(HOOK_AFTER_HOOK_FAILED_SQL)
.bind::<diesel::sql_types::Text, _>(failure)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(owner)
.execute(&mut *conn)
.await
.map(|rows| rows > 0)
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook after-hook failure mark failed: {error}"
))
})
}
pub async fn catch_repository_after_hook_unwind<Fut>(
future: Fut,
) -> Result<AutumnResult<()>, Box<dyn Any + Send>>
where
Fut: Future<Output = AutumnResult<()>> + Send,
{
std::panic::AssertUnwindSafe(future).catch_unwind().await
}
#[doc(hidden)]
pub fn start_repository_commit_hook_pending_finalizer_heartbeat(
pool: PgPool,
hook_id: String,
owner: String,
) -> RepositoryCommitHookPendingHeartbeat {
let shutdown = CancellationToken::new();
let heartbeat_shutdown = shutdown.child_token();
tokio::spawn(heartbeat_repository_commit_hook_pending_finalizer(
pool,
hook_id,
owner,
heartbeat_shutdown,
));
RepositoryCommitHookPendingHeartbeat::new(shutdown)
}
fn serialize_repository_commit_hook_payloads<C, R>(
context: &C,
record: &R,
) -> AutumnResult<(String, String)>
where
C: Serialize + ?Sized,
R: Serialize + ?Sized,
{
let context = serde_json::to_string(context).map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"serialize repository commit hook context: {error}"
))
})?;
let record = serde_json::to_string(record).map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"serialize repository commit hook record: {error}"
))
})?;
Ok((context, record))
}
pub fn start_repository_commit_hook_worker(pool: PgPool, shutdown: CancellationToken) {
register_inventory_repository_commit_hook_runners();
if !should_start_repository_commit_hook_worker(®istered_handler_keys()) {
return;
}
let worker_id = repository_commit_hook_worker_id();
tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(HOOK_WORKER_IDLE_SLEEP) => {
recover_stale_repository_commit_hooks(&pool, &worker_id).await;
drain_ready_repository_commit_hooks(&pool, &worker_id, 32).await;
}
}
}
});
}
pub fn kick_repository_commit_hook_dispatcher(pool: &PgPool) {
register_inventory_repository_commit_hook_runners();
if !should_start_repository_commit_hook_worker(®istered_handler_keys()) {
return;
}
let state = repository_commit_hook_kick_state(pool);
if state.request_kick() {
state.notify.notify_one();
}
}
fn repository_commit_hook_kick_state(pool: &PgPool) -> Arc<RepositoryCommitHookKickState> {
let key = repository_commit_hook_pool_key(pool);
let registry = REPOSITORY_COMMIT_HOOK_KICKERS.get_or_init(|| RwLock::new(HashMap::new()));
let existing = {
let registry = registry
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
registry.get(&key).cloned()
};
if let Some(state) = existing {
return state;
}
let mut registry = registry
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(state) = registry.get(&key).cloned() {
return state;
}
let state = Arc::new(RepositoryCommitHookKickState::default());
spawn_repository_commit_hook_kick_worker(pool.clone(), state.clone());
registry.insert(key, state.clone());
state
}
fn repository_commit_hook_pool_key(pool: &PgPool) -> usize {
std::ptr::from_ref(pool.manager()).addr()
}
fn spawn_repository_commit_hook_kick_worker(
pool: PgPool,
state: Arc<RepositoryCommitHookKickState>,
) {
let worker_id = repository_commit_hook_worker_id();
tokio::spawn(async move {
loop {
state.notify.notified().await;
while state.take_pending_kick() {
drain_ready_repository_commit_hooks(&pool, &worker_id, 32).await;
}
}
});
}
async fn drain_ready_repository_commit_hooks(pool: &PgPool, worker_id: &str, max_rows: usize) {
for _ in 0..max_rows {
let Some(row) = pg_claim_next_repository_commit_hook(pool, worker_id).await else {
break;
};
let heartbeat_shutdown = CancellationToken::new();
let heartbeat_task = tokio::spawn(heartbeat_repository_commit_hook_claim(
pool.clone(),
row.id.clone(),
worker_id.to_owned(),
heartbeat_shutdown.child_token(),
));
let result = run_repository_commit_hook_row(&row).await;
match result {
Ok(()) => {
if let Err(error) =
pg_ack_repository_commit_hook_success(pool, &row.id, worker_id).await
{
tracing::warn!(
hook_id = %row.id,
error = %error,
"failed to ack repository commit hook success"
);
}
}
Err(error) => {
let failures_total = crate::db::record_after_commit_failure();
tracing::error!(
hook_id = %row.id,
handler_key = %row.handler_key,
hook_name = %row.hook_name,
autumn.after_commit.failures_total = failures_total,
"repository after_commit hook failed: {error}"
);
if let Err(nack_error) =
pg_nack_repository_commit_hook_failure(pool, &row.id, worker_id, &error, &row)
.await
{
tracing::warn!(
hook_id = %row.id,
error = %nack_error,
"failed to record repository commit hook failure"
);
}
}
}
heartbeat_shutdown.cancel();
if let Err(error) = heartbeat_task.await {
tracing::warn!(
hook_id = %row.id,
error = %error,
"repository commit hook heartbeat task failed"
);
}
}
}
async fn run_repository_commit_hook_row(row: &PgRepositoryCommitHookRow) -> Result<(), String> {
let context = serde_json::from_str::<Value>(&row.context)
.map_err(|error| format!("decode repository hook context: {error}"))?;
let record = serde_json::from_str::<Value>(&row.record)
.map_err(|error| format!("decode repository hook record: {error}"))?;
let result = std::panic::AssertUnwindSafe(run_registered_repository_commit_hook(
&row.handler_key,
&row.hook_name,
context,
record,
))
.catch_unwind()
.await;
match result {
Ok(Ok(())) => Ok(()),
Ok(Err(error)) => Err(error.to_string()),
Err(panic) => Err(format_repository_commit_hook_panic(&*panic)),
}
}
async fn run_registered_repository_commit_hook(
handler_key: &str,
hook_name: &str,
context: Value,
record: Value,
) -> AutumnResult<()> {
let runner = {
let registry = REPOSITORY_COMMIT_HOOK_RUNNERS
.get_or_init(|| RwLock::new(HashMap::new()))
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
registry
.get(handler_key)
.and_then(|registration| registration.runner(hook_name))
};
let Some(runner) = runner else {
return Err(AutumnError::internal_server_error_msg(format!(
"repository commit hook runner not registered: handler_key={handler_key}, hook_name={hook_name}"
)));
};
runner(context, record).await
}
async fn heartbeat_repository_commit_hook_claim(
pool: PgPool,
hook_id: String,
worker_id: String,
shutdown: CancellationToken,
) {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(HOOK_CLAIM_HEARTBEAT_INTERVAL) => {
match pg_extend_repository_commit_hook_claim(&pool, &hook_id, &worker_id).await {
Ok(true) => {}
Ok(false) => break,
Err(error) => {
tracing::warn!(
hook_id = %hook_id,
error = %error,
"failed to extend repository commit hook claim"
);
}
}
}
}
}
}
async fn heartbeat_repository_commit_hook_pending_finalizer(
pool: PgPool,
hook_id: String,
owner: String,
shutdown: CancellationToken,
) {
loop {
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(HOOK_PENDING_FINALIZER_HEARTBEAT_INTERVAL) => {
match pg_extend_repository_commit_hook_pending_finalizer(&pool, &hook_id, &owner).await {
Ok(true) => {}
Ok(false) => break,
Err(error) => {
tracing::warn!(
hook_id = %hook_id,
error = %error,
"failed to extend repository commit hook pending finalizer lease"
);
}
}
}
}
}
}
fn registered_handler_keys() -> Vec<String> {
REPOSITORY_COMMIT_HOOK_RUNNERS
.get_or_init(|| RwLock::new(HashMap::new()))
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.keys()
.cloned()
.collect()
}
const fn should_start_repository_commit_hook_worker(handler_keys: &[String]) -> bool {
!handler_keys.is_empty()
}
async fn pg_claim_next_repository_commit_hook(
pool: &PgPool,
worker_id: &str,
) -> Option<PgRepositoryCommitHookRow> {
let handler_keys = registered_handler_keys();
if handler_keys.is_empty() {
return None;
}
let mut conn = pool.get().await.ok()?;
let sql = format!(
"UPDATE autumn_repository_commit_hooks \
SET status = 'running', started_at = NOW(), claimed_by = $2, claimed_at = NOW() \
WHERE id = ( \
SELECT id FROM autumn_repository_commit_hooks \
WHERE status = 'enqueued' \
AND run_at <= NOW() \
AND handler_key = ANY($1) \
ORDER BY run_at ASC, enqueued_at ASC \
LIMIT 1 \
FOR UPDATE SKIP LOCKED \
) \
RETURNING {HOOK_SELECT_COLS}"
);
diesel::sql_query(sql)
.bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(handler_keys)
.bind::<diesel::sql_types::Text, _>(worker_id)
.get_result::<PgRepositoryCommitHookRow>(&mut *conn)
.await
.optional()
.unwrap_or_else(|error| {
if is_missing_hook_table_error(&error) {
tracing::debug!(
error = %error,
"repository commit hook queue table is not available yet"
);
} else {
tracing::warn!(error = %error, "repository commit hook claim query failed");
}
None
})
}
async fn pg_ack_repository_commit_hook_success(
pool: &PgPool,
hook_id: &str,
worker_id: &str,
) -> AutumnResult<bool> {
let mut conn = pool.get().await.map_err(|error| {
AutumnError::internal_server_error_msg(format!("pg pool error: {error}"))
})?;
diesel::sql_query(HOOK_ACK_SUCCESS_SQL)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(worker_id)
.execute(&mut *conn)
.await
.map(|rows| rows > 0)
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook ack failed: {error}"
))
})
}
async fn pg_extend_repository_commit_hook_claim(
pool: &PgPool,
hook_id: &str,
worker_id: &str,
) -> AutumnResult<bool> {
let mut conn = pool.get().await.map_err(|error| {
AutumnError::internal_server_error_msg(format!("pg pool error: {error}"))
})?;
diesel::sql_query(HOOK_EXTEND_CLAIM_SQL)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(worker_id)
.execute(&mut *conn)
.await
.map(|rows| rows > 0)
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook claim heartbeat failed: {error}"
))
})
}
async fn pg_extend_repository_commit_hook_pending_finalizer(
pool: &PgPool,
hook_id: &str,
owner: &str,
) -> AutumnResult<bool> {
let mut conn = pool.get().await.map_err(|error| {
AutumnError::internal_server_error_msg(format!("pg pool error: {error}"))
})?;
diesel::sql_query(HOOK_EXTEND_PENDING_FINALIZER_SQL)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(owner)
.execute(&mut *conn)
.await
.map(|rows| rows > 0)
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook pending finalizer heartbeat failed: {error}"
))
})
}
async fn pg_nack_repository_commit_hook_failure(
pool: &PgPool,
hook_id: &str,
worker_id: &str,
error: &str,
row: &PgRepositoryCommitHookRow,
) -> AutumnResult<bool> {
let mut conn = pool.get().await.map_err(|error| {
AutumnError::internal_server_error_msg(format!("pg pool error: {error}"))
})?;
if row.attempt < row.max_attempts {
let delay_ms = retry_delay_ms(row.initial_backoff_ms, row.attempt);
diesel::sql_query(
"UPDATE autumn_repository_commit_hooks \
SET status = 'enqueued', \
attempt = attempt + 1, \
run_at = NOW() + ($1::BIGINT * INTERVAL '1 millisecond'), \
started_at = NULL, \
finished_at = NULL, \
claimed_by = NULL, \
claimed_at = NULL, \
last_error = $2 \
WHERE id = $3 AND claimed_by = $4 AND status = 'running'",
)
.bind::<diesel::sql_types::BigInt, _>(delay_ms)
.bind::<diesel::sql_types::Text, _>(error)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(worker_id)
.execute(&mut *conn)
.await
.map(|rows| rows > 0)
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook retry failed: {error}"
))
})
} else {
diesel::sql_query(
"UPDATE autumn_repository_commit_hooks \
SET status = 'failed', \
finished_at = NOW(), \
claimed_by = NULL, \
claimed_at = NULL, \
last_error = $1 \
WHERE id = $2 AND claimed_by = $3 AND status = 'running'",
)
.bind::<diesel::sql_types::Text, _>(error)
.bind::<diesel::sql_types::Text, _>(hook_id)
.bind::<diesel::sql_types::Text, _>(worker_id)
.execute(&mut *conn)
.await
.map(|rows| rows > 0)
.map_err(|error| {
AutumnError::internal_server_error_msg(format!(
"repository commit hook dead-letter failed: {error}"
))
})
}
}
async fn recover_stale_repository_commit_hooks(pool: &PgPool, worker_id: &str) {
let stale_after_ms = i64::try_from(HOOK_STALE_CLAIM_AFTER.as_millis()).unwrap_or(i64::MAX);
let Ok(mut conn) = pool.get().await else {
return;
};
if let Err(error) = diesel::sql_query(HOOK_RECOVER_STALE_RUNNING_SQL)
.bind::<diesel::sql_types::Text, _>(format!("stale claim recovered by {worker_id}"))
.bind::<diesel::sql_types::BigInt, _>(stale_after_ms)
.execute(&mut *conn)
.await
{
if is_missing_hook_table_error(&error) {
tracing::debug!(
error = %error,
"repository commit hook queue table is not available yet"
);
} else {
tracing::warn!(error = %error, "repository commit hook stale recovery failed");
}
}
if let Err(error) = diesel::sql_query(HOOK_RECOVER_STALE_PENDING_SQL)
.bind::<diesel::sql_types::Text, _>(format!(
"stale pending after hook recovered by {worker_id}"
))
.bind::<diesel::sql_types::BigInt, _>(stale_after_ms)
.execute(&mut *conn)
.await
{
if is_missing_hook_table_error(&error) {
tracing::debug!(
error = %error,
"repository commit hook queue table is not available yet"
);
} else {
tracing::warn!(
error = %error,
"repository commit hook stale pending recovery failed"
);
}
}
}
fn is_missing_hook_table_error(error: &diesel::result::Error) -> bool {
let message = error.to_string().to_ascii_lowercase();
message.contains("autumn_repository_commit_hooks")
&& (message.contains("does not exist") || message.contains("undefinedtable"))
}
fn retry_delay_ms(initial_backoff_ms: i64, attempt: i32) -> i64 {
let exp = u32::try_from(attempt.saturating_sub(1)).unwrap_or(0);
initial_backoff_ms.saturating_mul(2_i64.saturating_pow(exp))
}
fn format_repository_commit_hook_panic(payload: &(dyn Any + Send)) -> String {
let message = payload
.downcast_ref::<&str>()
.copied()
.or_else(|| payload.downcast_ref::<String>().map(String::as_str));
message.map_or_else(
|| "repository commit hook panicked".to_owned(),
|message| format!("repository commit hook panicked: {message}"),
)
}
fn repository_commit_hook_id(
idempotency_key: Option<&str>,
idempotency_discriminator: Option<&str>,
handler_key: &str,
hook_name: &str,
record: &str,
) -> String {
let Some(idempotency_key) = idempotency_key.filter(|key| !key.is_empty()) else {
return uuid::Uuid::new_v4().to_string();
};
let mut hasher = sha2::Sha256::new();
push_hook_id_component(&mut hasher, "handler", handler_key.as_bytes());
push_hook_id_component(&mut hasher, "hook", hook_name.as_bytes());
push_hook_id_component(&mut hasher, "idempotency", idempotency_key.as_bytes());
if let Some(discriminator) = idempotency_discriminator {
push_hook_id_component(&mut hasher, "mutation", discriminator.as_bytes());
} else {
push_hook_id_component(&mut hasher, "record", record.as_bytes());
}
format!("idempotent:{}", hex_lower(hasher.finalize()))
}
fn push_hook_id_component(hasher: &mut sha2::Sha256, label: &str, value: &[u8]) {
hasher.update(label.as_bytes());
hasher.update(b":");
hasher.update((value.len() as u64).to_be_bytes());
hasher.update(b":");
hasher.update(value);
hasher.update(b";");
}
fn hex_lower(bytes: impl AsRef<[u8]>) -> String {
bytes.as_ref().iter().fold(
String::with_capacity(bytes.as_ref().len() * 2),
|mut out, byte| {
use std::fmt::Write as _;
let _ = write!(out, "{byte:02x}");
out
},
)
}
fn repository_commit_hook_worker_id() -> String {
format!("repository-hook-{}", uuid::Uuid::new_v4())
}
fn repository_commit_hook_pending_owner_id() -> String {
format!("repository-hook-pending-{}", uuid::Uuid::new_v4())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test]
async fn registered_runner_executes_matching_hook() {
let calls = Arc::new(AtomicUsize::new(0));
let create_calls = calls.clone();
let handler_key: &'static str = Box::leak(
format!(
"test::registered_runner_executes_matching_hook::{}",
uuid::Uuid::new_v4()
)
.into_boxed_str(),
);
register_repository_commit_hook_runner(
handler_key,
move |_ctx, _record| {
let create_calls = create_calls.clone();
async move {
create_calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
},
|_ctx, _record| async { Ok(()) },
|_ctx, _record| async { Ok(()) },
);
run_registered_repository_commit_hook(handler_key, "create", Value::Null, Value::Null)
.await
.unwrap();
assert_eq!(calls.as_ref().load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn missing_runner_returns_recoverable_error() {
let err = run_registered_repository_commit_hook(
"missing-handler",
"create",
Value::Null,
Value::Null,
)
.await
.expect_err("missing runner should be reported");
assert!(
err.to_string().contains("runner not registered"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn after_hook_failure_marking_returns_when_pool_is_unavailable() {
use diesel_async::AsyncPgConnection;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new("not-a-postgres-url");
let pool = Pool::builder(manager)
.max_size(1)
.runtime(deadpool::Runtime::Tokio1)
.build()
.expect("pool");
let result = tokio::time::timeout(
Duration::from_millis(750),
mark_repository_commit_hook_after_hook_failed(&pool, "hook-id", "owner", "boom"),
)
.await;
assert!(
result.is_ok(),
"after-hook failure marking must not block a committed mutation forever when the pool/database is down"
);
}
#[test]
fn worker_start_is_disabled_without_registered_handlers() {
assert!(
!should_start_repository_commit_hook_worker(&[]),
"unhooked DB apps must not poll the hook queue"
);
}
#[test]
fn worker_start_is_enabled_with_registered_handlers() {
assert!(
should_start_repository_commit_hook_worker(&["handler".to_owned()]),
"hooked DB apps should poll the hook queue"
);
}
#[test]
fn dispatcher_kick_state_coalesces_pending_notifications() {
let state = RepositoryCommitHookKickState::default();
assert!(state.request_kick(), "first kick should notify the worker");
assert!(
!state.request_kick(),
"repeated kicks while one is pending must coalesce"
);
assert!(
state.take_pending_kick(),
"worker should observe one wakeup"
);
assert!(
!state.take_pending_kick(),
"observed wakeup should clear pending state"
);
assert!(
state.request_kick(),
"a later kick after the worker drains should notify again"
);
}
#[test]
fn retry_delay_is_exponential() {
assert_eq!(retry_delay_ms(100, 1), 100);
assert_eq!(retry_delay_ms(100, 2), 200);
assert_eq!(retry_delay_ms(100, 3), 400);
}
#[test]
fn idempotent_hook_ids_are_deterministic_and_safely_delimited() {
let record = serde_json::json!({ "id": 1, "title": "first" }).to_string();
let first = repository_commit_hook_id(
Some("v2:request"),
Some("0"),
"pkg::module::posts::Post",
"create",
&record,
);
let second = repository_commit_hook_id(
Some("v2:request"),
Some("0"),
"pkg::module::posts::Post",
"create",
&record,
);
let other_hook = repository_commit_hook_id(
Some("v2:request"),
Some("0"),
"pkg::module::posts::Post",
"update",
&record,
);
assert_eq!(first, second);
assert_ne!(first, other_hook);
assert!(first.starts_with("idempotent:"));
assert!(!first.contains("v2:request"));
assert!(!first.contains("pkg::module::posts::Post"));
}
#[test]
fn non_idempotent_hook_ids_remain_fresh() {
let record = serde_json::json!({ "id": 1 }).to_string();
let first = repository_commit_hook_id(None, None, "handler", "create", &record);
let second = repository_commit_hook_id(None, None, "handler", "create", &record);
assert_ne!(first, second);
assert!(uuid::Uuid::parse_str(&first).is_ok());
assert!(uuid::Uuid::parse_str(&second).is_ok());
}
#[test]
fn hook_insert_sql_ignores_duplicate_idempotent_rows() {
assert!(
HOOK_ENQUEUE_INSERT_SQL.contains("ON CONFLICT (id) DO NOTHING"),
"direct delete commit hooks must dedupe duplicate idempotency rows"
);
assert!(
HOOK_PENDING_INSERT_SQL.contains("ON CONFLICT (id) DO UPDATE")
&& HOOK_PENDING_INSERT_SQL.contains(
"WHERE autumn_repository_commit_hooks.status IN ('pending_after_hook', 'after_hook_failed')"
),
"staged create/update commit hooks must dedupe successful duplicate rows while allowing a retry to reclaim unfinalized or failed staged rows"
);
}
#[test]
fn idempotent_hook_ids_distinguish_records_in_same_request() {
let first_record = serde_json::json!({ "id": 1, "title": "first" }).to_string();
let second_record = serde_json::json!({ "id": 2, "title": "second" }).to_string();
let first = repository_commit_hook_id(
Some("v2:request"),
Some("0"),
"pkg::module::posts::Post",
"create",
&first_record,
);
let second = repository_commit_hook_id(
Some("v2:request"),
Some("1"),
"pkg::module::posts::Post",
"create",
&second_record,
);
assert_ne!(
first, second,
"one idempotent request can stage multiple committed records for the same hook"
);
}
#[test]
fn idempotent_hook_ids_distinguish_same_record_sequences_in_same_request() {
let record = serde_json::json!({ "id": 1, "title": "same" }).to_string();
let first = repository_commit_hook_id(
Some("v2:request"),
Some("0"),
"pkg::module::posts::Post",
"update",
&record,
);
let second = repository_commit_hook_id(
Some("v2:request"),
Some("1"),
"pkg::module::posts::Post",
"update",
&record,
);
let first_again = repository_commit_hook_id(
Some("v2:request"),
Some("0"),
"pkg::module::posts::Post",
"update",
&record,
);
assert_eq!(
first, first_again,
"the same mutation sequence must dedupe across duplicate request attempts"
);
assert_ne!(
first, second,
"distinct mutations in one request must not collapse just because their final record serializes identically"
);
}
#[test]
fn missing_idempotent_finalization_fails_closed() {
let err = missing_repository_commit_hook_finalization_result("idempotent:abc")
.expect_err("missing idempotent staged rows should fail closed");
assert!(
err.to_string()
.contains("finalization skipped missing staged row"),
"unexpected error: {err}"
);
}
#[test]
fn pending_insert_reclaims_only_unfinalized_or_failed_rows() {
assert!(
HOOK_PENDING_INSERT_SQL.contains("ON CONFLICT (id) DO UPDATE")
&& HOOK_PENDING_INSERT_SQL.contains("status = 'pending_after_hook'")
&& HOOK_PENDING_INSERT_SQL.contains("context = EXCLUDED.context")
&& HOOK_PENDING_INSERT_SQL.contains("record = EXCLUDED.record")
&& HOOK_PENDING_INSERT_SQL.contains("claimed_by = EXCLUDED.claimed_by")
&& HOOK_PENDING_INSERT_SQL.contains("last_error = NULL"),
"a retried idempotent mutation must be able to restage durable hooks after an earlier unfinalized or failed regular after-hook"
);
assert!(
HOOK_PENDING_INSERT_SQL.contains(
"WHERE autumn_repository_commit_hooks.status IN ('pending_after_hook', 'after_hook_failed')"
),
"restaging must reclaim unfinalized pending rows but not replace already finalized, enqueued, running, completed, or worker-failed rows"
);
}
#[test]
fn missing_non_idempotent_finalization_remains_an_error() {
let err = missing_repository_commit_hook_finalization_result("random-id")
.expect_err("missing non-idempotent staged rows should still be reported");
assert!(
err.to_string()
.contains("finalization skipped missing staged row"),
"unexpected error: {err}"
);
}
#[test]
fn claim_heartbeat_runs_before_stale_recovery() {
assert!(
HOOK_CLAIM_HEARTBEAT_INTERVAL < HOOK_STALE_CLAIM_AFTER,
"heartbeat interval must be shorter than stale recovery threshold"
);
assert!(
HOOK_PENDING_FINALIZER_HEARTBEAT_INTERVAL < HOOK_STALE_CLAIM_AFTER,
"pending finalizer heartbeat interval must be shorter than stale recovery threshold"
);
}
#[test]
fn success_ack_clears_retained_payloads() {
assert!(
HOOK_ACK_SUCCESS_SQL.contains("context = '{}'::JSONB"),
"success ack must clear serialized context payload"
);
assert!(
HOOK_ACK_SUCCESS_SQL.contains("record = '{}'::JSONB"),
"success ack must clear serialized record payload"
);
}
#[test]
fn stale_running_recovery_counts_against_max_attempts() {
assert!(
HOOK_RECOVER_STALE_RUNNING_SQL.contains("attempt < max_attempts"),
"stale running recovery must branch on retry exhaustion"
);
assert!(
HOOK_RECOVER_STALE_RUNNING_SQL.contains("attempt = CASE"),
"stale running recovery must not requeue without updating attempt accounting"
);
assert!(
HOOK_RECOVER_STALE_RUNNING_SQL.contains("attempt + 1"),
"stale running recovery must consume the abandoned attempt"
);
assert!(
HOOK_RECOVER_STALE_RUNNING_SQL.contains("ELSE 'failed'"),
"stale running recovery must dead-letter rows already at max_attempts"
);
assert!(
!HOOK_RECOVER_STALE_RUNNING_SQL.contains("SET status = 'enqueued'"),
"stale running recovery must not unconditionally requeue exhausted rows"
);
}
#[test]
fn staged_hooks_are_not_dispatchable_until_finalized_after_regular_hooks() {
assert!(
HOOK_PENDING_INSERT_SQL.contains("status, attempt")
&& HOOK_PENDING_INSERT_SQL.contains("'pending_after_hook'"),
"create/update hooks must first be staged in a non-dispatchable lifecycle state"
);
assert!(
HOOK_PENDING_INSERT_SQL.contains("claimed_by, claimed_at"),
"staged rows must carry a finalizer lease so recovery can distinguish live after hooks from abandoned rows"
);
assert!(
HOOK_MARK_AFTER_HOOK_SUCCEEDED_SQL.contains("status = 'after_hook_succeeded'")
&& HOOK_MARK_AFTER_HOOK_SUCCEEDED_SQL.contains("context = $1::JSONB")
&& HOOK_MARK_AFTER_HOOK_SUCCEEDED_SQL.contains("record = $2::JSONB"),
"regular after-hook success must durably persist finalized hook payload before enqueue"
);
assert!(
HOOK_MARK_AFTER_HOOK_SUCCEEDED_SQL
.contains("WHERE id = $3 AND claimed_by = $4 AND status = 'pending_after_hook'"),
"success marking must only advance the staged row it owns"
);
assert!(
HOOK_FINALIZE_AFTER_HOOK_SQL.contains("status = 'enqueued'")
&& HOOK_FINALIZE_AFTER_HOOK_SQL.contains(
"WHERE id = $1 AND claimed_by = $2 AND status = 'after_hook_succeeded'"
),
"after-hook finalization must only enqueue rows with a durable regular-hook success marker"
);
assert!(
HOOK_AFTER_HOOK_FAILED_SQL.contains("status = 'after_hook_failed'")
&& HOOK_AFTER_HOOK_FAILED_SQL.contains("context = '{}'::JSONB")
&& HOOK_AFTER_HOOK_FAILED_SQL.contains("record = '{}'::JSONB")
&& HOOK_AFTER_HOOK_FAILED_SQL.contains(
"WHERE id = $2 AND claimed_by = $3 AND status = 'pending_after_hook'"
),
"failed regular after hooks must mark only the owner-scoped staged row terminal and non-dispatchable"
);
assert!(
!HOOK_AFTER_HOOK_FAILED_SQL.contains("claimed_by IS NULL")
&& !HOOK_AFTER_HOOK_FAILED_SQL.contains("'enqueued'"),
"duplicate idempotent retries must not dead-letter already finalized hook rows"
);
assert!(
HOOK_EXTEND_PENDING_FINALIZER_SQL.contains("claimed_at = NOW()")
&& HOOK_EXTEND_PENDING_FINALIZER_SQL.contains("status = 'pending_after_hook'"),
"long-running regular after hooks must heartbeat their staged-row finalizer lease"
);
assert!(
HOOK_RECOVER_STALE_PENDING_SQL
.contains("status IN ('pending_after_hook', 'after_hook_succeeded')")
&& HOOK_RECOVER_STALE_PENDING_SQL
.contains("WHEN status = 'after_hook_succeeded' THEN 'enqueued'")
&& HOOK_RECOVER_STALE_PENDING_SQL.contains("ELSE 'after_hook_failed'"),
"stale recovery must enqueue only rows with a durable regular-hook success marker"
);
assert!(
HOOK_RECOVER_STALE_PENDING_SQL
.contains("WHEN status = 'pending_after_hook' THEN '{}'::JSONB"),
"ambiguous stale pending rows must be failed closed without retaining payloads"
);
assert!(
HOOK_RECOVER_STALE_PENDING_SQL
.contains("WHEN status = 'pending_after_hook' THEN NOW()"),
"ambiguous stale pending rows must be marked terminal when failed closed"
);
}
#[test]
fn pending_heartbeat_guard_cancels_on_drop() {
let guard = RepositoryCommitHookPendingHeartbeat::new(CancellationToken::new());
let child = guard.shutdown.child_token();
drop(guard);
assert!(
child.is_cancelled(),
"dropping the pending heartbeat guard must cancel recovery-blocking heartbeats"
);
}
#[test]
fn claim_heartbeat_is_owner_scoped() {
assert!(
HOOK_EXTEND_CLAIM_SQL.contains("claimed_at = NOW()"),
"heartbeat must extend the stale-recovery lease"
);
assert!(
HOOK_EXTEND_CLAIM_SQL.contains("claimed_by = $2"),
"heartbeat must only extend this worker's claim"
);
assert!(
HOOK_EXTEND_CLAIM_SQL.contains("status = 'running'"),
"heartbeat must only touch running rows"
);
}
#[test]
fn missing_hook_table_error_is_detected_for_quiet_polling() {
let error = diesel::result::Error::QueryBuilderError(
std::io::Error::other("relation \"autumn_repository_commit_hooks\" does not exist")
.into(),
);
assert!(is_missing_hook_table_error(&error));
}
}