baichun-framework-db 0.1.0

Database module for Baichun-Rust framework
Documentation
//! 数据库执行器模块
//!
//! 本模块提供了数据库查询和事务执行的核心功能。包括:
//! - 基本的数据库查询操作(执行、查询单行、查询多行等)
//! - 事务管理
//! - 连接池和事务的统一接口

use crate::error::{Error, Result};
use async_trait::async_trait;
use sqlx::{MySql, Transaction};
use std::future::Future;

/// 数据库执行器特征
///
/// 提供了基本的数据库操作接口,包括执行查询、获取结果等。
///
/// # 示例
///
/// ```rust
/// use baichun_framework_db::{Executor, DbPool};
///
/// async fn example(pool: &mut DbPool) -> Result<()> {
///     // 执行插入操作
///     pool.execute("INSERT INTO users (name) VALUES ('Alice')").await?;
///
///     // 查询单条记录
///     let user: User = pool.fetch_one("SELECT * FROM users WHERE id = 1").await?;
///
///     // 查询多条记录
///     let users: Vec<User> = pool.fetch_all("SELECT * FROM users").await?;
///
///     Ok(())
/// }
/// ```
#[async_trait]
pub trait Executor {
    /// 执行不返回行的查询
    ///
    /// 适用于 INSERT、UPDATE、DELETE 等操作。
    async fn execute(&mut self, query: &str) -> Result<sqlx::mysql::MySqlQueryResult>;

    /// 执行返回多行的查询
    ///
    /// 将结果转换为指定类型的向量。
    async fn fetch_all<'q, T>(&mut self, query: &'q str) -> Result<Vec<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>;

    /// 执行返回单行的查询
    ///
    /// 如果查询没有返回行,将返回错误。
    async fn fetch_one<'q, T>(&mut self, query: &'q str) -> Result<T>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>;

    /// 执行返回可选单行的查询
    ///
    /// 如果查询没有返回行,将返回 None。
    async fn fetch_optional<'q, T>(&mut self, query: &'q str) -> Result<Option<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>;
}

/// 数据库执行器内部特征
///
/// 为连接池和事务提供统一的执行接口。
#[async_trait]
pub trait DbExecutor<'c>: Send + Sync {
    /// 执行查询
    async fn execute_query(&mut self, query: &str) -> Result<sqlx::mysql::MySqlQueryResult>;

    /// 获取所有结果
    async fn fetch_all_query<'q, T>(&mut self, query: &'q str) -> Result<Vec<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>;

    /// 获取单行结果
    async fn fetch_one_query<'q, T>(&mut self, query: &'q str) -> Result<T>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>;

    /// 获取可选的单行结果
    async fn fetch_optional_query<'q, T>(&mut self, query: &'q str) -> Result<Option<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>;
}

#[async_trait]
impl<'c> DbExecutor<'c> for sqlx::Pool<MySql> {
    async fn execute_query(&mut self, query: &str) -> Result<sqlx::mysql::MySqlQueryResult> {
        sqlx::query(query)
            .execute(&*self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }

    async fn fetch_all_query<'q, T>(&mut self, query: &'q str) -> Result<Vec<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        sqlx::query_as::<_, T>(query)
            .fetch_all(&*self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }

    async fn fetch_one_query<'q, T>(&mut self, query: &'q str) -> Result<T>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        sqlx::query_as::<_, T>(query)
            .fetch_one(&*self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }

    async fn fetch_optional_query<'q, T>(&mut self, query: &'q str) -> Result<Option<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        sqlx::query_as::<_, T>(query)
            .fetch_optional(&*self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }
}

#[async_trait]
impl<'c> DbExecutor<'c> for Transaction<'c, MySql> {
    async fn execute_query(&mut self, query: &str) -> Result<sqlx::mysql::MySqlQueryResult> {
        sqlx::query(query)
            .execute(&mut **self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }

    async fn fetch_all_query<'q, T>(&mut self, query: &'q str) -> Result<Vec<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        sqlx::query_as::<_, T>(query)
            .fetch_all(&mut **self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }

    async fn fetch_one_query<'q, T>(&mut self, query: &'q str) -> Result<T>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        sqlx::query_as::<_, T>(query)
            .fetch_one(&mut **self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }

    async fn fetch_optional_query<'q, T>(&mut self, query: &'q str) -> Result<Option<T>>
    where
        T: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        sqlx::query_as::<_, T>(query)
            .fetch_optional(&mut **self)
            .await
            .map_err(|e| Error::Query(e.to_string()))
    }
}

#[async_trait]
impl<'c, T> Executor for T
where
    T: DbExecutor<'c> + Send + Sync,
{
    async fn execute(&mut self, query: &str) -> Result<sqlx::mysql::MySqlQueryResult> {
        self.execute_query(query).await
    }

    async fn fetch_all<'q, U>(&mut self, query: &'q str) -> Result<Vec<U>>
    where
        U: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        self.fetch_all_query(query).await
    }

    async fn fetch_one<'q, U>(&mut self, query: &'q str) -> Result<U>
    where
        U: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        self.fetch_one_query(query).await
    }

    async fn fetch_optional<'q, U>(&mut self, query: &'q str) -> Result<Option<U>>
    where
        U: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::mysql::MySqlRow>,
    {
        self.fetch_optional_query(query).await
    }
}

/// 事务管理器
///
/// 提供了事务的自动提交和回滚功能。
///
/// # 示例
///
/// ```rust
/// use baichun_framework_db::{DbPool, TransactionManager};
///
/// async fn example(pool: &DbPool) -> Result<()> {
///     let tx = pool.begin().await?;
///     let mut tm = TransactionManager::new(tx);
///
///     tm.execute(|tx| Box::pin(async move {
///         tx.execute("INSERT INTO users (name) VALUES ('Alice')").await?;
///         tx.execute("UPDATE users SET status = 'active' WHERE name = 'Alice'").await?;
///         Ok(())
///     })).await?;
///
///     Ok(())
/// }
/// ```
pub struct TransactionManager<'c> {
    tx: Option<Transaction<'c, MySql>>,
}

impl<'c> TransactionManager<'c> {
    /// 创建新的事务管理器
    pub fn new(tx: Transaction<'c, MySql>) -> Self {
        Self { tx: Some(tx) }
    }

    /// 在事务中执行操作
    ///
    /// 如果操作成功,事务将被提交;如果操作失败,事务将被回滚。
    pub async fn execute<F, T, E>(&mut self, f: F) -> Result<T>
    where
        F: for<'a> FnOnce(
            &'a mut Transaction<'c, MySql>,
        ) -> std::pin::Pin<
            Box<dyn Future<Output = std::result::Result<T, E>> + Send + 'a>,
        >,
        E: Into<Error>,
        T: Send,
    {
        let tx = self
            .tx
            .take()
            .ok_or_else(|| Error::Transaction("Transaction already used".to_string()))?;
        let mut tx = tx;

        match f(&mut tx).await {
            Ok(value) => {
                tx.commit()
                    .await
                    .map_err(|e| Error::Transaction(e.to_string()))?;
                Ok(value)
            }
            Err(e) => {
                if let Err(e) = tx.rollback().await {
                    return Err(Error::Transaction(e.to_string()));
                }
                Err(e.into())
            }
        }
    }
}