use crate::error::{QuickDbError, QuickDbResult};
use crate::odm::manager_core::AsyncOdmManager;
use crate::odm::traits::OdmOperations;
use crate::odm::types::OdmRequest;
use crate::types::*;
use async_trait::async_trait;
use std::collections::HashMap;
use tokio::sync::oneshot;
#[async_trait]
impl OdmOperations for AsyncOdmManager {
async fn create(
&self,
collection: &str,
data: HashMap<String, DataValue>,
alias: Option<&str>,
) -> QuickDbResult<DataValue> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::Create {
collection: collection.to_string(),
data,
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn find_by_id(
&self,
collection: &str,
id: &str,
alias: Option<&str>,
) -> QuickDbResult<Option<DataValue>> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::FindById {
collection: collection.to_string(),
id: id.to_string(),
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn find(
&self,
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
options: Option<QueryOptions>,
alias: Option<&str>,
) -> QuickDbResult<Vec<DataValue>> {
self.find_with_cache_control(collection, conditions, options, alias, false).await
}
async fn find_with_cache_control(
&self,
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
options: Option<QueryOptions>,
alias: Option<&str>,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::FindWithCacheControl {
collection: collection.to_string(),
conditions,
options,
alias: alias.map(|s| s.to_string()),
bypass_cache,
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn find_with_groups(
&self,
collection: &str,
condition_groups: Vec<QueryConditionGroup>,
options: Option<QueryOptions>,
alias: Option<&str>,
) -> QuickDbResult<Vec<DataValue>> {
self.find_with_groups_with_cache_control(collection, condition_groups, options, alias, false).await
}
async fn find_with_groups_with_cache_control(
&self,
collection: &str,
condition_groups: Vec<QueryConditionGroup>,
options: Option<QueryOptions>,
alias: Option<&str>,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
let condition_groups_with_config: Vec<crate::types::QueryConditionGroupWithConfig> = condition_groups
.into_iter()
.map(|g| g.into())
.collect();
self.find_with_groups_with_cache_control_and_config(collection, condition_groups_with_config, options, alias, bypass_cache).await
}
async fn find_with_groups_with_config(
&self,
collection: &str,
condition_groups: Vec<crate::types::QueryConditionGroupWithConfig>,
options: Option<QueryOptions>,
alias: Option<&str>,
) -> QuickDbResult<Vec<DataValue>> {
self.find_with_groups_with_cache_control_and_config(collection, condition_groups, options, alias, false).await
}
async fn find_with_groups_with_cache_control_and_config(
&self,
collection: &str,
condition_groups: Vec<crate::types::QueryConditionGroupWithConfig>,
options: Option<QueryOptions>,
alias: Option<&str>,
bypass_cache: bool,
) -> QuickDbResult<Vec<DataValue>> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::FindWithGroupsWithCacheControl {
collection: collection.to_string(),
condition_groups,
options,
alias: alias.map(|s| s.to_string()),
bypass_cache,
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn update(
&self,
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
updates: HashMap<String, DataValue>,
alias: Option<&str>,
) -> QuickDbResult<u64> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::Update {
collection: collection.to_string(),
conditions,
updates,
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn update_with_operations(
&self,
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
operations: Vec<crate::types::UpdateOperation>,
alias: Option<&str>,
) -> QuickDbResult<u64> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::UpdateWithOperations {
collection: collection.to_string(),
conditions,
operations,
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn update_by_id(
&self,
collection: &str,
id: &str,
updates: HashMap<String, DataValue>,
alias: Option<&str>,
) -> QuickDbResult<bool> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::UpdateById {
collection: collection.to_string(),
id: id.to_string(),
updates,
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn delete(
&self,
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
alias: Option<&str>,
) -> QuickDbResult<u64> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::Delete {
collection: collection.to_string(),
conditions,
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn delete_by_id(
&self,
collection: &str,
id: &str,
alias: Option<&str>,
) -> QuickDbResult<bool> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::DeleteById {
collection: collection.to_string(),
id: id.to_string(),
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn count(
&self,
collection: &str,
conditions: Vec<QueryConditionWithConfig>,
alias: Option<&str>,
) -> QuickDbResult<u64> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::Count {
collection: collection.to_string(),
conditions,
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn count_with_groups_with_config(
&self,
collection: &str,
condition_groups: Vec<crate::types::QueryConditionGroupWithConfig>,
alias: Option<&str>,
) -> QuickDbResult<u64> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::CountWithGroups {
collection: collection.to_string(),
condition_groups,
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn get_server_version(&self, alias: Option<&str>) -> QuickDbResult<String> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::GetServerVersion {
alias: alias.map(|s| s.to_string()),
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn create_stored_procedure(
&self,
config: crate::stored_procedure::StoredProcedureConfig,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureCreateResult> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::CreateStoredProcedure {
config,
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
async fn execute_stored_procedure(
&self,
procedure_name: &str,
database_alias: Option<&str>,
params: Option<std::collections::HashMap<String, crate::types::DataValue>>,
) -> QuickDbResult<crate::stored_procedure::StoredProcedureQueryResult> {
let (sender, receiver) = oneshot::channel();
let request = OdmRequest::ExecuteStoredProcedure {
procedure_name: procedure_name.to_string(),
database_alias: database_alias.map(|s| s.to_string()),
params,
response: sender,
};
self.request_sender
.send(request)
.map_err(|_| QuickDbError::ConnectionError {
message: "ODM后台任务已停止".to_string(),
})?;
receiver.await.map_err(|_| QuickDbError::ConnectionError {
message: "ODM请求处理失败".to_string(),
})?
}
}