use crossbeam_queue::SegQueue;
use rat_logger::{debug, error, info, warn};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "sqlite-support")]
use super::SqliteWorker;
use super::{
DatabaseConnection, DatabaseOperation, ExtendedPoolConfig, MultiConnectionManager,
PooledConnection,
};
use crate::adapter::DatabaseAdapter;
use crate::error::{QuickDbError, QuickDbResult};
use crate::model::FieldDefinition;
use crate::types::*;
#[derive(Debug)]
pub struct ConnectionPool {
pub db_config: DatabaseConfig,
pub config: ExtendedPoolConfig,
pub operation_sender: mpsc::UnboundedSender<DatabaseOperation>,
pub db_type: DatabaseType,
pub cache_manager: Option<Arc<crate::cache::CacheManager>>,
}
impl ConnectionPool {
pub async fn with_config(
db_config: DatabaseConfig,
config: ExtendedPoolConfig,
) -> QuickDbResult<Self> {
Self::with_config_and_cache(db_config, config, None).await
}
pub async fn with_config_and_cache(
db_config: DatabaseConfig,
config: ExtendedPoolConfig,
cache_manager: Option<Arc<crate::cache::CacheManager>>,
) -> QuickDbResult<Self> {
let (operation_sender, operation_receiver) = mpsc::unbounded_channel();
let pool = Self {
db_type: db_config.db_type.clone(),
db_config: db_config.clone(),
config: config.clone(),
operation_sender,
cache_manager: cache_manager.clone(),
};
match &db_config.db_type {
#[cfg(feature = "sqlite-support")]
DatabaseType::SQLite => {
pool.start_sqlite_worker(operation_receiver, db_config, config)
.await?;
}
#[cfg(feature = "postgres-support")]
DatabaseType::PostgreSQL => {
pool.start_multi_connection_manager(operation_receiver, db_config, config)
.await?;
}
#[cfg(feature = "mysql-support")]
DatabaseType::MySQL => {
pool.start_multi_connection_manager(operation_receiver, db_config, config)
.await?;
}
#[cfg(feature = "mongodb-support")]
DatabaseType::MongoDB => {
pool.start_multi_connection_manager(operation_receiver, db_config, config)
.await?;
}
#[allow(unreachable_patterns)]
_ => Err(QuickDbError::ConfigError {
message: "不支持的数据库类型(可能需要启用相应的feature)".to_string(),
})?,
}
Ok(pool)
}
pub fn set_cache_manager(&mut self, cache_manager: Arc<crate::cache::CacheManager>) {
self.cache_manager = Some(cache_manager);
}
#[cfg(feature = "sqlite-support")]
async fn start_sqlite_worker(
&self,
operation_receiver: mpsc::UnboundedReceiver<DatabaseOperation>,
db_config: DatabaseConfig,
config: ExtendedPoolConfig,
) -> QuickDbResult<()> {
let connection = self.create_sqlite_connection().await?;
let (startup_tx, startup_rx) = oneshot::channel();
use crate::adapter::{create_adapter, create_adapter_with_cache};
let (adapter, adapter_type) = if let Some(cache_manager) = &self.cache_manager {
let adapter = create_adapter_with_cache(&db_config.db_type, cache_manager.clone())?;
(adapter, "缓存适配器")
} else {
let adapter = create_adapter(&db_config.db_type)?;
(adapter, "普通适配器")
};
info!("数据库 '{}' 使用 {}", db_config.alias, adapter_type);
let worker = SqliteWorker {
connection,
operation_receiver,
db_config: db_config.clone(),
retry_count: 0,
max_retries: config.max_retries,
retry_interval_ms: config.retry_interval_ms,
health_check_interval_sec: config.health_check_timeout_sec, last_health_check: Instant::now(),
is_healthy: true,
cache_manager: self.cache_manager.clone(),
adapter,
};
tokio::spawn(async move {
let _ = startup_tx.send(());
worker.run().await;
});
startup_rx
.await
.map_err(|_| QuickDbError::ConnectionError {
message: crate::i18n::tf(
"error.sqlite_worker_startup",
&[("alias", &db_config.alias)],
),
})?;
info!("SQLite工作器启动完成: 别名={}", db_config.alias);
Ok(())
}
async fn start_multi_connection_manager(
&self,
operation_receiver: mpsc::UnboundedReceiver<DatabaseOperation>,
db_config: DatabaseConfig,
config: ExtendedPoolConfig,
) -> QuickDbResult<()> {
let manager = MultiConnectionManager {
workers: Vec::new(),
available_workers: SegQueue::new(),
operation_receiver,
db_config,
config,
keepalive_handle: None,
cache_manager: self.cache_manager.clone(),
};
tokio::spawn(async move {
manager.run().await;
});
Ok(())
}
#[cfg(feature = "sqlite-support")]
async fn create_sqlite_connection(&self) -> QuickDbResult<DatabaseConnection> {
let (path, create_if_missing) = match &self.db_config.connection {
crate::types::ConnectionConfig::SQLite {
path,
create_if_missing,
} => (path.clone(), *create_if_missing),
_ => {
return Err(QuickDbError::ConfigError {
message: crate::i18n::t("error.sqlite_config_mismatch"),
});
}
};
if path == ":memory:" {
info!("连接SQLite内存数据库: 别名={}", self.db_config.alias);
let pool = sqlx::SqlitePool::connect(&path).await.map_err(|e| {
QuickDbError::ConnectionError {
message: crate::i18n::tf("error.sqlite_memory", &[("message", &e.to_string())]),
}
})?;
return Ok(DatabaseConnection::SQLite(pool));
}
let file_exists = std::path::Path::new(&path).exists();
if !file_exists && !create_if_missing {
return Err(QuickDbError::ConnectionError {
message: crate::i18n::tf("error.sqlite_file_not_found", &[("path", &path)]),
});
}
if create_if_missing && !file_exists {
if let Some(parent) = std::path::Path::new(&path).parent() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
QuickDbError::ConnectionError {
message: crate::i18n::tf(
"error.sqlite_dir_create",
&[("message", &e.to_string())],
),
}
})?;
}
tokio::fs::File::create(&path)
.await
.map_err(|e| QuickDbError::ConnectionError {
message: crate::i18n::tf(
"error.sqlite_file_create",
&[("message", &e.to_string())],
),
})?;
}
let pool =
sqlx::SqlitePool::connect(&path)
.await
.map_err(|e| QuickDbError::ConnectionError {
message: crate::i18n::tf(
"error.sqlite_connection",
&[("message", &e.to_string())],
),
})?;
Ok(DatabaseConnection::SQLite(pool))
}
async fn send_operation<T>(&self, operation: DatabaseOperation) -> QuickDbResult<T>
where
T: Send + 'static,
{
Err(QuickDbError::QueryError {
message: "操作发送未实现".to_string(),
})
}
pub async fn create(
&self,
table: &str,
data: &HashMap<String, DataValue>,
id_strategy: &IdStrategy,
) -> QuickDbResult<DataValue> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::Create {
table: table.to_string(),
data: data.clone(),
id_strategy: id_strategy.clone(),
alias: self.db_config.alias.clone(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn find_by_id(
&self,
table: &str,
id: &DataValue,
) -> QuickDbResult<Option<DataValue>> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::FindById {
table: table.to_string(),
id: id.clone(),
alias: self.db_config.alias.clone(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn find(
&self,
table: &str,
conditions: &[QueryConditionWithConfig],
options: &QueryOptions,
) -> QuickDbResult<Vec<DataValue>> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::Find {
table: table.to_string(),
conditions: conditions.to_vec(),
options: options.clone(),
alias: self.db_config.alias.clone(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn update(
&self,
table: &str,
conditions: &[QueryConditionWithConfig],
data: &HashMap<String, DataValue>,
) -> QuickDbResult<u64> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::Update {
table: table.to_string(),
conditions: conditions.to_vec(),
data: data.clone(),
alias: self.db_config.alias.clone(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn update_by_id(
&self,
table: &str,
id: &DataValue,
data: &HashMap<String, DataValue>,
) -> QuickDbResult<bool> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::UpdateById {
table: table.to_string(),
id: id.clone(),
data: data.clone(),
alias: self.db_config.alias.clone(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn delete(
&self,
table: &str,
conditions: &[QueryConditionWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::Delete {
table: table.to_string(),
conditions: conditions.to_vec(),
alias: alias.to_string(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn delete_by_id(
&self,
table: &str,
id: &DataValue,
alias: &str,
) -> QuickDbResult<bool> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::DeleteById {
table: table.to_string(),
id: id.clone(),
alias: alias.to_string(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn count(
&self,
table: &str,
conditions: &[QueryConditionWithConfig],
alias: &str,
) -> QuickDbResult<u64> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::Count {
table: table.to_string(),
conditions: conditions.to_vec(),
alias: alias.to_string(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn create_table(
&self,
table: &str,
fields: &HashMap<String, FieldDefinition>,
id_strategy: &IdStrategy,
) -> QuickDbResult<()> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::CreateTable {
table: table.to_string(),
fields: fields.clone(),
id_strategy: id_strategy.clone(),
alias: self.db_config.alias.clone(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn create_index(
&self,
table: &str,
index_name: &str,
fields: &[String],
unique: bool,
) -> QuickDbResult<()> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::CreateIndex {
table: table.to_string(),
index_name: index_name.to_string(),
fields: fields.to_vec(),
unique,
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn table_exists(&self, table: &str) -> QuickDbResult<bool> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::TableExists {
table: table.to_string(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub async fn drop_table(&self, table: &str) -> QuickDbResult<()> {
let (response_sender, response_receiver) = oneshot::channel();
let operation = DatabaseOperation::DropTable {
table: table.to_string(),
response: response_sender,
};
self.operation_sender
.send(operation)
.map_err(|_| QuickDbError::QueryError {
message: "发送操作失败".to_string(),
})?;
response_receiver
.await
.map_err(|_| QuickDbError::QueryError {
message: "接收响应失败".to_string(),
})?
}
pub fn get_database_type(&self) -> &DatabaseType {
&self.db_config.db_type
}
pub async fn get_connection(&self) -> QuickDbResult<PooledConnection> {
Ok(PooledConnection {
id: format!("{}-virtual", self.db_config.alias),
db_type: self.db_config.db_type.clone(),
alias: self.db_config.alias.clone(),
})
}
pub async fn release_connection(&self, _connection_id: &str) -> QuickDbResult<()> {
Ok(())
}
pub async fn cleanup_expired_connections(&self) {
debug!("清理过期连接(新架构中自动管理)");
}
}