pub trait TransactionRepository<M>: Repository<M>where
M: Model,{
// Provided methods
fn with_transaction<'a, 'b, F, Fut, R, E>(
&'a self,
callback: F,
) -> impl Future<Output = Result<R, E>> + Send + 'a
where F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a,
Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send,
R: Send + 'a,
E: From<Error> + Send { ... }
fn transaction_sequential<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a,
I::IntoIter: Send + 'a,
F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a,
Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send,
R: Send + 'a,
E: From<Error> + Send + 'a { ... }
fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a,
I::IntoIter: Send + 'a,
F: FnOnce(Arc<Mutex<Transaction<'b, Database>>>) -> Fut + Send + 'a,
Fut: Future<Output = Result<R, E>> + Send + 'a,
R: Send + 'a,
E: From<Error> + Send + 'a { ... }
fn try_transaction<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
where I: IntoIterator<Item = F> + Send + 'a,
I::IntoIter: Send + 'a,
F: FnOnce(Transaction<'b, Database>) -> Fut + Send + 'a,
Fut: Future<Output = (Result<R, E>, Transaction<'b, Database>)> + Send,
R: Send + 'a,
E: From<Error> + Send + 'a { ... }
}
Expand description
Extension trait for Repository to work with transactions
This trait adds transactions capabilities to any repository that implements
the Repository
trait. It provides several methods for executing operations
within database transactions, with different strategies for concurrency and
error handling.
The trait is automatically implemented for any type that implements Repository<M>
,
making transactions capabilities available to all repositories without additional code.
Provided Methods§
Sourcefn with_transaction<'a, 'b, F, Fut, R, E>(
&'a self,
callback: F,
) -> impl Future<Output = Result<R, E>> + Send + 'a
fn with_transaction<'a, 'b, F, Fut, R, E>( &'a self, callback: F, ) -> impl Future<Output = Result<R, E>> + Send + 'a
Executes a callback within a transactions, handling the transactions lifecycle automatically.
This method:
- Begins a transactions from the repository’s connection pool
- Passes the transactions to the callback function
- Waits for the callback to complete and return both a result and the transactions
- Commits the transactions if the result is
Ok
, or rolls it back if it’sErr
- Returns the final result
§Type Parameters
F
: The type of the callback function 1Fut
: The future type returned by the callbackR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
callback
: A function that accepts aTransaction
and returns a future
§Returns
A future that resolves to Result<R, E>
.
§Example
let result = repo.with_transaction(|mut tx| async move {
let model = Model::new();
let res = repo.save_with_executor(&mut tx, model).await;
(res, tx)
}).await;
Examples found in repository?
101 pub async fn save_with_context(
102 &self,
103 model: User,
104 context: UserContext,
105 ) -> Result<User, DbError> {
106 self.with_transaction(move |mut tx| async move {
107 let res = match context {
108 UserContext::System => self
109 .save_with_executor(&mut *tx, model)
110 .await
111 .map_err(Into::into),
112 UserContext::UnAuthenticated => Err(DbError::NotAllowed),
113 };
114
115 (res, tx)
116 })
117 .await
118 }
119
120 pub async fn save_with_tx<'a, 'b>(&'a self, model: User) -> Result<Vec<User>, DbError> {
121 self.transaction_sequential::<'a, 'b>([
122 move |mut tx: Transaction<'b, Database>| async move {
123 let res = self.save_with_executor(&mut *tx, model).await;
124
125 (res, tx)
126 },
127 ])
128 .await
129 .map_err(Into::into)
130 }
131
132 pub async fn save_with_rx_concurrent<'a, 'b>(
133 &'a self,
134 model: User,
135 ) -> Result<Vec<User>, DbError>
136 where
137 'b: 'a,
138 {
139 self.transaction_concurrent::<'a, 'b>([
140 |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
141 let mut tx = match tx.try_lock_arc() {
142 Some(tx) => tx,
143 None => return Err(Error::MutexLockError),
144 };
145
146 let res = USER_REPO.save_with_executor(&mut **tx, model).await;
147
148 ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
149
150 res
151 },
152 ])
153 .await
154 .map_err(Into::into)
155 }
156
157 pub async fn try_save_with_tx<'a, 'b>(
158 &'a self,
159 model: User,
160 ) -> Result<Vec<User>, Vec<DbError>> {
161 self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
162 let res = self.save_with_executor(&mut *tx, model).await;
163
164 (res, tx)
165 }])
166 .await
167 .map_err(|errors| errors.into_iter().map(Into::into).collect())
168 }
169}
170
171async fn action<'b>(
172 _: Transaction<'b, Database>,
173) -> (Result<User, DbError>, Transaction<'b, Database>) {
174 unimplemented!()
175}
176
177#[tokio::main]
178async fn main() {
179 install_default_drivers();
180
181 initialize_db_pool(
182 PoolOptions::new()
183 .max_connections(21)
184 .min_connections(5)
185 .idle_timeout(Duration::from_secs(60 * 10))
186 .max_lifetime(Duration::from_secs(60 * 60 * 24))
187 .acquire_timeout(Duration::from_secs(20))
188 .connect(&DATABASE_URL)
189 .await
190 .expect("Failed to connect to database"),
191 );
192
193 let user = User {
194 id: 1,
195 name: String::new(),
196 };
197
198 USER_REPO.save_ref(&user).await.unwrap();
199
200 USER_REPO.save_in_transaction(user.clone()).await.unwrap();
201
202 USER_REPO
203 .save_with_context(user.clone(), UserContext::System)
204 .await
205 .unwrap();
206
207 USER_REPO.with_transaction(action).await.unwrap();
208
209 USER_REPO
210 .delete_by_values_in_transaction("id", [1, 2, 3, 11, 22])
211 .await
212 .unwrap();
213
214 USER_REPO
215 .get_one_by_filter(UserFilter::new("name").id(1))
216 .await
217 .unwrap();
218}
Sourcefn transaction_sequential<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
fn transaction_sequential<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
Executes multiple operations sequentially in a transactions, stopping at the first error.
This method provides an optimized approach for cases where you want to stop processing as soon as any action fails, immediately rolling back the transactions.
§Type Parameters
I
: The iterator typeF
: The action function type 1Fut
: The future type returned by each actionR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
actions
: An iterator of functions that will be executed in the transactions
§Returns
A future that resolves to:
Ok(Vec<R>)
: A vector of results if all actions succeededErr(E)
: The first error encountered
§Implementation Details
- Begins a transactions from the repository’s connection pool
- Executes each action sequentially, collecting results
- If any action fails, rolls back the transactions and returns the error
- If all actions succeed, commits the transactions and returns the results
Due to complex lifetime bounds in underlying types we must take ownership and then return it back.
§Examples
§Basic
let results = repo.transaction_sequential([
|tx| async move { repo.save_with_executor(tx, model1).await },
|tx| async move { repo.save_with_executor(tx, model2).await }
]).await;
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;
struct User {
id: String,
name: String
}
impl Model for User {
type Id = String;
fn get_id(&self) -> Option<Self::Id> {
Some(self.id.to_owned())
}
}
#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
kind: Box<dyn std::error::Error + Send>
}
impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
Self {
kind: Box::new(value)
}
}
}
async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
unimplemented!()
}
USER_REPO.transaction_sequential([action, action, action]).await.unwrap();
Sourcefn transaction_concurrent<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
fn transaction_concurrent<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, E>> + Send + 'a
Executes multiple operations concurrently in a transactions.
This method allows for concurrent execution of actions within a transactions, which can significantly improve performance for I/O-bound operations. Note that this only works when the actions don’t have data dependencies.
§Type Parameters
I
: The iterator typeF
: The action function type 1 2Fut
: The future type returned by each actionR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
actions
: An iterator of functions that will be executed concurrently in the transactions
§Returns
A future that resolves to:
Ok(Vec<R>)
: A vector of results if all actions succeededErr(E)
: The first error encountered
§Implementation Details
- Begins a transactions from the repository’s connection pool
- Wraps the transactions in an
Arc<Mutex<_>>
to safely share it between concurrent operations 2 - Creates futures for all actions but doesn’t execute them yet
- Executes all futures concurrently using
try_join_all
- If all operations succeed, commits the transactions and returns the results
- If any operation fails, rolls back the transactions and returns the error
§Notes
- Uses
parking_lot::Mutex
for better performance thanstd::sync::Mutex
- Requires the transactions to be safely shared between multiple futures
§Example
§Basic
let results = repo.transaction_concurrent([
|tx_arc| async move {
let mut tx = tx_arc.lock();
repo.save_with_executor(&mut *tx, model1).await
},
|tx_arc| async move {
let mut tx = tx_arc.lock();
repo.save_with_executor(&mut *tx, model2).await
}
]).await;
§Complete
use sqlx::Transaction;
use std::sync::Arc;
use sqlx_utils::prelude::*;
struct User {
id: String,
name: String
}
impl Model for User {
type Id = String;
fn get_id(&self) -> Option<Self::Id> {
Some(self.id.to_owned())
}
}
#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
kind: Box<dyn std::error::Error + Send>
}
impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
Self {
kind: Box::new(value)
}
}
}
async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<User, Error> {
unimplemented!()
}
USER_REPO.transaction_concurrent([action, action, action]).await.unwrap();
The function signature of an action must be
async fn action<'b>(tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>) -> Result<T, E>
Take note of the lifetimes as you might run into errors related to lifetimes if they are not specified due to invariance. The future must also beSend
↩It is up to you to ensure we don’t get deadlocks, the function itself will not lock the mutex, it will however attempt to get the inner value of the
Arc
after all actions has completed where it also consumes the mutex. This makes it in theory impossible to get a deadlock in this method, however deadlocks can occur between different actions. ↩
Examples found in repository?
132 pub async fn save_with_rx_concurrent<'a, 'b>(
133 &'a self,
134 model: User,
135 ) -> Result<Vec<User>, DbError>
136 where
137 'b: 'a,
138 {
139 self.transaction_concurrent::<'a, 'b>([
140 |tx: Arc<parking_lot::Mutex<Transaction<'b, Database>>>| async move {
141 let mut tx = match tx.try_lock_arc() {
142 Some(tx) => tx,
143 None => return Err(Error::MutexLockError),
144 };
145
146 let res = USER_REPO.save_with_executor(&mut **tx, model).await;
147
148 ArcMutexGuard::<parking_lot::RawMutex, Transaction<'b, sqlx::Any>>::unlock_fair(tx);
149
150 res
151 },
152 ])
153 .await
154 .map_err(Into::into)
155 }
Sourcefn try_transaction<'a, 'b, I, F, Fut, R, E>(
&'a self,
actions: I,
) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
fn try_transaction<'a, 'b, I, F, Fut, R, E>( &'a self, actions: I, ) -> impl Future<Output = Result<Vec<R>, Vec<E>>> + Send + 'a
Executes multiple operations and collects all results, committing only if all succeed.
This method runs all actions sequentially, collecting results (both successes and failures). The transactions is committed only if all actions succeed; otherwise, it’s rolled back.
§Type Parameters
I
: The iterator typeF
: The action function type 1Fut
: The future type returned by each actionR
: The result typeE
: The error type, which must be convertible fromError
§Parameters
actions
: An iterator of functions that will be executed in the transactions
§Returns
A future that resolves to:
Ok(Vec<R>)
: A vector of results if all operations succeededErr(Vec<E>)
: A vector of all errors if any operation failed
§Implementation Details
- Begins a transactions from the repository’s connection pool
- Executes each action sequentially, collecting all results and errors
- If any errors occurred, rolls back the transactions and returns all errors
- If all operations succeeded, commits the transactions and returns the results
§Example
§Basic
match repo.try_transaction([
|tx| async move { repo.save_with_executor(tx, model1).await },
|tx| async move { repo.save_with_executor(tx, model2).await }
]).await {
Ok(results) => println!("All operations succeeded"),
Err(errors) => println!("Some operations failed: {:?}", errors)
}
§Complete
use sqlx::Transaction;
use sqlx_utils::prelude::*;
struct User {
id: String,
name: String
}
impl Model for User {
type Id = String;
fn get_id(&self) -> Option<Self::Id> {
Some(self.id.to_owned())
}
}
#[derive(Debug)]
struct Error { // Any error type that implements `From<sqlx::Error>` is allowed
kind: Box<dyn std::error::Error + Send>
}
impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
Self {
kind: Box::new(value)
}
}
}
async fn action<'b>(tx: Transaction<'b, Database>) -> (Result<User, Error>, Transaction<'b, Database>) {
unimplemented!()
}
USER_REPO.try_transaction([action, action, action]).await.unwrap();
Examples found in repository?
157 pub async fn try_save_with_tx<'a, 'b>(
158 &'a self,
159 model: User,
160 ) -> Result<Vec<User>, Vec<DbError>> {
161 self.try_transaction::<'a, 'b>([move |mut tx: Transaction<'b, Database>| async move {
162 let res = self.save_with_executor(&mut *tx, model).await;
163
164 (res, tx)
165 }])
166 .await
167 .map_err(|errors| errors.into_iter().map(Into::into).collect())
168 }
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.