db-cores 0.1.0

Database core utilities
Documentation
// #![allow(unused_variables)]
// #![allow(unused_imports)]

mod common;
pub use common::*;

pub mod ast;
pub mod plugin;
pub mod utlis;
pub mod to_json;
pub mod verify;
pub mod query;
pub mod builder;






use regex::Regex;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
pub use sql_builder::SqlBuilder;

#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
use std::{collections::HashMap, path::Path, sync::{Arc, LazyLock}};

#[cfg(feature = "postgres")]
use sqlx::{PgPool, Pool, Postgres};
#[cfg(feature = "mysql")]
use sqlx::{MySql, MySqlPool};
#[cfg(feature = "sqlite")]
use sqlx::{sqlite::SqliteConnectOptions, Sqlite, SqlitePool};

pub use serde;
pub use serde_json;
#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
pub use sqlx;
pub use db_proc_macro;

// 多种数据库池的枚举封装
#[derive(Debug, Clone)]
#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
pub enum DbPool {
    #[cfg(feature = "mysql")]
    MySql(MySqlPool),
    #[cfg(feature = "postgres")]
    PgSql(PgPool),
    #[cfg(feature = "sqlite")]
    Sqlite(SqlitePool),
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct ToJsonResult {
    pub data: Vec<JsonValue>,
    pub columns: Vec<JsonValue>,
    pub count: i64,
}



// 枚举:不同数据库类型创建时需要的参数
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum NewConnectParams {
    MySql {
        id: String,
        connect_name: String,
        host: String,
        port: i64,
        username: String,
        password: String,
        default_db_name: Option<String>,
        charset: Option<String>,
    },

    Postgres {
        id: String,
        connect_name: String,
        host: String,
        port: i64,
        username: String,
        password: String,
        default_db_name: Option<String>,
        schema: Option<String>,
    },

    SqlServer {
        id: String,
        connect_name: String,
        host: String,
        port: i64,
        username: String,
        password: String,
        default_db_name: Option<String>,
        instance: Option<String>,
    },

    FileDB {
        id: String,
        kind: DatabaseKind, // 文件类数据库类型(Sqlite/DuckDB 等)
        connect_name: String,
        file_dir: String,
        is_memory: bool,
    },
}


// 定义全局连接池缓存
#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
type PoolCache = Arc<tokio::sync::RwLock<HashMap<String, DbPool>>>;

pub const COUNT_COLUMN_NAME: &str = "r8Bz1ae9BxYqe";

// pub static RE_SELECT: LazyLock<Regex> =
//     LazyLock::new(|| Regex::new(r"(?i)(SELECT\s+.*?\s+FROM)").unwrap());

// pub static RE_SELECT_FIELDS: LazyLock<Regex> =
//     LazyLock::new(|| Regex::new(r"(?i)^SELECT\s+(.*?)\s+FROM").unwrap());

// pub static RE_LIMIT_OFFSET: LazyLock<Regex> =
//     LazyLock::new(|| Regex::new(r"(?i)(LIMIT\s+\d+(\s+OFFSET\s+\d+)?)").unwrap());

// pub static RE_ORDER_BY: LazyLock<Regex> =
//     LazyLock::new(|| Regex::new(r"(?i)\s*ORDER BY.*").unwrap());

// // static RE_TABLE_NAME: LazyLock<Regex> =
//     LazyLock::new(|| Regex::new(r"(?i)\bFROM\s+(\w+)").unwrap());

// static RE_WHERE_CLAUSE: LazyLock<Regex> =
//     LazyLock::new(|| Regex::new(r"(?i)\s+WHERE\s+(.*?)(?:\s+LIMIT|\s+OFFSET|$)").unwrap());

#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
pub static DB_POOL_CACHE: LazyLock<PoolCache> =
    LazyLock::new(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())));

/// 通用方法:从缓存中获取连接池或者动态生成连接池并添加到缓存
#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
async fn get_or_create_pool<F, Fut>(connect_key: &str, create_pool: F) -> anyhow::Result<DbPool>
where
    F: FnOnce() -> Fut, // 接收一个函数,返回一个 Future
    Fut: std::future::Future<Output = anyhow::Result<DbPool>>, // Future 的输出是 `Result<DbPool>`
{
    // 读取缓存数据
    if let Some(pool) = DB_POOL_CACHE.read().await.get(connect_key) {
        return Ok(pool.clone());
    }
    // 如果缓存未命中,则调用 `create_pool` 创建新的连接池
    let pool = create_pool().await?;
    // 将新创建的连接池插入缓存
    DB_POOL_CACHE
        .write()
        .await
        .insert(connect_key.to_string(), pool.clone());
    Ok(pool)
}

