1use std::{sync::Arc, time::Duration};
2
3use futures_util::future::BoxFuture;
4#[cfg(feature = "sqlx-mysql")]
5use sqlx::mysql::MySqlConnectOptions;
6#[cfg(feature = "sqlx-postgres")]
7use sqlx::postgres::PgConnectOptions;
8#[cfg(feature = "sqlx-sqlite")]
9use sqlx::sqlite::SqliteConnectOptions;
10
11mod connection;
12mod db_connection;
13#[cfg(feature = "mock")]
14#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
15mod mock;
16#[cfg(feature = "proxy")]
17#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
18mod proxy;
19#[cfg(feature = "rbac")]
20mod restricted_connection;
21#[cfg(all(feature = "schema-sync", feature = "sqlx-dep"))]
22mod sea_schema_shim;
23mod statement;
24mod stream;
25mod transaction;
26
27pub use connection::*;
28pub use db_connection::*;
29#[cfg(feature = "mock")]
30#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
31pub use mock::*;
32#[cfg(feature = "proxy")]
33#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
34pub use proxy::*;
35#[cfg(feature = "rbac")]
36pub use restricted_connection::*;
37pub use statement::*;
38use std::borrow::Cow;
39pub use stream::*;
40use tracing::instrument;
41pub use transaction::*;
42
43use crate::error::*;
44
45#[derive(Debug, Default)]
47pub struct Database;
48
49type AfterConnectCallback = Option<
50 Arc<
51 dyn Fn(DatabaseConnection) -> BoxFuture<'static, Result<(), DbErr>> + Send + Sync + 'static,
52 >,
53>;
54
55#[derive(derive_more::Debug, Clone)]
57pub struct ConnectOptions {
58 pub(crate) url: String,
60 pub(crate) max_connections: Option<u32>,
62 pub(crate) min_connections: Option<u32>,
64 pub(crate) connect_timeout: Option<Duration>,
66 pub(crate) idle_timeout: Option<Option<Duration>>,
69 pub(crate) acquire_timeout: Option<Duration>,
71 pub(crate) max_lifetime: Option<Option<Duration>>,
73 pub(crate) sqlx_logging: bool,
75 pub(crate) sqlx_logging_level: log::LevelFilter,
77 pub(crate) sqlx_slow_statements_logging_level: log::LevelFilter,
79 pub(crate) sqlx_slow_statements_logging_threshold: Duration,
81 pub(crate) sqlcipher_key: Option<Cow<'static, str>>,
83 pub(crate) schema_search_path: Option<String>,
85 pub(crate) test_before_acquire: bool,
86 pub(crate) connect_lazy: bool,
90
91 #[debug(skip)]
92 pub(crate) after_connect: AfterConnectCallback,
93
94 #[cfg(feature = "sqlx-mysql")]
95 #[debug(skip)]
96 pub(crate) mysql_opts_fn:
97 Option<Arc<dyn Fn(MySqlConnectOptions) -> MySqlConnectOptions + Send + Sync>>,
98 #[cfg(feature = "sqlx-postgres")]
99 #[debug(skip)]
100 pub(crate) pg_opts_fn: Option<Arc<dyn Fn(PgConnectOptions) -> PgConnectOptions + Send + Sync>>,
101 #[cfg(feature = "sqlx-sqlite")]
102 #[debug(skip)]
103 pub(crate) sqlite_opts_fn:
104 Option<Arc<dyn Fn(SqliteConnectOptions) -> SqliteConnectOptions + Send + Sync>>,
105}
106
107impl Database {
108 #[instrument(level = "trace", skip(opt))]
111 pub async fn connect<C>(opt: C) -> Result<DatabaseConnection, DbErr>
112 where
113 C: Into<ConnectOptions>,
114 {
115 let opt: ConnectOptions = opt.into();
116
117 if url::Url::parse(&opt.url).is_err() {
118 return Err(conn_err(format!(
119 "The connection string '{}' cannot be parsed.",
120 opt.url
121 )));
122 }
123
124 #[cfg(feature = "sqlx-mysql")]
125 if DbBackend::MySql.is_prefix_of(&opt.url) {
126 return crate::SqlxMySqlConnector::connect(opt).await;
127 }
128 #[cfg(feature = "sqlx-postgres")]
129 if DbBackend::Postgres.is_prefix_of(&opt.url) {
130 return crate::SqlxPostgresConnector::connect(opt).await;
131 }
132 #[cfg(feature = "sqlx-sqlite")]
133 if DbBackend::Sqlite.is_prefix_of(&opt.url) {
134 return crate::SqlxSqliteConnector::connect(opt).await;
135 }
136 #[cfg(feature = "mock")]
137 if crate::MockDatabaseConnector::accepts(&opt.url) {
138 return crate::MockDatabaseConnector::connect(&opt.url).await;
139 }
140
141 Err(conn_err(format!(
142 "The connection string '{}' has no supporting driver.",
143 opt.url
144 )))
145 }
146
147 #[cfg(feature = "proxy")]
149 #[instrument(level = "trace", skip(proxy_func_arc))]
150 pub async fn connect_proxy(
151 db_type: DbBackend,
152 proxy_func_arc: std::sync::Arc<Box<dyn ProxyDatabaseTrait>>,
153 ) -> Result<DatabaseConnection, DbErr> {
154 match db_type {
155 DbBackend::MySql => {
156 return crate::ProxyDatabaseConnector::connect(
157 DbBackend::MySql,
158 proxy_func_arc.to_owned(),
159 );
160 }
161 DbBackend::Postgres => {
162 return crate::ProxyDatabaseConnector::connect(
163 DbBackend::Postgres,
164 proxy_func_arc.to_owned(),
165 );
166 }
167 DbBackend::Sqlite => {
168 return crate::ProxyDatabaseConnector::connect(
169 DbBackend::Sqlite,
170 proxy_func_arc.to_owned(),
171 );
172 }
173 }
174 }
175}
176
177impl<T> From<T> for ConnectOptions
178where
179 T: Into<String>,
180{
181 fn from(s: T) -> ConnectOptions {
182 ConnectOptions::new(s.into())
183 }
184}
185
186impl ConnectOptions {
187 pub fn new<T>(url: T) -> Self
189 where
190 T: Into<String>,
191 {
192 Self {
193 url: url.into(),
194 max_connections: None,
195 min_connections: None,
196 connect_timeout: None,
197 idle_timeout: None,
198 acquire_timeout: None,
199 max_lifetime: None,
200 sqlx_logging: true,
201 sqlx_logging_level: log::LevelFilter::Info,
202 sqlx_slow_statements_logging_level: log::LevelFilter::Off,
203 sqlx_slow_statements_logging_threshold: Duration::from_secs(1),
204 sqlcipher_key: None,
205 schema_search_path: None,
206 test_before_acquire: true,
207 connect_lazy: false,
208 after_connect: None,
209 #[cfg(feature = "sqlx-mysql")]
210 mysql_opts_fn: None,
211 #[cfg(feature = "sqlx-postgres")]
212 pg_opts_fn: None,
213 #[cfg(feature = "sqlx-sqlite")]
214 sqlite_opts_fn: None,
215 }
216 }
217
218 pub fn get_url(&self) -> &str {
220 &self.url
221 }
222
223 pub fn max_connections(&mut self, value: u32) -> &mut Self {
225 self.max_connections = Some(value);
226 self
227 }
228
229 pub fn get_max_connections(&self) -> Option<u32> {
231 self.max_connections
232 }
233
234 pub fn min_connections(&mut self, value: u32) -> &mut Self {
236 self.min_connections = Some(value);
237 self
238 }
239
240 pub fn get_min_connections(&self) -> Option<u32> {
242 self.min_connections
243 }
244
245 pub fn connect_timeout(&mut self, value: Duration) -> &mut Self {
247 self.connect_timeout = Some(value);
248 self
249 }
250
251 pub fn get_connect_timeout(&self) -> Option<Duration> {
253 self.connect_timeout
254 }
255
256 pub fn idle_timeout<T>(&mut self, value: T) -> &mut Self
258 where
259 T: Into<Option<Duration>>,
260 {
261 self.idle_timeout = Some(value.into());
262 self
263 }
264
265 pub fn get_idle_timeout(&self) -> Option<Option<Duration>> {
267 self.idle_timeout
268 }
269
270 pub fn acquire_timeout(&mut self, value: Duration) -> &mut Self {
272 self.acquire_timeout = Some(value);
273 self
274 }
275
276 pub fn get_acquire_timeout(&self) -> Option<Duration> {
278 self.acquire_timeout
279 }
280
281 pub fn max_lifetime<T>(&mut self, lifetime: T) -> &mut Self
283 where
284 T: Into<Option<Duration>>,
285 {
286 self.max_lifetime = Some(lifetime.into());
287 self
288 }
289
290 pub fn get_max_lifetime(&self) -> Option<Option<Duration>> {
292 self.max_lifetime
293 }
294
295 pub fn sqlx_logging(&mut self, value: bool) -> &mut Self {
297 self.sqlx_logging = value;
298 self
299 }
300
301 pub fn get_sqlx_logging(&self) -> bool {
303 self.sqlx_logging
304 }
305
306 pub fn sqlx_logging_level(&mut self, level: log::LevelFilter) -> &mut Self {
309 self.sqlx_logging_level = level;
310 self
311 }
312
313 pub fn sqlx_slow_statements_logging_settings(
316 &mut self,
317 level: log::LevelFilter,
318 duration: Duration,
319 ) -> &mut Self {
320 self.sqlx_slow_statements_logging_level = level;
321 self.sqlx_slow_statements_logging_threshold = duration;
322 self
323 }
324
325 pub fn get_sqlx_logging_level(&self) -> log::LevelFilter {
327 self.sqlx_logging_level
328 }
329
330 pub fn get_sqlx_slow_statements_logging_settings(&self) -> (log::LevelFilter, Duration) {
332 (
333 self.sqlx_slow_statements_logging_level,
334 self.sqlx_slow_statements_logging_threshold,
335 )
336 }
337
338 pub fn sqlcipher_key<T>(&mut self, value: T) -> &mut Self
340 where
341 T: Into<Cow<'static, str>>,
342 {
343 self.sqlcipher_key = Some(value.into());
344 self
345 }
346
347 pub fn set_schema_search_path<T>(&mut self, schema_search_path: T) -> &mut Self
349 where
350 T: Into<String>,
351 {
352 self.schema_search_path = Some(schema_search_path.into());
353 self
354 }
355
356 pub fn test_before_acquire(&mut self, value: bool) -> &mut Self {
358 self.test_before_acquire = value;
359 self
360 }
361
362 pub fn connect_lazy(&mut self, value: bool) -> &mut Self {
365 self.connect_lazy = value;
366 self
367 }
368
369 pub fn get_connect_lazy(&self) -> bool {
371 self.connect_lazy
372 }
373
374 pub fn after_connect<F>(&mut self, f: F) -> &mut Self
376 where
377 F: Fn(DatabaseConnection) -> BoxFuture<'static, Result<(), DbErr>> + Send + Sync + 'static,
378 {
379 self.after_connect = Some(Arc::new(f));
380
381 self
382 }
383
384 #[cfg(feature = "sqlx-mysql")]
385 #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-mysql")))]
386 pub fn map_sqlx_mysql_opts<F>(&mut self, f: F) -> &mut Self
389 where
390 F: Fn(MySqlConnectOptions) -> MySqlConnectOptions + Send + Sync + 'static,
391 {
392 self.mysql_opts_fn = Some(Arc::new(f));
393 self
394 }
395
396 #[cfg(feature = "sqlx-postgres")]
397 #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-postgres")))]
398 pub fn map_sqlx_postgres_opts<F>(&mut self, f: F) -> &mut Self
401 where
402 F: Fn(PgConnectOptions) -> PgConnectOptions + Send + Sync + 'static,
403 {
404 self.pg_opts_fn = Some(Arc::new(f));
405 self
406 }
407
408 #[cfg(feature = "sqlx-sqlite")]
409 #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-sqlite")))]
410 pub fn map_sqlx_sqlite_opts<F>(&mut self, f: F) -> &mut Self
413 where
414 F: Fn(SqliteConnectOptions) -> SqliteConnectOptions + Send + Sync + 'static,
415 {
416 self.sqlite_opts_fn = Some(Arc::new(f));
417 self
418 }
419}