Trait TransactionRepository

Source
pub trait TransactionRepository<M>: Repository<M>
where M: Model,
{ // Provided methods fn with_transaction<'a, 'b, F, Fut, R, E>( &'a self, callback: F, ) -> impl Future<Output = Result<R, E>> + Send + 'a where F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send { ... } fn transaction_sequential<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a { ... } fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Arc<Mutex<Transaction<'b, Database>>>) -> Fut + Send + 'a, Fut: Future<Output = Result<R, E>> + Send + 'a, R: Send + 'a, E: From<Error> + Send + 'a { ... } fn try_transaction<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a { ... } }
Expand description

Extension trait for Repository to work with transactions

This trait adds transactions capabilities to any repository that implements the Repository trait. It provides several methods for executing operations within database transactions, with different strategies for concurrency and error handling.

The trait is automatically implemented for any type that implements Repository<M>, making transactions capabilities available to all repositories without additional code.

Provided Methods§

Source

fn with_transaction<'a, 'b, F, Fut, R, E>( &'a self, callback: F, ) -> impl Future<Output = Result<R, E>> + Send + 'a
where F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send,

Executes a callback within a transactions, handling the transactions lifecycle automatically.

This method:

  1. Begins a transactions from the repository’s connection pool
  2. Passes the transactions to the callback function
  3. Waits for the callback to complete and return both a result and the transactions
  4. Commits the transactions if the result is Ok, or rolls it back if it’s Err
  5. Returns the final result
§Type Parameters
  • F: The type of the callback function 1
  • Fut: The future type returned by the callback
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • callback: A function that accepts a Transaction and returns a future
§Returns

A future that resolves to Result<R, E>.

§Example
let result = repo.with_transaction(|mut tx| async move {
    let model = Model::new();
    let res = repo.save_with_executor(&mut tx, model).await;
    (res, tx)
}).await;

  1. The function signature of an action must be async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<T, E>, Transaction<'b, Database>) Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

Examples found in repository?
examples/basic_repo.rs (lines 106-116)
101    pub async fn save_with_context(
102        &self,
103        model: User,
104        context: UserContext,
105    ) -> Result<User, DbError> {
106        self.with_transaction(move |mut tx| async move {
107            let res = match context {
108                UserContext::System => self
109                    .save_with_executor(&mut *tx, model)
110                    .await
111                    .map_err(Into::into),
112                UserContext::UnAuthenticated => Err(DbError::NotAllowed),
113            };
114
115            (res, tx)
116        })
117        .await
118    }
119
120    pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
121        self.transaction_sequential::<'a, 'b>([
122            move |mut tx: Transaction<'b, Database>| async move {
123                let res = self.save_with_executor(&mut *tx, model).await;
124
125                (res, tx)
126            },
127        ])
128        .await
129        .map_err(Into::into)
130    }
131
132    pub async fn save_with_rx_concurrent<'a, 'b>(
133        &'a self,
134        model: User,
135    ) -> Result<Vec<User>, DbError>
136    where
137        'b: 'a,
138    {
139        self.transaction_concurrent::<'a, 'b>([
140            |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
141                let mut tx = match tx.try_lock_arc() {
142                    Some(tx) => tx,
143                    None => return Err(Error::MutexLockError),
144                };
145
146                let res = USER_REPO.save_with_executor(&mut **tx, model).await;
147
148                ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
149
150                res
151            },
152        ])
153        .await
154        .map_err(Into::into)
155    }
156
157    pub async fn try_save_with_tx<'a, 'b>(
158        &'a self,
159        model: User,
160    ) -> Result<Vec<User>, Vec<DbError>> {
161        self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
162            let res = self.save_with_executor(&mut *tx, model).await;
163
164            (res, tx)
165        }])
166        .await
167        .map_err(|errors| errors.into_iter().map(Into::into).collect())
168    }
169}
170
171async fn action<'b>(
172    _: Transaction<'b, Database>,
173) -> (Result<User, DbError>, Transaction<'b, Database>) {
174    unimplemented!()
175}
176
177#[tokio::main]
178async fn main() {
179    install_default_drivers();
180
181    initialize_db_pool(
182        PoolOptions::new()
183            .max_connections(21)
184            .min_connections(5)
185            .idle_timeout(Duration::from_secs(60 * 10))
186            .max_lifetime(Duration::from_secs(60 * 60 * 24))
187            .acquire_timeout(Duration::from_secs(20))
188            .connect(&DATABASE_URL)
189            .await
190            .expect("Failed to connect to database"),
191    );
192
193    let user = User {
194        id: 1,
195        name: String::new(),
196    };
197
198    USER_REPO.save_ref(&user).await.unwrap();
199
200    USER_REPO.save_in_transaction(user.clone()).await.unwrap();
201
202    USER_REPO
203        .save_with_context(user.clone(), UserContext::System)
204        .await
205        .unwrap();
206
207    USER_REPO.with_transaction(action).await.unwrap();
208
209    USER_REPO
210        .delete_by_values_in_transaction("id", [1, 2, 3, 11, 22])
211        .await
212        .unwrap();
213
214    USER_REPO
215        .get_one_by_filter(UserFilter::new("name").id(1))
216        .await
217        .unwrap();
218}
Source

