use super::DatabaseAdapter;
use crate::cache::CacheManager;
use crate::error::QuickDbResult;
use crate::model::FieldDefinition;
use crate::pool::DatabaseConnection;
use crate::types::*;
use async_trait::async_trait;
use rat_logger::{debug, warn};
use std::collections::HashMap;
use std::sync::Arc;
pub struct CachedDatabaseAdapter {
inner: Box<dyn DatabaseAdapter>,
cache_manager: Arc<CacheManager>,
}
impl CachedDatabaseAdapter {
pub fn new(inner: Box<dyn DatabaseAdapter>, cache_manager: Arc<CacheManager>) -> Self {
Self {
inner,
cache_manager,
}
}
}
#[async_trait]
impl DatabaseAdapter for CachedDatabaseAdapter {
async fn create(
&self,
connection: &DatabaseConnection,
table: &str,
data: &HashMap<String, DataValue>,
id_strategy: &IdStrategy,
alias: &str,
) -> QuickDbResult<DataValue> {
let result = self
.inner
.create(connection, table, data, id_strategy, alias)
.await;
if result.is_ok() {
if let Err(e) = self.cache_manager.clear_table_query_cache(table).await {
warn!("清理表查询缓存失败: {}", e);
}
debug!("已清理表查询缓存: table={}", table);
}
result
}
async fn find_by_id(
&self,
connection: &DatabaseConnection,
table: &str,
id: &DataValue,
alias: &str,
) -> QuickDbResult<Option<DataValue>> {
let id_type = match id {
DataValue::Int(n) => IdType::Number(*n),
DataValue::String(s) => IdType::String(s.clone()),
_ => {
warn!("无法将DataValue转换为IdType: {:?}", id);
return self.inner.find_by_id(connection, table, id, alias).await;
}
};
match self.cache_manager.get_cached_record(table, &id_type).await {
Ok(Some(cached_result)) => {
debug!("缓存命中: 表={}, ID={:?}", table, id);
return Ok(Some(cached_result));
}
Ok(None) => {
debug!("缓存未命中: 表={}, ID={:?}", table, id);
}
Err(e) => {
warn!("缓存查询失败: {}, 继续查询数据库", e);
}
}
let result = self.inner.find_by_id(connection, table, id, alias).await;
if let Ok(Some(ref record)) = result {
if let Err(e) = self
.cache_manager
.cache_record(table, &id_type, record)
.await
{
warn!("缓存记录失败: {}", e);
}
}
result
}
async fn find_with_cache_control(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
options: &QueryOptions,
alias: &str,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
let condition_groups = if conditions.is_empty() {
vec![]
} else {
let group_conditions: Vec<QueryConditionGroupWithConfig> = conditions
.iter()
.map(|c| QueryConditionGroupWithConfig::Single(c.clone()))
.collect();
vec![QueryConditionGroupWithConfig::GroupWithConfig {
operator: LogicalOperator::And,
conditions: group_conditions,
}]
};
self.find_with_groups_with_cache_control_and_config(connection, table, &condition_groups, options, alias, bypass_cache)
.await
}
async fn find_with_groups_with_cache_control(
&self,
connection: &DatabaseConnection,
table: &str,
condition_groups: &[QueryConditionGroup],
options: &QueryOptions,
alias: &str,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
let condition_groups_with_config: Vec<QueryConditionGroupWithConfig> = condition_groups
.iter()
.map(|g| g.clone().into())
.collect();
self.find_with_groups_with_cache_control_and_config(
connection,
table,
&condition_groups_with_config,
options,
alias,
bypass_cache,
)
.await
}
async fn find_with_groups_with_cache_control_and_config(
&self,
connection: &DatabaseConnection,
table: &str,
condition_groups: &[QueryConditionGroupWithConfig],
options: &QueryOptions,
alias: &str,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
let cache_key = self.cache_manager.generate_condition_groups_with_config_cache_key(
table,
condition_groups,
options,
);
if !bypass_cache {
match self
.cache_manager
.get_cached_condition_groups_with_config_result(table, condition_groups, options)
.await
{
Ok(Some(cached_result)) => {
debug!("条件组合查询缓存命中: 表={}, 键={}", table, cache_key);
return Ok(cached_result);
}
Ok(None) => {
debug!("条件组合查询缓存未命中: 表={}, 键={}", table, cache_key);
}
Err(e) => {
warn!("获取条件组合查询缓存失败: {}", e);
}
}
} else {
debug!("强制跳过缓存: 表={}, 键={}", table, cache_key);
}
let result = self
.inner
.find_with_groups_with_config(connection, table, condition_groups, options, alias)
.await?;
if !bypass_cache {
if let Err(e) = self
.cache_manager
.cache_condition_groups_with_config_result(table, condition_groups, options, &result)
.await
{
warn!("缓存条件组合查询结果失败: {}", e);
} else {
debug!(
"已缓存条件组合查询结果: 表={}, 键={}, 结果数量={}",
table,
cache_key,
result.len()
);
}
}
Ok(result)
}
async fn update(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
data: &HashMap<String, DataValue>,
alias: &str,
) -> QuickDbResult<u64> {
let result = self
.inner
.update(connection, table, conditions, data, alias)
.await;
if let Ok(updated_count) = result {
if updated_count > 0 {
if let Err(e) = self.cache_manager.clear_table_query_cache(table).await {
warn!("清理表查询缓存失败: {}", e);
}
debug!(
"已清理表查询缓存: table={}, updated_count={}",
table, updated_count
);
}
}
result
}
async fn update_with_operations(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
operations: &[crate::types::UpdateOperation],
alias: &str,
) -> QuickDbResult<u64> {
let result = self
.inner
.update_with_operations(connection, table, conditions, operations, alias)
.await;
if let Ok(updated_count) = result {
if updated_count > 0 {
if let Err(e) = self.cache_manager.clear_table_query_cache(table).await {
warn!("清理表查询缓存失败: {}", e);
}
debug!(
"已清理表查询缓存: table={}, updated_count={}",
table, updated_count
);
}
}
result
}
async fn update_by_id(
&self,
connection: &DatabaseConnection,
table: &str,
id: &DataValue,
data: &HashMap<String, DataValue>,
alias: &str,
) -> QuickDbResult<bool> {
let result = self
.inner
.update_by_id(connection, table, id, data, alias)
.await;
if let Ok(true) = result {
let id_value = match id {
DataValue::Int(n) => IdType::Number(*n),
DataValue::String(s) => IdType::String(s.clone()),
_ => {
warn!("无法将DataValue转换为IdType: {:?}", id);
return result;
}
};
if let Err(e) = self.cache_manager.invalidate_record(table, &id_value).await {
warn!("清理记录缓存失败: {}", e);
}
if let Err(e) = self.cache_manager.clear_table_query_cache(table).await {
warn!("清理表查询缓存失败: {}", e);
}
debug!("已清理记录和查询缓存: table={}, id={:?}", table, id);
}
result
}
async fn delete(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
let result = self
.inner
.delete(connection, table, conditions, alias)
.await;
if let Ok(deleted_count) = result {
if deleted_count > 0 {
if let Err(e) = self.cache_manager.clear_table_query_cache(table).await {
warn!("清理表查询缓存失败: {}", e);
}
if let Err(e) = self.cache_manager.clear_table_record_cache(table).await {
warn!("清理表记录缓存失败: {}", e);
}
debug!(
"已清理表缓存: table={}, deleted_count={}",
table, deleted_count
);
}
}
result
}
async fn delete_by_id(
&self,
connection: &DatabaseConnection,
table: &str,
id: &DataValue,
alias: &str,
) -> QuickDbResult<bool> {
let result = self.inner.delete_by_id(connection, table, id, alias).await;
if let Ok(true) = result {
let id_value = match id {
DataValue::Int(n) => IdType::Number(*n),
DataValue::String(s) => IdType::String(s.clone()),
_ => {
warn!("无法将DataValue转换为IdType: {:?}", id);
return result;
}
};
if let Err(e) = self.cache_manager.invalidate_record(table, &id_value).await {
warn!("清理记录缓存失败: {}", e);
}
if let Err(e) = self.cache_manager.clear_table_query_cache(table).await {
warn!("清理表查询缓存失败: {}", e);
}
debug!("已清理记录和查询缓存: table={}, id={:?}", table, id);
}
result
}
async fn count(
&self,
connection: &DatabaseConnection,
table: &str,
conditions: &[QueryConditionWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
self.inner.count(connection, table, conditions, alias).await
}
async fn count_with_groups(
&self,
connection: &DatabaseConnection,
table: &str,
condition_groups: &[QueryConditionGroupWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
self.inner.count_with_groups(connection, table, condition_groups, alias).await
}
async fn create_table(
&self,
connection: &DatabaseConnection,
table: &str,
fields: &HashMap<String, FieldDefinition>,
id_strategy: &IdStrategy,
alias: &str,
) -> QuickDbResult<()> {
self.inner
.create_table(connection, table, fields, id_strategy, alias)
.await
}
async fn create_index(
&self,
connection: &DatabaseConnection,
table: &str,
index_name: &str,
fields: &[String],
unique: bool,
) -> QuickDbResult<()> {
self.inner
.create_index(connection, table, index_name, fields, unique)
.await
}
async fn table_exists(
&self,
connection: &DatabaseConnection,
table: &str,
) -> QuickDbResult<bool> {
self.inner.table_exists(connection, table).await
}
async fn drop_table(&self, connection: &DatabaseConnection, table: &str) -> QuickDbResult<()> {
let result = self.inner.drop_table(connection, table).await;
if result.is_ok() {
if let Err(e) = self.cache_manager.clear_table_query_cache(table).await {
warn!("清理表缓存失败: {}", e);
}
debug!("已清理表缓存: table={}", table);
}
result
}
async fn get_server_version(&self, connection: &DatabaseConnection) -> QuickDbResult<String> {
self.inner.get_server_version(connection).await
}
async fn create_stored_procedure(
&self,
connection: &DatabaseConnection,
config: &crate::stored_procedure::StoredProcedureConfig,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureCreateResult> {
self.inner.create_stored_procedure(connection, config).await
}
async fn execute_stored_procedure(
&self,
connection: &DatabaseConnection,
procedure_name: &str,
database: &str,
params: Option<std::collections::HashMap<String, crate::types::DataValue>>,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureQueryResult> {
self.inner
.execute_stored_procedure(connection, procedure_name, database, params)
.await
}
}