use super::schema::{ColumnDefinition, ColumnType, TableSchema};
use super::version::{MigrationScriptType, SchemaVersion, VersionManager};
use crate::adapter::DatabaseAdapter;
use crate::error::{QuickDbError, QuickDbResult};
use crate::manager::PoolManager;
use rat_logger::info;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct TableManager {
pool_manager: Arc<PoolManager>,
version_manager: Arc<RwLock<VersionManager>>,
schema_cache: Arc<RwLock<HashMap<String, TableSchema>>>,
existence_cache: Arc<RwLock<HashMap<String, bool>>>,
auto_create_tables: bool,
auto_migrate: bool,
}
#[derive(Debug, Clone)]
pub struct TableManagerConfig {
pub auto_create_tables: bool,
pub auto_migrate: bool,
pub cache_ttl_seconds: u64,
pub enable_schema_validation: bool,
}
impl Default for TableManagerConfig {
fn default() -> Self {
Self {
auto_create_tables: true,
auto_migrate: false,
cache_ttl_seconds: 300, enable_schema_validation: true,
}
}
}
#[derive(Debug, Clone)]
pub struct TableCreateOptions {
pub if_not_exists: bool,
pub create_indexes: bool,
pub add_constraints: bool,
pub table_options: Option<HashMap<String, String>>,
}
impl Default for TableCreateOptions {
fn default() -> Self {
Self {
if_not_exists: true,
create_indexes: true,
add_constraints: true,
table_options: None,
}
}
}
#[derive(Debug, Clone)]
pub struct TableCheckResult {
pub exists: bool,
pub current_version: Option<u32>,
pub expected_version: Option<u32>,
pub needs_migration: bool,
pub schema_diff: Option<SchemaDiff>,
}
#[derive(Debug, Clone)]
pub struct SchemaDiff {
pub added_columns: Vec<ColumnDefinition>,
pub removed_columns: Vec<String>,
pub modified_columns: Vec<(String, ColumnDefinition)>,
pub added_indexes: Vec<String>,
pub removed_indexes: Vec<String>,
}
impl TableManager {
pub fn new(pool_manager: Arc<PoolManager>, config: TableManagerConfig) -> Self {
Self {
pool_manager,
version_manager: Arc::new(RwLock::new(VersionManager::new())),
schema_cache: Arc::new(RwLock::new(HashMap::new())),
existence_cache: Arc::new(RwLock::new(HashMap::new())),
auto_create_tables: config.auto_create_tables,
auto_migrate: config.auto_migrate,
}
}
pub async fn check_table_exists(&self, table_name: &str) -> QuickDbResult<bool> {
{
let cache = self.existence_cache.read().await;
if let Some(&exists) = cache.get(table_name) {
return Ok(exists);
}
}
let connection = self.pool_manager.get_connection(Some("default")).await?;
let db_type = self.pool_manager.get_database_type("default")?;
let adapter = crate::adapter::create_adapter(&db_type)?;
let exists = false;
self.pool_manager.release_connection(&connection).await?;
{
let mut cache = self.existence_cache.write().await;
cache.insert(table_name.to_string(), exists);
}
Ok(exists)
}
pub async fn create_table(
&self,
schema: &TableSchema,
options: Option<TableCreateOptions>,
) -> QuickDbResult<()> {
let options = options.unwrap_or_default();
if options.if_not_exists {
let exists = self.check_table_exists(&schema.name).await?;
if exists {
info!("表 {} 已存在,跳过创建", schema.name);
return Ok(());
}
}
let pools = self.pool_manager.get_connection_pools();
let pool = pools
.get("default")
.ok_or_else(|| QuickDbError::ConfigError {
message: "无法获取默认连接池".to_string(),
})?
.clone();
let mut fields = std::collections::HashMap::new();
for column in &schema.columns {
let field_type = match &column.column_type {
crate::table::schema::ColumnType::Integer => crate::model::FieldType::Integer {
min_value: None,
max_value: None,
},
crate::table::schema::ColumnType::BigInteger => crate::model::FieldType::BigInteger,
crate::table::schema::ColumnType::Float => crate::model::FieldType::Float {
min_value: None,
max_value: None,
},
crate::table::schema::ColumnType::Double => crate::model::FieldType::Double,
crate::table::schema::ColumnType::String { length } => {
crate::model::FieldType::String {
max_length: length.map(|l| l as usize),
min_length: None,
regex: None,
}
}
crate::table::schema::ColumnType::Text => crate::model::FieldType::Text,
crate::table::schema::ColumnType::Boolean => crate::model::FieldType::Boolean,
crate::table::schema::ColumnType::DateTime => crate::model::FieldType::DateTime,
crate::table::schema::ColumnType::Date => crate::model::FieldType::Date,
crate::table::schema::ColumnType::Time => crate::model::FieldType::Time,
crate::table::schema::ColumnType::Json => crate::model::FieldType::Json,
crate::table::schema::ColumnType::Binary { length: _ } => {
crate::model::FieldType::Binary
}
crate::table::schema::ColumnType::Uuid => crate::model::FieldType::Uuid,
crate::table::schema::ColumnType::Decimal { precision, scale } => {
crate::model::FieldType::Decimal {
precision: *precision as u8,
scale: *scale as u8,
}
}
crate::table::schema::ColumnType::SmallInteger => {
crate::model::FieldType::Integer {
min_value: None,
max_value: None,
}
}
crate::table::schema::ColumnType::LongText => crate::model::FieldType::Text,
crate::table::schema::ColumnType::Timestamp => crate::model::FieldType::DateTime,
crate::table::schema::ColumnType::Blob => crate::model::FieldType::Binary,
crate::table::schema::ColumnType::Enum { values: _ } => {
crate::model::FieldType::String {
max_length: None,
min_length: None,
regex: None,
}
}
crate::table::schema::ColumnType::Custom { type_name: _ } => {
crate::model::FieldType::String {
max_length: None,
min_length: None,
regex: None,
}
}
};
fields.insert(
column.name.clone(),
crate::model::FieldDefinition::new(field_type),
);
}
let result = pool
.create_table(&schema.name, &fields, &pool.db_config.id_strategy)
.await;
if result.is_ok() {
{
let mut cache = self.existence_cache.write().await;
cache.insert(schema.name.clone(), true);
}
{
let mut cache = self.schema_cache.write().await;
cache.insert(schema.name.clone(), schema.clone());
}
{
let mut version_manager = self.version_manager.write().await;
version_manager.register_version(
schema.name.clone(),
schema.clone(),
Some("初始版本".to_string()),
)?;
}
info!("成功创建表: {}", schema.name);
}
result
}
pub async fn drop_table(&self, table_name: &str) -> QuickDbResult<()> {
let pools = self.pool_manager.get_connection_pools();
let pool = pools
.get("default")
.ok_or_else(|| QuickDbError::ConfigError {
message: "无法获取默认连接池".to_string(),
})?
.clone();
let result = pool.drop_table(table_name).await;
if result.is_ok() {
{
let mut cache = self.existence_cache.write().await;
cache.remove(table_name);
}
{
let mut cache = self.schema_cache.write().await;
cache.remove(table_name);
}
info!("成功删除表: {}", table_name);
}
result
}
pub async fn drop_and_recreate_table(
&self,
schema: &TableSchema,
options: Option<TableCreateOptions>,
) -> QuickDbResult<()> {
let table_name = &schema.name;
info!("开始删除并重建表: {}", table_name);
let table_exists = self.check_table_exists(table_name).await?;
if table_exists {
info!("表 {} 存在,正在删除...", table_name);
self.drop_table(table_name).await?;
} else {
info!("表 {} 不存在,直接创建", table_name);
}
info!("正在重新创建表: {}", table_name);
self.create_table(schema, options).await?;
info!("成功删除并重建表: {}", table_name);
Ok(())
}
pub async fn get_table_schema(&self, table_name: &str) -> QuickDbResult<Option<TableSchema>> {
{
let cache = self.schema_cache.read().await;
if let Some(schema) = cache.get(table_name) {
return Ok(Some(schema.clone()));
}
}
let connection = self.pool_manager.get_connection(Some("default")).await?;
let db_type = self.pool_manager.get_database_type("default")?;
let adapter = crate::adapter::create_adapter(&db_type)?;
let schema: Option<TableSchema> = None;
self.pool_manager.release_connection(&connection).await?;
if let Some(ref schema) = schema {
let mut cache = self.schema_cache.write().await;
cache.insert(table_name.to_string(), schema.clone());
}
Ok(schema)
}
pub async fn list_tables(&self) -> QuickDbResult<Vec<String>> {
let mut connection = self.pool_manager.get_connection(Some("default")).await?;
let db_type = self.pool_manager.get_database_type("default")?;
let adapter = crate::adapter::create_adapter(&db_type)?;
let tables = Vec::new();
self.pool_manager.release_connection(&connection).await?;
Ok(tables)
}
pub async fn check_table_status(&self, table_name: &str) -> QuickDbResult<TableCheckResult> {
let exists = self.check_table_exists(table_name).await?;
if !exists {
return Ok(TableCheckResult {
exists: false,
current_version: None,
expected_version: None,
needs_migration: false,
schema_diff: None,
});
}
let version_manager = self.version_manager.read().await;
let expected_version = version_manager
.get_current_version(table_name)
.map(|v| v.version);
let current_version = None;
let needs_migration =
if let (Some(current), Some(expected)) = (current_version, expected_version) {
current < expected
} else {
false
};
let schema_diff = None;
Ok(TableCheckResult {
exists,
current_version,
expected_version,
needs_migration,
schema_diff,
})
}
pub async fn migrate_table(
&self,
table_name: &str,
target_version: Option<u32>,
) -> QuickDbResult<()> {
let target_version = {
let version_manager = self.version_manager.read().await;
if let Some(version) = target_version {
version
} else {
version_manager
.get_current_version(table_name)
.map(|v| v.version)
.ok_or_else(|| QuickDbError::ValidationError {
field: "table_name".to_string(),
message: format!("表 {} 没有注册版本", table_name),
})?
}
};
let current_version = 0u32;
if current_version == target_version {
info!("表 {} 已是最新版本 {}", table_name, target_version);
return Ok(());
}
{
let version_manager_guard = self.version_manager.read().await;
let migration_path = version_manager_guard.get_migration_path(
table_name,
current_version,
target_version,
)?;
let migration_ids: Vec<String> = migration_path.iter().map(|m| m.id.clone()).collect();
drop(version_manager_guard);
for migration_id in migration_ids {
info!("执行迁移: {}", migration_id);
let mut version_manager = self.version_manager.write().await;
version_manager
.execute_migration(table_name, &migration_id)
.await?;
}
}
info!("表 {} 迁移完成,版本: {}", table_name, target_version);
Ok(())
}
pub async fn ensure_table_exists(&self, schema: &TableSchema) -> QuickDbResult<()> {
if !self.auto_create_tables {
return Ok(());
}
let exists = self.check_table_exists(&schema.name).await?;
if !exists {
info!("表 {} 不存在,自动创建", schema.name);
self.create_table(schema, None).await?;
} else if self.auto_migrate {
let status = self.check_table_status(&schema.name).await?;
if status.needs_migration {
info!("表 {} 需要迁移", schema.name);
self.migrate_table(&schema.name, None).await?;
}
}
Ok(())
}
pub async fn register_schema(
&self,
schema: TableSchema,
description: Option<String>,
) -> QuickDbResult<u32> {
let mut version_manager = self.version_manager.write().await;
let version =
version_manager.register_version(schema.name.clone(), schema.clone(), description)?;
{
let mut cache = self.schema_cache.write().await;
cache.insert(schema.name.clone(), schema);
}
Ok(version)
}
pub async fn create_migration_script(
&self,
table_name: &str,
from_version: u32,
to_version: u32,
up_script: String,
down_script: Option<String>,
script_type: MigrationScriptType,
) -> QuickDbResult<String> {
let mut version_manager = self.version_manager.write().await;
version_manager.create_migration(
table_name,
from_version,
to_version,
up_script,
down_script,
script_type,
)
}
pub async fn clear_cache(&self) {
{
let mut cache = self.existence_cache.write().await;
cache.clear();
}
{
let mut cache = self.schema_cache.write().await;
cache.clear();
}
info!("表管理器缓存已清除");
}
pub fn get_version_manager(&self) -> Arc<RwLock<VersionManager>> {
self.version_manager.clone()
}
pub async fn get_stats(&self) -> HashMap<String, serde_json::Value> {
let mut stats = HashMap::new();
{
let existence_cache = self.existence_cache.read().await;
stats.insert(
"existence_cache_size".to_string(),
serde_json::Value::Number(existence_cache.len().into()),
);
}
{
let schema_cache = self.schema_cache.read().await;
stats.insert(
"schema_cache_size".to_string(),
serde_json::Value::Number(schema_cache.len().into()),
);
}
stats.insert(
"auto_create_tables".to_string(),
serde_json::Value::Bool(self.auto_create_tables),
);
stats.insert(
"auto_migrate".to_string(),
serde_json::Value::Bool(self.auto_migrate),
);
stats
}
}