fn transaction_sequential<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a,

Executes multiple operations sequentially in a transactions, stopping at the first error.

This method provides an optimized approach for cases where you want to stop processing as soon as any action fails, immediately rolling back the transactions.

§Type Parameters
  • I: The iterator type
  • F: The action function type 1
  • Fut: The future type returned by each action
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • actions: An iterator of functions that will be executed in the transactions
§Returns

A future that resolves to:

  • Ok(Vec<R>): A vector of results if all actions succeeded
  • Err(E): The first error encountered
§Implementation Details
  1. Begins a transactions from the repository’s connection pool
  2. Executes each action sequentially, collecting results
  3. If any action fails, rolls back the transactions and returns the error
  4. If all actions succeed, commits the transactions and returns the results

Due to complex lifetime bounds in underlying types we must take ownership and then return it back.

§Examples
§Basic
let results = repo.transaction_sequential([
    |tx| async move { repo.save_with_executor(tx, model1).await },
    |tx| async move { repo.save_with_executor(tx, model2).await }
]).await;
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;

struct User {
    id: String,
    name: String
}

impl Model for User {
    type Id = String;

    fn get_id(&self) -> Option<Self::Id> {
        Some(self.id.to_owned())
    }
}

#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
    kind: Box<dyn std::error::Error + Send>
}

impl From<sqlx::Error> for Error {
    fn from(value: sqlx::Error) -> Self {
        Self {
            kind: Box::new(value)
        }
    }
}

async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
     unimplemented!()
 }

USER_REPO.transaction_sequential([action, action, action]).await.unwrap();

  1. The function signature of an action must be async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<T, E>, Transaction<'b, Database>) Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

Examples found in repository?
examples/basic_repo.rs (lines 121-127)
120    pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
121        self.transaction_sequential::<'a, 'b>([
122            move |mut tx: Transaction<'b, Database>| async move {
123                let res = self.save_with_executor(&mut *tx, model).await;
124
125                (res, tx)
126            },
127        ])
128        .await
129        .map_err(Into::into)
130    }
Source

fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Arc<Mutex<Transaction<'b, Database>>>) -> Fut + Send + 'a, Fut: Future<Output = Result<R, E>> + Send + 'a, R: Send + 'a, E: From<Error> + Send + 'a,

Executes multiple operations concurrently in a transactions.

This method allows for concurrent execution of actions within a transactions, which can significantly improve performance for I/O-bound operations. Note that this only works when the actions don’t have data dependencies.

§Type Parameters
  • I: The iterator type
  • F: The action function type 1 2
  • Fut: The future type returned by each action
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • actions: An iterator of functions that will be executed concurrently in the transactions
§Returns

A future that resolves to:

  • Ok(Vec<R>): A vector of results if all actions succeeded
  • Err(E): The first error encountered
§Implementation Details
  1. Begins a transactions from the repository’s connection pool
  2. Wraps the transactions in an Arc<Mutex<_>> to safely share it between concurrent operations 2
  3. Creates futures for all actions but doesn’t execute them yet
  4. Executes all futures concurrently using try_join_all
  5. If all operations succeed, commits the transactions and returns the results
  6. If any operation fails, rolls back the transactions and returns the error
§Notes
  • Uses parking_lot::Mutex for better performance than std::sync::Mutex
  • Requires the transactions to be safely shared between multiple futures
