use crate::error::{QuickDbError, QuickDbResult};
use crate::manager::get_global_pool_manager;
use crate::odm::manager_core::AsyncOdmManager;
use crate::pool::DatabaseOperation;
use crate::types::*;
use rat_logger::{debug, info, warn};
use tokio::sync::oneshot;
impl AsyncOdmManager {
#[doc(hidden)]
pub async fn handle_delete(
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
alias: Option<String>,
) -> QuickDbResult<u64> {
let manager = get_global_pool_manager();
let actual_alias = match alias {
Some(a) => a,
None => manager
.get_default_alias()
.await
.unwrap_or_else(|| "default".to_string()),
};
debug!(
"处理删除请求: collection={}, alias={}",
collection, actual_alias
);
let manager = get_global_pool_manager();
let connection_pools = manager.get_connection_pools();
let connection_pool =
connection_pools
.get(&actual_alias)
.ok_or_else(|| QuickDbError::AliasNotFound {
alias: actual_alias.clone(),
})?;
let (response_tx, response_rx) = oneshot::channel();
let operation = DatabaseOperation::Delete {
table: collection.to_string(),
conditions,
alias: actual_alias.clone(),
response: response_tx,
};
connection_pool
.operation_sender
.send(operation)
.map_err(|_| QuickDbError::ConnectionError {
message: "连接池操作通道已关闭".to_string(),
})?;
let affected_rows = response_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: "等待连接池响应超时".to_string(),
})??;
Ok(affected_rows)
}
#[doc(hidden)]
pub async fn handle_delete_by_id(
collection: &str,
id: &str,
alias: Option<String>,
) -> QuickDbResult<bool> {
let manager = get_global_pool_manager();
let actual_alias = match alias {
Some(a) => a,
None => manager
.get_default_alias()
.await
.unwrap_or_else(|| "default".to_string()),
};
debug!(
"处理根据ID删除请求: collection={}, id={}, alias={}",
collection, id, actual_alias
);
let manager = get_global_pool_manager();
let connection_pools = manager.get_connection_pools();
let connection_pool =
connection_pools
.get(&actual_alias)
.ok_or_else(|| QuickDbError::AliasNotFound {
alias: actual_alias.clone(),
})?;
let (response_tx, response_rx) = oneshot::channel();
let operation = DatabaseOperation::DeleteById {
table: collection.to_string(),
id: DataValue::String(id.to_string()),
alias: actual_alias.clone(),
response: response_tx,
};
connection_pool
.operation_sender
.send(operation)
.map_err(|_| QuickDbError::ConnectionError {
message: "连接池操作通道已关闭".to_string(),
})?;
let result = response_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: "等待连接池响应超时".to_string(),
})??;
Ok(result)
}
#[doc(hidden)]
pub async fn handle_count(
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
alias: Option<String>,
) -> QuickDbResult<u64> {
let manager = get_global_pool_manager();
let actual_alias = match alias {
Some(a) => a,
None => manager
.get_default_alias()
.await
.unwrap_or_else(|| "default".to_string()),
};
debug!(
"处理计数请求: collection={}, alias={}",
collection, actual_alias
);
let manager = get_global_pool_manager();
let connection_pools = manager.get_connection_pools();
let connection_pool =
connection_pools
.get(&actual_alias)
.ok_or_else(|| QuickDbError::AliasNotFound {
alias: actual_alias.clone(),
})?;
let (response_tx, response_rx) = oneshot::channel();
let operation = DatabaseOperation::Count {
table: collection.to_string(),
conditions,
alias: actual_alias.clone(),
response: response_tx,
};
connection_pool
.operation_sender
.send(operation)
.map_err(|_| QuickDbError::ConnectionError {
message: "连接池操作通道已关闭".to_string(),
})?;
let count = response_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: "等待连接池响应超时".to_string(),
})??;
Ok(count)
}
#[doc(hidden)]
pub async fn handle_count_with_groups(
collection: &str,
condition_groups: Vec<QueryConditionGroupWithConfig>,
alias: Option<String>,
) -> QuickDbResult<u64> {
let manager = get_global_pool_manager();
let actual_alias = match alias {
Some(a) => a,
None => manager
.get_default_alias()
.await
.unwrap_or_else(|| "default".to_string()),
};
debug!(
"处理条件组合计数请求: collection={}, alias={}",
collection, actual_alias
);
let manager = get_global_pool_manager();
let connection_pools = manager.get_connection_pools();
let connection_pool =
connection_pools
.get(&actual_alias)
.ok_or_else(|| QuickDbError::AliasNotFound {
alias: actual_alias.clone(),
})?;
let (response_tx, response_rx) = oneshot::channel();
let operation = DatabaseOperation::CountWithGroups {
table: collection.to_string(),
condition_groups,
alias: actual_alias.clone(),
response: response_tx,
};
connection_pool
.operation_sender
.send(operation)
.map_err(|_| QuickDbError::ConnectionError {
message: "连接池操作通道已关闭".to_string(),
})?;
let count = response_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: "等待连接池响应超时".to_string(),
})??;
Ok(count)
}
#[doc(hidden)]
pub async fn handle_get_server_version(alias: Option<String>) -> QuickDbResult<String> {
let manager = get_global_pool_manager();
let actual_alias = match alias {
Some(a) => a,
None => manager
.get_default_alias()
.await
.unwrap_or_else(|| "default".to_string()),
};
debug!("处理版本查询请求: alias={}", actual_alias);
let manager = get_global_pool_manager();
let connection_pools = manager.get_connection_pools();
let connection_pool =
connection_pools
.get(&actual_alias)
.ok_or_else(|| QuickDbError::AliasNotFound {
alias: actual_alias.clone(),
})?;
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let operation = DatabaseOperation::GetServerVersion {
response: response_tx,
};
connection_pool
.operation_sender
.send(operation)
.map_err(|_| QuickDbError::ConnectionError {
message: "连接池操作通道已关闭".to_string(),
})?;
let result = response_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: "等待数据库操作结果超时".to_string(),
})??;
Ok(result)
}
}