use anyhow::Result;
use duckdb::{Connection, Result as DuckResult};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{debug, error, warn};
#[derive(Clone)]
pub struct DatabaseManager {
config: Arc<DatabaseConfig>,
}
#[derive(Debug)]
struct DatabaseConfig {
db_path: Option<PathBuf>,
memory_connection: Option<Arc<Mutex<Connection>>>,
}
impl DatabaseManager {
pub async fn new<P: AsRef<std::path::Path>>(db_path: P) -> Result<Self> {
let db_path = db_path.as_ref().to_path_buf();
if let Some(parent) = db_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let _test_conn = Connection::open(&db_path)?;
debug!("Database file connection test successful: {:?}", db_path);
let manager = Self {
config: Arc::new(DatabaseConfig {
db_path: Some(db_path),
memory_connection: None,
}),
};
Ok(manager)
}
pub async fn new_memory() -> Result<Self> {
let connection = Arc::new(Mutex::new(Connection::open_in_memory()?));
debug!("In-memory database connection created successfully");
let manager = Self {
config: Arc::new(DatabaseConfig {
db_path: None,
memory_connection: Some(connection),
}),
};
Ok(manager)
}
pub async fn init_database(&self) -> Result<()> {
debug!("Explicitly initializing database table structure...");
self.initialize_schema().await?;
debug!("Database table structure initialization completed");
Ok(())
}
async fn create_connection(&self) -> Result<Connection> {
if let Some(ref path) = self.config.db_path {
Ok(Connection::open(path)?)
} else if let Some(ref memory_conn) = self.config.memory_connection {
let conn = memory_conn.lock().await;
Ok(conn.try_clone()?)
} else {
Err(anyhow::anyhow!("Invalid database configuration"))
}
}
pub async fn read_with_retry<F, R>(&self, operation: F) -> Result<R>
where
F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
R: Send,
{
let mut retry_count = 0;
const MAX_RETRIES: usize = 3;
loop {
let connection = self.create_connection().await?;
match operation(&connection) {
Ok(result) => return Ok(result),
Err(e) => {
let error_msg = e.to_string();
if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
retry_count += 1;
let delay = Duration::from_millis(100 * (1 << retry_count)); warn!(
"Database read operation failed, retrying in {}ms ({}/{}): {}",
delay.as_millis(),
retry_count,
MAX_RETRIES,
error_msg
);
tokio::time::sleep(delay).await;
} else {
error!("Database read operation ultimately failed: {}", error_msg);
return Err(anyhow::anyhow!(e.to_string()));
}
}
}
}
}
pub async fn write_with_retry<F, R>(&self, operation: F) -> Result<R>
where
F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
R: Send,
{
let mut retry_count = 0;
const MAX_RETRIES: usize = 3;
loop {
if let Some(ref memory_conn) = self.config.memory_connection {
let conn = memory_conn.lock().await;
match operation(&conn) {
Ok(result) => return Ok(result),
Err(e) => {
let error_msg = e.to_string();
if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
retry_count += 1;
let delay = Duration::from_millis(50 * (1 << retry_count)); warn!(
"In-memory database write operation failed, retrying in {}ms ({}/{}): {}",
delay.as_millis(),
retry_count,
MAX_RETRIES,
error_msg
);
drop(conn); tokio::time::sleep(delay).await;
} else {
error!("In-memory database write operation ultimately failed: {}", error_msg);
return Err(anyhow::anyhow!(e.to_string()));
}
}
}
} else {
let connection = self.create_connection().await?;
match operation(&connection) {
Ok(result) => return Ok(result),
Err(e) => {
let error_msg = e.to_string();
if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
retry_count += 1;
let delay = Duration::from_millis(100 * (1 << retry_count)); warn!(
"File database write operation failed, retrying in {}ms ({}/{}): {}",
delay.as_millis(),
retry_count,
MAX_RETRIES,
error_msg
);
tokio::time::sleep(delay).await;
} else {
error!("File database write operation ultimately failed: {}", error_msg);
return Err(anyhow::anyhow!(e));
}
}
}
}
}
}
pub async fn batch_write_with_retry<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
R: Send,
{
self.write_with_retry(|conn| {
operations(conn)
})
.await
}
fn is_retryable_error(error_msg: &str) -> bool {
error_msg.contains("write-write conflict")
|| error_msg.contains("database is locked")
|| error_msg.contains("database is busy")
|| error_msg.contains("SQLITE_BUSY")
|| error_msg.contains("SQLITE_LOCKED")
}
pub async fn initialize_schema(&self) -> Result<()> {
debug!("Initializing database table structure...");
let schema_sql = include_str!("../migrations/init_duckdb.sql");
self.write_with_retry(|conn| {
let statements = self.parse_sql_statements(schema_sql);
for statement in statements {
let trimmed = statement.trim();
if trimmed.is_empty() {
continue;
}
let is_only_comments = trimmed
.lines()
.map(|line| line.trim())
.all(|line| line.is_empty() || line.starts_with("--"));
if is_only_comments {
continue;
}
debug!(
"Executing SQL statement: {}",
if trimmed.len() > 100 {
let mut end = 100;
while end > 0 && !trimmed.is_char_boundary(end) {
end -= 1;
}
format!("{}...", &trimmed[..end])
} else {
trimmed.to_string()
}
);
if let Err(e) = conn.execute(trimmed, []) {
error!("SQL statement execution failed: {}, statement: {}", e, trimmed);
return Err(e);
}
}
Ok(())
})
.await?;
debug!("Database table structure initialization completed");
Ok(())
}
fn parse_sql_statements(&self, sql: &str) -> Vec<String> {
let mut statements = Vec::new();
let mut current_statement = String::new();
let mut in_string = false;
let mut in_json = false;
let mut brace_count = 0;
let chars = sql.chars().peekable();
for ch in chars {
match ch {
'\'' | '"' => {
current_statement.push(ch);
if !in_json {
in_string = !in_string;
}
}
'{' => {
current_statement.push(ch);
if !in_string {
brace_count += 1;
in_json = true;
}
}
'}' => {
current_statement.push(ch);
if !in_string && in_json {
brace_count -= 1;
if brace_count == 0 {
in_json = false;
}
}
}
';' => {
if !in_string && !in_json {
if !current_statement.trim().is_empty() {
statements.push(current_statement.trim().to_string());
}
current_statement.clear();
} else {
current_statement.push(ch);
}
}
_ => {
current_statement.push(ch);
}
}
}
if !current_statement.trim().is_empty() {
statements.push(current_statement.trim().to_string());
}
statements
}
pub fn get_connection_stats(&self) -> ConnectionStats {
ConnectionStats {
db_type: if self.config.db_path.is_some() {
"file".to_string()
} else {
"memory".to_string()
},
is_memory_db: self.config.memory_connection.is_some(),
}
}
pub async fn health_check(&self) -> Result<HealthStatus> {
let read_result = self
.read_with_retry(|conn| {
conn.query_row("SELECT 1", [], |row| {
let value: i32 = row.get(0)?;
Ok(value)
})
})
.await;
let write_result = self
.write_with_retry(|conn| {
conn.query_row("SELECT 1", [], |row| {
let value: i32 = row.get(0)?;
Ok(value)
})
})
.await;
Ok(HealthStatus {
read_healthy: read_result.is_ok(),
write_healthy: write_result.is_ok(),
read_error: read_result.err().map(|e| e.to_string()),
write_error: write_result.err().map(|e| e.to_string()),
})
}
#[cfg(test)]
pub async fn debug_execute_sql(&self, sql: &str) -> Result<()> {
self.write_with_retry(|conn| {
debug!("Executing debug SQL: {}", sql);
conn.execute(sql, [])?;
Ok(())
})
.await
}
#[cfg(test)]
pub async fn debug_table_exists(&self, table_name: &str) -> Result<bool> {
self.read_with_retry(|conn| {
let exists = conn.query_row(
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?",
[table_name],
|row| {
let count: i64 = row.get(0)?;
Ok(count > 0)
},
);
match exists {
Ok(result) => Ok(result),
Err(_) => {
conn.query_row(
"SELECT COUNT(*) FROM duckdb_tables() WHERE table_name = ?",
[table_name],
|row| {
let count: i64 = row.get(0)?;
Ok(count > 0)
},
)
}
}
})
.await
}
#[cfg(test)]
pub async fn debug_list_tables(&self) -> Result<Vec<String>> {
self.read_with_retry(|conn| {
let mut stmt = conn.prepare("SELECT table_name FROM duckdb_tables()")?;
let table_iter = stmt.query_map([], |row| {
let table_name: String = row.get(0)?;
Ok(table_name)
})?;
let mut tables = Vec::new();
for table in table_iter {
tables.push(table?);
}
Ok(tables)
})
.await
}
}
#[derive(Debug, Clone)]
pub struct ConnectionStats {
pub db_type: String,
pub is_memory_db: bool,
}
#[derive(Debug, Clone)]
pub struct HealthStatus {
pub read_healthy: bool,
pub write_healthy: bool,
pub read_error: Option<String>,
pub write_error: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_database_manager_creation() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let manager = DatabaseManager::new(&db_path).await.unwrap();
let stats = manager.get_connection_stats();
assert_eq!(stats.db_type, "file");
assert!(!stats.is_memory_db);
}
#[tokio::test]
async fn test_memory_database_creation() {
let manager = DatabaseManager::new_memory().await.unwrap();
let stats = manager.get_connection_stats();
assert_eq!(stats.db_type, "memory");
assert!(stats.is_memory_db);
}
#[tokio::test]
async fn test_concurrent_read_operations() {
let manager = DatabaseManager::new_memory().await.unwrap();
let mut handles = Vec::new();
for i in 0..10 {
let manager = manager.clone();
let handle = tokio::spawn(async move {
manager
.read_with_retry(|conn| {
conn.query_row("SELECT ?", [i], |row| {
let value: i32 = row.get(0)?;
Ok(value)
})
})
.await
});
handles.push(handle);
}
for (i, handle) in handles.into_iter().enumerate() {
let result = handle.await.unwrap();
assert_eq!(result.unwrap(), i as i32);
}
}
#[tokio::test]
async fn test_write_operations() {
let manager = DatabaseManager::new_memory().await.unwrap();
let result = manager
.write_with_retry(|conn| {
conn.execute(
"CREATE TABLE IF NOT EXISTS test_table (id INTEGER, name TEXT)",
[],
)?;
conn.execute("INSERT INTO test_table (id, name) VALUES (1, 'test')", [])?;
Ok(())
})
.await;
assert!(result.is_ok());
let value = manager
.read_with_retry(|conn| {
conn.query_row("SELECT name FROM test_table WHERE id = 1", [], |row| {
let name: String = row.get(0)?;
Ok(name)
})
})
.await;
assert_eq!(value.unwrap(), "test");
}
#[tokio::test]
async fn test_health_check() {
let manager = DatabaseManager::new_memory().await.unwrap();
let health = manager.health_check().await.unwrap();
assert!(health.read_healthy);
assert!(health.write_healthy);
assert!(health.read_error.is_none());
assert!(health.write_error.is_none());
}
#[tokio::test]
async fn test_batch_write_operations() {
let manager = DatabaseManager::new_memory().await.unwrap();
let result = manager
.batch_write_with_retry(|conn| {
conn.execute(
"CREATE TABLE IF NOT EXISTS batch_test (id INTEGER, value TEXT)",
[],
)?;
conn.execute("INSERT INTO batch_test (id, value) VALUES (1, 'a')", [])?;
conn.execute("INSERT INTO batch_test (id, value) VALUES (2, 'b')", [])?;
conn.execute("INSERT INTO batch_test (id, value) VALUES (3, 'c')", [])?;
Ok(())
})
.await;
assert!(result.is_ok());
let count = manager
.read_with_retry(|conn| {
conn.query_row("SELECT COUNT(*) FROM batch_test", [], |row| {
let count: i64 = row.get(0)?;
Ok(count)
})
})
.await;
assert_eq!(count.unwrap(), 3);
}
#[tokio::test]
async fn test_file_database_concurrent_operations() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("concurrent_test.db");
let manager = DatabaseManager::new(&db_path).await.unwrap();
manager
.write_with_retry(|conn| {
conn.execute(
"CREATE TABLE IF NOT EXISTS concurrent_test (id INTEGER, value TEXT)",
[],
)?;
Ok(())
})
.await
.unwrap();
let mut handles = Vec::new();
for i in 0..5 {
let manager = manager.clone();
let handle = tokio::spawn(async move {
manager
.write_with_retry(|conn| {
conn.execute(
"INSERT INTO concurrent_test (id, value) VALUES (?, ?)",
[&i.to_string(), &format!("value_{i}")],
)?;
Ok(())
})
.await
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.unwrap();
assert!(result.is_ok());
}
let count = manager
.read_with_retry(|conn| {
conn.query_row("SELECT COUNT(*) FROM concurrent_test", [], |row| {
let count: i64 = row.get(0)?;
Ok(count)
})
})
.await;
assert_eq!(count.unwrap(), 5);
}
#[tokio::test]
async fn test_debug_sql_initialization() {
let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
let manager = DatabaseManager {
config: Arc::new(DatabaseConfig {
db_path: None,
memory_connection: Some(connection),
}),
};
println!("=== 初始化前 ===");
let tables = manager.debug_list_tables().await.unwrap();
println!("初始化前的表: {tables:?}");
println!("=== 开始初始化 ===");
let init_result = manager.initialize_schema().await;
println!("初始化结果: {init_result:?}");
if init_result.is_ok() {
println!("=== 初始化成功,检查表 ===");
let tables_after = manager.debug_list_tables().await.unwrap();
println!("初始化后的表: {tables_after:?}");
let app_config_exists = manager.debug_table_exists("app_config").await.unwrap();
println!("app_config表存在: {app_config_exists}");
} else {
println!("=== 初始化失败,尝试手动创建简单表 ===");
let result = manager.debug_execute_sql(
"CREATE TABLE app_config (config_key VARCHAR PRIMARY KEY, config_value JSON NOT NULL)"
).await;
println!("手动创建app_config表的结果: {result:?}");
if result.is_ok() {
let app_config_exists_after =
manager.debug_table_exists("app_config").await.unwrap();
println!("创建后app_config表存在: {app_config_exists_after}");
}
}
}
#[tokio::test]
async fn test_debug_sql_parsing() {
let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
let manager = DatabaseManager {
config: Arc::new(DatabaseConfig {
db_path: None,
memory_connection: Some(connection),
}),
};
let test_sql = r#"
CREATE TABLE test1 (id INTEGER);
INSERT INTO test1 VALUES (1);
CREATE TABLE test2 (data JSON);
INSERT INTO test2 VALUES ('{"key": "value; with semicolon"}');
"#;
let statements = manager.parse_sql_statements(test_sql);
println!("解析出的SQL语句数量: {}", statements.len());
for (i, stmt) in statements.iter().enumerate() {
println!("语句 {}: {}", i + 1, stmt);
}
let schema_sql = include_str!("../migrations/init_duckdb.sql");
let real_statements = manager.parse_sql_statements(schema_sql);
println!("真实SQL脚本解析出的语句数量: {}", real_statements.len());
for (i, stmt) in real_statements.iter().take(10).enumerate() {
println!("真实语句 {}: {}", i + 1, stmt);
}
}
#[tokio::test]
async fn test_debug_individual_sql_statements() {
let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
let manager = DatabaseManager {
config: Arc::new(DatabaseConfig {
db_path: None,
memory_connection: Some(connection),
}),
};
let schema_sql = include_str!("../migrations/init_duckdb.sql");
let statements = manager.parse_sql_statements(schema_sql);
println!("总共解析出 {} 个语句", statements.len());
for (i, stmt) in statements.iter().enumerate() {
let trimmed = stmt.trim();
if trimmed.is_empty() {
println!("跳过语句 {}: [空语句]", i + 1);
continue;
}
let is_only_comments = trimmed
.lines()
.map(|line| line.trim())
.all(|line| line.is_empty() || line.starts_with("--"));
if is_only_comments {
println!("跳过语句 {}: [仅包含注释]", i + 1);
continue;
}
println!(
"执行语句 {}: {}",
i + 1,
if trimmed.len() > 100 {
let mut end = 100;
while end > 0 && !trimmed.is_char_boundary(end) {
end -= 1;
}
format!("{}...", &trimmed[..end])
} else {
trimmed.to_string()
}
);
let result = manager
.write_with_retry(|conn| {
conn.execute(trimmed, [])?;
Ok(())
})
.await;
if let Err(e) = result {
println!("❌ 语句 {} 执行失败: {}", i + 1, e);
println!("失败的语句: {trimmed}");
break;
} else {
println!("✅ 语句 {} 执行成功", i + 1);
}
}
}
}