1use super::transaction::run_async_transaction_callback;
2use crate::{
3 AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
4 ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionOptions,
5 TransactionTrait,
6};
7use crate::{Schema, SchemaBuilder};
8use std::future::Future;
9use std::pin::Pin;
10
11#[derive(Debug)]
19pub enum DatabaseExecutor<'c> {
20 Connection(&'c DatabaseConnection),
22 Transaction(&'c DatabaseTransaction),
24 OwnedTransaction(DatabaseTransaction),
27}
28
29impl<'c> From<&'c DatabaseConnection> for DatabaseExecutor<'c> {
30 fn from(conn: &'c DatabaseConnection) -> Self {
31 Self::Connection(conn)
32 }
33}
34
35impl<'c> From<&'c DatabaseTransaction> for DatabaseExecutor<'c> {
36 fn from(trans: &'c DatabaseTransaction) -> Self {
37 Self::Transaction(trans)
38 }
39}
40
41#[async_trait::async_trait]
42impl ConnectionTrait for DatabaseExecutor<'_> {
43 fn get_database_backend(&self) -> DbBackend {
44 match self {
45 DatabaseExecutor::Connection(conn) => conn.get_database_backend(),
46 DatabaseExecutor::Transaction(trans) => trans.get_database_backend(),
47 DatabaseExecutor::OwnedTransaction(trans) => trans.get_database_backend(),
48 }
49 }
50
51 async fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
52 match self {
53 DatabaseExecutor::Connection(conn) => conn.execute_raw(stmt).await,
54 DatabaseExecutor::Transaction(trans) => trans.execute_raw(stmt).await,
55 DatabaseExecutor::OwnedTransaction(trans) => trans.execute_raw(stmt).await,
56 }
57 }
58
59 async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
60 match self {
61 DatabaseExecutor::Connection(conn) => conn.execute_unprepared(sql).await,
62 DatabaseExecutor::Transaction(trans) => trans.execute_unprepared(sql).await,
63 DatabaseExecutor::OwnedTransaction(trans) => trans.execute_unprepared(sql).await,
64 }
65 }
66
67 async fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
68 match self {
69 DatabaseExecutor::Connection(conn) => conn.query_one_raw(stmt).await,
70 DatabaseExecutor::Transaction(trans) => trans.query_one_raw(stmt).await,
71 DatabaseExecutor::OwnedTransaction(trans) => trans.query_one_raw(stmt).await,
72 }
73 }
74
75 async fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
76 match self {
77 DatabaseExecutor::Connection(conn) => conn.query_all_raw(stmt).await,
78 DatabaseExecutor::Transaction(trans) => trans.query_all_raw(stmt).await,
79 DatabaseExecutor::OwnedTransaction(trans) => trans.query_all_raw(stmt).await,
80 }
81 }
82}
83
84#[async_trait::async_trait]
85impl TransactionTrait for DatabaseExecutor<'_> {
86 type Transaction = DatabaseTransaction;
87
88 async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
89 match self {
90 DatabaseExecutor::Connection(conn) => conn.begin().await,
91 DatabaseExecutor::Transaction(trans) => trans.begin().await,
92 DatabaseExecutor::OwnedTransaction(trans) => trans.begin().await,
93 }
94 }
95
96 async fn begin_with_config(
97 &self,
98 isolation_level: Option<IsolationLevel>,
99 access_mode: Option<AccessMode>,
100 ) -> Result<DatabaseTransaction, DbErr> {
101 match self {
102 DatabaseExecutor::Connection(conn) => {
103 conn.begin_with_config(isolation_level, access_mode).await
104 }
105 DatabaseExecutor::Transaction(trans) => {
106 trans.begin_with_config(isolation_level, access_mode).await
107 }
108 DatabaseExecutor::OwnedTransaction(trans) => {
109 trans.begin_with_config(isolation_level, access_mode).await
110 }
111 }
112 }
113
114 async fn begin_with_options(
115 &self,
116 options: TransactionOptions,
117 ) -> Result<DatabaseTransaction, DbErr> {
118 match self {
119 DatabaseExecutor::Connection(conn) => conn.begin_with_options(options).await,
120 DatabaseExecutor::Transaction(trans) => trans.begin_with_options(options).await,
121 DatabaseExecutor::OwnedTransaction(trans) => trans.begin_with_options(options).await,
122 }
123 }
124
125 async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
126 where
127 F: for<'c> FnOnce(
128 &'c DatabaseTransaction,
129 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
130 + Send,
131 T: Send,
132 E: std::fmt::Display + std::fmt::Debug + Send,
133 {
134 match self {
135 DatabaseExecutor::Connection(conn) => conn.transaction(callback).await,
136 DatabaseExecutor::Transaction(trans) => trans.transaction(callback).await,
137 DatabaseExecutor::OwnedTransaction(trans) => trans.transaction(callback).await,
138 }
139 }
140
141 async fn transaction_with_config<F, T, E>(
142 &self,
143 callback: F,
144 isolation_level: Option<IsolationLevel>,
145 access_mode: Option<AccessMode>,
146 ) -> Result<T, TransactionError<E>>
147 where
148 F: for<'c> FnOnce(
149 &'c DatabaseTransaction,
150 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
151 + Send,
152 T: Send,
153 E: std::fmt::Display + std::fmt::Debug + Send,
154 {
155 match self {
156 DatabaseExecutor::Connection(conn) => {
157 conn.transaction_with_config(callback, isolation_level, access_mode)
158 .await
159 }
160 DatabaseExecutor::Transaction(trans) => {
161 trans
162 .transaction_with_config(callback, isolation_level, access_mode)
163 .await
164 }
165 DatabaseExecutor::OwnedTransaction(trans) => {
166 trans
167 .transaction_with_config(callback, isolation_level, access_mode)
168 .await
169 }
170 }
171 }
172}
173
174pub trait IntoDatabaseExecutor<'c>: Send
179where
180 Self: 'c,
181{
182 fn into_database_executor(self) -> DatabaseExecutor<'c>;
184}
185
186impl<'c> IntoDatabaseExecutor<'c> for DatabaseExecutor<'c> {
187 fn into_database_executor(self) -> DatabaseExecutor<'c> {
188 self
189 }
190}
191
192impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseConnection {
193 fn into_database_executor(self) -> DatabaseExecutor<'c> {
194 DatabaseExecutor::Connection(self)
195 }
196}
197
198impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseTransaction {
199 fn into_database_executor(self) -> DatabaseExecutor<'c> {
200 DatabaseExecutor::Transaction(self)
201 }
202}
203
204impl IntoDatabaseExecutor<'static> for DatabaseTransaction {
205 fn into_database_executor(self) -> DatabaseExecutor<'static> {
206 DatabaseExecutor::OwnedTransaction(self)
207 }
208}
209
210impl DatabaseExecutor<'_> {
211 pub async fn transaction_async<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
215 where
216 F: for<'c> AsyncFnOnce(&'c DatabaseTransaction) -> Result<T, E> + Send,
217 T: Send,
218 E: std::fmt::Display + std::fmt::Debug + Send,
219 {
220 let transaction = self.begin().await.map_err(TransactionError::Connection)?;
221 run_async_transaction_callback(transaction, callback).await
222 }
223
224 pub async fn transaction_with_config_async<F, T, E>(
228 &self,
229 callback: F,
230 isolation_level: Option<IsolationLevel>,
231 access_mode: Option<AccessMode>,
232 ) -> Result<T, TransactionError<E>>
233 where
234 F: for<'c> AsyncFnOnce(&'c DatabaseTransaction) -> Result<T, E> + Send,
235 T: Send,
236 E: std::fmt::Display + std::fmt::Debug + Send,
237 {
238 let transaction = self
239 .begin_with_config(isolation_level, access_mode)
240 .await
241 .map_err(TransactionError::Connection)?;
242 run_async_transaction_callback(transaction, callback).await
243 }
244
245 pub fn is_transaction(&self) -> bool {
247 matches!(
248 self,
249 DatabaseExecutor::Transaction(_) | DatabaseExecutor::OwnedTransaction(_)
250 )
251 }
252
253 pub fn get_schema_builder(&self) -> SchemaBuilder {
255 Schema::new(self.get_database_backend()).builder()
256 }
257
258 #[cfg(feature = "entity-registry")]
259 #[cfg_attr(docsrs, doc(cfg(feature = "entity-registry")))]
260 pub fn get_schema_registry(&self, prefix: &str) -> SchemaBuilder {
262 let schema = Schema::new(self.get_database_backend());
263 crate::EntityRegistry::build_schema(schema, prefix)
264 }
265}