use crate::error::{QuickDbError, QuickDbResult};
use crate::manager::get_global_pool_manager;
use crate::odm::types::OdmRequest;
use crate::types::*;
use rat_logger::{debug, error, info, warn};
use tokio::sync::{mpsc, oneshot};
pub struct AsyncOdmManager {
pub(crate) request_sender: mpsc::UnboundedSender<OdmRequest>,
default_alias: String,
_task_handle: Option<tokio::task::JoinHandle<()>>,
}
impl AsyncOdmManager {
pub fn new() -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
let task_handle = tokio::spawn(Self::process_requests(receiver));
info!("创建异步ODM管理器");
Self {
request_sender: sender,
default_alias: "default".to_string(),
_task_handle: Some(task_handle),
}
}
pub fn set_default_alias(&mut self, alias: &str) {
info!("设置默认别名: {}", alias);
self.default_alias = alias.to_string();
}
fn get_actual_alias(&self, alias: Option<&str>) -> String {
alias.unwrap_or(&self.default_alias).to_string()
}
async fn process_requests(mut receiver: mpsc::UnboundedReceiver<OdmRequest>) {
info!("启动ODM后台处理任务");
while let Some(request) = receiver.recv().await {
match request {
OdmRequest::Create {
collection,
data,
alias,
response,
} => {
let result = Self::handle_create(&collection, data, alias).await;
let _ = response.send(result);
}
OdmRequest::FindById {
collection,
id,
alias,
response,
} => {
let result = Self::handle_find_by_id(&collection, &id, alias).await;
let _ = response.send(result);
}
OdmRequest::Find {
collection,
conditions,
options,
alias,
response,
} => {
let result = Self::handle_find_with_cache_control(&collection, conditions, options, alias, false).await;
let _ = response.send(result);
}
OdmRequest::FindWithCacheControl {
collection,
conditions,
options,
alias,
bypass_cache,
response,
} => {
let result = Self::handle_find_with_cache_control(&collection, conditions, options, alias, bypass_cache).await;
let _ = response.send(result);
}
OdmRequest::FindWithGroups {
collection,
condition_groups,
options,
alias,
response,
} => {
let result = Self::handle_find_with_groups(&collection, condition_groups, options, alias).await;
let _ = response.send(result);
}
OdmRequest::FindWithGroupsWithCacheControl {
collection,
condition_groups,
options,
alias,
bypass_cache,
response,
} => {
let result = Self::handle_find_with_groups_with_cache_control(&collection, condition_groups, options, alias, bypass_cache).await;
let _ = response.send(result);
}
OdmRequest::Update {
collection,
conditions,
updates,
alias,
response,
} => {
let result = Self::handle_update(&collection, conditions, updates, alias).await;
let _ = response.send(result);
}
OdmRequest::UpdateWithOperations {
collection,
conditions,
operations,
alias,
response,
} => {
let result = Self::handle_update_with_operations(
&collection,
conditions,
operations,
alias,
)
.await;
let _ = response.send(result);
}
OdmRequest::UpdateById {
collection,
id,
updates,
alias,
response,
} => {
let result = Self::handle_update_by_id(&collection, &id, updates, alias).await;
let _ = response.send(result);
}
OdmRequest::Delete {
collection,
conditions,
alias,
response,
} => {
let result = Self::handle_delete(&collection, conditions, alias).await;
let _ = response.send(result);
}
OdmRequest::DeleteById {
collection,
id,
alias,
response,
} => {
let result = Self::handle_delete_by_id(&collection, &id, alias).await;
let _ = response.send(result);
}
OdmRequest::Count {
collection,
conditions,
alias,
response,
} => {
let result = Self::handle_count(&collection, conditions, alias).await;
let _ = response.send(result);
}
OdmRequest::CountWithGroups {
collection,
condition_groups,
alias,
response,
} => {
let result = Self::handle_count_with_groups(&collection, condition_groups, alias).await;
let _ = response.send(result);
}
OdmRequest::GetServerVersion { alias, response } => {
let result = Self::handle_get_server_version(alias).await;
let _ = response.send(result);
}
OdmRequest::CreateStoredProcedure { config, response } => {
let result = Self::handle_create_stored_procedure(config).await;
let _ = response.send(result);
}
OdmRequest::ExecuteStoredProcedure {
procedure_name,
database_alias,
params,
response,
} => {
let result = Self::handle_execute_stored_procedure(
&procedure_name,
database_alias.as_deref(),
params,
)
.await;
let _ = response.send(result);
}
}
}
warn!("ODM后台处理任务结束");
}
#[doc(hidden)]
pub async fn handle_create_stored_procedure(
config: crate::stored_procedure::StoredProcedureConfig,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureCreateResult> {
let database_alias = &config.database;
debug!(
"处理存储过程创建请求: procedure={}, database={}",
config.procedure_name, database_alias
);
let manager = get_global_pool_manager();
let connection_pools = manager.get_connection_pools();
let connection_pool =
connection_pools
.get(database_alias)
.ok_or_else(|| QuickDbError::AliasNotFound {
alias: database_alias.clone(),
})?;
let (response_tx, response_rx) = oneshot::channel();
let operation = crate::pool::DatabaseOperation::CreateStoredProcedure {
config,
response: response_tx,
};
connection_pool
.operation_sender
.send(operation)
.map_err(|_| QuickDbError::ConnectionError {
message: "连接池操作通道已关闭".to_string(),
})?;
let result = response_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: "等待连接池响应超时".to_string(),
})??;
Ok(result)
}
#[doc(hidden)]
pub async fn handle_execute_stored_procedure(
procedure_name: &str,
alias: Option<&str>,
params: Option<std::collections::HashMap<String, crate::types::DataValue>>,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureQueryResult> {
let database_alias = match alias {
Some(a) => a.to_string(),
None => {
let manager = get_global_pool_manager();
manager
.get_default_alias()
.await
.unwrap_or_else(|| "default".to_string())
}
};
debug!(
"处理存储过程执行请求: procedure={}, database={}",
procedure_name, database_alias
);
let manager = get_global_pool_manager();
let connection_pools = manager.get_connection_pools();
let connection_pool =
connection_pools
.get(&database_alias)
.ok_or_else(|| QuickDbError::AliasNotFound {
alias: database_alias.clone(),
})?;
let (response_tx, response_rx) = oneshot::channel();
let operation = crate::pool::DatabaseOperation::ExecuteStoredProcedure {
procedure_name: procedure_name.to_string(),
database: database_alias.clone(),
params,
response: response_tx,
};
connection_pool
.operation_sender
.send(operation)
.map_err(|_| QuickDbError::ConnectionError {
message: "连接池操作通道已关闭".to_string(),
})?;
let result = response_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: "等待连接池响应超时".to_string(),
})??;
Ok(result)
}
}
impl Drop for AsyncOdmManager {
fn drop(&mut self) {
info!("开始清理AsyncOdmManager资源");
if let Some(handle) = self._task_handle.take() {
if !handle.is_finished() {
warn!("ODM后台任务仍在运行,将被取消");
handle.abort();
} else {
info!("ODM后台任务已正常结束");
}
}
info!("AsyncOdmManager资源清理完成");
}
}