fraiseql_server/webhooks/
transaction.rs1use futures::future::BoxFuture;
9use sqlx::{PgPool, Postgres, Transaction};
10
11use super::{Result, WebhookError};
12
13#[derive(Debug, Clone, Copy, Default)]
15pub enum WebhookIsolation {
16 #[default]
18 ReadCommitted,
19 RepeatableRead,
21 Serializable,
23}
24
25impl WebhookIsolation {
26 #[must_use]
28 pub fn as_sql(self) -> &'static str {
29 match self {
30 Self::ReadCommitted => "READ COMMITTED",
31 Self::RepeatableRead => "REPEATABLE READ",
32 Self::Serializable => "SERIALIZABLE",
33 }
34 }
35}
36
37pub async fn execute_in_transaction<F, T>(
50 pool: &PgPool,
51 isolation: WebhookIsolation,
52 f: F,
53) -> Result<T>
54where
55 F: for<'c> FnOnce(&'c mut Transaction<'_, Postgres>) -> BoxFuture<'c, Result<T>>,
56{
57 let mut tx = pool.begin().await.map_err(|e| WebhookError::Database(e.to_string()))?;
58
59 sqlx::query(&format!("SET TRANSACTION ISOLATION LEVEL {}", isolation.as_sql()))
61 .execute(&mut *tx)
62 .await
63 .map_err(|e| WebhookError::Database(e.to_string()))?;
64
65 let result = f(&mut tx).await;
66
67 match result {
68 Ok(value) => {
69 tx.commit().await.map_err(|e| WebhookError::Database(e.to_string()))?;
70 Ok(value)
71 },
72 Err(e) => {
73 let _ = tx.rollback().await;
75 Err(e)
76 },
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83
84 #[test]
85 fn test_isolation_level_sql() {
86 assert_eq!(WebhookIsolation::ReadCommitted.as_sql(), "READ COMMITTED");
87 assert_eq!(WebhookIsolation::RepeatableRead.as_sql(), "REPEATABLE READ");
88 assert_eq!(WebhookIsolation::Serializable.as_sql(), "SERIALIZABLE");
89 }
90
91 #[test]
92 fn test_default_isolation_level() {
93 let default = WebhookIsolation::default();
94 assert_eq!(default.as_sql(), "READ COMMITTED");
95 }
96}