1use anyhow::Result;
2use duckdb::{Connection, Result as DuckResult};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::Mutex;
7use tracing::{debug, error, warn};
8
9#[derive(Clone)]
17pub struct DatabaseManager {
18 config: Arc<DatabaseConfig>,
20}
21
22#[derive(Debug)]
23struct DatabaseConfig {
24 db_path: Option<PathBuf>,
26 memory_connection: Option<Arc<Mutex<Connection>>>,
28}
29
30impl DatabaseManager {
31 pub async fn new<P: AsRef<std::path::Path>>(db_path: P) -> Result<Self> {
33 let db_path = db_path.as_ref().to_path_buf();
34
35 if let Some(parent) = db_path.parent() {
37 tokio::fs::create_dir_all(parent).await?;
38 }
39
40 let _test_conn = Connection::open(&db_path)?;
42 debug!("Database file connection test successful: {:?}", db_path);
43
44 let manager = Self {
45 config: Arc::new(DatabaseConfig {
46 db_path: Some(db_path),
47 memory_connection: None,
48 }),
49 };
50
51 Ok(manager)
55 }
56
57 pub async fn new_memory() -> Result<Self> {
59 let connection = Arc::new(Mutex::new(Connection::open_in_memory()?));
61 debug!("In-memory database connection created successfully");
62
63 let manager = Self {
64 config: Arc::new(DatabaseConfig {
65 db_path: None,
66 memory_connection: Some(connection),
67 }),
68 };
69
70 Ok(manager)
74 }
75
76 pub async fn init_database(&self) -> Result<()> {
78 debug!("Explicitly initializing database table structure...");
79 self.initialize_schema().await?;
80 debug!("Database table structure initialization completed");
81 Ok(())
82 }
83
84 async fn create_connection(&self) -> Result<Connection> {
86 if let Some(ref path) = self.config.db_path {
87 Ok(Connection::open(path)?)
89 } else if let Some(ref memory_conn) = self.config.memory_connection {
90 let conn = memory_conn.lock().await;
92 Ok(conn.try_clone()?)
93 } else {
94 Err(anyhow::anyhow!("Invalid database configuration"))
95 }
96 }
97
98 pub async fn read_with_retry<F, R>(&self, operation: F) -> Result<R>
100 where
101 F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
102 R: Send,
103 {
104 let mut retry_count = 0;
105 const MAX_RETRIES: usize = 3;
106
107 loop {
108 let connection = self.create_connection().await?;
109
110 match operation(&connection) {
111 Ok(result) => return Ok(result),
112 Err(e) => {
113 let error_msg = e.to_string();
114 if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
115 retry_count += 1;
116 let delay = Duration::from_millis(100 * (1 << retry_count)); warn!(
118 "Database read operation failed, retrying in {}ms ({}/{}): {}",
119 delay.as_millis(),
120 retry_count,
121 MAX_RETRIES,
122 error_msg
123 );
124 tokio::time::sleep(delay).await;
125 } else {
126 error!("Database read operation ultimately failed: {}", error_msg);
127 return Err(anyhow::anyhow!(e.to_string()));
128 }
129 }
130 }
131 }
132 }
133
134 pub async fn write_with_retry<F, R>(&self, operation: F) -> Result<R>
136 where
137 F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
138 R: Send,
139 {
140 let mut retry_count = 0;
141 const MAX_RETRIES: usize = 3;
142
143 loop {
144 if let Some(ref memory_conn) = self.config.memory_connection {
145 let conn = memory_conn.lock().await;
147 match operation(&conn) {
148 Ok(result) => return Ok(result),
149 Err(e) => {
150 let error_msg = e.to_string();
151 if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
152 retry_count += 1;
153 let delay = Duration::from_millis(50 * (1 << retry_count)); warn!(
155 "In-memory database write operation failed, retrying in {}ms ({}/{}): {}",
156 delay.as_millis(),
157 retry_count,
158 MAX_RETRIES,
159 error_msg
160 );
161 drop(conn); tokio::time::sleep(delay).await;
163 } else {
164 error!(
165 "In-memory database write operation ultimately failed: {}",
166 error_msg
167 );
168 return Err(anyhow::anyhow!(e.to_string()));
169 }
170 }
171 }
172 } else {
173 let connection = self.create_connection().await?;
175 match operation(&connection) {
176 Ok(result) => return Ok(result),
177 Err(e) => {
178 let error_msg = e.to_string();
179 if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
180 retry_count += 1;
181 let delay = Duration::from_millis(100 * (1 << retry_count)); warn!(
183 "File database write operation failed, retrying in {}ms ({}/{}): {}",
184 delay.as_millis(),
185 retry_count,
186 MAX_RETRIES,
187 error_msg
188 );
189 tokio::time::sleep(delay).await;
190 } else {
191 error!(
192 "File database write operation ultimately failed: {}",
193 error_msg
194 );
195 return Err(anyhow::anyhow!(e));
196 }
197 }
198 }
199 }
200 }
201 }
202
203 pub async fn batch_write_with_retry<F, R>(&self, operations: F) -> Result<R>
206 where
207 F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
208 R: Send,
209 {
210 self.write_with_retry(|conn| {
212 operations(conn)
215 })
216 .await
217 }
218
219 fn is_retryable_error(error_msg: &str) -> bool {
221 error_msg.contains("write-write conflict")
223 || error_msg.contains("database is locked")
224 || error_msg.contains("database is busy")
225 || error_msg.contains("SQLITE_BUSY")
226 || error_msg.contains("SQLITE_LOCKED")
227 }
228
229 pub async fn initialize_schema(&self) -> Result<()> {
231 debug!("Initializing database table structure...");
232
233 let schema_sql = include_str!("../migrations/init_duckdb.sql");
235
236 self.write_with_retry(|conn| {
238 let statements = self.parse_sql_statements(schema_sql);
239 for statement in statements {
240 let trimmed = statement.trim();
241 if trimmed.is_empty() {
242 continue;
243 }
244
245 let is_only_comments = trimmed
247 .lines()
248 .map(|line| line.trim())
249 .all(|line| line.is_empty() || line.starts_with("--"));
250
251 if is_only_comments {
252 continue;
253 }
254
255 debug!(
256 "Executing SQL statement: {}",
257 if trimmed.len() > 100 {
258 let mut end = 100;
260 while end > 0 && !trimmed.is_char_boundary(end) {
261 end -= 1;
262 }
263 format!("{}...", &trimmed[..end])
264 } else {
265 trimmed.to_string()
266 }
267 );
268
269 if let Err(e) = conn.execute(trimmed, []) {
270 error!(
271 "SQL statement execution failed: {}, statement: {}",
272 e, trimmed
273 );
274 return Err(e);
275 }
276 }
277 Ok(())
278 })
279 .await?;
280
281 debug!("Database table structure initialization completed");
282 Ok(())
283 }
284
285 fn parse_sql_statements(&self, sql: &str) -> Vec<String> {
287 let mut statements = Vec::new();
288 let mut current_statement = String::new();
289 let mut in_string = false;
290 let mut in_json = false;
291 let mut brace_count = 0;
292 let chars = sql.chars().peekable();
293
294 for ch in chars {
295 match ch {
296 '\'' | '"' => {
297 current_statement.push(ch);
298 if !in_json {
299 in_string = !in_string;
300 }
301 }
302 '{' => {
303 current_statement.push(ch);
304 if !in_string {
305 brace_count += 1;
306 in_json = true;
307 }
308 }
309 '}' => {
310 current_statement.push(ch);
311 if !in_string && in_json {
312 brace_count -= 1;
313 if brace_count == 0 {
314 in_json = false;
315 }
316 }
317 }
318 ';' if !in_string && !in_json => {
319 if !current_statement.trim().is_empty() {
321 statements.push(current_statement.trim().to_string());
322 }
323 current_statement.clear();
324 }
325 _ => {
326 current_statement.push(ch);
327 }
328 }
329 }
330
331 if !current_statement.trim().is_empty() {
333 statements.push(current_statement.trim().to_string());
334 }
335
336 statements
337 }
338
339 pub fn get_connection_stats(&self) -> ConnectionStats {
341 ConnectionStats {
342 db_type: if self.config.db_path.is_some() {
343 "file".to_string()
344 } else {
345 "memory".to_string()
346 },
347 is_memory_db: self.config.memory_connection.is_some(),
348 }
349 }
350
351 pub async fn health_check(&self) -> Result<HealthStatus> {
353 let read_result = self
355 .read_with_retry(|conn| {
356 conn.query_row("SELECT 1", [], |row| {
357 let value: i32 = row.get(0)?;
358 Ok(value)
359 })
360 })
361 .await;
362
363 let write_result = self
365 .write_with_retry(|conn| {
366 conn.query_row("SELECT 1", [], |row| {
367 let value: i32 = row.get(0)?;
368 Ok(value)
369 })
370 })
371 .await;
372
373 Ok(HealthStatus {
374 read_healthy: read_result.is_ok(),
375 write_healthy: write_result.is_ok(),
376 read_error: read_result.err().map(|e| e.to_string()),
377 write_error: write_result.err().map(|e| e.to_string()),
378 })
379 }
380
381 #[cfg(test)]
383 pub async fn debug_execute_sql(&self, sql: &str) -> Result<()> {
384 self.write_with_retry(|conn| {
385 debug!("Executing debug SQL: {}", sql);
386 conn.execute(sql, [])?;
387 Ok(())
388 })
389 .await
390 }
391
392 #[cfg(test)]
394 pub async fn debug_table_exists(&self, table_name: &str) -> Result<bool> {
395 self.read_with_retry(|conn| {
396 let exists = conn.query_row(
397 "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?",
398 [table_name],
399 |row| {
400 let count: i64 = row.get(0)?;
401 Ok(count > 0)
402 },
403 );
404
405 match exists {
407 Ok(result) => Ok(result),
408 Err(_) => {
409 conn.query_row(
411 "SELECT COUNT(*) FROM duckdb_tables() WHERE table_name = ?",
412 [table_name],
413 |row| {
414 let count: i64 = row.get(0)?;
415 Ok(count > 0)
416 },
417 )
418 }
419 }
420 })
421 .await
422 }
423
424 #[cfg(test)]
426 pub async fn debug_list_tables(&self) -> Result<Vec<String>> {
427 self.read_with_retry(|conn| {
428 let mut stmt = conn.prepare("SELECT table_name FROM duckdb_tables()")?;
429 let table_iter = stmt.query_map([], |row| {
430 let table_name: String = row.get(0)?;
431 Ok(table_name)
432 })?;
433
434 let mut tables = Vec::new();
435 for table in table_iter {
436 tables.push(table?);
437 }
438 Ok(tables)
439 })
440 .await
441 }
442}
443
444#[derive(Debug, Clone)]
446pub struct ConnectionStats {
447 pub db_type: String,
448 pub is_memory_db: bool,
449}
450
451#[derive(Debug, Clone)]
453pub struct HealthStatus {
454 pub read_healthy: bool,
455 pub write_healthy: bool,
456 pub read_error: Option<String>,
457 pub write_error: Option<String>,
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use tempfile::tempdir;
464
465 #[tokio::test]
466 async fn test_database_manager_creation() {
467 let temp_dir = tempdir().unwrap();
468 let db_path = temp_dir.path().join("test.db");
469
470 let manager = DatabaseManager::new(&db_path).await.unwrap();
471 let stats = manager.get_connection_stats();
472
473 assert_eq!(stats.db_type, "file");
474 assert!(!stats.is_memory_db);
475 }
476
477 #[tokio::test]
478 async fn test_memory_database_creation() {
479 let manager = DatabaseManager::new_memory().await.unwrap();
480 let stats = manager.get_connection_stats();
481
482 assert_eq!(stats.db_type, "memory");
483 assert!(stats.is_memory_db);
484 }
485
486 #[tokio::test]
487 async fn test_concurrent_read_operations() {
488 let manager = DatabaseManager::new_memory().await.unwrap();
489
490 let mut handles = Vec::new();
492 for i in 0..10 {
493 let manager = manager.clone();
494 let handle = tokio::spawn(async move {
495 manager
496 .read_with_retry(|conn| {
497 conn.query_row("SELECT ?", [i], |row| {
498 let value: i32 = row.get(0)?;
499 Ok(value)
500 })
501 })
502 .await
503 });
504 handles.push(handle);
505 }
506
507 for (i, handle) in handles.into_iter().enumerate() {
509 let result = handle.await.unwrap();
510 assert_eq!(result.unwrap(), i as i32);
511 }
512 }
513
514 #[tokio::test]
515 async fn test_write_operations() {
516 let manager = DatabaseManager::new_memory().await.unwrap();
517
518 let result = manager
520 .write_with_retry(|conn| {
521 conn.execute(
522 "CREATE TABLE IF NOT EXISTS test_table (id INTEGER, name TEXT)",
523 [],
524 )?;
525 conn.execute("INSERT INTO test_table (id, name) VALUES (1, 'test')", [])?;
526 Ok(())
527 })
528 .await;
529
530 assert!(result.is_ok());
531
532 let value = manager
534 .read_with_retry(|conn| {
535 conn.query_row("SELECT name FROM test_table WHERE id = 1", [], |row| {
536 let name: String = row.get(0)?;
537 Ok(name)
538 })
539 })
540 .await;
541
542 assert_eq!(value.unwrap(), "test");
543 }
544
545 #[tokio::test]
546 async fn test_health_check() {
547 let manager = DatabaseManager::new_memory().await.unwrap();
548 let health = manager.health_check().await.unwrap();
549
550 assert!(health.read_healthy);
551 assert!(health.write_healthy);
552 assert!(health.read_error.is_none());
553 assert!(health.write_error.is_none());
554 }
555
556 #[tokio::test]
557 async fn test_batch_write_operations() {
558 let manager = DatabaseManager::new_memory().await.unwrap();
559
560 let result = manager
562 .batch_write_with_retry(|conn| {
563 conn.execute(
564 "CREATE TABLE IF NOT EXISTS batch_test (id INTEGER, value TEXT)",
565 [],
566 )?;
567 conn.execute("INSERT INTO batch_test (id, value) VALUES (1, 'a')", [])?;
568 conn.execute("INSERT INTO batch_test (id, value) VALUES (2, 'b')", [])?;
569 conn.execute("INSERT INTO batch_test (id, value) VALUES (3, 'c')", [])?;
570 Ok(())
571 })
572 .await;
573
574 assert!(result.is_ok());
575
576 let count = manager
578 .read_with_retry(|conn| {
579 conn.query_row("SELECT COUNT(*) FROM batch_test", [], |row| {
580 let count: i64 = row.get(0)?;
581 Ok(count)
582 })
583 })
584 .await;
585
586 assert_eq!(count.unwrap(), 3);
587 }
588
589 #[tokio::test]
590 async fn test_file_database_concurrent_operations() {
591 let temp_dir = tempdir().unwrap();
592 let db_path = temp_dir.path().join("concurrent_test.db");
593 let manager = DatabaseManager::new(&db_path).await.unwrap();
594
595 manager
597 .write_with_retry(|conn| {
598 conn.execute(
599 "CREATE TABLE IF NOT EXISTS concurrent_test (id INTEGER, value TEXT)",
600 [],
601 )?;
602 Ok(())
603 })
604 .await
605 .unwrap();
606
607 let mut handles = Vec::new();
609 for i in 0..5 {
610 let manager = manager.clone();
611 let handle = tokio::spawn(async move {
612 manager
613 .write_with_retry(|conn| {
614 conn.execute(
615 "INSERT INTO concurrent_test (id, value) VALUES (?, ?)",
616 [&i.to_string(), &format!("value_{i}")],
617 )?;
618 Ok(())
619 })
620 .await
621 });
622 handles.push(handle);
623 }
624
625 for handle in handles {
627 let result = handle.await.unwrap();
628 assert!(result.is_ok());
629 }
630
631 let count = manager
633 .read_with_retry(|conn| {
634 conn.query_row("SELECT COUNT(*) FROM concurrent_test", [], |row| {
635 let count: i64 = row.get(0)?;
636 Ok(count)
637 })
638 })
639 .await;
640
641 assert_eq!(count.unwrap(), 5);
642 }
643
644 #[tokio::test]
645 async fn test_debug_sql_initialization() {
646 let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
648 let manager = DatabaseManager {
649 config: Arc::new(DatabaseConfig {
650 db_path: None,
651 memory_connection: Some(connection),
652 }),
653 };
654
655 println!("=== 初始化前 ===");
656 let tables = manager.debug_list_tables().await.unwrap();
658 println!("初始化前的表: {tables:?}");
659
660 println!("=== 开始初始化 ===");
661 let init_result = manager.initialize_schema().await;
663 println!("初始化结果: {init_result:?}");
664
665 if init_result.is_ok() {
666 println!("=== 初始化成功,检查表 ===");
667 let tables_after = manager.debug_list_tables().await.unwrap();
668 println!("初始化后的表: {tables_after:?}");
669
670 let app_config_exists = manager.debug_table_exists("app_config").await.unwrap();
672 println!("app_config表存在: {app_config_exists}");
673 } else {
674 println!("=== 初始化失败,尝试手动创建简单表 ===");
675 let result = manager.debug_execute_sql(
676 "CREATE TABLE app_config (config_key VARCHAR PRIMARY KEY, config_value JSON NOT NULL)"
677 ).await;
678 println!("手动创建app_config表的结果: {result:?}");
679
680 if result.is_ok() {
681 let app_config_exists_after =
682 manager.debug_table_exists("app_config").await.unwrap();
683 println!("创建后app_config表存在: {app_config_exists_after}");
684 }
685 }
686 }
687
688 #[tokio::test]
689 async fn test_debug_sql_parsing() {
690 let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
692 let manager = DatabaseManager {
693 config: Arc::new(DatabaseConfig {
694 db_path: None,
695 memory_connection: Some(connection),
696 }),
697 };
698
699 let test_sql = r#"
700 CREATE TABLE test1 (id INTEGER);
701 INSERT INTO test1 VALUES (1);
702 CREATE TABLE test2 (data JSON);
703 INSERT INTO test2 VALUES ('{"key": "value; with semicolon"}');
704 "#;
705
706 let statements = manager.parse_sql_statements(test_sql);
707 println!("解析出的SQL语句数量: {}", statements.len());
708 for (i, stmt) in statements.iter().enumerate() {
709 println!("语句 {}: {}", i + 1, stmt);
710 }
711
712 let schema_sql = include_str!("../migrations/init_duckdb.sql");
714 let real_statements = manager.parse_sql_statements(schema_sql);
715 println!("真实SQL脚本解析出的语句数量: {}", real_statements.len());
716
717 for (i, stmt) in real_statements.iter().take(10).enumerate() {
719 println!("真实语句 {}: {}", i + 1, stmt);
720 }
721 }
722
723 #[tokio::test]
724 async fn test_debug_individual_sql_statements() {
725 let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
727 let manager = DatabaseManager {
728 config: Arc::new(DatabaseConfig {
729 db_path: None,
730 memory_connection: Some(connection),
731 }),
732 };
733
734 let schema_sql = include_str!("../migrations/init_duckdb.sql");
735 let statements = manager.parse_sql_statements(schema_sql);
736 println!("总共解析出 {} 个语句", statements.len());
737
738 for (i, stmt) in statements.iter().enumerate() {
740 let trimmed = stmt.trim();
741 if trimmed.is_empty() {
742 println!("跳过语句 {}: [空语句]", i + 1);
743 continue;
744 }
745
746 let is_only_comments = trimmed
748 .lines()
749 .map(|line| line.trim())
750 .all(|line| line.is_empty() || line.starts_with("--"));
751
752 if is_only_comments {
753 println!("跳过语句 {}: [仅包含注释]", i + 1);
754 continue;
755 }
756
757 println!(
758 "执行语句 {}: {}",
759 i + 1,
760 if trimmed.len() > 100 {
761 let mut end = 100;
763 while end > 0 && !trimmed.is_char_boundary(end) {
764 end -= 1;
765 }
766 format!("{}...", &trimmed[..end])
767 } else {
768 trimmed.to_string()
769 }
770 );
771
772 let result = manager
773 .write_with_retry(|conn| {
774 conn.execute(trimmed, [])?;
775 Ok(())
776 })
777 .await;
778
779 if let Err(e) = result {
780 println!("❌ 语句 {} 执行失败: {}", i + 1, e);
781 println!("失败的语句: {trimmed}");
782 break;
783 } else {
784 println!("✅ 语句 {} 执行成功", i + 1);
785 }
786 }
787 }
788}