Skip to main content

sea_orm/database/
executor.rs

1use super::transaction::run_async_transaction_callback;
2use crate::{
3    AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
4    ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionOptions,
5    TransactionTrait,
6};
7use crate::{Schema, SchemaBuilder};
8use std::future::Future;
9use std::pin::Pin;
10
11/// Either a borrowed [`DatabaseConnection`] / [`DatabaseTransaction`], or an
12/// owned [`DatabaseTransaction`].
13///
14/// Implements [`ConnectionTrait`] and [`TransactionTrait`], so APIs that
15/// need to accept "any of those three" can take a `DatabaseExecutor` and
16/// not worry about which variant the caller had. Used in particular by
17/// `sea-orm-migration`'s `SchemaManager`.
18#[derive(Debug)]
19pub enum DatabaseExecutor<'c> {
20    /// Borrowed connection — use against a long-lived pool.
21    Connection(&'c DatabaseConnection),
22    /// Borrowed transaction — caller still owns the transaction handle.
23    Transaction(&'c DatabaseTransaction),
24    /// Owned transaction — used by migration's `SchemaManager::begin()` so
25    /// the transaction can be committed/rolled back at the end of the call.
26    OwnedTransaction(DatabaseTransaction),
27}
28
29impl<'c> From<&'c DatabaseConnection> for DatabaseExecutor<'c> {
30    fn from(conn: &'c DatabaseConnection) -> Self {
31        Self::Connection(conn)
32    }
33}
34
35impl<'c> From<&'c DatabaseTransaction> for DatabaseExecutor<'c> {
36    fn from(trans: &'c DatabaseTransaction) -> Self {
37        Self::Transaction(trans)
38    }
39}
40
41#[async_trait::async_trait]
42impl ConnectionTrait for DatabaseExecutor<'_> {
43    fn get_database_backend(&self) -> DbBackend {
44        match self {
45            DatabaseExecutor::Connection(conn) => conn.get_database_backend(),
46            DatabaseExecutor::Transaction(trans) => trans.get_database_backend(),
47            DatabaseExecutor::OwnedTransaction(trans) => trans.get_database_backend(),
48        }
49    }
50
51    async fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
52        match self {
53            DatabaseExecutor::Connection(conn) => conn.execute_raw(stmt).await,
54            DatabaseExecutor::Transaction(trans) => trans.execute_raw(stmt).await,
55            DatabaseExecutor::OwnedTransaction(trans) => trans.execute_raw(stmt).await,
56        }
57    }
58
59    async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
60        match self {
61            DatabaseExecutor::Connection(conn) => conn.execute_unprepared(sql).await,
62            DatabaseExecutor::Transaction(trans) => trans.execute_unprepared(sql).await,
63            DatabaseExecutor::OwnedTransaction(trans) => trans.execute_unprepared(sql).await,
64        }
65    }
66
67    async fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
68        match self {
69            DatabaseExecutor::Connection(conn) => conn.query_one_raw(stmt).await,
70            DatabaseExecutor::Transaction(trans) => trans.query_one_raw(stmt).await,
71            DatabaseExecutor::OwnedTransaction(trans) => trans.query_one_raw(stmt).await,
72        }
73    }
74
75    async fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
76        match self {
77            DatabaseExecutor::Connection(conn) => conn.query_all_raw(stmt).await,
78            DatabaseExecutor::Transaction(trans) => trans.query_all_raw(stmt).await,
79            DatabaseExecutor::OwnedTransaction(trans) => trans.query_all_raw(stmt).await,
80        }
81    }
82}
83
84#[async_trait::async_trait]
85impl TransactionTrait for DatabaseExecutor<'_> {
86    type Transaction = DatabaseTransaction;
87
88    async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
89        match self {
90            DatabaseExecutor::Connection(conn) => conn.begin().await,
91            DatabaseExecutor::Transaction(trans) => trans.begin().await,
92            DatabaseExecutor::OwnedTransaction(trans) => trans.begin().await,
93        }
94    }
95
96    async fn begin_with_config(
97        &self,
98        isolation_level: Option<IsolationLevel>,
99        access_mode: Option<AccessMode>,
100    ) -> Result<DatabaseTransaction, DbErr> {
101        match self {
102            DatabaseExecutor::Connection(conn) => {
103                conn.begin_with_config(isolation_level, access_mode).await
104            }
105            DatabaseExecutor::Transaction(trans) => {
106                trans.begin_with_config(isolation_level, access_mode).await
107            }
108            DatabaseExecutor::OwnedTransaction(trans) => {
109                trans.begin_with_config(isolation_level, access_mode).await
110            }
111        }
112    }
113
114    async fn begin_with_options(
115        &self,
116        options: TransactionOptions,
117    ) -> Result<DatabaseTransaction, DbErr> {
118        match self {
119            DatabaseExecutor::Connection(conn) => conn.begin_with_options(options).await,
120            DatabaseExecutor::Transaction(trans) => trans.begin_with_options(options).await,
121            DatabaseExecutor::OwnedTransaction(trans) => trans.begin_with_options(options).await,
122        }
123    }
124
125    async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
126    where
127        F: for<'c> FnOnce(
128                &'c DatabaseTransaction,
129            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
130            + Send,
131        T: Send,
132        E: std::fmt::Display + std::fmt::Debug + Send,
133    {
134        match self {
135            DatabaseExecutor::Connection(conn) => conn.transaction(callback).await,
136            DatabaseExecutor::Transaction(trans) => trans.transaction(callback).await,
137            DatabaseExecutor::OwnedTransaction(trans) => trans.transaction(callback).await,
138        }
139    }
140
141    async fn transaction_with_config<F, T, E>(
142        &self,
143        callback: F,
144        isolation_level: Option<IsolationLevel>,
145        access_mode: Option<AccessMode>,
146    ) -> Result<T, TransactionError<E>>
147    where
148        F: for<'c> FnOnce(
149                &'c DatabaseTransaction,
150            ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
151            + Send,
152        T: Send,
153        E: std::fmt::Display + std::fmt::Debug + Send,
154    {
155        match self {
156            DatabaseExecutor::Connection(conn) => {
157                conn.transaction_with_config(callback, isolation_level, access_mode)
158                    .await
159            }
160            DatabaseExecutor::Transaction(trans) => {
161                trans
162                    .transaction_with_config(callback, isolation_level, access_mode)
163                    .await
164            }
165            DatabaseExecutor::OwnedTransaction(trans) => {
166                trans
167                    .transaction_with_config(callback, isolation_level, access_mode)
168                    .await
169            }
170        }
171    }
172}
173
174/// Conversion into a [`DatabaseExecutor`]. Implemented for
175/// `&DatabaseConnection`, `&DatabaseTransaction`, and owned
176/// `DatabaseTransaction` — let users hand any of them to functions that
177/// take `impl IntoDatabaseExecutor<'_>`.
178pub trait IntoDatabaseExecutor<'c>: Send
179where
180    Self: 'c,
181{
182    /// Build the [`DatabaseExecutor`].
183    fn into_database_executor(self) -> DatabaseExecutor<'c>;
184}
185
186impl<'c> IntoDatabaseExecutor<'c> for DatabaseExecutor<'c> {
187    fn into_database_executor(self) -> DatabaseExecutor<'c> {
188        self
189    }
190}
191
192impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseConnection {
193    fn into_database_executor(self) -> DatabaseExecutor<'c> {
194        DatabaseExecutor::Connection(self)
195    }
196}
197
198impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseTransaction {
199    fn into_database_executor(self) -> DatabaseExecutor<'c> {
200        DatabaseExecutor::Transaction(self)
201    }
202}
203
204impl IntoDatabaseExecutor<'static> for DatabaseTransaction {
205    fn into_database_executor(self) -> DatabaseExecutor<'static> {
206        DatabaseExecutor::OwnedTransaction(self)
207    }
208}
209
210impl DatabaseExecutor<'_> {
211    /// Execute the function inside a transaction.
212    /// If the function returns an error, the transaction will be rolled back.
213    /// Otherwise, the transaction will be committed.
214    pub async fn transaction_async<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
215    where
216        F: for<'c> AsyncFnOnce(&'c DatabaseTransaction) -> Result<T, E> + Send,
217        T: Send,
218        E: std::fmt::Display + std::fmt::Debug + Send,
219    {
220        let transaction = self.begin().await.map_err(TransactionError::Connection)?;
221        run_async_transaction_callback(transaction, callback).await
222    }
223
224    /// Execute the function inside a transaction with isolation level and/or access mode.
225    /// If the function returns an error, the transaction will be rolled back.
226    /// Otherwise, the transaction will be committed.
227    pub async fn transaction_with_config_async<F, T, E>(
228        &self,
229        callback: F,
230        isolation_level: Option<IsolationLevel>,
231        access_mode: Option<AccessMode>,
232    ) -> Result<T, TransactionError<E>>
233    where
234        F: for<'c> AsyncFnOnce(&'c DatabaseTransaction) -> Result<T, E> + Send,
235        T: Send,
236        E: std::fmt::Display + std::fmt::Debug + Send,
237    {
238        let transaction = self
239            .begin_with_config(isolation_level, access_mode)
240            .await
241            .map_err(TransactionError::Connection)?;
242        run_async_transaction_callback(transaction, callback).await
243    }
244
245    /// Returns `true` if this executor is backed by a transaction (borrowed or owned).
246    pub fn is_transaction(&self) -> bool {
247        matches!(
248            self,
249            DatabaseExecutor::Transaction(_) | DatabaseExecutor::OwnedTransaction(_)
250        )
251    }
252
253    /// Creates a [`SchemaBuilder`] for this backend
254    pub fn get_schema_builder(&self) -> SchemaBuilder {
255        Schema::new(self.get_database_backend()).builder()
256    }
257
258    #[cfg(feature = "entity-registry")]
259    #[cfg_attr(docsrs, doc(cfg(feature = "entity-registry")))]
260    /// Builds a schema for all the entities in the given module
261    pub fn get_schema_registry(&self, prefix: &str) -> SchemaBuilder {
262        let schema = Schema::new(self.get_database_backend());
263        crate::EntityRegistry::build_schema(schema, prefix)
264    }
265}