#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
pub async fn remove_pool_from_cache(database_path: impl AsRef<Path>) -> bool {
    use tokio::time::{sleep, Duration};
    let path: &Path = database_path.as_ref();
    let connect_key = format!("{}", path.to_string_lossy());

    let mut cache = DB_POOL_CACHE.write().await;
    if let Some(pool) = cache.remove(&connect_key) {
        drop(pool); // 显式释放 pool
        sleep(Duration::from_millis(100)).await; // 等待句柄真正释放
        true
    } else {
        false
    }
}

#[cfg(feature = "sqlite")]
async fn get_sqlite_pool(database_path: impl AsRef<Path>) -> anyhow::Result<DbPool> {
    let path: &Path = database_path.as_ref();
    let connect_key = format!("{}", path.to_string_lossy());
    get_or_create_pool(&connect_key, || async {
        let options = SqliteConnectOptions::new()
            .filename(path)
            .create_if_missing(true);
        let pool = sqlx::sqlite::SqlitePool::connect_with(options.clone()).await?;
        Ok(DbPool::Sqlite(pool))
    })
    .await
}

#[cfg(any(feature = "mysql", feature = "postgres"))]
async fn get_server_pool(op: &DbConnect) -> anyhow::Result<DbPool> {
    let connect_key = format!(
        "{}_{}_{}_{}_{}",
        op.host, op.port, op.db_name, op.username, op.password
    );
    get_or_create_pool(&connect_key, || async {
        let db_pool = match op.kind {
            #[cfg(feature = "mysql")]
            DatabaseKind::MySql => {
                let connection_string = format!(
                    "mysql://{}:{}@{}:{}/{}",
                    op.username, op.password, op.host, op.port, op.db_name
                );
                let pool = MySqlPool::connect(&connection_string).await?;
                DbPool::MySql(pool)
            }
            #[cfg(feature = "postgres")]
            DatabaseKind::Postgres => {
                let connection_string = format!(
                    "postgres://{}:{}@{}:{}/{}",
                    op.username, op.password, op.host, op.port, op.db_name
                );
                let pool: Pool<Postgres> = PgPool::connect(&connection_string).await?;
                DbPool::PgSql(pool)
            }
            _ => return Err(anyhow::anyhow!("Unsupported database type")),
        };
        Ok(db_pool)
    })
    .await
}

#[cfg(feature = "sqlite")]
pub async fn sqlite_pool(database_path: impl AsRef<Path>) -> anyhow::Result<sqlx::Pool<Sqlite>> {
    let pool = get_sqlite_pool(database_path).await?;
    match pool {
        DbPool::Sqlite(pool) => Ok(pool),
        _ => Err(anyhow::anyhow!("Unsupported sqlite_pool database type")),
    }
}

#[cfg(feature = "mysql")]
pub async fn mysql_pool(op: &DbConnect) -> anyhow::Result<sqlx::Pool<MySql>> {
    let pool = get_server_pool(op).await?;
    match pool {
        DbPool::MySql(pool) => Ok(pool),
        _ => Err(anyhow::anyhow!("Invalid database type for MySql")),
    }
}

#[cfg(feature = "postgres")]
pub async fn postgres_pool(op: &DbConnect) -> anyhow::Result<Pool<Postgres>> {
    let pool = get_server_pool(op).await?;
    match pool {
        DbPool::PgSql(pool) => Ok(pool),
        _ => Err(anyhow::Error::msg("Invalid database type for PostgreSQL")),
    }
}



#[cfg(test)]
#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
mod tests {
    use super::*;
    use crate::define_model; // 引入 define_model 宏
    define_model!(Code2 , "TABLE_CODE",
        id: String,
        name:String,
        kind: String, // api请求 ,code  代码,commend  命令
        content: String,
        params: Option<serde_json::Value>,
        req: Option<String>,
        res: Option<String>,
        created_at: Option<i64>,
        updated_at : Option<i64>,
        is_active: bool // bool,
    );


    #[test]
    pub fn run() {
        let sql = "SELECT id FROM sample WHERE a=0 AND B>1 ORDER BY id LIMIT 1000 OFFSET 0";
        // let r = select_add_count(sql);
        // println!("{}", r);
    }
}