use crate::adapter::DatabaseAdapter;
use crate::adapter::MysqlAdapter;
use crate::adapter::mysql::query_builder::SqlQueryBuilder;
use crate::error::{QuickDbError, QuickDbResult};
use crate::manager;
use crate::model::{FieldDefinition, FieldType};
use crate::pool::DatabaseConnection;
use crate::types::*;
use async_trait::async_trait;
use rat_logger::debug;
use sqlx::Row;
use std::collections::HashMap;
use super::query as mysql_query;
use super::schema as mysql_schema;
#[async_trait]
impl DatabaseAdapter for MysqlAdapter {
async fn create(
&self,
connection: &DatabaseConnection,
table: &str,
data: &HashMap<String, DataValue>,
id_strategy: &IdStrategy,
alias: &str,
) -> QuickDbResult<DataValue> {
if let DatabaseConnection::MySQL(pool) = connection {
if !self.table_exists(connection, table).await? {
let _lock = self.acquire_table_lock(table).await;
if !self.table_exists(connection, table).await? {
if let Some(model_meta) = manager::get_model_with_alias(table, alias) {
debug!("表 {} 不存在,使用预定义模型元数据创建", table);
self.create_table(
connection,
table,
&model_meta.fields,
id_strategy,
alias,
)
.await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
debug!("⏱️ 等待100ms确保表 '{}' 创建完成", table);
} else {
return Err(QuickDbError::ValidationError {
field: "table_creation".to_string(),
message: format!(
"表 '{}' 不存在,且没有预定义的模型元数据。请先定义模型并使用 define_model! 宏明确指定字段类型。",
table
),
});
}
} else {
debug!("表 {} 已存在,跳过创建", table);
}
}
let (sql, params) = SqlQueryBuilder::new()
.insert(data.clone())
.build(table, alias)?;
debug!("生成的INSERT SQL: {}", sql);
debug!("绑定参数: {:?}", params);
let mut tx = pool.begin().await.map_err(|e| QuickDbError::QueryError {
message: format!("开始事务失败: {}", e),
})?;
let affected_rows = {
let mut query = sqlx::query::<sqlx::MySql>(&sql);
for param in ¶ms {
query = match param {
DataValue::String(s) => query.bind(s),
DataValue::Int(i) => query.bind(i),
DataValue::UInt(u) => {
if *u <= i64::MAX as u64 {
query.bind(*u as i64)
} else {
query.bind(u.to_string())
}
}
DataValue::Float(f) => query.bind(f),
DataValue::Bool(b) => query.bind(b),
DataValue::DateTime(dt) => query.bind(dt.naive_utc().and_utc()),
DataValue::DateTimeUTC(dt) => query.bind(dt.naive_utc()),
DataValue::Uuid(uuid) => query.bind(uuid),
DataValue::Json(json) => query.bind(json.to_string()),
DataValue::Bytes(bytes) => query.bind(bytes.as_slice()),
DataValue::Null => query.bind(Option::<String>::None),
DataValue::Array(arr) => {
let json_values: Vec<serde_json::Value> =
arr.iter().map(|v| v.to_json_value()).collect();
query.bind(serde_json::to_string(&json_values).unwrap_or_default())
}
DataValue::Object(obj) => {
let json_map: serde_json::Map<String, serde_json::Value> = obj
.iter()
.map(|(k, v)| (k.clone(), v.to_json_value()))
.collect();
query.bind(serde_json::to_string(&json_map).unwrap_or_default())
}
};
}
let execute_result = query.execute(&mut *tx).await;
match execute_result {
Ok(result) => {
let rows = result.rows_affected();
debug!("✅ SQL执行成功,影响的行数: {}", rows);
rows
}
Err(e) => {
debug!("❌ SQL执行失败: {}", e);
return Err(QuickDbError::QueryError {
message: format!("执行插入失败: {}", e),
});
}
}
};
debug!("插入操作最终影响的行数: {}", affected_rows);
let id_value = match id_strategy {
IdStrategy::AutoIncrement => {
let last_id_row = sqlx::query::<sqlx::MySql>("SELECT LAST_INSERT_ID()")
.fetch_one(&mut *tx)
.await
.map_err(|e| QuickDbError::QueryError {
message: format!("获取LAST_INSERT_ID失败: {}", e),
})?;
let last_id: u64 =
last_id_row
.try_get(0)
.map_err(|e| QuickDbError::QueryError {
message: format!("解析LAST_INSERT_ID失败: {}", e),
})?;
debug!("在事务中获取到的LAST_INSERT_ID: {}", last_id);
DataValue::Int(last_id as i64)
}
_ => {
if let Some(id_data) = data.get("id") {
debug!("使用数据中的ID字段: {:?}", id_data);
id_data.clone()
} else {
debug!("数据中没有ID字段,返回默认值0");
DataValue::Int(0)
}
}
};
let commit_result = tx.commit().await;
match commit_result {
Ok(_) => debug!("✅ 事务提交成功"),
Err(e) => {
debug!("❌ 事务提交失败: {}", e);
return Err(QuickDbError::QueryError {
message: format!("提交事务失败: {}", e),
});
}
}
let mut result_map = std::collections::HashMap::new();
result_map.insert("id".to_string(), id_value.clone());
result_map.insert(
"affected_rows".to_string(),
DataValue::Int(affected_rows as i64),
);
debug!(
"最终返回的DataValue: {:?}",
DataValue::Object(result_map.clone())
);
Ok(DataValue::Object(result_map))
} else {
Err(QuickDbError::ConnectionError {
message: "连接类型不匹配,期望MySQL连接".to_string(),
})
}
}
async fn find_by_id(
&self,
connection: &DatabaseConnection,
table: &str,
id: &DataValue,
alias: &str,
) -> QuickDbResult<Option<DataValue>> {
if let DatabaseConnection::MySQL(pool) = connection {
let condition = QueryConditionWithConfig {
field: "id".to_string(),
operator: QueryOperator::Eq,
value: id.clone(),
case_insensitive: false,
};
let (sql, params) = SqlQueryBuilder::new()
.select(&["*"])
.where_condition(condition)
.limit(1)
.build(table, alias)?;
let results = self.execute_query(pool, &sql, ¶ms, table).await?;
Ok(results.into_iter().next())
} else {
Err(QuickDbError::ConnectionError {
message: "连接类型不匹配,期望MySQL连接".to_string(),
})
}
}
async fn find_with_cache_control(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
options: &QueryOptions,
alias: &str,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
let condition_groups = if conditions.is_empty() {
vec![]
} else {
let group_conditions = conditions
.iter()
.map(|c| QueryConditionGroupWithConfig::Single(c.clone()))
.collect();
vec![QueryConditionGroupWithConfig::GroupWithConfig {
operator: crate::types::LogicalOperator::And,
conditions: group_conditions,
}]
};
self.find_with_groups_with_cache_control_and_config(connection, table, &condition_groups, options, alias, bypass_cache)
.await
}
async fn find_with_groups_with_cache_control(
&self,
connection: &DatabaseConnection,
table: &str,
condition_groups: &[QueryConditionGroup],
options: &QueryOptions,
alias: &str,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
fn convert_group(group: &QueryConditionGroup) -> QueryConditionGroupWithConfig {
match group {
QueryConditionGroup::Single(c) => {
QueryConditionGroupWithConfig::Single(QueryConditionWithConfig {
field: c.field.clone(),
operator: c.operator.clone(),
value: c.value.clone(),
case_insensitive: false,
})
}
QueryConditionGroup::Group { operator, conditions } => {
QueryConditionGroupWithConfig::GroupWithConfig {
operator: operator.clone(),
conditions: conditions.iter().map(convert_group).collect(),
}
}
}
}
let condition_groups_with_config: Vec<QueryConditionGroupWithConfig> =
condition_groups.iter().map(convert_group).collect();
if let DatabaseConnection::MySQL(pool) = connection {
let mut builder = SqlQueryBuilder::new()
.select(&["*"])
.where_condition_groups(&condition_groups_with_config);
for sort_field in &options.sort {
builder = builder.order_by(&sort_field.field, sort_field.direction.clone());
}
if let Some(pagination) = &options.pagination {
builder = builder.limit(pagination.limit).offset(pagination.skip);
}
let (sql, params) = builder.build(table, alias)?;
debug!("执行MySQL条件组合查询: {}", sql);
self.execute_query(pool, &sql, ¶ms, table).await
} else {
Err(QuickDbError::ConnectionError {
message: "连接类型不匹配,期望MySQL连接".to_string(),
})
}
}
async fn find_with_groups_with_cache_control_and_config(
&self,
connection: &DatabaseConnection,
table: &str,
condition_groups: &[QueryConditionGroupWithConfig],
options: &QueryOptions,
alias: &str,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
fn convert_group(group: &QueryConditionGroupWithConfig) -> QueryConditionGroup {
match group {
QueryConditionGroupWithConfig::Single(c) => {
QueryConditionGroup::Single(QueryCondition {
field: c.field.clone(),
operator: c.operator.clone(),
value: c.value.clone(),
})
}
QueryConditionGroupWithConfig::GroupWithConfig { operator, conditions } => {
QueryConditionGroup::Group {
operator: operator.clone(),
conditions: conditions.iter().map(convert_group).collect(),
}
}
}
}
let simple_groups: Vec<QueryConditionGroup> =
condition_groups.iter().map(convert_group).collect();
self.find_with_groups_with_cache_control(connection, table, &simple_groups, options, alias, bypass_cache).await
}
async fn find(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
options: &QueryOptions,
alias: &str,
) -> QuickDbResult<Vec<DataValue>> {
self.find_with_cache_control(connection, table, conditions, options, alias, false).await
}
async fn find_with_groups(
&self,
connection: &DatabaseConnection,
table: &str,
condition_groups: &[QueryConditionGroup],
options: &QueryOptions,
alias: &str,
) -> QuickDbResult<Vec<DataValue>> {
self.find_with_groups_with_cache_control(connection, table, condition_groups, options, alias, false).await
}
async fn update(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
data: &HashMap<String, DataValue>,
alias: &str,
) -> QuickDbResult<u64> {
if let DatabaseConnection::MySQL(pool) = connection {
let model_meta =
crate::manager::get_model_with_alias(table, alias).ok_or_else(|| {
QuickDbError::ValidationError {
field: "model".to_string(),
message: format!("模型 '{}' 不存在", table),
}
})?;
let field_map: std::collections::HashMap<String, crate::model::FieldDefinition> =
model_meta
.fields
.iter()
.map(|(name, f)| (name.clone(), f.clone()))
.collect();
let mut validated_data = HashMap::new();
for (field_name, data_value) in data {
if let Some(field_def) = field_map.get(field_name) {
if matches!(
field_def.field_type,
crate::model::FieldType::DateTimeWithTz { .. }
) {
let converted = match data_value {
DataValue::String(s) => chrono::DateTime::parse_from_rfc3339(s)
.map(|dt| {
DataValue::DateTime(
dt.with_timezone(&chrono::FixedOffset::east(0)),
)
})
.unwrap_or(data_value.clone()),
DataValue::DateTimeUTC(dt) => {
DataValue::DateTime(dt.with_timezone(&chrono::FixedOffset::east(0)))
}
_ => data_value.clone(),
};
validated_data.insert(field_name.clone(), converted);
} else {
validated_data.insert(field_name.clone(), data_value.clone());
}
} else {
return Err(QuickDbError::ValidationError {
field: field_name.clone(),
message: format!("字段 '{}' 在模型中不存在", field_name),
});
}
}
let (sql, params) = SqlQueryBuilder::new()
.update(validated_data)
.where_conditions(conditions)
.build(table, alias)?;
self.execute_update(pool, &sql, ¶ms, table).await
} else {
Err(QuickDbError::ConnectionError {
message: "连接类型不匹配,期望MySQL连接".to_string(),
})
}
}
async fn update_by_id(
&self,
connection: &DatabaseConnection,
table: &str,
id: &DataValue,
data: &HashMap<String, DataValue>,
alias: &str,
) -> QuickDbResult<bool> {
if let DatabaseConnection::MySQL(pool) = connection {
let condition = QueryConditionWithConfig {
field: "id".to_string(),
operator: QueryOperator::Eq,
value: id.clone(),
case_insensitive: false,
};
let (sql, params) = SqlQueryBuilder::new()
.update(data.clone())
.where_condition(condition)
.build(table, alias)?;
let affected_rows = self.execute_update(pool, &sql, ¶ms, table).await?;
Ok(affected_rows > 0)
} else {
Err(QuickDbError::ConnectionError {
message: "连接类型不匹配,期望MySQL连接".to_string(),
})
}
}
async fn update_with_operations(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
operations: &[crate::types::UpdateOperation],
alias: &str,
) -> QuickDbResult<u64> {
if let DatabaseConnection::MySQL(pool) = connection {
let mut set_clauses = Vec::new();
let mut params = Vec::new();
for operation in operations {
match &operation.operation {
crate::types::UpdateOperator::Set => {
set_clauses.push(format!("{} = ?", operation.field));
params.push(operation.value.clone());
}
crate::types::UpdateOperator::Increment => {
set_clauses.push(format!("{} = {} + ?", operation.field, operation.field));
params.push(operation.value.clone());
}
crate::types::UpdateOperator::Decrement => {
set_clauses.push(format!("{} = {} - ?", operation.field, operation.field));
params.push(operation.value.clone());
}
crate::types::UpdateOperator::Multiply => {
set_clauses.push(format!("{} = {} * ?", operation.field, operation.field));
params.push(operation.value.clone());
}
crate::types::UpdateOperator::Divide => {
set_clauses.push(format!("{} = {} / ?", operation.field, operation.field));
params.push(operation.value.clone());
}
crate::types::UpdateOperator::PercentIncrease => {
set_clauses.push(format!(
"{} = {} * (1.0 + ?/100.0)",
operation.field, operation.field
));
params.push(operation.value.clone());
}
crate::types::UpdateOperator::PercentDecrease => {
set_clauses.push(format!(
"{} = {} * (1.0 - ?/100.0)",
operation.field, operation.field
));
params.push(operation.value.clone());
}
}
}
if set_clauses.is_empty() {
return Err(QuickDbError::ValidationError {
field: "operations".to_string(),
message: "更新操作不能为空".to_string(),
});
}
let mut sql = format!("UPDATE {} SET {}", table, set_clauses.join(", "));
if !conditions.is_empty() {
let (where_clause, mut where_params) = SqlQueryBuilder::new()
.build_where_clause_with_offset(conditions, params.len() + 1, table, alias)?;
sql.push_str(&format!(" WHERE {}", where_clause));
params.extend(where_params);
}
debug!("执行MySQL操作更新: {}", sql);
self.execute_update(pool, &sql, ¶ms, table).await
} else {
Err(QuickDbError::ConnectionError {
message: "连接类型不匹配,期望MySQL连接".to_string(),
})
}
}
async fn delete(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
mysql_query::delete(self, connection, table, conditions, alias).await
}
async fn delete_by_id(
&self,
connection: &DatabaseConnection,
table: &str,
id: &DataValue,
alias: &str,
) -> QuickDbResult<bool> {
mysql_query::delete_by_id(self, connection, table, id, alias).await
}
async fn count(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
mysql_query::count(self, connection, table, conditions, alias).await
}
async fn count_with_groups(
&self,
connection: &DatabaseConnection,
table: &str,
condition_groups: &[QueryConditionGroupWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
mysql_query::count_with_groups(self, connection, table, condition_groups, alias).await
}
async fn create_table(
&self,
connection: &DatabaseConnection,
table: &str,
fields: &HashMap<String, FieldDefinition>,
id_strategy: &IdStrategy,
alias: &str,
) -> QuickDbResult<()> {
mysql_schema::create_table(self, connection, table, fields, id_strategy, alias).await
}
async fn create_index(
&self,
connection: &DatabaseConnection,
table: &str,
index_name: &str,
fields: &[String],
unique: bool,
) -> QuickDbResult<()> {
mysql_schema::create_index(self, connection, table, index_name, fields, unique).await
}
async fn table_exists(
&self,
connection: &DatabaseConnection,
table: &str,
) -> QuickDbResult<bool> {
mysql_schema::table_exists(self, connection, table).await
}
async fn drop_table(&self, connection: &DatabaseConnection, table: &str) -> QuickDbResult<()> {
mysql_schema::drop_table(self, connection, table).await
}
async fn get_server_version(&self, connection: &DatabaseConnection) -> QuickDbResult<String> {
mysql_schema::get_server_version(self, connection).await
}
async fn create_stored_procedure(
&self,
connection: &DatabaseConnection,
config: &crate::stored_procedure::StoredProcedureConfig,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureCreateResult> {
use crate::stored_procedure::StoredProcedureCreateResult;
use crate::types::id_types::IdStrategy;
debug!("开始创建MySQL存储过程: {}", config.procedure_name);
config
.validate()
.map_err(|e| crate::error::QuickDbError::ValidationError {
field: "config".to_string(),
message: format!("存储过程配置验证失败: {}", e),
})?;
for model_meta in &config.dependencies {
let table_name = &model_meta.collection_name;
if !self.table_exists(connection, table_name).await? {
debug!("依赖表 {} 不存在,尝试创建", table_name);
let id_strategy = crate::manager::get_id_strategy(&config.database)
.unwrap_or(IdStrategy::AutoIncrement);
self.create_table(
connection,
table_name,
&model_meta.fields,
&id_strategy,
&config.database,
)
.await?;
}
}
let sql_template = self.generate_stored_procedure_sql(&config).await?;
debug!("生成MySQL存储过程SQL模板: {}", sql_template);
let procedure_info = crate::stored_procedure::StoredProcedureInfo {
config: config.clone(),
template: sql_template.clone(),
db_type: "MySQL".to_string(),
created_at: chrono::Utc::now(),
};
let mut procedures = self.stored_procedures.lock().await;
procedures.insert(config.procedure_name.clone(), procedure_info);
debug!(
"✅ MySQL存储过程 {} 模板已存储到适配器映射表",
config.procedure_name
);
Ok(StoredProcedureCreateResult {
success: true,
procedure_name: config.procedure_name.clone(),
error: None,
})
}
async fn execute_stored_procedure(
&self,
connection: &DatabaseConnection,
procedure_name: &str,
database: &str,
params: Option<std::collections::HashMap<String, crate::types::DataValue>>,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureQueryResult> {
use crate::adapter::mysql::adapter::MysqlAdapter;
let procedures = self.stored_procedures.lock().await;
let procedure_info = procedures.get(procedure_name).ok_or_else(|| {
crate::error::QuickDbError::ValidationError {
field: "procedure_name".to_string(),
message: format!("存储过程 '{}' 不存在", procedure_name),
}
})?;
let sql_template = procedure_info.template.clone();
drop(procedures);
debug!(
"执行存储过程查询: {}, 模板: {}",
procedure_name, sql_template
);
let final_sql = self
.build_final_query_from_template(&sql_template, params)
.await?;
let pool = match connection {
DatabaseConnection::MySQL(pool) => pool,
_ => {
return Err(QuickDbError::ConnectionError {
message: "Invalid connection type for MySQL".to_string(),
});
}
};
debug!("执行存储过程查询SQL: {}", final_sql);
let rows = sqlx::query::<sqlx::MySql>(&final_sql)
.fetch_all(pool)
.await
.map_err(|e| QuickDbError::QueryError {
message: format!("执行存储过程查询失败: {}", e),
})?;
let mut query_result = Vec::new();
for row in rows {
let data_map = self.row_to_data_map(&row)?;
query_result.push(data_map);
}
let mut result = Vec::new();
for row_data in query_result {
let mut row_map = std::collections::HashMap::new();
for (key, value) in row_data {
row_map.insert(key, value);
}
result.push(row_map);
}
debug!(
"存储过程 {} 执行完成,返回 {} 条记录",
procedure_name,
result.len()
);
Ok(result)
}
}
impl MysqlAdapter {
async fn build_final_query_from_template(
&self,
template: &str,
params: Option<std::collections::HashMap<String, crate::types::DataValue>>,
) -> QuickDbResult<String> {
let mut final_sql = template.to_string();
if let Some(param_map) = params {
if let Some(where_clause) = param_map.get("WHERE") {
let where_str = match where_clause {
crate::types::DataValue::String(s) => s.clone(),
_ => where_clause.to_string(),
};
final_sql = final_sql.replace("{WHERE}", &format!(" WHERE {}", where_str));
} else {
final_sql = final_sql.replace("{WHERE}", "");
}
if let Some(group_by) = param_map.get("GROUP_BY") {
let group_by_str = match group_by {
crate::types::DataValue::String(s) => s.clone(),
_ => group_by.to_string(),
};
final_sql = final_sql.replace("{GROUP_BY}", &format!(" GROUP BY {}", group_by_str));
} else {
final_sql = final_sql.replace("{GROUP_BY}", "");
}
if let Some(having) = param_map.get("HAVING") {
let having_str = match having {
crate::types::DataValue::String(s) => s.clone(),
_ => having.to_string(),
};
final_sql = final_sql.replace("{HAVING}", &format!(" HAVING {}", having_str));
} else {
final_sql = final_sql.replace("{HAVING}", "");
}
if let Some(order_by) = param_map.get("ORDER_BY") {
let order_by_str = match order_by {
crate::types::DataValue::String(s) => s.clone(),
_ => order_by.to_string(),
};
final_sql = final_sql.replace("{ORDER_BY}", &format!(" ORDER BY {}", order_by_str));
} else {
final_sql = final_sql.replace("{ORDER_BY}", "");
}
if let Some(limit) = param_map.get("LIMIT") {
let limit_str = match limit {
crate::types::DataValue::Int(i) => i.to_string(),
_ => limit.to_string(),
};
final_sql = final_sql.replace("{LIMIT}", &format!(" LIMIT {}", limit_str));
} else {
final_sql = final_sql.replace("{LIMIT}", "");
}
if let Some(offset) = param_map.get("OFFSET") {
let offset_str = match offset {
crate::types::DataValue::Int(i) => i.to_string(),
_ => offset.to_string(),
};
final_sql = final_sql.replace("{OFFSET}", &format!(" OFFSET {}", offset_str));
} else {
final_sql = final_sql.replace("{OFFSET}", "");
}
} else {
final_sql = final_sql
.replace("{WHERE}", "")
.replace("{GROUP_BY}", "")
.replace("{HAVING}", "")
.replace("{ORDER_BY}", "")
.replace("{LIMIT}", "")
.replace("{OFFSET}", "");
}
final_sql = final_sql
.replace(" ", " ")
.replace(" ,", ",")
.replace(", ", ", ")
.replace(" WHERE ", "")
.replace(" HAVING ", "")
.replace(" ORDER BY ", "")
.replace(" LIMIT ", "")
.replace(" OFFSET ", "");
debug!("构建的最终SQL: {}", final_sql);
Ok(final_sql)
}
}