use std::{sync::Arc, time::Duration};
#[cfg(not(feature = "sync"))]
#[cfg(feature = "sqlx-mysql")]
use sqlx::mysql::MySqlConnectOptions;
#[cfg(feature = "sqlx-postgres")]
use sqlx::postgres::PgConnectOptions;
#[cfg(feature = "sqlx-sqlite")]
use sqlx::sqlite::SqliteConnectOptions;
mod connection;
mod db_connection;
mod executor;
#[cfg(feature = "mock")]
#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
mod mock;
#[cfg(feature = "proxy")]
#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
mod proxy;
#[cfg(feature = "rbac")]
mod restricted_connection;
#[cfg(all(feature = "schema-sync", feature = "rusqlite"))]
mod sea_schema_rusqlite;
#[cfg(all(feature = "schema-sync", feature = "sqlx-dep"))]
mod sea_schema_shim;
mod statement;
mod stream;
mod tracing_spans;
mod transaction;
pub use connection::*;
pub use db_connection::*;
pub use executor::*;
#[cfg(feature = "mock")]
#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
pub use mock::*;
#[cfg(feature = "proxy")]
#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
pub use proxy::*;
#[cfg(feature = "rbac")]
pub use restricted_connection::*;
pub use statement::*;
use std::borrow::Cow;
pub use stream::*;
use tracing::instrument;
pub use transaction::*;
use crate::error::*;
#[derive(Debug, Default)]
pub struct Database;
#[cfg(feature = "sync")]
type BoxFuture<'a, T> = T;
#[cfg(feature = "sqlx-mysql")]
type MapMySqlPoolOptsFn =
Arc<dyn Fn(sqlx::pool::PoolOptions<sqlx::MySql>) -> sqlx::pool::PoolOptions<sqlx::MySql>>;
#[cfg(feature = "sqlx-postgres")]
type MapPgPoolOptsFn =
Arc<dyn Fn(sqlx::pool::PoolOptions<sqlx::Postgres>) -> sqlx::pool::PoolOptions<sqlx::Postgres>>;
#[cfg(feature = "sqlx-sqlite")]
type MapSqlitePoolOptsFn = Option<
Arc<dyn Fn(sqlx::pool::PoolOptions<sqlx::Sqlite>) -> sqlx::pool::PoolOptions<sqlx::Sqlite>>,
>;
type AfterConnectCallback =
Option<Arc<dyn Fn(DatabaseConnection) -> BoxFuture<'static, Result<(), DbErr>> + 'static>>;
#[derive(derive_more::Debug, Clone)]
pub struct ConnectOptions {
pub(crate) url: String,
pub(crate) max_connections: Option<u32>,
pub(crate) min_connections: Option<u32>,
pub(crate) connect_timeout: Option<Duration>,
pub(crate) idle_timeout: Option<Option<Duration>>,
pub(crate) acquire_timeout: Option<Duration>,
pub(crate) max_lifetime: Option<Option<Duration>>,
pub(crate) sqlx_logging: bool,
pub(crate) sqlx_logging_level: log::LevelFilter,
pub(crate) sqlx_slow_statements_logging_level: log::LevelFilter,
pub(crate) sqlx_slow_statements_logging_threshold: Duration,
pub(crate) sqlcipher_key: Option<Cow<'static, str>>,
pub(crate) schema_search_path: Option<String>,
pub(crate) application_name: Option<String>,
pub(crate) statement_timeout: Option<Duration>,
pub(crate) test_before_acquire: bool,
pub(crate) connect_lazy: bool,
#[debug(skip)]
pub(crate) after_connect: AfterConnectCallback,
#[cfg(feature = "sqlx-mysql")]
#[debug(skip)]
pub(crate) mysql_pool_opts_fn: Option<MapMySqlPoolOptsFn>,
#[cfg(feature = "sqlx-postgres")]
#[debug(skip)]
pub(crate) pg_pool_opts_fn: Option<MapPgPoolOptsFn>,
#[cfg(feature = "sqlx-sqlite")]
#[debug(skip)]
pub(crate) sqlite_pool_opts_fn: MapSqlitePoolOptsFn,
#[cfg(feature = "sqlx-mysql")]
#[debug(skip)]
pub(crate) mysql_opts_fn: Option<Arc<dyn Fn(MySqlConnectOptions) -> MySqlConnectOptions>>,
#[cfg(feature = "sqlx-postgres")]
#[debug(skip)]
pub(crate) pg_opts_fn: Option<Arc<dyn Fn(PgConnectOptions) -> PgConnectOptions>>,
#[cfg(feature = "sqlx-sqlite")]
#[debug(skip)]
pub(crate) sqlite_opts_fn: Option<Arc<dyn Fn(SqliteConnectOptions) -> SqliteConnectOptions>>,
}
impl Database {
#[instrument(level = "trace", skip(opt))]
pub fn connect<C>(opt: C) -> Result<DatabaseConnection, DbErr>
where
C: Into<ConnectOptions>,
{
let opt: ConnectOptions = opt.into();
if url::Url::parse(&opt.url).is_err() {
return Err(conn_err(format!(
"The connection string '{}' cannot be parsed.",
opt.url
)));
}
#[cfg(feature = "sqlx-mysql")]
if DbBackend::MySql.is_prefix_of(&opt.url) {
return crate::SqlxMySqlConnector::connect(opt);
}
#[cfg(feature = "sqlx-postgres")]
if DbBackend::Postgres.is_prefix_of(&opt.url) {
return crate::SqlxPostgresConnector::connect(opt);
}
#[cfg(feature = "sqlx-sqlite")]
if DbBackend::Sqlite.is_prefix_of(&opt.url) {
return crate::SqlxSqliteConnector::connect(opt);
}
#[cfg(feature = "rusqlite")]
if DbBackend::Sqlite.is_prefix_of(&opt.url) {
return crate::driver::rusqlite::RusqliteConnector::connect(opt);
}
#[cfg(feature = "mock")]
if crate::MockDatabaseConnector::accepts(&opt.url) {
return crate::MockDatabaseConnector::connect(&opt.url);
}
Err(conn_err(format!(
"The connection string '{}' has no supporting driver.",
opt.url
)))
}
#[cfg(feature = "proxy")]
#[instrument(level = "trace", skip(proxy_func_arc))]
pub fn connect_proxy(
db_type: DbBackend,
proxy_func_arc: std::sync::Arc<Box<dyn ProxyDatabaseTrait>>,
) -> Result<DatabaseConnection, DbErr> {
match db_type {
DbBackend::MySql => {
return crate::ProxyDatabaseConnector::connect(
DbBackend::MySql,
proxy_func_arc.to_owned(),
);
}
DbBackend::Postgres => {
return crate::ProxyDatabaseConnector::connect(
DbBackend::Postgres,
proxy_func_arc.to_owned(),
);
}
DbBackend::Sqlite => {
return crate::ProxyDatabaseConnector::connect(
DbBackend::Sqlite,
proxy_func_arc.to_owned(),
);
}
}
}
}
impl<T> From<T> for ConnectOptions
where
T: Into<String>,
{
fn from(s: T) -> ConnectOptions {
ConnectOptions::new(s.into())
}
}
impl ConnectOptions {
pub fn new<T>(url: T) -> Self
where
T: Into<String>,
{
Self {
url: url.into(),
max_connections: None,
min_connections: None,
connect_timeout: None,
idle_timeout: None,
acquire_timeout: None,
max_lifetime: None,
sqlx_logging: true,
sqlx_logging_level: log::LevelFilter::Info,
sqlx_slow_statements_logging_level: log::LevelFilter::Off,
sqlx_slow_statements_logging_threshold: Duration::from_secs(1),
sqlcipher_key: None,
schema_search_path: None,
application_name: None,
statement_timeout: None,
test_before_acquire: true,
connect_lazy: false,
after_connect: None,
#[cfg(feature = "sqlx-mysql")]
mysql_pool_opts_fn: None,
#[cfg(feature = "sqlx-postgres")]
pg_pool_opts_fn: None,
#[cfg(feature = "sqlx-sqlite")]
sqlite_pool_opts_fn: None,
#[cfg(feature = "sqlx-mysql")]
mysql_opts_fn: None,
#[cfg(feature = "sqlx-postgres")]
pg_opts_fn: None,
#[cfg(feature = "sqlx-sqlite")]
sqlite_opts_fn: None,
}
}
pub fn get_url(&self) -> &str {
&self.url
}
pub fn max_connections(&mut self, value: u32) -> &mut Self {
self.max_connections = Some(value);
self
}
pub fn get_max_connections(&self) -> Option<u32> {
self.max_connections
}
pub fn min_connections(&mut self, value: u32) -> &mut Self {
self.min_connections = Some(value);
self
}
pub fn get_min_connections(&self) -> Option<u32> {
self.min_connections
}
pub fn connect_timeout(&mut self, value: Duration) -> &mut Self {
self.connect_timeout = Some(value);
self
}
pub fn get_connect_timeout(&self) -> Option<Duration> {
self.connect_timeout
}
pub fn idle_timeout<T>(&mut self, value: T) -> &mut Self
where
T: Into<Option<Duration>>,
{
self.idle_timeout = Some(value.into());
self
}
pub fn get_idle_timeout(&self) -> Option<Option<Duration>> {
self.idle_timeout
}
pub fn acquire_timeout(&mut self, value: Duration) -> &mut Self {
self.acquire_timeout = Some(value);
self
}
pub fn get_acquire_timeout(&self) -> Option<Duration> {
self.acquire_timeout
}
pub fn max_lifetime<T>(&mut self, lifetime: T) -> &mut Self
where
T: Into<Option<Duration>>,
{
self.max_lifetime = Some(lifetime.into());
self
}
pub fn get_max_lifetime(&self) -> Option<Option<Duration>> {
self.max_lifetime
}
pub fn sqlx_logging(&mut self, value: bool) -> &mut Self {
self.sqlx_logging = value;
self
}
pub fn get_sqlx_logging(&self) -> bool {
self.sqlx_logging
}
pub fn sqlx_logging_level(&mut self, level: log::LevelFilter) -> &mut Self {
self.sqlx_logging_level = level;
self
}
pub fn sqlx_slow_statements_logging_settings(
&mut self,
level: log::LevelFilter,
duration: Duration,
) -> &mut Self {
self.sqlx_slow_statements_logging_level = level;
self.sqlx_slow_statements_logging_threshold = duration;
self
}
pub fn get_sqlx_logging_level(&self) -> log::LevelFilter {
self.sqlx_logging_level
}
pub fn get_sqlx_slow_statements_logging_settings(&self) -> (log::LevelFilter, Duration) {
(
self.sqlx_slow_statements_logging_level,
self.sqlx_slow_statements_logging_threshold,
)
}
pub fn sqlcipher_key<T>(&mut self, value: T) -> &mut Self
where
T: Into<Cow<'static, str>>,
{
self.sqlcipher_key = Some(value.into());
self
}
pub fn set_schema_search_path<T>(&mut self, schema_search_path: T) -> &mut Self
where
T: Into<String>,
{
self.schema_search_path = Some(schema_search_path.into());
self
}
pub fn set_application_name<T>(&mut self, application_name: T) -> &mut Self
where
T: Into<String>,
{
self.application_name = Some(application_name.into());
self
}
pub fn statement_timeout(&mut self, value: Duration) -> &mut Self {
self.statement_timeout = Some(value);
self
}
pub fn get_statement_timeout(&self) -> Option<Duration> {
self.statement_timeout
}
pub fn test_before_acquire(&mut self, value: bool) -> &mut Self {
self.test_before_acquire = value;
self
}
pub fn connect_lazy(&mut self, value: bool) -> &mut Self {
self.connect_lazy = value;
self
}
pub fn get_connect_lazy(&self) -> bool {
self.connect_lazy
}
pub fn after_connect<F>(&mut self, f: F) -> &mut Self
where
F: Fn(DatabaseConnection) -> BoxFuture<'static, Result<(), DbErr>> + 'static,
{
self.after_connect = Some(Arc::new(f));
self
}
#[cfg(feature = "sqlx-mysql")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlx-mysql")))]
pub fn map_sqlx_mysql_opts<F>(&mut self, f: F) -> &mut Self
where
F: Fn(MySqlConnectOptions) -> MySqlConnectOptions + 'static,
{
self.mysql_opts_fn = Some(Arc::new(f));
self
}
#[cfg(feature = "sqlx-mysql")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlx-mysql")))]
pub fn map_sqlx_mysql_pool_opts<F>(&mut self, f: F) -> &mut Self
where
F: Fn(sqlx::pool::PoolOptions<sqlx::MySql>) -> sqlx::pool::PoolOptions<sqlx::MySql>
+ 'static,
{
self.mysql_pool_opts_fn = Some(Arc::new(f));
self
}
#[cfg(feature = "sqlx-postgres")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlx-postgres")))]
pub fn map_sqlx_postgres_opts<F>(&mut self, f: F) -> &mut Self
where
F: Fn(PgConnectOptions) -> PgConnectOptions + 'static,
{
self.pg_opts_fn = Some(Arc::new(f));
self
}
#[cfg(feature = "sqlx-postgres")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlx-postgres")))]
pub fn map_sqlx_postgres_pool_opts<F>(&mut self, f: F) -> &mut Self
where
F: Fn(sqlx::pool::PoolOptions<sqlx::Postgres>) -> sqlx::pool::PoolOptions<sqlx::Postgres>
+ 'static,
{
self.pg_pool_opts_fn = Some(Arc::new(f));
self
}
#[cfg(feature = "sqlx-sqlite")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlx-sqlite")))]
pub fn map_sqlx_sqlite_opts<F>(&mut self, f: F) -> &mut Self
where
F: Fn(SqliteConnectOptions) -> SqliteConnectOptions + 'static,
{
self.sqlite_opts_fn = Some(Arc::new(f));
self
}
#[cfg(feature = "sqlx-sqlite")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlx-sqlite")))]
pub fn map_sqlx_sqlite_pool_opts<F>(&mut self, f: F) -> &mut Self
where
F: Fn(sqlx::pool::PoolOptions<sqlx::Sqlite>) -> sqlx::pool::PoolOptions<sqlx::Sqlite>
+ 'static,
{
self.sqlite_pool_opts_fn = Some(Arc::new(f));
self
}
}