Skip to main content

client_core/
database_manager.rs

1use anyhow::Result;
2use duckdb::{Connection, Result as DuckResult};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::Mutex;
7use tracing::{debug, error, warn};
8
9/// DuckDB 数据库管理器 - 针对并发特性优化
10///
11/// 设计原则:
12/// - 文件数据库:每个操作创建新连接,天然支持并发读
13/// - 内存数据库:使用单一连接+Mutex,确保数据一致性
14/// - 写操作:串行执行,避免write-write conflict
15/// - 重试机制:检测冲突并实现指数退避重试
16#[derive(Clone)]
17pub struct DatabaseManager {
18    /// 数据库配置
19    config: Arc<DatabaseConfig>,
20}
21
22#[derive(Debug)]
23struct DatabaseConfig {
24    /// 数据库路径(None表示内存数据库)
25    db_path: Option<PathBuf>,
26    /// 内存数据库的共享连接(仅用于内存数据库)
27    memory_connection: Option<Arc<Mutex<Connection>>>,
28}
29
30impl DatabaseManager {
31    /// 创建新的数据库管理器(文件数据库)
32    pub async fn new<P: AsRef<std::path::Path>>(db_path: P) -> Result<Self> {
33        let db_path = db_path.as_ref().to_path_buf();
34
35        // 确保数据库目录存在
36        if let Some(parent) = db_path.parent() {
37            tokio::fs::create_dir_all(parent).await?;
38        }
39
40        // 测试连接是否可以创建
41        let _test_conn = Connection::open(&db_path)?;
42        debug!("Database file connection test successful: {:?}", db_path);
43
44        let manager = Self {
45            config: Arc::new(DatabaseConfig {
46                db_path: Some(db_path),
47                memory_connection: None,
48            }),
49        };
50
51        // 移除自动初始化 - 只有在明确调用 init_database 时才初始化
52        // manager.initialize_schema().await?;
53
54        Ok(manager)
55    }
56
57    /// 创建内存数据库管理器(主要用于测试)
58    pub async fn new_memory() -> Result<Self> {
59        // 对于内存数据库,我们需要保持一个共享连接
60        let connection = Arc::new(Mutex::new(Connection::open_in_memory()?));
61        debug!("In-memory database connection created successfully");
62
63        let manager = Self {
64            config: Arc::new(DatabaseConfig {
65                db_path: None,
66                memory_connection: Some(connection),
67            }),
68        };
69
70        // 移除自动初始化 - 只有在明确调用 init_database 时才初始化
71        // manager.initialize_schema().await?;
72
73        Ok(manager)
74    }
75
76    /// 显式初始化数据库(只应在 nuwax-cli init 时调用)
77    pub async fn init_database(&self) -> Result<()> {
78        debug!("Explicitly initializing database table structure...");
79        self.initialize_schema().await?;
80        debug!("Database table structure initialization completed");
81        Ok(())
82    }
83
84    /// 创建数据库连接
85    async fn create_connection(&self) -> Result<Connection> {
86        if let Some(ref path) = self.config.db_path {
87            // 文件数据库:创建新连接
88            Ok(Connection::open(path)?)
89        } else if let Some(ref memory_conn) = self.config.memory_connection {
90            // 内存数据库:克隆共享连接
91            let conn = memory_conn.lock().await;
92            Ok(conn.try_clone()?)
93        } else {
94            Err(anyhow::anyhow!("Invalid database configuration"))
95        }
96    }
97
98    /// 并发读操作(文件数据库支持真正的并发)
99    pub async fn read_with_retry<F, R>(&self, operation: F) -> Result<R>
100    where
101        F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
102        R: Send,
103    {
104        let mut retry_count = 0;
105        const MAX_RETRIES: usize = 3;
106
107        loop {
108            let connection = self.create_connection().await?;
109
110            match operation(&connection) {
111                Ok(result) => return Ok(result),
112                Err(e) => {
113                    let error_msg = e.to_string();
114                    if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
115                        retry_count += 1;
116                        let delay = Duration::from_millis(100 * (1 << retry_count)); // 指数退避
117                        warn!(
118                            "Database read operation failed, retrying in {}ms ({}/{}): {}",
119                            delay.as_millis(),
120                            retry_count,
121                            MAX_RETRIES,
122                            error_msg
123                        );
124                        tokio::time::sleep(delay).await;
125                    } else {
126                        error!("Database read operation ultimately failed: {}", error_msg);
127                        return Err(anyhow::anyhow!(e.to_string()));
128                    }
129                }
130            }
131        }
132    }
133
134    /// 并发写操作(确保写入一致性)
135    pub async fn write_with_retry<F, R>(&self, operation: F) -> Result<R>
136    where
137        F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
138        R: Send,
139    {
140        let mut retry_count = 0;
141        const MAX_RETRIES: usize = 3;
142
143        loop {
144            if let Some(ref memory_conn) = self.config.memory_connection {
145                // 内存数据库:确保独占访问
146                let conn = memory_conn.lock().await;
147                match operation(&conn) {
148                    Ok(result) => return Ok(result),
149                    Err(e) => {
150                        let error_msg = e.to_string();
151                        if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
152                            retry_count += 1;
153                            let delay = Duration::from_millis(50 * (1 << retry_count)); // 较短的重试间隔
154                            warn!(
155                                "In-memory database write operation failed, retrying in {}ms ({}/{}): {}",
156                                delay.as_millis(),
157                                retry_count,
158                                MAX_RETRIES,
159                                error_msg
160                            );
161                            drop(conn); // 释放锁
162                            tokio::time::sleep(delay).await;
163                        } else {
164                            error!(
165                                "In-memory database write operation ultimately failed: {}",
166                                error_msg
167                            );
168                            return Err(anyhow::anyhow!(e.to_string()));
169                        }
170                    }
171                }
172            } else {
173                // 文件数据库:创建新连接
174                let connection = self.create_connection().await?;
175                match operation(&connection) {
176                    Ok(result) => return Ok(result),
177                    Err(e) => {
178                        let error_msg = e.to_string();
179                        if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
180                            retry_count += 1;
181                            let delay = Duration::from_millis(100 * (1 << retry_count)); // 指数退避
182                            warn!(
183                                "File database write operation failed, retrying in {}ms ({}/{}): {}",
184                                delay.as_millis(),
185                                retry_count,
186                                MAX_RETRIES,
187                                error_msg
188                            );
189                            tokio::time::sleep(delay).await;
190                        } else {
191                            error!(
192                                "File database write operation ultimately failed: {}",
193                                error_msg
194                            );
195                            return Err(anyhow::anyhow!(e));
196                        }
197                    }
198                }
199            }
200        }
201    }
202
203    /// 批量写操作(事务中执行多个写操作)
204    /// 注意:这个方法接受一个闭包,该闭包会在事务上下文中执行
205    pub async fn batch_write_with_retry<F, R>(&self, operations: F) -> Result<R>
206    where
207        F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
208        R: Send,
209    {
210        // 将事务逻辑封装到普通的写操作中
211        self.write_with_retry(|conn| {
212            // 注意:这里我们不使用事务,因为 Connection 的借用问题
213            // 如果需要事务,可以在 operations 闭包内部处理
214            operations(conn)
215        })
216        .await
217    }
218
219    /// 检查错误是否可重试
220    fn is_retryable_error(error_msg: &str) -> bool {
221        // DuckDB 的 write-write conflict 错误模式
222        error_msg.contains("write-write conflict")
223            || error_msg.contains("database is locked")
224            || error_msg.contains("database is busy")
225            || error_msg.contains("SQLITE_BUSY")
226            || error_msg.contains("SQLITE_LOCKED")
227    }
228
229    /// 初始化数据库表结构
230    pub async fn initialize_schema(&self) -> Result<()> {
231        debug!("Initializing database table structure...");
232
233        // 读取SQL初始化脚本
234        let schema_sql = include_str!("../migrations/init_duckdb.sql");
235
236        // 使用更智能的SQL语句分割方式
237        self.write_with_retry(|conn| {
238            let statements = self.parse_sql_statements(schema_sql);
239            for statement in statements {
240                let trimmed = statement.trim();
241                if trimmed.is_empty() {
242                    continue;
243                }
244
245                // 检查是否只包含注释
246                let is_only_comments = trimmed
247                    .lines()
248                    .map(|line| line.trim())
249                    .all(|line| line.is_empty() || line.starts_with("--"));
250
251                if is_only_comments {
252                    continue;
253                }
254
255                debug!(
256                    "Executing SQL statement: {}",
257                    if trimmed.len() > 100 {
258                        // 安全地截取字符串,避免切断多字节字符
259                        let mut end = 100;
260                        while end > 0 && !trimmed.is_char_boundary(end) {
261                            end -= 1;
262                        }
263                        format!("{}...", &trimmed[..end])
264                    } else {
265                        trimmed.to_string()
266                    }
267                );
268
269                if let Err(e) = conn.execute(trimmed, []) {
270                    error!(
271                        "SQL statement execution failed: {}, statement: {}",
272                        e, trimmed
273                    );
274                    return Err(e);
275                }
276            }
277            Ok(())
278        })
279        .await?;
280
281        debug!("Database table structure initialization completed");
282        Ok(())
283    }
284
285    /// 智能解析SQL语句(处理JSON中的分号)
286    fn parse_sql_statements(&self, sql: &str) -> Vec<String> {
287        let mut statements = Vec::new();
288        let mut current_statement = String::new();
289        let mut in_string = false;
290        let mut in_json = false;
291        let mut brace_count = 0;
292        let chars = sql.chars().peekable();
293
294        for ch in chars {
295            match ch {
296                '\'' | '"' => {
297                    current_statement.push(ch);
298                    if !in_json {
299                        in_string = !in_string;
300                    }
301                }
302                '{' => {
303                    current_statement.push(ch);
304                    if !in_string {
305                        brace_count += 1;
306                        in_json = true;
307                    }
308                }
309                '}' => {
310                    current_statement.push(ch);
311                    if !in_string && in_json {
312                        brace_count -= 1;
313                        if brace_count == 0 {
314                            in_json = false;
315                        }
316                    }
317                }
318                ';' if !in_string && !in_json => {
319                    // 这是一个真正的语句结束符
320                    if !current_statement.trim().is_empty() {
321                        statements.push(current_statement.trim().to_string());
322                    }
323                    current_statement.clear();
324                }
325                _ => {
326                    current_statement.push(ch);
327                }
328            }
329        }
330
331        // 添加最后一个语句
332        if !current_statement.trim().is_empty() {
333            statements.push(current_statement.trim().to_string());
334        }
335
336        statements
337    }
338
339    /// 获取连接统计信息
340    pub fn get_connection_stats(&self) -> ConnectionStats {
341        ConnectionStats {
342            db_type: if self.config.db_path.is_some() {
343                "file".to_string()
344            } else {
345                "memory".to_string()
346            },
347            is_memory_db: self.config.memory_connection.is_some(),
348        }
349    }
350
351    /// 健康检查
352    pub async fn health_check(&self) -> Result<HealthStatus> {
353        // 检查读连接
354        let read_result = self
355            .read_with_retry(|conn| {
356                conn.query_row("SELECT 1", [], |row| {
357                    let value: i32 = row.get(0)?;
358                    Ok(value)
359                })
360            })
361            .await;
362
363        // 检查写连接
364        let write_result = self
365            .write_with_retry(|conn| {
366                conn.query_row("SELECT 1", [], |row| {
367                    let value: i32 = row.get(0)?;
368                    Ok(value)
369                })
370            })
371            .await;
372
373        Ok(HealthStatus {
374            read_healthy: read_result.is_ok(),
375            write_healthy: write_result.is_ok(),
376            read_error: read_result.err().map(|e| e.to_string()),
377            write_error: write_result.err().map(|e| e.to_string()),
378        })
379    }
380
381    /// 调试:执行单个SQL语句并返回结果
382    #[cfg(test)]
383    pub async fn debug_execute_sql(&self, sql: &str) -> Result<()> {
384        self.write_with_retry(|conn| {
385            debug!("Executing debug SQL: {}", sql);
386            conn.execute(sql, [])?;
387            Ok(())
388        })
389        .await
390    }
391
392    /// 调试:检查表是否存在
393    #[cfg(test)]
394    pub async fn debug_table_exists(&self, table_name: &str) -> Result<bool> {
395        self.read_with_retry(|conn| {
396            let exists = conn.query_row(
397                "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?",
398                [table_name],
399                |row| {
400                    let count: i64 = row.get(0)?;
401                    Ok(count > 0)
402                },
403            );
404
405            // 如果information_schema不存在,尝试SQLite方式
406            match exists {
407                Ok(result) => Ok(result),
408                Err(_) => {
409                    // 尝试DuckDB的系统表
410                    conn.query_row(
411                        "SELECT COUNT(*) FROM duckdb_tables() WHERE table_name = ?",
412                        [table_name],
413                        |row| {
414                            let count: i64 = row.get(0)?;
415                            Ok(count > 0)
416                        },
417                    )
418                }
419            }
420        })
421        .await
422    }
423
424    /// 调试:获取所有表名
425    #[cfg(test)]
426    pub async fn debug_list_tables(&self) -> Result<Vec<String>> {
427        self.read_with_retry(|conn| {
428            let mut stmt = conn.prepare("SELECT table_name FROM duckdb_tables()")?;
429            let table_iter = stmt.query_map([], |row| {
430                let table_name: String = row.get(0)?;
431                Ok(table_name)
432            })?;
433
434            let mut tables = Vec::new();
435            for table in table_iter {
436                tables.push(table?);
437            }
438            Ok(tables)
439        })
440        .await
441    }
442}
443
444/// 连接统计信息
445#[derive(Debug, Clone)]
446pub struct ConnectionStats {
447    pub db_type: String,
448    pub is_memory_db: bool,
449}
450
451/// 健康检查状态
452#[derive(Debug, Clone)]
453pub struct HealthStatus {
454    pub read_healthy: bool,
455    pub write_healthy: bool,
456    pub read_error: Option<String>,
457    pub write_error: Option<String>,
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use tempfile::tempdir;
464
465    #[tokio::test]
466    async fn test_database_manager_creation() {
467        let temp_dir = tempdir().unwrap();
468        let db_path = temp_dir.path().join("test.db");
469
470        let manager = DatabaseManager::new(&db_path).await.unwrap();
471        let stats = manager.get_connection_stats();
472
473        assert_eq!(stats.db_type, "file");
474        assert!(!stats.is_memory_db);
475    }
476
477    #[tokio::test]
478    async fn test_memory_database_creation() {
479        let manager = DatabaseManager::new_memory().await.unwrap();
480        let stats = manager.get_connection_stats();
481
482        assert_eq!(stats.db_type, "memory");
483        assert!(stats.is_memory_db);
484    }
485
486    #[tokio::test]
487    async fn test_concurrent_read_operations() {
488        let manager = DatabaseManager::new_memory().await.unwrap();
489
490        // 并发执行多个读操作
491        let mut handles = Vec::new();
492        for i in 0..10 {
493            let manager = manager.clone();
494            let handle = tokio::spawn(async move {
495                manager
496                    .read_with_retry(|conn| {
497                        conn.query_row("SELECT ?", [i], |row| {
498                            let value: i32 = row.get(0)?;
499                            Ok(value)
500                        })
501                    })
502                    .await
503            });
504            handles.push(handle);
505        }
506
507        // 等待所有操作完成
508        for (i, handle) in handles.into_iter().enumerate() {
509            let result = handle.await.unwrap();
510            assert_eq!(result.unwrap(), i as i32);
511        }
512    }
513
514    #[tokio::test]
515    async fn test_write_operations() {
516        let manager = DatabaseManager::new_memory().await.unwrap();
517
518        // 测试写操作
519        let result = manager
520            .write_with_retry(|conn| {
521                conn.execute(
522                    "CREATE TABLE IF NOT EXISTS test_table (id INTEGER, name TEXT)",
523                    [],
524                )?;
525                conn.execute("INSERT INTO test_table (id, name) VALUES (1, 'test')", [])?;
526                Ok(())
527            })
528            .await;
529
530        assert!(result.is_ok());
531
532        // 验证数据是否写入
533        let value = manager
534            .read_with_retry(|conn| {
535                conn.query_row("SELECT name FROM test_table WHERE id = 1", [], |row| {
536                    let name: String = row.get(0)?;
537                    Ok(name)
538                })
539            })
540            .await;
541
542        assert_eq!(value.unwrap(), "test");
543    }
544
545    #[tokio::test]
546    async fn test_health_check() {
547        let manager = DatabaseManager::new_memory().await.unwrap();
548        let health = manager.health_check().await.unwrap();
549
550        assert!(health.read_healthy);
551        assert!(health.write_healthy);
552        assert!(health.read_error.is_none());
553        assert!(health.write_error.is_none());
554    }
555
556    #[tokio::test]
557    async fn test_batch_write_operations() {
558        let manager = DatabaseManager::new_memory().await.unwrap();
559
560        // 测试批量写操作
561        let result = manager
562            .batch_write_with_retry(|conn| {
563                conn.execute(
564                    "CREATE TABLE IF NOT EXISTS batch_test (id INTEGER, value TEXT)",
565                    [],
566                )?;
567                conn.execute("INSERT INTO batch_test (id, value) VALUES (1, 'a')", [])?;
568                conn.execute("INSERT INTO batch_test (id, value) VALUES (2, 'b')", [])?;
569                conn.execute("INSERT INTO batch_test (id, value) VALUES (3, 'c')", [])?;
570                Ok(())
571            })
572            .await;
573
574        assert!(result.is_ok());
575
576        // 验证所有数据都已写入
577        let count = manager
578            .read_with_retry(|conn| {
579                conn.query_row("SELECT COUNT(*) FROM batch_test", [], |row| {
580                    let count: i64 = row.get(0)?;
581                    Ok(count)
582                })
583            })
584            .await;
585
586        assert_eq!(count.unwrap(), 3);
587    }
588
589    #[tokio::test]
590    async fn test_file_database_concurrent_operations() {
591        let temp_dir = tempdir().unwrap();
592        let db_path = temp_dir.path().join("concurrent_test.db");
593        let manager = DatabaseManager::new(&db_path).await.unwrap();
594
595        // 创建测试表
596        manager
597            .write_with_retry(|conn| {
598                conn.execute(
599                    "CREATE TABLE IF NOT EXISTS concurrent_test (id INTEGER, value TEXT)",
600                    [],
601                )?;
602                Ok(())
603            })
604            .await
605            .unwrap();
606
607        // 并发写入测试
608        let mut handles = Vec::new();
609        for i in 0..5 {
610            let manager = manager.clone();
611            let handle = tokio::spawn(async move {
612                manager
613                    .write_with_retry(|conn| {
614                        conn.execute(
615                            "INSERT INTO concurrent_test (id, value) VALUES (?, ?)",
616                            [&i.to_string(), &format!("value_{i}")],
617                        )?;
618                        Ok(())
619                    })
620                    .await
621            });
622            handles.push(handle);
623        }
624
625        // 等待所有写操作完成
626        for handle in handles {
627            let result = handle.await.unwrap();
628            assert!(result.is_ok());
629        }
630
631        // 验证数据完整性
632        let count = manager
633            .read_with_retry(|conn| {
634                conn.query_row("SELECT COUNT(*) FROM concurrent_test", [], |row| {
635                    let count: i64 = row.get(0)?;
636                    Ok(count)
637                })
638            })
639            .await;
640
641        assert_eq!(count.unwrap(), 5);
642    }
643
644    #[tokio::test]
645    async fn test_debug_sql_initialization() {
646        // 创建一个不初始化的数据库管理器
647        let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
648        let manager = DatabaseManager {
649            config: Arc::new(DatabaseConfig {
650                db_path: None,
651                memory_connection: Some(connection),
652            }),
653        };
654
655        println!("=== 初始化前 ===");
656        // 检查表是否存在
657        let tables = manager.debug_list_tables().await.unwrap();
658        println!("初始化前的表: {tables:?}");
659
660        println!("=== 开始初始化 ===");
661        // 现在手动初始化
662        let init_result = manager.initialize_schema().await;
663        println!("初始化结果: {init_result:?}");
664
665        if init_result.is_ok() {
666            println!("=== 初始化成功,检查表 ===");
667            let tables_after = manager.debug_list_tables().await.unwrap();
668            println!("初始化后的表: {tables_after:?}");
669
670            // 检查app_config表是否存在
671            let app_config_exists = manager.debug_table_exists("app_config").await.unwrap();
672            println!("app_config表存在: {app_config_exists}");
673        } else {
674            println!("=== 初始化失败,尝试手动创建简单表 ===");
675            let result = manager.debug_execute_sql(
676                "CREATE TABLE app_config (config_key VARCHAR PRIMARY KEY, config_value JSON NOT NULL)"
677            ).await;
678            println!("手动创建app_config表的结果: {result:?}");
679
680            if result.is_ok() {
681                let app_config_exists_after =
682                    manager.debug_table_exists("app_config").await.unwrap();
683                println!("创建后app_config表存在: {app_config_exists_after}");
684            }
685        }
686    }
687
688    #[tokio::test]
689    async fn test_debug_sql_parsing() {
690        // 创建一个不初始化的数据库管理器
691        let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
692        let manager = DatabaseManager {
693            config: Arc::new(DatabaseConfig {
694                db_path: None,
695                memory_connection: Some(connection),
696            }),
697        };
698
699        let test_sql = r#"
700        CREATE TABLE test1 (id INTEGER);
701        INSERT INTO test1 VALUES (1);
702        CREATE TABLE test2 (data JSON);
703        INSERT INTO test2 VALUES ('{"key": "value; with semicolon"}');
704        "#;
705
706        let statements = manager.parse_sql_statements(test_sql);
707        println!("解析出的SQL语句数量: {}", statements.len());
708        for (i, stmt) in statements.iter().enumerate() {
709            println!("语句 {}: {}", i + 1, stmt);
710        }
711
712        // 测试我们的真实SQL脚本
713        let schema_sql = include_str!("../migrations/init_duckdb.sql");
714        let real_statements = manager.parse_sql_statements(schema_sql);
715        println!("真实SQL脚本解析出的语句数量: {}", real_statements.len());
716
717        // 打印前几个语句
718        for (i, stmt) in real_statements.iter().take(10).enumerate() {
719            println!("真实语句 {}: {}", i + 1, stmt);
720        }
721    }
722
723    #[tokio::test]
724    async fn test_debug_individual_sql_statements() {
725        // 创建一个不初始化的数据库管理器
726        let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
727        let manager = DatabaseManager {
728            config: Arc::new(DatabaseConfig {
729                db_path: None,
730                memory_connection: Some(connection),
731            }),
732        };
733
734        let schema_sql = include_str!("../migrations/init_duckdb.sql");
735        let statements = manager.parse_sql_statements(schema_sql);
736        println!("总共解析出 {} 个语句", statements.len());
737
738        // 逐个执行SQL语句
739        for (i, stmt) in statements.iter().enumerate() {
740            let trimmed = stmt.trim();
741            if trimmed.is_empty() {
742                println!("跳过语句 {}: [空语句]", i + 1);
743                continue;
744            }
745
746            // 检查是否只包含注释
747            let is_only_comments = trimmed
748                .lines()
749                .map(|line| line.trim())
750                .all(|line| line.is_empty() || line.starts_with("--"));
751
752            if is_only_comments {
753                println!("跳过语句 {}: [仅包含注释]", i + 1);
754                continue;
755            }
756
757            println!(
758                "执行语句 {}: {}",
759                i + 1,
760                if trimmed.len() > 100 {
761                    // 安全地截取字符串,避免切断多字节字符
762                    let mut end = 100;
763                    while end > 0 && !trimmed.is_char_boundary(end) {
764                        end -= 1;
765                    }
766                    format!("{}...", &trimmed[..end])
767                } else {
768                    trimmed.to_string()
769                }
770            );
771
772            let result = manager
773                .write_with_retry(|conn| {
774                    conn.execute(trimmed, [])?;
775                    Ok(())
776                })
777                .await;
778
779            if let Err(e) = result {
780                println!("❌ 语句 {} 执行失败: {}", i + 1, e);
781                println!("失败的语句: {trimmed}");
782                break;
783            } else {
784                println!("✅ 语句 {} 执行成功", i + 1);
785            }
786        }
787    }
788}