Skip to main content

sea_orm/database/
mod.rs

1use std::{sync::Arc, time::Duration};
2
3#[cfg(not(feature = "sync"))]
4use futures_util::future::BoxFuture;
5#[cfg(feature = "sqlx-mysql")]
6use sqlx::mysql::MySqlConnectOptions;
7#[cfg(feature = "sqlx-postgres")]
8use sqlx::postgres::PgConnectOptions;
9#[cfg(feature = "sqlx-sqlite")]
10use sqlx::sqlite::SqliteConnectOptions;
11
12mod connection;
13mod db_connection;
14mod executor;
15#[cfg(feature = "mock")]
16#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
17mod mock;
18#[cfg(feature = "proxy")]
19#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
20mod proxy;
21#[cfg(feature = "rbac")]
22mod restricted_connection;
23#[cfg(all(feature = "schema-sync", feature = "rusqlite"))]
24mod sea_schema_rusqlite;
25#[cfg(all(feature = "schema-sync", feature = "sqlx-dep"))]
26mod sea_schema_shim;
27mod statement;
28mod stream;
29mod tracing_spans;
30mod transaction;
31
32pub use connection::*;
33pub use db_connection::*;
34pub use executor::*;
35#[cfg(feature = "mock")]
36#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
37pub use mock::*;
38#[cfg(feature = "proxy")]
39#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
40pub use proxy::*;
41#[cfg(feature = "rbac")]
42pub use restricted_connection::*;
43pub use statement::*;
44use std::borrow::Cow;
45pub use stream::*;
46use tracing::instrument;
47pub use transaction::*;
48
49use crate::error::*;
50
51/// Defines a database
52#[derive(Debug, Default)]
53pub struct Database;
54
55#[cfg(feature = "sync")]
56type BoxFuture<'a, T> = T;
57
58#[cfg(feature = "sqlx-mysql")]
59type MapMySqlPoolOptsFn = Arc<
60    dyn Fn(sqlx::pool::PoolOptions<sqlx::MySql>) -> sqlx::pool::PoolOptions<sqlx::MySql>
61        + Send
62        + Sync,
63>;
64
65#[cfg(feature = "sqlx-postgres")]
66type MapPgPoolOptsFn = Arc<
67    dyn Fn(sqlx::pool::PoolOptions<sqlx::Postgres>) -> sqlx::pool::PoolOptions<sqlx::Postgres>
68        + Send
69        + Sync,
70>;
71
72#[cfg(feature = "sqlx-sqlite")]
73type MapSqlitePoolOptsFn = Option<
74    Arc<
75        dyn Fn(sqlx::pool::PoolOptions<sqlx::Sqlite>) -> sqlx::pool::PoolOptions<sqlx::Sqlite>
76            + Send
77            + Sync,
78    >,
79>;
80
81type AfterConnectCallback = Option<
82    Arc<
83        dyn Fn(DatabaseConnection) -> BoxFuture<'static, Result<(), DbErr>> + Send + Sync + 'static,
84    >,
85>;
86
87/// Defines the configuration options of a database
88#[derive(derive_more::Debug, Clone)]
89pub struct ConnectOptions {
90    /// The URI of the database
91    pub(crate) url: String,
92    /// Maximum number of connections for a pool
93    pub(crate) max_connections: Option<u32>,
94    /// Minimum number of connections for a pool
95    pub(crate) min_connections: Option<u32>,
96    /// The connection timeout for a packet connection
97    pub(crate) connect_timeout: Option<Duration>,
98    /// Maximum idle time for a particular connection to prevent
99    /// network resource exhaustion
100    pub(crate) idle_timeout: Option<Option<Duration>>,
101    /// Set the maximum amount of time to spend waiting for acquiring a connection
102    pub(crate) acquire_timeout: Option<Duration>,
103    /// Set the maximum lifetime of individual connections
104    pub(crate) max_lifetime: Option<Option<Duration>>,
105    /// Enable SQLx statement logging
106    pub(crate) sqlx_logging: bool,
107    /// Record SQL statements in tracing spans
108    pub(crate) record_stmt_in_spans: bool,
109    /// SQLx statement logging level (ignored if `sqlx_logging` is false)
110    pub(crate) sqlx_logging_level: log::LevelFilter,
111    /// SQLx slow statements logging level (ignored if `sqlx_logging` is false)
112    pub(crate) sqlx_slow_statements_logging_level: log::LevelFilter,
113    /// SQLx slow statements duration threshold (ignored if `sqlx_logging` is false)
114    pub(crate) sqlx_slow_statements_logging_threshold: Duration,
115    /// set sqlcipher key
116    pub(crate) sqlcipher_key: Option<Cow<'static, str>>,
117    /// Schema search path (PostgreSQL only)
118    pub(crate) schema_search_path: Option<String>,
119    /// Application name (PostgreSQL only)
120    pub(crate) application_name: Option<String>,
121    /// Statement timeout (PostgreSQL only)
122    pub(crate) statement_timeout: Option<Duration>,
123    pub(crate) test_before_acquire: bool,
124    /// Only establish connections to the DB as needed. If set to `true`, the db connection will
125    /// be created using SQLx's [connect_lazy](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#method.connect_lazy)
126    /// method.
127    pub(crate) connect_lazy: bool,
128
129    #[debug(skip)]
130    pub(crate) after_connect: AfterConnectCallback,
131
132    #[cfg(feature = "sqlx-mysql")]
133    #[debug(skip)]
134    pub(crate) mysql_pool_opts_fn: Option<MapMySqlPoolOptsFn>,
135    #[cfg(feature = "sqlx-postgres")]
136    #[debug(skip)]
137    pub(crate) pg_pool_opts_fn: Option<MapPgPoolOptsFn>,
138    #[cfg(feature = "sqlx-sqlite")]
139    #[debug(skip)]
140    pub(crate) sqlite_pool_opts_fn: MapSqlitePoolOptsFn,
141    #[cfg(feature = "sqlx-mysql")]
142    #[debug(skip)]
143    pub(crate) mysql_opts_fn:
144        Option<Arc<dyn Fn(MySqlConnectOptions) -> MySqlConnectOptions + Send + Sync>>,
145    #[cfg(feature = "sqlx-postgres")]
146    #[debug(skip)]
147    pub(crate) pg_opts_fn: Option<Arc<dyn Fn(PgConnectOptions) -> PgConnectOptions + Send + Sync>>,
148    #[cfg(feature = "sqlx-sqlite")]
149    #[debug(skip)]
150    pub(crate) sqlite_opts_fn:
151        Option<Arc<dyn Fn(SqliteConnectOptions) -> SqliteConnectOptions + Send + Sync>>,
152}
153
154impl Database {
155    /// Method to create a [DatabaseConnection] on a database. This method will return an error
156    /// if the database is not available.
157    #[instrument(level = "trace", skip(opt))]
158    pub async fn connect<C>(opt: C) -> Result<DatabaseConnection, DbErr>
159    where
160        C: Into<ConnectOptions>,
161    {
162        let opt: ConnectOptions = opt.into();
163
164        if url::Url::parse(&opt.url).is_err() {
165            return Err(conn_err(format!(
166                "The connection string '{}' cannot be parsed.",
167                opt.url
168            )));
169        }
170
171        #[cfg(feature = "sqlx-mysql")]
172        if DbBackend::MySql.is_prefix_of(&opt.url) {
173            return crate::SqlxMySqlConnector::connect(opt).await;
174        }
175        #[cfg(feature = "sqlx-postgres")]
176        if DbBackend::Postgres.is_prefix_of(&opt.url) {
177            return crate::SqlxPostgresConnector::connect(opt).await;
178        }
179        #[cfg(feature = "sqlx-sqlite")]
180        if DbBackend::Sqlite.is_prefix_of(&opt.url) {
181            return crate::SqlxSqliteConnector::connect(opt).await;
182        }
183        #[cfg(feature = "rusqlite")]
184        if DbBackend::Sqlite.is_prefix_of(&opt.url) {
185            return crate::driver::rusqlite::RusqliteConnector::connect(opt);
186        }
187        #[cfg(feature = "mock")]
188        if crate::MockDatabaseConnector::accepts(&opt.url) {
189            return crate::MockDatabaseConnector::connect(&opt.url).await;
190        }
191
192        Err(conn_err(format!(
193            "The connection string '{}' has no supporting driver.",
194            opt.url
195        )))
196    }
197
198    /// Method to create a [DatabaseConnection] on a proxy database
199    #[cfg(feature = "proxy")]
200    #[instrument(level = "trace", skip(proxy_func_arc))]
201    pub async fn connect_proxy(
202        db_type: DbBackend,
203        proxy_func_arc: std::sync::Arc<Box<dyn ProxyDatabaseTrait>>,
204    ) -> Result<DatabaseConnection, DbErr> {
205        match db_type {
206            DbBackend::MySql => {
207                return crate::ProxyDatabaseConnector::connect(
208                    DbBackend::MySql,
209                    proxy_func_arc.to_owned(),
210                );
211            }
212            DbBackend::Postgres => {
213                return crate::ProxyDatabaseConnector::connect(
214                    DbBackend::Postgres,
215                    proxy_func_arc.to_owned(),
216                );
217            }
218            DbBackend::Sqlite => {
219                return crate::ProxyDatabaseConnector::connect(
220                    DbBackend::Sqlite,
221                    proxy_func_arc.to_owned(),
222                );
223            }
224        }
225    }
226}
227
228impl<T> From<T> for ConnectOptions
229where
230    T: Into<String>,
231{
232    fn from(s: T) -> ConnectOptions {
233        ConnectOptions::new(s.into())
234    }
235}
236
237impl ConnectOptions {
238    /// Create new [ConnectOptions] for a [Database] by passing in a URI string
239    pub fn new<T>(url: T) -> Self
240    where
241        T: Into<String>,
242    {
243        Self {
244            url: url.into(),
245            max_connections: None,
246            min_connections: None,
247            connect_timeout: None,
248            idle_timeout: None,
249            acquire_timeout: None,
250            max_lifetime: None,
251            sqlx_logging: true,
252            record_stmt_in_spans: true,
253            sqlx_logging_level: log::LevelFilter::Info,
254            sqlx_slow_statements_logging_level: log::LevelFilter::Off,
255            sqlx_slow_statements_logging_threshold: Duration::from_secs(1),
256            sqlcipher_key: None,
257            schema_search_path: None,
258            application_name: None,
259            statement_timeout: None,
260            test_before_acquire: true,
261            connect_lazy: false,
262            after_connect: None,
263            #[cfg(feature = "sqlx-mysql")]
264            mysql_pool_opts_fn: None,
265            #[cfg(feature = "sqlx-postgres")]
266            pg_pool_opts_fn: None,
267            #[cfg(feature = "sqlx-sqlite")]
268            sqlite_pool_opts_fn: None,
269            #[cfg(feature = "sqlx-mysql")]
270            mysql_opts_fn: None,
271            #[cfg(feature = "sqlx-postgres")]
272            pg_opts_fn: None,
273            #[cfg(feature = "sqlx-sqlite")]
274            sqlite_opts_fn: None,
275        }
276    }
277
278    /// Get the database URL of the pool
279    pub fn get_url(&self) -> &str {
280        &self.url
281    }
282
283    /// Set the maximum number of connections of the pool
284    pub fn max_connections(&mut self, value: u32) -> &mut Self {
285        self.max_connections = Some(value);
286        self
287    }
288
289    /// Get the maximum number of connections of the pool, if set
290    pub fn get_max_connections(&self) -> Option<u32> {
291        self.max_connections
292    }
293
294    /// Set the minimum number of connections of the pool
295    pub fn min_connections(&mut self, value: u32) -> &mut Self {
296        self.min_connections = Some(value);
297        self
298    }
299
300    /// Get the minimum number of connections of the pool, if set
301    pub fn get_min_connections(&self) -> Option<u32> {
302        self.min_connections
303    }
304
305    /// Set the timeout duration when acquiring a connection
306    pub fn connect_timeout(&mut self, value: Duration) -> &mut Self {
307        self.connect_timeout = Some(value);
308        self
309    }
310
311    /// Get the timeout duration when acquiring a connection, if set
312    pub fn get_connect_timeout(&self) -> Option<Duration> {
313        self.connect_timeout
314    }
315
316    /// Set the idle duration before closing a connection.
317    pub fn idle_timeout<T>(&mut self, value: T) -> &mut Self
318    where
319        T: Into<Option<Duration>>,
320    {
321        self.idle_timeout = Some(value.into());
322        self
323    }
324
325    /// Get the idle duration before closing a connection, if set
326    pub fn get_idle_timeout(&self) -> Option<Option<Duration>> {
327        self.idle_timeout
328    }
329
330    /// Set the maximum amount of time to spend waiting for acquiring a connection
331    pub fn acquire_timeout(&mut self, value: Duration) -> &mut Self {
332        self.acquire_timeout = Some(value);
333        self
334    }
335
336    /// Get the maximum amount of time to spend waiting for acquiring a connection
337    pub fn get_acquire_timeout(&self) -> Option<Duration> {
338        self.acquire_timeout
339    }
340
341    /// Set the maximum lifetime of individual connections.
342    pub fn max_lifetime<T>(&mut self, lifetime: T) -> &mut Self
343    where
344        T: Into<Option<Duration>>,
345    {
346        self.max_lifetime = Some(lifetime.into());
347        self
348    }
349
350    /// Get the maximum lifetime of individual connections, if set
351    pub fn get_max_lifetime(&self) -> Option<Option<Duration>> {
352        self.max_lifetime
353    }
354
355    /// Enable SQLx statement logging (default true)
356    pub fn sqlx_logging(&mut self, value: bool) -> &mut Self {
357        self.sqlx_logging = value;
358        self
359    }
360
361    /// Get whether SQLx statement logging is enabled
362    pub fn get_sqlx_logging(&self) -> bool {
363        self.sqlx_logging
364    }
365
366    /// Enable recording `db.statement` in tracing spans (default true).
367    pub fn record_stmt_in_spans(&mut self, value: bool) -> &mut Self {
368        self.record_stmt_in_spans = value;
369        self
370    }
371
372    /// Get whether `db.statement` recording in tracing spans is enabled.
373    pub fn get_record_stmt_in_spans(&self) -> bool {
374        self.record_stmt_in_spans
375    }
376
377    /// Set SQLx statement logging level (default INFO).
378    /// (ignored if `sqlx_logging` is `false`)
379    pub fn sqlx_logging_level(&mut self, level: log::LevelFilter) -> &mut Self {
380        self.sqlx_logging_level = level;
381        self
382    }
383
384    /// Set SQLx slow statements logging level and duration threshold (default `LevelFilter::Off`).
385    /// (ignored if `sqlx_logging` is `false`)
386    pub fn sqlx_slow_statements_logging_settings(
387        &mut self,
388        level: log::LevelFilter,
389        duration: Duration,
390    ) -> &mut Self {
391        self.sqlx_slow_statements_logging_level = level;
392        self.sqlx_slow_statements_logging_threshold = duration;
393        self
394    }
395
396    /// Get the level of SQLx statement logging
397    pub fn get_sqlx_logging_level(&self) -> log::LevelFilter {
398        self.sqlx_logging_level
399    }
400
401    /// Get the SQLx slow statements logging settings
402    pub fn get_sqlx_slow_statements_logging_settings(&self) -> (log::LevelFilter, Duration) {
403        (
404            self.sqlx_slow_statements_logging_level,
405            self.sqlx_slow_statements_logging_threshold,
406        )
407    }
408
409    /// set key for sqlcipher
410    pub fn sqlcipher_key<T>(&mut self, value: T) -> &mut Self
411    where
412        T: Into<Cow<'static, str>>,
413    {
414        self.sqlcipher_key = Some(value.into());
415        self
416    }
417
418    /// Set schema search path (PostgreSQL only)
419    pub fn set_schema_search_path<T>(&mut self, schema_search_path: T) -> &mut Self
420    where
421        T: Into<String>,
422    {
423        self.schema_search_path = Some(schema_search_path.into());
424        self
425    }
426
427    /// Set application name (PostgreSQL only)
428    pub fn set_application_name<T>(&mut self, application_name: T) -> &mut Self
429    where
430        T: Into<String>,
431    {
432        self.application_name = Some(application_name.into());
433        self
434    }
435
436    /// Set the statement timeout (PostgreSQL only).
437    ///
438    /// This sets the PostgreSQL `statement_timeout` parameter via the connection options,
439    /// causing the server to abort any statement that exceeds the specified duration.
440    /// The timeout is applied at connection time and does not require an extra roundtrip.
441    ///
442    /// Has no effect on MySQL or SQLite connections.
443    pub fn statement_timeout(&mut self, value: Duration) -> &mut Self {
444        self.statement_timeout = Some(value);
445        self
446    }
447
448    /// Get the statement timeout, if set
449    pub fn get_statement_timeout(&self) -> Option<Duration> {
450        self.statement_timeout
451    }
452
453    /// If true, the connection will be pinged upon acquiring from the pool (default true).
454    pub fn test_before_acquire(&mut self, value: bool) -> &mut Self {
455        self.test_before_acquire = value;
456        self
457    }
458
459    /// If set to `true`, the db connection pool will be created using SQLx's
460    /// [connect_lazy](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#method.connect_lazy) method.
461    pub fn connect_lazy(&mut self, value: bool) -> &mut Self {
462        self.connect_lazy = value;
463        self
464    }
465
466    /// Get whether DB connections will be established when the pool is created or only as needed.
467    pub fn get_connect_lazy(&self) -> bool {
468        self.connect_lazy
469    }
470
471    /// Set a callback function that will be called after a new connection is established.
472    pub fn after_connect<F>(&mut self, f: F) -> &mut Self
473    where
474        F: Fn(DatabaseConnection) -> BoxFuture<'static, Result<(), DbErr>> + Send + Sync + 'static,
475    {
476        self.after_connect = Some(Arc::new(f));
477
478        self
479    }
480
481    #[cfg(feature = "sqlx-mysql")]
482    #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-mysql")))]
483    /// Apply a function to modify the underlying [`MySqlConnectOptions`] before
484    /// creating the connection pool.
485    pub fn map_sqlx_mysql_opts<F>(&mut self, f: F) -> &mut Self
486    where
487        F: Fn(MySqlConnectOptions) -> MySqlConnectOptions + Send + Sync + 'static,
488    {
489        self.mysql_opts_fn = Some(Arc::new(f));
490        self
491    }
492
493    #[cfg(feature = "sqlx-mysql")]
494    #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-mysql")))]
495    /// Apply a function to modify the underlying [`sqlx::pool::PoolOptions<sqlx::MySql>`]
496    /// before creating the connection pool.
497    pub fn map_sqlx_mysql_pool_opts<F>(&mut self, f: F) -> &mut Self
498    where
499        F: Fn(sqlx::pool::PoolOptions<sqlx::MySql>) -> sqlx::pool::PoolOptions<sqlx::MySql>
500            + Send
501            + Sync
502            + 'static,
503    {
504        self.mysql_pool_opts_fn = Some(Arc::new(f));
505        self
506    }
507
508    #[cfg(feature = "sqlx-postgres")]
509    #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-postgres")))]
510    /// Apply a function to modify the underlying [`PgConnectOptions`] before
511    /// creating the connection pool.
512    pub fn map_sqlx_postgres_opts<F>(&mut self, f: F) -> &mut Self
513    where
514        F: Fn(PgConnectOptions) -> PgConnectOptions + Send + Sync + 'static,
515    {
516        self.pg_opts_fn = Some(Arc::new(f));
517        self
518    }
519
520    #[cfg(feature = "sqlx-postgres")]
521    #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-postgres")))]
522    /// Apply a function to modify the underlying [`sqlx::pool::PoolOptions<sqlx::Postgres>`]
523    /// before creating the connection pool.
524    pub fn map_sqlx_postgres_pool_opts<F>(&mut self, f: F) -> &mut Self
525    where
526        F: Fn(sqlx::pool::PoolOptions<sqlx::Postgres>) -> sqlx::pool::PoolOptions<sqlx::Postgres>
527            + Send
528            + Sync
529            + 'static,
530    {
531        self.pg_pool_opts_fn = Some(Arc::new(f));
532        self
533    }
534
535    #[cfg(feature = "sqlx-sqlite")]
536    #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-sqlite")))]
537    /// Apply a function to modify the underlying [`SqliteConnectOptions`] before
538    /// creating the connection pool.
539    pub fn map_sqlx_sqlite_opts<F>(&mut self, f: F) -> &mut Self
540    where
541        F: Fn(SqliteConnectOptions) -> SqliteConnectOptions + Send + Sync + 'static,
542    {
543        self.sqlite_opts_fn = Some(Arc::new(f));
544        self
545    }
546
547    #[cfg(feature = "sqlx-sqlite")]
548    #[cfg_attr(docsrs, doc(cfg(feature = "sqlx-sqlite")))]
549    /// Apply a function to modify the underlying [`sqlx::pool::PoolOptions<sqlx::Sqlite>`]
550    /// before creating the connection pool.
551    pub fn map_sqlx_sqlite_pool_opts<F>(&mut self, f: F) -> &mut Self
552    where
553        F: Fn(sqlx::pool::PoolOptions<sqlx::Sqlite>) -> sqlx::pool::PoolOptions<sqlx::Sqlite>
554            + Send
555            + Sync
556            + 'static,
557    {
558        self.sqlite_pool_opts_fn = Some(Arc::new(f));
559        self
560    }
561}