Trait TransactionRepository

Source
pub trait TransactionRepository<M>: Repository<M>
where M: Model,
{ // Provided methods fn with_transaction<'a, 'b, F, Fut, R, E>( &'a self, callback: F, ) -> impl Future<Output = Result<R, E>> + Send + 'a where F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send { ... } fn transaction_sequential<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a { ... } fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Arc<Mutex<Transaction<'b, Database>>>) -> Fut + Send + 'a, Fut: Future<Output = Result<R, E>> + Send + 'a, R: Send + 'a, E: From<Error> + Send + 'a { ... } fn try_transaction<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a { ... } }
Expand description

Extension trait for Repository to work with transactions

This trait adds transactions capabilities to any repository that implements the Repository trait. It provides several methods for executing operations within database transactions, with different strategies for concurrency and error handling.

The trait is automatically implemented for any type that implements Repository<M>, making transactions capabilities available to all repositories without additional code.

Provided Methods§

Source

fn with_transaction<'a, 'b, F, Fut, R, E>( &'a self, callback: F, ) -> impl Future<Output = Result<R, E>> + Send + 'a
where F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send,

Executes a callback within a transactions, handling the transactions lifecycle automatically.

This method:

  1. Begins a transactions from the repository’s connection pool
  2. Passes the transactions to the callback function
  3. Waits for the callback to complete and return both a result and the transactions
  4. Commits the transactions if the result is Ok, or rolls it back if it’s Err
  5. Returns the final result
§Type Parameters
  • F: The type of the callback function 1
  • Fut: The future type returned by the callback
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • callback: A function that accepts a Transaction and returns a future
§Returns

A future that resolves to Result<R, E>.

§Example
let result = repo.with_transaction(|mut tx| async move {
    let model = Model::new();
    let res = repo.save_with_executor(&mut tx, model).await;
    (res, tx)
}).await;

  1. The function signature of an action must be async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<T, E>, Transaction<'b, Database>) Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

Examples found in repository?
examples/basic_repo.rs (lines 103-113)
98    pub async fn save_with_context(
99        &self,
100        model: User,
101        context: UserContext,
102    ) -> Result<User, DbError> {
103        self.with_transaction(move |mut tx| async move {
104            let res = match context {
105                UserContext::System => self
106                    .save_with_executor(&mut *tx, model)
107                    .await
108                    .map_err(Into::into),
109                UserContext::UnAuthenticated => Err(DbError::NotAllowed),
110            };
111
112            (res, tx)
113        })
114        .await
115    }
116
117    pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
118        self.transaction_sequential::<'a, 'b>([
119            move |mut tx: Transaction<'b, Database>| async move {
120                let res = self.save_with_executor(&mut *tx, model).await;
121
122                (res, tx)
123            },
124        ])
125        .await
126        .map_err(Into::into)
127    }
128
129    pub async fn save_with_rx_concurrent<'a, 'b>(
130        &'a self,
131        model: User,
132    ) -> Result<Vec<User>, DbError>
133    where
134        'b: 'a,
135    {
136        self.transaction_concurrent::<'a, 'b>([
137            |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
138                let mut tx = match tx.try_lock_arc() {
139                    Some(tx) => tx,
140                    None => return Err(Error::MutexLockError),
141                };
142
143                let res = USER_REPO.save_with_executor(&mut **tx, model).await;
144
145                ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
146
147                res
148            },
149        ])
150        .await
151        .map_err(Into::into)
152    }
153
154    pub async fn try_save_with_tx<'a, 'b>(
155        &'a self,
156        model: User,
157    ) -> Result<Vec<User>, Vec<DbError>> {
158        self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
159            let res = self.save_with_executor(&mut *tx, model).await;
160
161            (res, tx)
162        }])
163        .await
164        .map_err(|errors| errors.into_iter().map(Into::into).collect())
165    }
166}
167
168async fn action<'b>(
169    _: Transaction<'b, Database>,
170) -> (Result<User, DbError>, Transaction<'b, Database>) {
171    unimplemented!()
172}
173
174#[tokio::main]
175async fn main() {
176    install_default_drivers();
177
178    initialize_db_pool(
179        PoolOptions::new()
180            .max_connections(21)
181            .min_connections(5)
182            .idle_timeout(Duration::from_secs(60 * 10))
183            .max_lifetime(Duration::from_secs(60 * 60 * 24))
184            .acquire_timeout(Duration::from_secs(20))
185            .connect(&DATABASE_URL)
186            .await
187            .expect("Failed to connect to database"),
188    );
189
190    let user = User {
191        id: 1,
192        name: String::new(),
193    };
194
195    USER_REPO.save_ref(&user).await.unwrap();
196
197    USER_REPO.save_in_transaction(user.clone()).await.unwrap();
198
199    USER_REPO
200        .save_with_context(user.clone(), UserContext::System)
201        .await
202        .unwrap();
203
204    USER_REPO.with_transaction(action).await.unwrap();
205
206    USER_REPO
207        .delete_by_values_in_transaction("id", [1, 2, 3, 11, 22])
208        .await
209        .unwrap();
210}
Source

