1use crate::{
2 AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
3 ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionOptions,
4 TransactionTrait,
5};
6use crate::{Schema, SchemaBuilder};
7use std::future::Future;
8use std::pin::Pin;
9
10#[derive(Debug)]
13pub enum DatabaseExecutor<'c> {
14 Connection(&'c DatabaseConnection),
16 Transaction(&'c DatabaseTransaction),
18 OwnedTransaction(DatabaseTransaction),
20}
21
22impl<'c> From<&'c DatabaseConnection> for DatabaseExecutor<'c> {
23 fn from(conn: &'c DatabaseConnection) -> Self {
24 Self::Connection(conn)
25 }
26}
27
28impl<'c> From<&'c DatabaseTransaction> for DatabaseExecutor<'c> {
29 fn from(trans: &'c DatabaseTransaction) -> Self {
30 Self::Transaction(trans)
31 }
32}
33
34#[async_trait::async_trait]
35impl ConnectionTrait for DatabaseExecutor<'_> {
36 fn get_database_backend(&self) -> DbBackend {
37 match self {
38 DatabaseExecutor::Connection(conn) => conn.get_database_backend(),
39 DatabaseExecutor::Transaction(trans) => trans.get_database_backend(),
40 DatabaseExecutor::OwnedTransaction(trans) => trans.get_database_backend(),
41 }
42 }
43
44 async fn execute_raw(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
45 match self {
46 DatabaseExecutor::Connection(conn) => conn.execute_raw(stmt).await,
47 DatabaseExecutor::Transaction(trans) => trans.execute_raw(stmt).await,
48 DatabaseExecutor::OwnedTransaction(trans) => trans.execute_raw(stmt).await,
49 }
50 }
51
52 async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
53 match self {
54 DatabaseExecutor::Connection(conn) => conn.execute_unprepared(sql).await,
55 DatabaseExecutor::Transaction(trans) => trans.execute_unprepared(sql).await,
56 DatabaseExecutor::OwnedTransaction(trans) => trans.execute_unprepared(sql).await,
57 }
58 }
59
60 async fn query_one_raw(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
61 match self {
62 DatabaseExecutor::Connection(conn) => conn.query_one_raw(stmt).await,
63 DatabaseExecutor::Transaction(trans) => trans.query_one_raw(stmt).await,
64 DatabaseExecutor::OwnedTransaction(trans) => trans.query_one_raw(stmt).await,
65 }
66 }
67
68 async fn query_all_raw(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
69 match self {
70 DatabaseExecutor::Connection(conn) => conn.query_all_raw(stmt).await,
71 DatabaseExecutor::Transaction(trans) => trans.query_all_raw(stmt).await,
72 DatabaseExecutor::OwnedTransaction(trans) => trans.query_all_raw(stmt).await,
73 }
74 }
75}
76
77#[async_trait::async_trait]
78impl TransactionTrait for DatabaseExecutor<'_> {
79 type Transaction = DatabaseTransaction;
80
81 async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
82 match self {
83 DatabaseExecutor::Connection(conn) => conn.begin().await,
84 DatabaseExecutor::Transaction(trans) => trans.begin().await,
85 DatabaseExecutor::OwnedTransaction(trans) => trans.begin().await,
86 }
87 }
88
89 async fn begin_with_config(
90 &self,
91 isolation_level: Option<IsolationLevel>,
92 access_mode: Option<AccessMode>,
93 ) -> Result<DatabaseTransaction, DbErr> {
94 match self {
95 DatabaseExecutor::Connection(conn) => {
96 conn.begin_with_config(isolation_level, access_mode).await
97 }
98 DatabaseExecutor::Transaction(trans) => {
99 trans.begin_with_config(isolation_level, access_mode).await
100 }
101 DatabaseExecutor::OwnedTransaction(trans) => {
102 trans.begin_with_config(isolation_level, access_mode).await
103 }
104 }
105 }
106
107 async fn begin_with_options(
108 &self,
109 options: TransactionOptions,
110 ) -> Result<DatabaseTransaction, DbErr> {
111 match self {
112 DatabaseExecutor::Connection(conn) => conn.begin_with_options(options).await,
113 DatabaseExecutor::Transaction(trans) => trans.begin_with_options(options).await,
114 DatabaseExecutor::OwnedTransaction(trans) => trans.begin_with_options(options).await,
115 }
116 }
117
118 async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
119 where
120 F: for<'c> FnOnce(
121 &'c DatabaseTransaction,
122 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
123 + Send,
124 T: Send,
125 E: std::fmt::Display + std::fmt::Debug + Send,
126 {
127 match self {
128 DatabaseExecutor::Connection(conn) => conn.transaction(callback).await,
129 DatabaseExecutor::Transaction(trans) => trans.transaction(callback).await,
130 DatabaseExecutor::OwnedTransaction(trans) => trans.transaction(callback).await,
131 }
132 }
133
134 async fn transaction_with_config<F, T, E>(
135 &self,
136 callback: F,
137 isolation_level: Option<IsolationLevel>,
138 access_mode: Option<AccessMode>,
139 ) -> Result<T, TransactionError<E>>
140 where
141 F: for<'c> FnOnce(
142 &'c DatabaseTransaction,
143 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
144 + Send,
145 T: Send,
146 E: std::fmt::Display + std::fmt::Debug + Send,
147 {
148 match self {
149 DatabaseExecutor::Connection(conn) => {
150 conn.transaction_with_config(callback, isolation_level, access_mode)
151 .await
152 }
153 DatabaseExecutor::Transaction(trans) => {
154 trans
155 .transaction_with_config(callback, isolation_level, access_mode)
156 .await
157 }
158 DatabaseExecutor::OwnedTransaction(trans) => {
159 trans
160 .transaction_with_config(callback, isolation_level, access_mode)
161 .await
162 }
163 }
164 }
165}
166
167pub trait IntoDatabaseExecutor<'c>: Send
169where
170 Self: 'c,
171{
172 fn into_database_executor(self) -> DatabaseExecutor<'c>;
174}
175
176impl<'c> IntoDatabaseExecutor<'c> for DatabaseExecutor<'c> {
177 fn into_database_executor(self) -> DatabaseExecutor<'c> {
178 self
179 }
180}
181
182impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseConnection {
183 fn into_database_executor(self) -> DatabaseExecutor<'c> {
184 DatabaseExecutor::Connection(self)
185 }
186}
187
188impl<'c> IntoDatabaseExecutor<'c> for &'c DatabaseTransaction {
189 fn into_database_executor(self) -> DatabaseExecutor<'c> {
190 DatabaseExecutor::Transaction(self)
191 }
192}
193
194impl IntoDatabaseExecutor<'static> for DatabaseTransaction {
195 fn into_database_executor(self) -> DatabaseExecutor<'static> {
196 DatabaseExecutor::OwnedTransaction(self)
197 }
198}
199
200impl DatabaseExecutor<'_> {
201 pub fn is_transaction(&self) -> bool {
203 matches!(
204 self,
205 DatabaseExecutor::Transaction(_) | DatabaseExecutor::OwnedTransaction(_)
206 )
207 }
208
209 pub fn get_schema_builder(&self) -> SchemaBuilder {
211 Schema::new(self.get_database_backend()).builder()
212 }
213
214 #[cfg(feature = "entity-registry")]
215 #[cfg_attr(docsrs, doc(cfg(feature = "entity-registry")))]
216 pub fn get_schema_registry(&self, prefix: &str) -> SchemaBuilder {
218 let schema = Schema::new(self.get_database_backend());
219 crate::EntityRegistry::build_schema(schema, prefix)
220 }
221}