use crate::error::DbError;
use crate::mysql::query_builder::QueryBuilder;
use crate::mysql::transaction::Transaction;
use sqlx::mysql::MySqlPool;
#[derive(Debug, Clone)]
pub struct DatabaseConfig {
pub max_connections: u32,
pub connect_timeout: u64,
pub idle_timeout: u64,
pub enable_logging: bool,
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
max_connections: 10,
connect_timeout: 30,
idle_timeout: 600,
enable_logging: false,
}
}
}
pub struct Database {
pool: MySqlPool,
config: DatabaseConfig,
}
impl Database {
pub async fn connect(url: &str) -> Result<Self, DbError> {
Self::connect_with_config(url, DatabaseConfig::default()).await
}
pub async fn connect_with_config(url: &str, config: DatabaseConfig) -> Result<Self, DbError> {
use sqlx::mysql::MySqlPoolOptions;
use std::time::Duration;
let pool = MySqlPoolOptions::new()
.max_connections(config.max_connections)
.acquire_timeout(Duration::from_secs(config.connect_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
.connect(url)
.await?;
Ok(Self { pool, config })
}
pub fn table(&self, table_name: &str) -> QueryBuilder<'_> {
QueryBuilder::new(&self.pool, table_name, self.config.enable_logging)
}
pub async fn query<T>(&self, sql: &str) -> Result<Vec<T>, DbError>
where
T: for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow> + Send + Unpin,
{
if self.config.enable_logging {
log::debug!("执行原生查询: {}", sql);
}
let rows = sqlx::query_as::<_, T>(sql).fetch_all(&self.pool).await?;
Ok(rows)
}
pub async fn execute(&self, sql: &str) -> Result<u64, DbError> {
if self.config.enable_logging {
log::debug!("执行原生语句: {}", sql);
}
let result = sqlx::query(sql).execute(&self.pool).await?;
Ok(result.rows_affected())
}
pub async fn transaction(&self) -> Result<Transaction, DbError> {
let tx = self.pool.begin().await?;
Ok(Transaction::new(tx, self.config.enable_logging))
}
pub async fn init(&self, sql_script: &str) -> Result<(), DbError> {
let statements: Vec<&str> = sql_script
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect();
for statement in statements {
self.execute(statement).await?;
}
Ok(())
}
pub async fn create_table(&self, create_sql: &str) -> Result<(), DbError> {
self.execute(create_sql).await?;
Ok(())
}
pub async fn drop_table(&self, table_name: &str) -> Result<(), DbError> {
let sql = format!("DROP TABLE IF EXISTS `{}`", table_name);
self.execute(&sql).await?;
Ok(())
}
pub async fn table_exists(&self, table_name: &str) -> Result<bool, DbError> {
let sql = format!(
"SELECT COUNT(*) as count FROM information_schema.tables \
WHERE table_schema = DATABASE() AND table_name = '{}'",
table_name
);
let row: (i64,) = sqlx::query_as(&sql).fetch_one(&self.pool).await?;
Ok(row.0 > 0)
}
}