pub trait TransactionRepository<M>: Repository<M>where
M: Model,{
// Provided methods
fn with_transaction<'a, 'b, F, Fut, R, E>(
&'a self,
callback: F,
) -> impl Future<Output = Result<R, E>> + Send + 'a
where F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a,
Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send,
R: Send + 'a,
E: From<Error> + Send { ... }
fn transaction_sequential<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a,
I::IntoIter: Send + 'a,
F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a,
Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send,
R: Send + 'a,
E: From<Error> + Send + 'a { ... }
fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a,
I::IntoIter: Send + 'a,
F: FnOnce(Arc<Mutex<Transaction<'b, Database>>>) -> Fut + Send + 'a,
Fut: Future<Output = Result<R, E>> + Send + 'a,
R: Send + 'a,
E: From<Error> + Send + 'a { ... }
fn try_transaction<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a,
I::IntoIter: Send + 'a,
F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a,
Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send,
R: Send + 'a,
E: From<Error> + Send + 'a { ... }
}
Expand description
Extension trait for Repository to work with transactions
This trait adds transactions capabilities to any repository that implements
the Repository
trait. It provides several methods for executing operations
within database transactions, with different strategies for concurrency and
error handling.
The trait is automatically implemented for any type that implements Repository<M>
,
making transactions capabilities available to all repositories without additional code.
Provided Methods§
Sourcefn with_transaction<'a, 'b, F, Fut, R, E>(
&'a self,
callback: F,
) -> impl Future<Output = Result<R, E>> + Send + 'a
fn with_transaction<'a, 'b, F, Fut, R, E>( &'a self, callback: F, ) -> impl Future<Output = Result<R, E>> + Send + 'a
Executes a callback within a transactions, handling the transactions lifecycle automatically.
This method:
- Begins a transactions from the repository’s connection pool
- Passes the transactions to the callback function
- Waits for the callback to complete and return both a result and the transactions
- Commits the transactions if the result is
Ok
, or rolls it back if it’sErr
- Returns the final result
§Type Parameters
F
: The type of the callback function 1Fut
: The future type returned by the callbackR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
callback
: A function that accepts aTransaction
and returns a future
§Returns
A future that resolves to Result<R, E>
.
§Example
let result = repo.with_transaction(|mut tx| async move {
let model = Model::new();
let res = repo.save_with_executor(&mut tx, model).await;
(res, tx)
}).await;
Examples found in repository?
98 pub async fn save_with_context(
99 &self,
100 model: User,
101 context: UserContext,
102 ) -> Result<User, DbError> {
103 self.with_transaction(move |mut tx| async move {
104 let res = match context {
105 UserContext::System => self
106 .save_with_executor(&mut *tx, model)
107 .await
108 .map_err(Into::into),
109 UserContext::UnAuthenticated => Err(DbError::NotAllowed),
110 };
111
112 (res, tx)
113 })
114 .await
115 }
116
117 pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
118 self.transaction_sequential::<'a, 'b>([
119 move |mut tx: Transaction<'b, Database>| async move {
120 let res = self.save_with_executor(&mut *tx, model).await;
121
122 (res, tx)
123 },
124 ])
125 .await
126 .map_err(Into::into)
127 }
128
129 pub async fn save_with_rx_concurrent<'a, 'b>(
130 &'a self,
131 model: User,
132 ) -> Result<Vec<User>, DbError>
133 where
134 'b: 'a,
135 {
136 self.transaction_concurrent::<'a, 'b>([
137 |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
138 let mut tx = match tx.try_lock_arc() {
139 Some(tx) => tx,
140 None => return Err(Error::MutexLockError),
141 };
142
143 let res = USER_REPO.save_with_executor(&mut **tx, model).await;
144
145 ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
146
147 res
148 },
149 ])
150 .await
151 .map_err(Into::into)
152 }
153
154 pub async fn try_save_with_tx<'a, 'b>(
155 &'a self,
156 model: User,
157 ) -> Result<Vec<User>, Vec<DbError>> {
158 self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
159 let res = self.save_with_executor(&mut *tx, model).await;
160
161 (res, tx)
162 }])
163 .await
164 .map_err(|errors| errors.into_iter().map(Into::into).collect())
165 }
166}
167
168async fn action<'b>(
169 _: Transaction<'b, Database>,
170) -> (Result<User, DbError>, Transaction<'b, Database>) {
171 unimplemented!()
172}
173
174#[tokio::main]
175async fn main() {
176 install_default_drivers();
177
178 initialize_db_pool(
179 PoolOptions::new()
180 .max_connections(21)
181 .min_connections(5)
182 .idle_timeout(Duration::from_secs(60 * 10))
183 .max_lifetime(Duration::from_secs(60 * 60 * 24))
184 .acquire_timeout(Duration::from_secs(20))
185 .connect(&DATABASE_URL)
186 .await
187 .expect("Failed to connect to database"),
188 );
189
190 let user = User {
191 id: 1,
192 name: String::new(),
193 };
194
195 USER_REPO.save_ref(&user).await.unwrap();
196
197 USER_REPO.save_in_transaction(user.clone()).await.unwrap();
198
199 USER_REPO
200 .save_with_context(user.clone(), UserContext::System)
201 .await
202 .unwrap();
203
204 USER_REPO.with_transaction(action).await.unwrap();
205
206 USER_REPO
207 .delete_by_values_in_transaction("id", [1, 2, 3, 11, 22])
208 .await
209 .unwrap();
210}
Sourcefn transaction_sequential<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
fn transaction_sequential<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
Executes multiple operations sequentially in a transactions, stopping at the first error.
This method provides an optimized approach for cases where you want to stop processing as soon as any action fails, immediately rolling back the transactions.
§Type Parameters
I
: The iterator typeF
: The action function type 1Fut
: The future type returned by each actionR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
actions
: An iterator of functions that will be executed in the transactions
§Returns
A future that resolves to:
Ok(Vec<R>)
: A vector of results if all actions succeededErr(E)
: The first error encountered
§Implementation Details
- Begins a transactions from the repository’s connection pool
- Executes each action sequentially, collecting results
- If any action fails, rolls back the transactions and returns the error
- If all actions succeed, commits the transactions and returns the results
Due to complex lifetime bounds in underlying types we must take ownership and then return it back.
§Examples
§Basic
let results = repo.transaction_sequential([
|tx| async move { repo.save_with_executor(tx, model1).await },
|tx| async move { repo.save_with_executor(tx, model2).await }
]).await;
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;
struct User {
id: String,
name: String
}
impl Model for User {
type Id = String;
fn get_id(&self) -> Option<Self::Id> {
Some(self.id.to_owned())
}
}
#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
kind: Box<dyn std::error::Error + Send>
}
impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
Self {
kind: Box::new(value)
}
}
}
async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
unimplemented!()
}
USER_REPO.transaction_sequential([action, action, action]).await.unwrap();
Sourcefn transaction_concurrent<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
Executes multiple operations concurrently in a transactions.
This method allows for concurrent execution of actions within a transactions, which can significantly improve performance for I/O-bound operations. Note that this only works when the actions don’t have data dependencies.
§Type Parameters
I
: The iterator typeF
: The action function type 1 2Fut
: The future type returned by each actionR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
actions
: An iterator of functions that will be executed concurrently in the transactions
§Returns
A future that resolves to:
Ok(Vec<R>)
: A vector of results if all actions succeededErr(E)
: The first error encountered
§Implementation Details
- Begins a transactions from the repository’s connection pool
- Wraps the transactions in an
Arc<Mutex<_>>
to safely share it between concurrent operations 2 - Creates futures for all actions but doesn’t execute them yet
- Executes all futures concurrently using
try_join_all
- If all operations succeed, commits the transactions and returns the results
- If any operation fails, rolls back the transactions and returns the error
§Notes
- Uses
parking_lot::Mutex
for better performance thanstd::sync::Mutex
- Requires the transactions to be safely shared between multiple futures
§Example
§Basic
let results = repo.transaction_concurrent([
|tx_arc| async move {
let mut tx = tx_arc.lock();
repo.save_with_executor(&mut *tx, model1).await
},
|tx_arc| async move {
let mut tx = tx_arc.lock();
repo.save_with_executor(&mut *tx, model2).await
}
]).await;
§Complete
use sqlx::Transaction;
use std::sync::Arc;
use sqlx_utils::prelude::*;
struct User {
id: String,
name: String
}
impl Model for User {
type Id = String;
fn get_id(&self) -> Option<Self::Id> {
Some(self.id.to_owned())
}
}
#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
kind: Box<dyn std::error::Error + Send>
}
impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
Self {
kind: Box::new(value)
}
}
}
async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<User, Error> {
unimplemented!()
}
USER_REPO.transaction_concurrent([action, action, action]).await.unwrap();
The function signature of an action must be
async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<T, E>
Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also beSend
↩It is up to you to ensure we don’t get deadlocks, the function itself will not lock the mutex, it will however attempt to get the inner value of the
Arc
after all actions has completed where it also consumes the mutex. This makes it in theory impossible to get a deadlock in this method, however deadlocks can occur between different actions. ↩
Examples found in repository?
129 pub async fn save_with_rx_concurrent<'a, 'b>(
130 &'a self,
131 model: User,
132 ) -> Result<Vec<User>, DbError>
133 where
134 'b: 'a,
135 {
136 self.transaction_concurrent::<'a, 'b>([
137 |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
138 let mut tx = match tx.try_lock_arc() {
139 Some(tx) => tx,
140 None => return Err(Error::MutexLockError),
141 };
142
143 let res = USER_REPO.save_with_executor(&mut **tx, model).await;
144
145 ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
146
147 res
148 },
149 ])
150 .await
151 .map_err(Into::into)
152 }
Sourcefn try_transaction<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
fn try_transaction<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
Executes multiple operations and collects all results, committing only if all succeed.
This method runs all actions sequentially, collecting results (both successes and failures). The transactions is committed only if all actions succeed; otherwise, it’s rolled back.
§Type Parameters
I
: The iterator typeF
: The action function type 1Fut
: The future type returned by each actionR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
actions
: An iterator of functions that will be executed in the transactions
§Returns
A future that resolves to:
Ok(Vec<R>)
: A vector of results if all operations succeededErr(Vec<E>)
: A vector of all errors if any operation failed
§Implementation Details
- Begins a transactions from the repository’s connection pool
- Executes each action sequentially, collecting all results and errors
- If any errors occurred, rolls back the transactions and returns all errors
- If all operations succeeded, commits the transactions and returns the results
§Example
§Basic
match repo.try_transaction([
|tx| async move { repo.save_with_executor(tx, model1).await },
|tx| async move { repo.save_with_executor(tx, model2).await }
]).await {
Ok(results) => println!("All operations succeeded"),
Err(errors) => println!("Some operations failed: {:?}", errors)
}
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;
struct User {
id: String,
name: String
}
impl Model for User {
type Id = String;
fn get_id(&self) -> Option<Self::Id> {
Some(self.id.to_owned())
}
}
#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
kind: Box<dyn std::error::Error + Send>
}
impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
Self {
kind: Box::new(value)
}
}
}
async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
unimplemented!()
}
USER_REPO.try_transaction([action, action, action]).await.unwrap();
Examples found in repository?
154 pub async fn try_save_with_tx<'a, 'b>(
155 &'a self,
156 model: User,
157 ) -> Result<Vec<User>, Vec<DbError>> {
158 self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
159 let res = self.save_with_executor(&mut *tx, model).await;
160
161 (res, tx)
162 }])
163 .await
164 .map_err(|errors| errors.into_iter().map(Into::into).collect())
165 }
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.