Skip to main content

rustorm_core/
transaction.rs

1use crate::error::{OrmError, OrmResult};
2use sqlx::{PgPool, Postgres, Transaction};
3use std::future::Future;
4
5/// Выполняет асинхронную функцию внутри транзакции.
6/// При ошибке откатывает, при успехе — коммитит.
7///
8/// ```rust
9/// let (user, profile) = orm::transaction(&pool, |tx| async move {
10///     let user = User::create(data, tx).await?;
11///     let profile = Profile::create(NewProfile { user_id: user.id }, tx).await?;
12///     Ok((user, profile))
13/// }).await?;
14/// ```
15pub async fn transaction<F, Fut, R>(pool: &PgPool, f: F) -> OrmResult<R>
16where
17    F: FnOnce(Transaction<'static, Postgres>) -> Fut,
18    Fut: Future<Output = OrmResult<R>>,
19{
20    let tx = pool.begin().await.map_err(OrmError::from_sqlx)?;
21
22    match f(tx).await {
23        Ok(result) => Ok(result),
24        Err(err) => {
25            // Транзакция автоматически откатывается при дропе
26            Err(err)
27        }
28    }
29}
30
31/// Уровни изоляции транзакции.
32#[derive(Debug, Clone, Copy, Default)]
33pub enum IsolationLevel {
34    #[default]
35    ReadCommitted,
36    RepeatableRead,
37    Serializable,
38    ReadUncommitted,
39}
40
41impl IsolationLevel {
42    pub fn as_sql(&self) -> &'static str {
43        match self {
44            IsolationLevel::ReadCommitted => "READ COMMITTED",
45            IsolationLevel::RepeatableRead => "REPEATABLE READ",
46            IsolationLevel::Serializable => "SERIALIZABLE",
47            IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
48        }
49    }
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct TransactionOptions {
54    pub isolation: IsolationLevel,
55    pub read_only: bool,
56}
57
58/// Транзакция с явными опциями.
59pub async fn transaction_with<F, Fut, R>(
60    pool: &PgPool,
61    opts: TransactionOptions,
62    f: F,
63) -> OrmResult<R>
64where
65    F: FnOnce(Transaction<'static, Postgres>) -> Fut,
66    Fut: Future<Output = OrmResult<R>>,
67{
68    let mut tx = pool.begin().await.map_err(OrmError::from_sqlx)?;
69
70    // Установить уровень изоляции
71    let iso_sql = format!(
72        "SET TRANSACTION ISOLATION LEVEL {}",
73        opts.isolation.as_sql()
74    );
75    sqlx::query(&iso_sql)
76        .execute(&mut *tx)
77        .await
78        .map_err(OrmError::from_sqlx)?;
79
80    if opts.read_only {
81        sqlx::query("SET TRANSACTION READ ONLY")
82            .execute(&mut *tx)
83            .await
84            .map_err(OrmError::from_sqlx)?;
85    }
86
87    match f(tx).await {
88        Ok(result) => Ok(result),
89        Err(err) => Err(err),
90    }
91}