Skip to main content

fraiseql_webhooks/
transaction.rs

1//! Transaction boundary management for webhook processing.
2//!
3//! This module ensures correct transaction isolation for webhook handlers:
4//! 1. Signature verification: No transaction (fail fast)
5//! 2. Idempotency check: Read-only transaction
6//! 3. Event processing: Single transaction with idempotency record + handler
7
8use futures::future::BoxFuture;
9use sqlx::{PgPool, Postgres, Transaction};
10
11use super::{Result, WebhookError};
12
13/// Transaction isolation levels for webhook processing
14#[derive(Debug, Clone, Copy, Default)]
15#[non_exhaustive]
16pub enum WebhookIsolation {
17    /// Read Committed - default, good for most cases
18    #[default]
19    ReadCommitted,
20    /// Repeatable Read - for handlers that read-then-write
21    RepeatableRead,
22    /// Serializable - for handlers with complex consistency requirements
23    Serializable,
24}
25
26impl WebhookIsolation {
27    /// Convert to SQL isolation level string
28    #[must_use]
29    pub fn as_sql(self) -> &'static str {
30        match self {
31            Self::ReadCommitted => "READ COMMITTED",
32            Self::RepeatableRead => "REPEATABLE READ",
33            Self::Serializable => "SERIALIZABLE",
34        }
35    }
36}
37
38/// Execute webhook handler within a transaction with specified isolation level.
39///
40/// # Critical
41///
42/// The idempotency record and event handler MUST be in the same transaction:
43/// - If handler succeeds but idempotency update fails → duplicate processing on retry
44/// - If idempotency records but handler fails → event marked processed but not handled
45///
46/// # Errors
47///
48/// Returns `WebhookError::Database` if transaction fails to start, commit, or during handler
49/// execution.
50pub async fn execute_in_transaction<F, T>(
51    pool: &PgPool,
52    isolation: WebhookIsolation,
53    f: F,
54) -> Result<T>
55where
56    F: for<'c> FnOnce(&'c mut Transaction<'_, Postgres>) -> BoxFuture<'c, Result<T>>,
57{
58    let mut tx = pool.begin().await.map_err(|e| WebhookError::Database(e.to_string()))?;
59
60    // Set isolation level
61    // Safety: `isolation.as_sql()` is a match expression that returns only one of three
62    // hardcoded `&'static str` literals ("READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE").
63    // No user input reaches this call; PostgreSQL does not accept parameters for SET commands.
64    sqlx::query(&format!("SET TRANSACTION ISOLATION LEVEL {}", isolation.as_sql()))
65        .execute(&mut *tx)
66        .await
67        .map_err(|e| WebhookError::Database(e.to_string()))?;
68
69    let result = f(&mut tx).await;
70
71    match result {
72        Ok(value) => {
73            tx.commit().await.map_err(|e| WebhookError::Database(e.to_string()))?;
74            Ok(value)
75        },
76        Err(e) => {
77            // Explicit rollback (also happens on drop, but be explicit)
78            if let Err(rb_err) = tx.rollback().await {
79                tracing::error!(rollback_error = %rb_err, "Transaction rollback failed");
80            }
81            Err(e)
82        },
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89
90    #[test]
91    fn test_isolation_level_sql() {
92        assert_eq!(WebhookIsolation::ReadCommitted.as_sql(), "READ COMMITTED");
93        assert_eq!(WebhookIsolation::RepeatableRead.as_sql(), "REPEATABLE READ");
94        assert_eq!(WebhookIsolation::Serializable.as_sql(), "SERIALIZABLE");
95    }
96
97    #[test]
98    fn test_default_isolation_level() {
99        let default = WebhookIsolation::default();
100        assert_eq!(default.as_sql(), "READ COMMITTED");
101    }
102}