fraiseql_webhooks/
transaction.rs1use futures::future::BoxFuture;
9use sqlx::{PgPool, Postgres, Transaction};
10
11use super::{Result, WebhookError};
12
13#[derive(Debug, Clone, Copy, Default)]
15#[non_exhaustive]
16pub enum WebhookIsolation {
17 #[default]
19 ReadCommitted,
20 RepeatableRead,
22 Serializable,
24}
25
26impl WebhookIsolation {
27 #[must_use]
29 pub fn as_sql(self) -> &'static str {
30 match self {
31 Self::ReadCommitted => "READ COMMITTED",
32 Self::RepeatableRead => "REPEATABLE READ",
33 Self::Serializable => "SERIALIZABLE",
34 }
35 }
36}
37
38pub async fn execute_in_transaction<F, T>(
51 pool: &PgPool,
52 isolation: WebhookIsolation,
53 f: F,
54) -> Result<T>
55where
56 F: for<'c> FnOnce(&'c mut Transaction<'_, Postgres>) -> BoxFuture<'c, Result<T>>,
57{
58 let mut tx = pool.begin().await.map_err(|e| WebhookError::Database(e.to_string()))?;
59
60 sqlx::query(&format!("SET TRANSACTION ISOLATION LEVEL {}", isolation.as_sql()))
65 .execute(&mut *tx)
66 .await
67 .map_err(|e| WebhookError::Database(e.to_string()))?;
68
69 let result = f(&mut tx).await;
70
71 match result {
72 Ok(value) => {
73 tx.commit().await.map_err(|e| WebhookError::Database(e.to_string()))?;
74 Ok(value)
75 },
76 Err(e) => {
77 if let Err(rb_err) = tx.rollback().await {
79 tracing::error!(rollback_error = %rb_err, "Transaction rollback failed");
80 }
81 Err(e)
82 },
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89
90 #[test]
91 fn test_isolation_level_sql() {
92 assert_eq!(WebhookIsolation::ReadCommitted.as_sql(), "READ COMMITTED");
93 assert_eq!(WebhookIsolation::RepeatableRead.as_sql(), "REPEATABLE READ");
94 assert_eq!(WebhookIsolation::Serializable.as_sql(), "SERIALIZABLE");
95 }
96
97 #[test]
98 fn test_default_isolation_level() {
99 let default = WebhookIsolation::default();
100 assert_eq!(default.as_sql(), "READ COMMITTED");
101 }
102}