Skip to main content

fraiseql_server/webhooks/
transaction.rs

1//! Transaction boundary management for webhook processing.
2//!
3//! This module ensures correct transaction isolation for webhook handlers:
4//! 1. Signature verification: No transaction (fail fast)
5//! 2. Idempotency check: Read-only transaction
6//! 3. Event processing: Single transaction with idempotency record + handler
7
8use futures::future::BoxFuture;
9use sqlx::{PgPool, Postgres, Transaction};
10
11use super::{Result, WebhookError};
12
13/// Transaction isolation levels for webhook processing
14#[derive(Debug, Clone, Copy, Default)]
15pub enum WebhookIsolation {
16    /// Read Committed - default, good for most cases
17    #[default]
18    ReadCommitted,
19    /// Repeatable Read - for handlers that read-then-write
20    RepeatableRead,
21    /// Serializable - for handlers with complex consistency requirements
22    Serializable,
23}
24
25impl WebhookIsolation {
26    /// Convert to SQL isolation level string
27    #[must_use]
28    pub fn as_sql(self) -> &'static str {
29        match self {
30            Self::ReadCommitted => "READ COMMITTED",
31            Self::RepeatableRead => "REPEATABLE READ",
32            Self::Serializable => "SERIALIZABLE",
33        }
34    }
35}
36
37/// Execute webhook handler within a transaction with specified isolation level.
38///
39/// # Critical
40///
41/// The idempotency record and event handler MUST be in the same transaction:
42/// - If handler succeeds but idempotency update fails → duplicate processing on retry
43/// - If idempotency records but handler fails → event marked processed but not handled
44///
45/// # Errors
46///
47/// Returns `WebhookError::Database` if transaction fails to start, commit, or during handler
48/// execution.
49pub async fn execute_in_transaction<F, T>(
50    pool: &PgPool,
51    isolation: WebhookIsolation,
52    f: F,
53) -> Result<T>
54where
55    F: for<'c> FnOnce(&'c mut Transaction<'_, Postgres>) -> BoxFuture<'c, Result<T>>,
56{
57    let mut tx = pool.begin().await.map_err(|e| WebhookError::Database(e.to_string()))?;
58
59    // Set isolation level
60    sqlx::query(&format!("SET TRANSACTION ISOLATION LEVEL {}", isolation.as_sql()))
61        .execute(&mut *tx)
62        .await
63        .map_err(|e| WebhookError::Database(e.to_string()))?;
64
65    let result = f(&mut tx).await;
66
67    match result {
68        Ok(value) => {
69            tx.commit().await.map_err(|e| WebhookError::Database(e.to_string()))?;
70            Ok(value)
71        },
72        Err(e) => {
73            // Explicit rollback (also happens on drop, but be explicit)
74            let _ = tx.rollback().await;
75            Err(e)
76        },
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83
84    #[test]
85    fn test_isolation_level_sql() {
86        assert_eq!(WebhookIsolation::ReadCommitted.as_sql(), "READ COMMITTED");
87        assert_eq!(WebhookIsolation::RepeatableRead.as_sql(), "REPEATABLE READ");
88        assert_eq!(WebhookIsolation::Serializable.as_sql(), "SERIALIZABLE");
89    }
90
91    #[test]
92    fn test_default_isolation_level() {
93        let default = WebhookIsolation::default();
94        assert_eq!(default.as_sql(), "READ COMMITTED");
95    }
96}