§Example
§Basic
let results = repo.transaction_concurrent([
    |tx_arc| async move {
        let mut tx = tx_arc.lock();
        repo.save_with_executor(&mut *tx, model1).await
    },
    |tx_arc| async move {
        let mut tx = tx_arc.lock();
        repo.save_with_executor(&mut *tx, model2).await
    }
]).await;
§Complete
use sqlx::Transaction;
use std::sync::Arc;
use sqlx_utils::prelude::*;

struct User {
    id: String,
    name: String
}

impl Model for User {
    type Id = String;

    fn get_id(&self) -> Option<Self::Id> {
        Some(self.id.to_owned())
    }
}

#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
    kind: Box<dyn std::error::Error + Send>
}

impl From<sqlx::Error> for Error {
    fn from(value: sqlx::Error) -> Self {
        Self {
            kind: Box::new(value)
        }
    }
}

async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<User, Error> {
    unimplemented!()
 }

USER_REPO.transaction_concurrent([action, action, action]).await.unwrap();

  1. The function signature of an action must be async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<T, E> Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

  2. It is up to you to ensure we don’t get deadlocks, the function itself will not lock the mutex, it will however attempt to get the inner value of the Arc after all actions has completed where it also consumes the mutex. This makes it in theory impossible to get a deadlock in this method, however deadlocks can occur between different actions. 

Examples found in repository?
examples/basic_repo.rs (lines 139-152)
132    pub async fn save_with_rx_concurrent<'a, 'b>(
133        &'a self,
134        model: User,
135    ) -> Result<Vec<User>, DbError>
136    where
137        'b: 'a,
138    {
139        self.transaction_concurrent::<'a, 'b>([
140            |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
141                let mut tx = match tx.try_lock_arc() {
142                    Some(tx) => tx,
143                    None => return Err(Error::MutexLockError),
144                };
145
146                let res = USER_REPO.save_with_executor(&mut **tx, model).await;
147
148                ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
149
150                res
151            },
152        ])
153        .await
154        .map_err(Into::into)
155    }
Source

fn try_transaction<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a,

Executes multiple operations and collects all results, committing only if all succeed.

This method runs all actions sequentially, collecting results (both successes and failures). The transactions is committed only if all actions succeed; otherwise, it’s rolled back.

§Type Parameters
  • I: The iterator type
  • F: The action function type 1
  • Fut: The future type returned by each action
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • actions: An iterator of functions that will be executed in the transactions
§Returns

A future that resolves to:

  • Ok(Vec<R>): A vector of results if all operations succeeded
  • Err(Vec<E>): A vector of all errors if any operation failed
§Implementation Details
  1. Begins a transactions from the repository’s connection pool
  2. Executes each action sequentially, collecting all results and errors
  3. If any errors occurred, rolls back the transactions and returns all errors
  4. If all operations succeeded, commits the transactions and returns the results
§Example
§Basic
match repo.try_transaction([
    |tx| async move { repo.save_with_executor(tx, model1).await },
    |tx| async move { repo.save_with_executor(tx, model2).await }
]).await {
    Ok(results) => println!("All operations succeeded"),
    Err(errors) => println!("Some operations failed: {:?}", errors)
}
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;

struct User {
    id: String,
    name: String
}

impl Model for User {
    type Id = String;

    fn get_id(&self) -> Option<Self::Id> {
        Some(self.id.to_owned())
    }
}

#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
    kind: Box<dyn std::error::Error + Send>
}

impl From<sqlx::Error> for Error {
    fn from(value: sqlx::Error) -> Self {
        Self {
            kind: Box::new(value)
        }
    }
}

async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
     unimplemented!()
 }

USER_REPO.try_transaction([action, action, action]).await.unwrap();

  1. The function signature of an action must be async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<T, E>, Transaction<'b, Database>) Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

Examples found in repository?
examples/basic_repo.rs (lines 161-165)
157    pub async fn try_save_with_tx<'a, 'b>(
158        &'a self,
159        model: User,
160    ) -> Result<Vec<User>, Vec<DbError>> {
161        self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
162            let res = self.save_with_executor(&mut *tx, model).await;
163
164            (res, tx)
165        }])
166        .await
167        .map_err(|errors| errors.into_iter().map(Into::into).collect())
168    }

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<T, M> TransactionRepository<M> for T
where T: Repository<M>, M: Model,