1use anyhow::Result;
2use duckdb::{Connection, Result as DuckResult};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::Mutex;
7use tracing::{debug, error, warn};
8
9#[derive(Clone)]
17pub struct DatabaseManager {
18 config: Arc<DatabaseConfig>,
20}
21
22#[derive(Debug)]
23struct DatabaseConfig {
24 db_path: Option<PathBuf>,
26 memory_connection: Option<Arc<Mutex<Connection>>>,
28}
29
30impl DatabaseManager {
31 pub async fn new<P: AsRef<std::path::Path>>(db_path: P) -> Result<Self> {
33 let db_path = db_path.as_ref().to_path_buf();
34
35 if let Some(parent) = db_path.parent() {
37 tokio::fs::create_dir_all(parent).await?;
38 }
39
40 let _test_conn = Connection::open(&db_path)?;
42 debug!("Database file connection test successful: {:?}", db_path);
43
44 let manager = Self {
45 config: Arc::new(DatabaseConfig {
46 db_path: Some(db_path),
47 memory_connection: None,
48 }),
49 };
50
51 Ok(manager)
55 }
56
57 pub async fn new_memory() -> Result<Self> {
59 let connection = Arc::new(Mutex::new(Connection::open_in_memory()?));
61 debug!("In-memory database connection created successfully");
62
63 let manager = Self {
64 config: Arc::new(DatabaseConfig {
65 db_path: None,
66 memory_connection: Some(connection),
67 }),
68 };
69
70 Ok(manager)
74 }
75
76 pub async fn init_database(&self) -> Result<()> {
78 debug!("Explicitly initializing database table structure...");
79 self.initialize_schema().await?;
80 debug!("Database table structure initialization completed");
81 Ok(())
82 }
83
84 async fn create_connection(&self) -> Result<Connection> {
86 if let Some(ref path) = self.config.db_path {
87 Ok(Connection::open(path)?)
89 } else if let Some(ref memory_conn) = self.config.memory_connection {
90 let conn = memory_conn.lock().await;
92 Ok(conn.try_clone()?)
93 } else {
94 Err(anyhow::anyhow!("Invalid database configuration"))
95 }
96 }
97
98 pub async fn read_with_retry<F, R>(&self, operation: F) -> Result<R>
100 where
101 F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
102 R: Send,
103 {
104 let mut retry_count = 0;
105 const MAX_RETRIES: usize = 3;
106
107 loop {
108 let connection = self.create_connection().await?;
109
110 match operation(&connection) {
111 Ok(result) => return Ok(result),
112 Err(e) => {
113 let error_msg = e.to_string();
114 if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
115 retry_count += 1;
116 let delay = Duration::from_millis(100 * (1 << retry_count)); warn!(
118 "Database read operation failed, retrying in {}ms ({}/{}): {}",
119 delay.as_millis(),
120 retry_count,
121 MAX_RETRIES,
122 error_msg
123 );
124 tokio::time::sleep(delay).await;
125 } else {
126 error!("Database read operation ultimately failed: {}", error_msg);
127 return Err(anyhow::anyhow!(e.to_string()));
128 }
129 }
130 }
131 }
132 }
133
134 pub async fn write_with_retry<F, R>(&self, operation: F) -> Result<R>
136 where
137 F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
138 R: Send,
139 {
140 let mut retry_count = 0;
141 const MAX_RETRIES: usize = 3;
142
143 loop {
144 if let Some(ref memory_conn) = self.config.memory_connection {
145 let conn = memory_conn.lock().await;
147 match operation(&conn) {
148 Ok(result) => return Ok(result),
149 Err(e) => {
150 let error_msg = e.to_string();
151 if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
152 retry_count += 1;
153 let delay = Duration::from_millis(50 * (1 << retry_count)); warn!(
155 "In-memory database write operation failed, retrying in {}ms ({}/{}): {}",
156 delay.as_millis(),
157 retry_count,
158 MAX_RETRIES,
159 error_msg
160 );
161 drop(conn); tokio::time::sleep(delay).await;
163 } else {
164 error!("In-memory database write operation ultimately failed: {}", error_msg);
165 return Err(anyhow::anyhow!(e.to_string()));
166 }
167 }
168 }
169 } else {
170 let connection = self.create_connection().await?;
172 match operation(&connection) {
173 Ok(result) => return Ok(result),
174 Err(e) => {
175 let error_msg = e.to_string();
176 if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
177 retry_count += 1;
178 let delay = Duration::from_millis(100 * (1 << retry_count)); warn!(
180 "File database write operation failed, retrying in {}ms ({}/{}): {}",
181 delay.as_millis(),
182 retry_count,
183 MAX_RETRIES,
184 error_msg
185 );
186 tokio::time::sleep(delay).await;
187 } else {
188 error!("File database write operation ultimately failed: {}", error_msg);
189 return Err(anyhow::anyhow!(e));
190 }
191 }
192 }
193 }
194 }
195 }
196
197 pub async fn batch_write_with_retry<F, R>(&self, operations: F) -> Result<R>
200 where
201 F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
202 R: Send,
203 {
204 self.write_with_retry(|conn| {
206 operations(conn)
209 })
210 .await
211 }
212
213 fn is_retryable_error(error_msg: &str) -> bool {
215 error_msg.contains("write-write conflict")
217 || error_msg.contains("database is locked")
218 || error_msg.contains("database is busy")
219 || error_msg.contains("SQLITE_BUSY")
220 || error_msg.contains("SQLITE_LOCKED")
221 }
222
223 pub async fn initialize_schema(&self) -> Result<()> {
225 debug!("Initializing database table structure...");
226
227 let schema_sql = include_str!("../migrations/init_duckdb.sql");
229
230 self.write_with_retry(|conn| {
232 let statements = self.parse_sql_statements(schema_sql);
233 for statement in statements {
234 let trimmed = statement.trim();
235 if trimmed.is_empty() {
236 continue;
237 }
238
239 let is_only_comments = trimmed
241 .lines()
242 .map(|line| line.trim())
243 .all(|line| line.is_empty() || line.starts_with("--"));
244
245 if is_only_comments {
246 continue;
247 }
248
249 debug!(
250 "Executing SQL statement: {}",
251 if trimmed.len() > 100 {
252 let mut end = 100;
254 while end > 0 && !trimmed.is_char_boundary(end) {
255 end -= 1;
256 }
257 format!("{}...", &trimmed[..end])
258 } else {
259 trimmed.to_string()
260 }
261 );
262
263 if let Err(e) = conn.execute(trimmed, []) {
264 error!("SQL statement execution failed: {}, statement: {}", e, trimmed);
265 return Err(e);
266 }
267 }
268 Ok(())
269 })
270 .await?;
271
272 debug!("Database table structure initialization completed");
273 Ok(())
274 }
275
276 fn parse_sql_statements(&self, sql: &str) -> Vec<String> {
278 let mut statements = Vec::new();
279 let mut current_statement = String::new();
280 let mut in_string = false;
281 let mut in_json = false;
282 let mut brace_count = 0;
283 let chars = sql.chars().peekable();
284
285 for ch in chars {
286 match ch {
287 '\'' | '"' => {
288 current_statement.push(ch);
289 if !in_json {
290 in_string = !in_string;
291 }
292 }
293 '{' => {
294 current_statement.push(ch);
295 if !in_string {
296 brace_count += 1;
297 in_json = true;
298 }
299 }
300 '}' => {
301 current_statement.push(ch);
302 if !in_string && in_json {
303 brace_count -= 1;
304 if brace_count == 0 {
305 in_json = false;
306 }
307 }
308 }
309 ';' => {
310 if !in_string && !in_json {
311 if !current_statement.trim().is_empty() {
313 statements.push(current_statement.trim().to_string());
314 }
315 current_statement.clear();
316 } else {
317 current_statement.push(ch);
318 }
319 }
320 _ => {
321 current_statement.push(ch);
322 }
323 }
324 }
325
326 if !current_statement.trim().is_empty() {
328 statements.push(current_statement.trim().to_string());
329 }
330
331 statements
332 }
333
334 pub fn get_connection_stats(&self) -> ConnectionStats {
336 ConnectionStats {
337 db_type: if self.config.db_path.is_some() {
338 "file".to_string()
339 } else {
340 "memory".to_string()
341 },
342 is_memory_db: self.config.memory_connection.is_some(),
343 }
344 }
345
346 pub async fn health_check(&self) -> Result<HealthStatus> {
348 let read_result = self
350 .read_with_retry(|conn| {
351 conn.query_row("SELECT 1", [], |row| {
352 let value: i32 = row.get(0)?;
353 Ok(value)
354 })
355 })
356 .await;
357
358 let write_result = self
360 .write_with_retry(|conn| {
361 conn.query_row("SELECT 1", [], |row| {
362 let value: i32 = row.get(0)?;
363 Ok(value)
364 })
365 })
366 .await;
367
368 Ok(HealthStatus {
369 read_healthy: read_result.is_ok(),
370 write_healthy: write_result.is_ok(),
371 read_error: read_result.err().map(|e| e.to_string()),
372 write_error: write_result.err().map(|e| e.to_string()),
373 })
374 }
375
376 #[cfg(test)]
378 pub async fn debug_execute_sql(&self, sql: &str) -> Result<()> {
379 self.write_with_retry(|conn| {
380 debug!("Executing debug SQL: {}", sql);
381 conn.execute(sql, [])?;
382 Ok(())
383 })
384 .await
385 }
386
387 #[cfg(test)]
389 pub async fn debug_table_exists(&self, table_name: &str) -> Result<bool> {
390 self.read_with_retry(|conn| {
391 let exists = conn.query_row(
392 "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?",
393 [table_name],
394 |row| {
395 let count: i64 = row.get(0)?;
396 Ok(count > 0)
397 },
398 );
399
400 match exists {
402 Ok(result) => Ok(result),
403 Err(_) => {
404 conn.query_row(
406 "SELECT COUNT(*) FROM duckdb_tables() WHERE table_name = ?",
407 [table_name],
408 |row| {
409 let count: i64 = row.get(0)?;
410 Ok(count > 0)
411 },
412 )
413 }
414 }
415 })
416 .await
417 }
418
419 #[cfg(test)]
421 pub async fn debug_list_tables(&self) -> Result<Vec<String>> {
422 self.read_with_retry(|conn| {
423 let mut stmt = conn.prepare("SELECT table_name FROM duckdb_tables()")?;
424 let table_iter = stmt.query_map([], |row| {
425 let table_name: String = row.get(0)?;
426 Ok(table_name)
427 })?;
428
429 let mut tables = Vec::new();
430 for table in table_iter {
431 tables.push(table?);
432 }
433 Ok(tables)
434 })
435 .await
436 }
437}
438
439#[derive(Debug, Clone)]
441pub struct ConnectionStats {
442 pub db_type: String,
443 pub is_memory_db: bool,
444}
445
446#[derive(Debug, Clone)]
448pub struct HealthStatus {
449 pub read_healthy: bool,
450 pub write_healthy: bool,
451 pub read_error: Option<String>,
452 pub write_error: Option<String>,
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use tempfile::tempdir;
459
460 #[tokio::test]
461 async fn test_database_manager_creation() {
462 let temp_dir = tempdir().unwrap();
463 let db_path = temp_dir.path().join("test.db");
464
465 let manager = DatabaseManager::new(&db_path).await.unwrap();
466 let stats = manager.get_connection_stats();
467
468 assert_eq!(stats.db_type, "file");
469 assert!(!stats.is_memory_db);
470 }
471
472 #[tokio::test]
473 async fn test_memory_database_creation() {
474 let manager = DatabaseManager::new_memory().await.unwrap();
475 let stats = manager.get_connection_stats();
476
477 assert_eq!(stats.db_type, "memory");
478 assert!(stats.is_memory_db);
479 }
480
481 #[tokio::test]
482 async fn test_concurrent_read_operations() {
483 let manager = DatabaseManager::new_memory().await.unwrap();
484
485 let mut handles = Vec::new();
487 for i in 0..10 {
488 let manager = manager.clone();
489 let handle = tokio::spawn(async move {
490 manager
491 .read_with_retry(|conn| {
492 conn.query_row("SELECT ?", [i], |row| {
493 let value: i32 = row.get(0)?;
494 Ok(value)
495 })
496 })
497 .await
498 });
499 handles.push(handle);
500 }
501
502 for (i, handle) in handles.into_iter().enumerate() {
504 let result = handle.await.unwrap();
505 assert_eq!(result.unwrap(), i as i32);
506 }
507 }
508
509 #[tokio::test]
510 async fn test_write_operations() {
511 let manager = DatabaseManager::new_memory().await.unwrap();
512
513 let result = manager
515 .write_with_retry(|conn| {
516 conn.execute(
517 "CREATE TABLE IF NOT EXISTS test_table (id INTEGER, name TEXT)",
518 [],
519 )?;
520 conn.execute("INSERT INTO test_table (id, name) VALUES (1, 'test')", [])?;
521 Ok(())
522 })
523 .await;
524
525 assert!(result.is_ok());
526
527 let value = manager
529 .read_with_retry(|conn| {
530 conn.query_row("SELECT name FROM test_table WHERE id = 1", [], |row| {
531 let name: String = row.get(0)?;
532 Ok(name)
533 })
534 })
535 .await;
536
537 assert_eq!(value.unwrap(), "test");
538 }
539
540 #[tokio::test]
541 async fn test_health_check() {
542 let manager = DatabaseManager::new_memory().await.unwrap();
543 let health = manager.health_check().await.unwrap();
544
545 assert!(health.read_healthy);
546 assert!(health.write_healthy);
547 assert!(health.read_error.is_none());
548 assert!(health.write_error.is_none());
549 }
550
551 #[tokio::test]
552 async fn test_batch_write_operations() {
553 let manager = DatabaseManager::new_memory().await.unwrap();
554
555 let result = manager
557 .batch_write_with_retry(|conn| {
558 conn.execute(
559 "CREATE TABLE IF NOT EXISTS batch_test (id INTEGER, value TEXT)",
560 [],
561 )?;
562 conn.execute("INSERT INTO batch_test (id, value) VALUES (1, 'a')", [])?;
563 conn.execute("INSERT INTO batch_test (id, value) VALUES (2, 'b')", [])?;
564 conn.execute("INSERT INTO batch_test (id, value) VALUES (3, 'c')", [])?;
565 Ok(())
566 })
567 .await;
568
569 assert!(result.is_ok());
570
571 let count = manager
573 .read_with_retry(|conn| {
574 conn.query_row("SELECT COUNT(*) FROM batch_test", [], |row| {
575 let count: i64 = row.get(0)?;
576 Ok(count)
577 })
578 })
579 .await;
580
581 assert_eq!(count.unwrap(), 3);
582 }
583
584 #[tokio::test]
585 async fn test_file_database_concurrent_operations() {
586 let temp_dir = tempdir().unwrap();
587 let db_path = temp_dir.path().join("concurrent_test.db");
588 let manager = DatabaseManager::new(&db_path).await.unwrap();
589
590 manager
592 .write_with_retry(|conn| {
593 conn.execute(
594 "CREATE TABLE IF NOT EXISTS concurrent_test (id INTEGER, value TEXT)",
595 [],
596 )?;
597 Ok(())
598 })
599 .await
600 .unwrap();
601
602 let mut handles = Vec::new();
604 for i in 0..5 {
605 let manager = manager.clone();
606 let handle = tokio::spawn(async move {
607 manager
608 .write_with_retry(|conn| {
609 conn.execute(
610 "INSERT INTO concurrent_test (id, value) VALUES (?, ?)",
611 [&i.to_string(), &format!("value_{i}")],
612 )?;
613 Ok(())
614 })
615 .await
616 });
617 handles.push(handle);
618 }
619
620 for handle in handles {
622 let result = handle.await.unwrap();
623 assert!(result.is_ok());
624 }
625
626 let count = manager
628 .read_with_retry(|conn| {
629 conn.query_row("SELECT COUNT(*) FROM concurrent_test", [], |row| {
630 let count: i64 = row.get(0)?;
631 Ok(count)
632 })
633 })
634 .await;
635
636 assert_eq!(count.unwrap(), 5);
637 }
638
639 #[tokio::test]
640 async fn test_debug_sql_initialization() {
641 let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
643 let manager = DatabaseManager {
644 config: Arc::new(DatabaseConfig {
645 db_path: None,
646 memory_connection: Some(connection),
647 }),
648 };
649
650 println!("=== 初始化前 ===");
651 let tables = manager.debug_list_tables().await.unwrap();
653 println!("初始化前的表: {tables:?}");
654
655 println!("=== 开始初始化 ===");
656 let init_result = manager.initialize_schema().await;
658 println!("初始化结果: {init_result:?}");
659
660 if init_result.is_ok() {
661 println!("=== 初始化成功,检查表 ===");
662 let tables_after = manager.debug_list_tables().await.unwrap();
663 println!("初始化后的表: {tables_after:?}");
664
665 let app_config_exists = manager.debug_table_exists("app_config").await.unwrap();
667 println!("app_config表存在: {app_config_exists}");
668 } else {
669 println!("=== 初始化失败,尝试手动创建简单表 ===");
670 let result = manager.debug_execute_sql(
671 "CREATE TABLE app_config (config_key VARCHAR PRIMARY KEY, config_value JSON NOT NULL)"
672 ).await;
673 println!("手动创建app_config表的结果: {result:?}");
674
675 if result.is_ok() {
676 let app_config_exists_after =
677 manager.debug_table_exists("app_config").await.unwrap();
678 println!("创建后app_config表存在: {app_config_exists_after}");
679 }
680 }
681 }
682
683 #[tokio::test]
684 async fn test_debug_sql_parsing() {
685 let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
687 let manager = DatabaseManager {
688 config: Arc::new(DatabaseConfig {
689 db_path: None,
690 memory_connection: Some(connection),
691 }),
692 };
693
694 let test_sql = r#"
695 CREATE TABLE test1 (id INTEGER);
696 INSERT INTO test1 VALUES (1);
697 CREATE TABLE test2 (data JSON);
698 INSERT INTO test2 VALUES ('{"key": "value; with semicolon"}');
699 "#;
700
701 let statements = manager.parse_sql_statements(test_sql);
702 println!("解析出的SQL语句数量: {}", statements.len());
703 for (i, stmt) in statements.iter().enumerate() {
704 println!("语句 {}: {}", i + 1, stmt);
705 }
706
707 let schema_sql = include_str!("../migrations/init_duckdb.sql");
709 let real_statements = manager.parse_sql_statements(schema_sql);
710 println!("真实SQL脚本解析出的语句数量: {}", real_statements.len());
711
712 for (i, stmt) in real_statements.iter().take(10).enumerate() {
714 println!("真实语句 {}: {}", i + 1, stmt);
715 }
716 }
717
718 #[tokio::test]
719 async fn test_debug_individual_sql_statements() {
720 let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
722 let manager = DatabaseManager {
723 config: Arc::new(DatabaseConfig {
724 db_path: None,
725 memory_connection: Some(connection),
726 }),
727 };
728
729 let schema_sql = include_str!("../migrations/init_duckdb.sql");
730 let statements = manager.parse_sql_statements(schema_sql);
731 println!("总共解析出 {} 个语句", statements.len());
732
733 for (i, stmt) in statements.iter().enumerate() {
735 let trimmed = stmt.trim();
736 if trimmed.is_empty() {
737 println!("跳过语句 {}: [空语句]", i + 1);
738 continue;
739 }
740
741 let is_only_comments = trimmed
743 .lines()
744 .map(|line| line.trim())
745 .all(|line| line.is_empty() || line.starts_with("--"));
746
747 if is_only_comments {
748 println!("跳过语句 {}: [仅包含注释]", i + 1);
749 continue;
750 }
751
752 println!(
753 "执行语句 {}: {}",
754 i + 1,
755 if trimmed.len() > 100 {
756 let mut end = 100;
758 while end > 0 && !trimmed.is_char_boundary(end) {
759 end -= 1;
760 }
761 format!("{}...", &trimmed[..end])
762 } else {
763 trimmed.to_string()
764 }
765 );
766
767 let result = manager
768 .write_with_retry(|conn| {
769 conn.execute(trimmed, [])?;
770 Ok(())
771 })
772 .await;
773
774 if let Err(e) = result {
775 println!("❌ 语句 {} 执行失败: {}", i + 1, e);
776 println!("失败的语句: {trimmed}");
777 break;
778 } else {
779 println!("✅ 语句 {} 执行成功", i + 1);
780 }
781 }
782 }
783}