1use super::transaction::run_async_transaction_callback;
2use crate::{
3 AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
4 ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionOptions,
5 TransactionTrait,
6};
7use crate::{Schema, SchemaBuilder};
8use std::future::Future;
9use std::pin::Pin;
10
11#[derive(Debug)]
14pub enum DatabaseExecutor<'c> {
15 Connection(&'c DatabaseConnection),
17 Transaction(&'c DatabaseTransaction),
19 OwnedTransaction(DatabaseTransaction),
21}
22
23impl<'c> From<&'c DatabaseConnection> for DatabaseExecutor<'c> {
24 fn from(conn: &'c DatabaseConnection) -> Self {
25 Self::Connection(conn)
26 }
27}
28
29impl<'c> From<&'c DatabaseTransaction> for DatabaseExecutor<'c> {
30 fn from(trans: &'c DatabaseTransaction) -> Self {
31 Self::Transaction(trans)
32 }
33}
34
35impl ConnectionTrait for DatabaseExecutor<'_> {
36 fn get_database_backend(&self) -> DbBackend {
37 match self {
38 DatabaseExecutor::Connection(conn) => conn.get_database_backend(),
39 DatabaseExecutor::Transaction(trans) => trans.get_database_backend(),
40 DatabaseExecutor::OwnedTransaction(trans) => trans.get_database_backend(),
41 }
42 }
43
44 fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
45 match self {
46 DatabaseExecutor::Connection(conn) => conn.execute_raw(stmt),
47 DatabaseExecutor::Transaction(trans) => trans.execute_raw(stmt),
48 DatabaseExecutor::OwnedTransaction(trans) => trans.execute_raw(stmt),
49 }
50 }
51
52 fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
53 match self {
54 DatabaseExecutor::Connection(conn) => conn.execute_unprepared(sql),
55 DatabaseExecutor::Transaction(trans) => trans.execute_unprepared(sql),
56 DatabaseExecutor::OwnedTransaction(trans) => trans.execute_unprepared(sql),
57 }
58 }
59
60 fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
61 match self {
62 DatabaseExecutor::Connection(conn) => conn.query_one_raw(stmt),
63 DatabaseExecutor::Transaction(trans) => trans.query_one_raw(stmt),
64 DatabaseExecutor::OwnedTransaction(trans) => trans.query_one_raw(stmt),
65 }
66 }
67
68 fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
69 match self {
70 DatabaseExecutor::Connection(conn) => conn.query_all_raw(stmt),
71 DatabaseExecutor::Transaction(trans) => trans.query_all_raw(stmt),
72 DatabaseExecutor::OwnedTransaction(trans) => trans.query_all_raw(stmt),
73 }
74 }
75}
76
77impl TransactionTrait for DatabaseExecutor<'_> {
78 type Transaction = DatabaseTransaction;
79
80 fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
81 match self {
82 DatabaseExecutor::Connection(conn) => conn.begin(),
83 DatabaseExecutor::Transaction(trans) => trans.begin(),
84 DatabaseExecutor::OwnedTransaction(trans) => trans.begin(),
85 }
86 }
87
88 fn begin_with_config(
89 &self,
90 isolation_level: Option<IsolationLevel>,
91 access_mode: Option<AccessMode>,
92 ) -> Result<DatabaseTransaction, DbErr> {
93 match self {
94 DatabaseExecutor::Connection(conn) => {
95 conn.begin_with_config(isolation_level, access_mode)
96 }
97 DatabaseExecutor::Transaction(trans) => {
98 trans.begin_with_config(isolation_level, access_mode)
99 }
100 DatabaseExecutor::OwnedTransaction(trans) => {
101 trans.begin_with_config(isolation_level, access_mode)
102 }
103 }
104 }
105
106 fn begin_with_options(
107 &self,
108 options: TransactionOptions,
109 ) -> Result<DatabaseTransaction, DbErr> {
110 match self {
111 DatabaseExecutor::Connection(conn) => conn.begin_with_options(options),
112 DatabaseExecutor::Transaction(trans) => trans.begin_with_options(options),
113 DatabaseExecutor::OwnedTransaction(trans) => trans.begin_with_options(options),
114 }
115 }
116
117 fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
118 where
119 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
120 E: std::fmt::Display + std::fmt::Debug,
121 {
122 match self {
123 DatabaseExecutor::Connection(conn) => conn.transaction(callback),
124 DatabaseExecutor::Transaction(trans) => trans.transaction(callback),
125 DatabaseExecutor::OwnedTransaction(trans) => trans.transaction(callback),
126 }
127 }
128
129 fn transaction_with_config<F, T, E>(
130 &self,
131 callback: F,
132 isolation_level: Option<IsolationLevel>,
133 access_mode: Option<AccessMode>,
134 ) -> Result<T, TransactionError<E>>
135 where
136 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
137 E: std::fmt::Display + std::fmt::Debug,
138 {
139 match self {
140 DatabaseExecutor::Connection(conn) => {
141 conn.transaction_with_config(callback, isolation_level, access_mode)
142 }
143 DatabaseExecutor::Transaction(trans) => {
144 trans.transaction_with_config(callback, isolation_level, access_mode)
145 }
146 DatabaseExecutor::OwnedTransaction(trans) => {
147 trans.transaction_with_config(callback, isolation_level, access_mode)
148 }
149 }
150 }
151}
152
153pub trait IntoDatabaseExecutor<'c>
155where
156 Self: 'c,
157{
158 fn into_database_executor(self) -> DatabaseExecutor<'c>;
160}
161
162impl<'c> IntoDatabaseExecutor<'c> for DatabaseExecutor<'c> {
163 fn into_database_executor(self) -> DatabaseExecutor<'c> {
164 self
165 }
166}
167
168impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseConnection {
169 fn into_database_executor(self) -> DatabaseExecutor<'c> {
170 DatabaseExecutor::Connection(self)
171 }
172}
173
174impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseTransaction {
175 fn into_database_executor(self) -> DatabaseExecutor<'c> {
176 DatabaseExecutor::Transaction(self)
177 }
178}
179
180impl IntoDatabaseExecutor<'static> for DatabaseTransaction {
181 fn into_database_executor(self) -> DatabaseExecutor<'static> {
182 DatabaseExecutor::OwnedTransaction(self)
183 }
184}
185
186impl DatabaseExecutor<'_> {
187 pub fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
191 where
192 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
193 E: std::fmt::Display + std::fmt::Debug,
194 {
195 let transaction = self.begin().map_err(TransactionError::Connection)?;
196 run_async_transaction_callback(transaction, callback)
197 }
198
199 pub fn transaction_with_config<F, T, E>(
203 &self,
204 callback: F,
205 isolation_level: Option<IsolationLevel>,
206 access_mode: Option<AccessMode>,
207 ) -> Result<T, TransactionError<E>>
208 where
209 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
210 E: std::fmt::Display + std::fmt::Debug,
211 {
212 let transaction = self
213 .begin_with_config(isolation_level, access_mode)
214 .map_err(TransactionError::Connection)?;
215 run_async_transaction_callback(transaction, callback)
216 }
217
218 pub fn is_transaction(&self) -> bool {
220 matches!(
221 self,
222 DatabaseExecutor::Transaction(_) | DatabaseExecutor::OwnedTransaction(_)
223 )
224 }
225
226 pub fn get_schema_builder(&self) -> SchemaBuilder {
228 Schema::new(self.get_database_backend()).builder()
229 }
230
231 #[cfg(feature = "entity-registry")]
232 #[cfg_attr(docsrs, doc(cfg(feature = "entity-registry")))]
233 pub fn get_schema_registry(&self, prefix: &str) -> SchemaBuilder {
235 let schema = Schema::new(self.get_database_backend());
236 crate::EntityRegistry::build_schema(schema, prefix)
237 }
238}