use crate::{extension::TomlTableExt, state::State};
use smallvec::SmallVec;
use std::sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
LazyLock,
};
mod accessor;
mod column;
mod executor;
mod helper;
mod manager;
mod mutation;
mod pool;
mod query;
mod schema;
mod transaction;
pub use accessor::ModelAccessor;
pub use executor::Executor;
pub use helper::ModelHelper;
pub use manager::PoolManager;
pub use pool::ConnectionPool;
pub use schema::Schema;
pub use transaction::Transaction;
#[cfg(feature = "orm-sqlx")]
mod decode;
#[cfg(feature = "orm-sqlx")]
mod scalar;
#[cfg(feature = "orm-sqlx")]
pub use decode::{decode, decode_array};
#[cfg(feature = "orm-sqlx")]
pub use scalar::ScalarQuery;
cfg_if::cfg_if! {
if #[cfg(any(feature = "orm-mariadb", feature = "orm-mysql", feature = "orm-tidb"))] {
mod mysql;
static DRIVER_NAME: &str = if cfg!(feature = "orm-mariadb") {
"mariadb"
} else if cfg!(feature = "orm-tidb") {
"tidb"
} else {
"mysql"
};
pub type DatabaseDriver = sqlx::mysql::MySql;
pub type DatabasePool = sqlx::mysql::MySqlPool;
pub type DatabaseRow = sqlx::mysql::MySqlRow;
} else if #[cfg(feature = "orm-postgres")] {
mod postgres;
static DRIVER_NAME: &str = "postgres";
pub type DatabaseDriver = sqlx::postgres::Postgres;
pub type DatabasePool = sqlx::postgres::PgPool;
pub type DatabaseRow = sqlx::postgres::PgRow;
} else {
mod sqlite;
static DRIVER_NAME: &str = "sqlite";
pub type DatabaseDriver = sqlx::sqlite::Sqlite;
pub type DatabasePool = sqlx::sqlite::SqlitePool;
pub type DatabaseRow = sqlx::sqlite::SqliteRow;
}
}
#[derive(Debug)]
struct ConnectionPools(SmallVec<[ConnectionPool; 4]>);
impl ConnectionPools {
pub(crate) fn get_pool(&self, name: &str) -> Option<&ConnectionPool> {
let mut pool = None;
for cp in self.0.iter().filter(|cp| cp.name() == name) {
if cp.is_available() {
return Some(cp);
} else {
pool = Some(cp);
}
}
pool
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct GlobalPool;
impl GlobalPool {
#[inline]
pub fn get(name: &str) -> Option<&'static ConnectionPool> {
SHARED_CONNECTION_POOLS.get_pool(name)
}
#[inline]
pub async fn connect_all() {
for cp in SHARED_CONNECTION_POOLS.0.iter() {
cp.check_availability().await;
}
}
#[inline]
pub async fn close_all() {
for cp in SHARED_CONNECTION_POOLS.0.iter() {
cp.close().await;
}
}
}
static SHARED_CONNECTION_POOLS: LazyLock<ConnectionPools> = LazyLock::new(|| {
let config = State::shared().config();
let Some(database_config) = config.get_table("database") else {
return ConnectionPools(SmallVec::new());
};
let driver = DRIVER_NAME;
let database_type = database_config.get_str("type").unwrap_or(driver);
let databases = config.get_array(database_type).unwrap_or_else(|| {
panic!(
"the `{database_type}` field should be an array of tables; \
please use `[[{database_type}]]` to configure a list of database services"
)
});
let pools = databases
.iter()
.filter_map(|v| v.as_table())
.map(ConnectionPool::with_config)
.collect();
if database_type == driver {
tracing::warn!(driver, "connect to database services lazily");
} else {
tracing::error!(
driver,
"invalid database type `{database_type}` for the driver `{driver}`"
);
}
ConnectionPools(pools)
});
static NAMESPACE_PREFIX: LazyLock<&'static str> = LazyLock::new(|| {
State::shared()
.get_config("database")
.and_then(|config| {
config
.get_str("namespace")
.filter(|s| !s.is_empty())
.map(|s| [s, ":"].concat().leak())
})
.unwrap_or_default()
});
static TABLE_PREFIX: LazyLock<&'static str> = LazyLock::new(|| {
State::shared()
.get_config("database")
.and_then(|config| {
if let Some(max_rows) = config.get_usize("max-rows") {
MAX_ROWS.store(max_rows, Relaxed);
}
config
.get_str("namespace")
.filter(|s| !s.is_empty())
.map(|s| [s, "_"].concat().leak())
})
.unwrap_or_default()
});
static MAX_ROWS: AtomicUsize = AtomicUsize::new(10000);