fn transaction_sequential<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a,

Executes multiple operations sequentially in a transactions, stopping at the first error.

This method provides an optimized approach for cases where you want to stop processing as soon as any action fails, immediately rolling back the transactions.

§Type Parameters
  • I: The iterator type
  • F: The action function type 1
  • Fut: The future type returned by each action
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • actions: An iterator of functions that will be executed in the transactions
§Returns

A future that resolves to:

  • Ok(Vec<R>): A vector of results if all actions succeeded
  • Err(E): The first error encountered
§Implementation Details
  1. Begins a transactions from the repository’s connection pool
  2. Executes each action sequentially, collecting results
  3. If any action fails, rolls back the transactions and returns the error
  4. If all actions succeed, commits the transactions and returns the results

Due to complex lifetime bounds in underlying types we must take ownership and then return it back.

§Examples
§Basic
let results = repo.transaction_sequential([
    |tx| async move { repo.save_with_executor(tx, model1).await },
    |tx| async move { repo.save_with_executor(tx, model2).await }
]).await;
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;

struct User {
    id: String,
    name: String
}

impl Model for User {
    type Id = String;

    fn get_id(&self) -> Option<Self::Id> {
        Some(self.id.to_owned())
    }
}

#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
    kind: Box<dyn std::error::Error + Send>
}

impl From<sqlx::Error> for Error {
    fn from(value: sqlx::Error) -> Self {
        Self {
            kind: Box::new(value)
        }
    }
}

async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
     unimplemented!()
 }

USER_REPO.transaction_sequential([action, action, action]).await.unwrap();

  1. The function signature of an action must be async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<T, E>, Transaction<'b, Database>) Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

Examples found in repository?
examples/basic_repo.rs (lines 118-124)
117    pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
118        self.transaction_sequential::<'a, 'b>([
119            move |mut tx: Transaction<'b, Database>| async move {
120                let res = self.save_with_executor(&mut *tx, model).await;
121
122                (res, tx)
123            },
124        ])
125        .await
126        .map_err(Into::into)
127    }
Source

fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Arc<Mutex<Transaction<'b, Database>>>) -> Fut + Send + 'a, Fut: Future<Output = Result<R, E>> + Send + 'a, R: Send + 'a, E: From<Error> + Send + 'a,

Executes multiple operations concurrently in a transactions.

This method allows for concurrent execution of actions within a transactions, which can significantly improve performance for I/O-bound operations. Note that this only works when the actions don’t have data dependencies.

§Type Parameters
  • I: The iterator type
  • F: The action function type 1 2
  • Fut: The future type returned by each action
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • actions: An iterator of functions that will be executed concurrently in the transactions
§Returns

A future that resolves to:

  • Ok(Vec<R>): A vector of results if all actions succeeded
  • Err(E): The first error encountered
§Implementation Details
  1. Begins a transactions from the repository’s connection pool
  2. Wraps the transactions in an Arc<Mutex<_>> to safely share it between concurrent operations 2
  3. Creates futures for all actions but doesn’t execute them yet
  4. Executes all futures concurrently using try_join_all
  5. If all operations succeed, commits the transactions and returns the results
  6. If any operation fails, rolls back the transactions and returns the error
§Notes
  • Uses parking_lot::Mutex for better performance than std::sync::Mutex
  • Requires the transactions to be safely shared between multiple futures
§Example
§Basic
let results = repo.transaction_concurrent([
    |tx_arc| async move {
        let mut tx = tx_arc.lock();
        repo.save_with_executor(&mut *tx, model1).await
    },
    |tx_arc| async move {
        let mut tx = tx_arc.lock();
        repo.save_with_executor(&mut *tx, model2).await
    }
]).await;
§Complete
use sqlx::Transaction;
use std::sync::Arc;
use sqlx_utils::prelude::*;

struct User {
    id: String,
    name: String
}

impl Model for User {
    type Id = String;

    fn get_id(&self) -> Option<Self::Id> {
        Some(self.id.to_owned())
    }
}

#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
    kind: Box<dyn std::error::Error + Send>
}

impl From<sqlx::Error> for Error {
    fn from(value: sqlx::Error) -> Self {
        Self {
            kind: Box::new(value)
        }
    }
}

async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<User, Error> {
    unimplemented!()
 }

USER_REPO.transaction_concurrent([action, action, action]).await.unwrap();

  1. The function signature of an action must be async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<T, E> Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

  2. It is up to you to ensure we don’t get deadlocks, the function itself will not lock the mutex, it will however attempt to get the inner value of the Arc after all actions has completed where it also consumes the mutex. This makes it in theory impossible to get a deadlock in this method, however deadlocks can occur between different actions. 

Examples found in repository?
examples/basic_repo.rs (lines 136-149)
129    pub async fn save_with_rx_concurrent<'a, 'b>(
130        &'a self,
131        model: User,
132    ) -> Result<Vec<User>, DbError>
133    where
134        'b: 'a,
135    {
136        self.transaction_concurrent::<'a, 'b>([
137            |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
138                let mut tx = match tx.try_lock_arc() {
139                    Some(tx) => tx,
140                    None => return Err(Error::MutexLockError),
141                };
142
143                let res = USER_REPO.save_with_executor(&mut **tx, model).await;
144
145                ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
146
147                res
148            },
149        ])
150        .await
151        .map_err(Into::into)
152    }
Source

fn try_transaction<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a, I::IntoIter: Send + 'a, F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a, Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send, R: Send + 'a, E: From<Error> + Send + 'a,

Executes multiple operations and collects all results, committing only if all succeed.

This method runs all actions sequentially, collecting results (both successes and failures). The transactions is committed only if all actions succeed; otherwise, it’s rolled back.

§Type Parameters
  • I: The iterator type
  • F: The action function type 1
  • Fut: The future type returned by each action
  • R: The result type
  • E: The error type, which must be convertible from Error
§Parameters
  • actions: An iterator of functions that will be executed in the transactions
§Returns

A future that resolves to:

  • Ok(Vec<R>): A vector of results if all operations succeeded
  • Err(Vec<E>): A vector of all errors if any operation failed
§Implementation Details
  1. Begins a transactions from the repository’s connection pool
  2. Executes each action sequentially, collecting all results and errors
  3. If any errors occurred, rolls back the transactions and returns all errors
  4. If all operations succeeded, commits the transactions and returns the results
§Example
§Basic
match repo.try_transaction([
    |tx| async move { repo.save_with_executor(tx, model1).await },
    |tx| async move { repo.save_with_executor(tx, model2).await }
]).await {
    Ok(results) => println!("All operations succeeded"),
    Err(errors) => println!("Some operations failed: {:?}", errors)
}
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;

struct User {
    id: String,
    name: String
}

impl Model for User {
    type Id = String;

    fn get_id(&self) -> Option<Self::Id> {
        Some(self.id.to_owned())
    }
}

#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
    kind: Box<dyn std::error::Error + Send>
}

impl From<sqlx::Error> for Error {
    fn from(value: sqlx::Error) -> Self {
        Self {
            kind: Box::new(value)
        }
    }
}

async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
     unimplemented!()
 }

USER_REPO.try_transaction([action, action, action]).await.unwrap();

  1. The function signature of an action must be async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<T, E>, Transaction<'b, Database>) Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also be Send 

Examples found in repository?
examples/basic_repo.rs (lines 158-162)
154    pub async fn try_save_with_tx<'a, 'b>(
155        &'a self,
156        model: User,
157    ) -> Result<Vec<User>, Vec<DbError>> {
158        self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
159            let res = self.save_with_executor(&mut *tx, model).await;
160
161            (res, tx)
162        }])
163        .await
164        .map_err(|errors| errors.into_iter().map(Into::into).collect())
165    }

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<T, M> TransactionRepository<M> for T
where T: Repository<M>, M: Model,