Skip to main content

client_core/
database_manager.rs

1use anyhow::Result;
2use duckdb::{Connection, Result as DuckResult};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::Mutex;
7use tracing::{debug, error, warn};
8
9/// DuckDB 数据库管理器 - 针对并发特性优化
10///
11/// 设计原则:
12/// - 文件数据库:每个操作创建新连接,天然支持并发读
13/// - 内存数据库:使用单一连接+Mutex,确保数据一致性
14/// - 写操作:串行执行,避免write-write conflict
15/// - 重试机制:检测冲突并实现指数退避重试
16#[derive(Clone)]
17pub struct DatabaseManager {
18    /// 数据库配置
19    config: Arc<DatabaseConfig>,
20}
21
22#[derive(Debug)]
23struct DatabaseConfig {
24    /// 数据库路径(None表示内存数据库)
25    db_path: Option<PathBuf>,
26    /// 内存数据库的共享连接(仅用于内存数据库)
27    memory_connection: Option<Arc<Mutex<Connection>>>,
28}
29
30impl DatabaseManager {
31    /// 创建新的数据库管理器(文件数据库)
32    pub async fn new<P: AsRef<std::path::Path>>(db_path: P) -> Result<Self> {
33        let db_path = db_path.as_ref().to_path_buf();
34
35        // 确保数据库目录存在
36        if let Some(parent) = db_path.parent() {
37            tokio::fs::create_dir_all(parent).await?;
38        }
39
40        // 测试连接是否可以创建
41        let _test_conn = Connection::open(&db_path)?;
42        debug!("Database file connection test successful: {:?}", db_path);
43
44        let manager = Self {
45            config: Arc::new(DatabaseConfig {
46                db_path: Some(db_path),
47                memory_connection: None,
48            }),
49        };
50
51        // 移除自动初始化 - 只有在明确调用 init_database 时才初始化
52        // manager.initialize_schema().await?;
53
54        Ok(manager)
55    }
56
57    /// 创建内存数据库管理器(主要用于测试)
58    pub async fn new_memory() -> Result<Self> {
59        // 对于内存数据库,我们需要保持一个共享连接
60        let connection = Arc::new(Mutex::new(Connection::open_in_memory()?));
61        debug!("In-memory database connection created successfully");
62
63        let manager = Self {
64            config: Arc::new(DatabaseConfig {
65                db_path: None,
66                memory_connection: Some(connection),
67            }),
68        };
69
70        // 移除自动初始化 - 只有在明确调用 init_database 时才初始化
71        // manager.initialize_schema().await?;
72
73        Ok(manager)
74    }
75
76    /// 显式初始化数据库(只应在 nuwax-cli init 时调用)
77    pub async fn init_database(&self) -> Result<()> {
78        debug!("Explicitly initializing database table structure...");
79        self.initialize_schema().await?;
80        debug!("Database table structure initialization completed");
81        Ok(())
82    }
83
84    /// 创建数据库连接
85    async fn create_connection(&self) -> Result<Connection> {
86        if let Some(ref path) = self.config.db_path {
87            // 文件数据库:创建新连接
88            Ok(Connection::open(path)?)
89        } else if let Some(ref memory_conn) = self.config.memory_connection {
90            // 内存数据库:克隆共享连接
91            let conn = memory_conn.lock().await;
92            Ok(conn.try_clone()?)
93        } else {
94            Err(anyhow::anyhow!("Invalid database configuration"))
95        }
96    }
97
98    /// 并发读操作(文件数据库支持真正的并发)
99    pub async fn read_with_retry<F, R>(&self, operation: F) -> Result<R>
100    where
101        F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
102        R: Send,
103    {
104        let mut retry_count = 0;
105        const MAX_RETRIES: usize = 3;
106
107        loop {
108            let connection = self.create_connection().await?;
109
110            match operation(&connection) {
111                Ok(result) => return Ok(result),
112                Err(e) => {
113                    let error_msg = e.to_string();
114                    if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
115                        retry_count += 1;
116                        let delay = Duration::from_millis(100 * (1 << retry_count)); // 指数退避
117                        warn!(
118                            "Database read operation failed, retrying in {}ms ({}/{}): {}",
119                            delay.as_millis(),
120                            retry_count,
121                            MAX_RETRIES,
122                            error_msg
123                        );
124                        tokio::time::sleep(delay).await;
125                    } else {
126                        error!("Database read operation ultimately failed: {}", error_msg);
127                        return Err(anyhow::anyhow!(e.to_string()));
128                    }
129                }
130            }
131        }
132    }
133
134    /// 并发写操作(确保写入一致性)
135    pub async fn write_with_retry<F, R>(&self, operation: F) -> Result<R>
136    where
137        F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
138        R: Send,
139    {
140        let mut retry_count = 0;
141        const MAX_RETRIES: usize = 3;
142
143        loop {
144            if let Some(ref memory_conn) = self.config.memory_connection {
145                // 内存数据库:确保独占访问
146                let conn = memory_conn.lock().await;
147                match operation(&conn) {
148                    Ok(result) => return Ok(result),
149                    Err(e) => {
150                        let error_msg = e.to_string();
151                        if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
152                            retry_count += 1;
153                            let delay = Duration::from_millis(50 * (1 << retry_count)); // 较短的重试间隔
154                            warn!(
155                                "In-memory database write operation failed, retrying in {}ms ({}/{}): {}",
156                                delay.as_millis(),
157                                retry_count,
158                                MAX_RETRIES,
159                                error_msg
160                            );
161                            drop(conn); // 释放锁
162                            tokio::time::sleep(delay).await;
163                        } else {
164                            error!("In-memory database write operation ultimately failed: {}", error_msg);
165                            return Err(anyhow::anyhow!(e.to_string()));
166                        }
167                    }
168                }
169            } else {
170                // 文件数据库:创建新连接
171                let connection = self.create_connection().await?;
172                match operation(&connection) {
173                    Ok(result) => return Ok(result),
174                    Err(e) => {
175                        let error_msg = e.to_string();
176                        if retry_count < MAX_RETRIES && Self::is_retryable_error(&error_msg) {
177                            retry_count += 1;
178                            let delay = Duration::from_millis(100 * (1 << retry_count)); // 指数退避
179                            warn!(
180                                "File database write operation failed, retrying in {}ms ({}/{}): {}",
181                                delay.as_millis(),
182                                retry_count,
183                                MAX_RETRIES,
184                                error_msg
185                            );
186                            tokio::time::sleep(delay).await;
187                        } else {
188                            error!("File database write operation ultimately failed: {}", error_msg);
189                            return Err(anyhow::anyhow!(e));
190                        }
191                    }
192                }
193            }
194        }
195    }
196
197    /// 批量写操作(事务中执行多个写操作)
198    /// 注意:这个方法接受一个闭包,该闭包会在事务上下文中执行
199    pub async fn batch_write_with_retry<F, R>(&self, operations: F) -> Result<R>
200    where
201        F: Fn(&Connection) -> DuckResult<R> + Send + Sync,
202        R: Send,
203    {
204        // 将事务逻辑封装到普通的写操作中
205        self.write_with_retry(|conn| {
206            // 注意:这里我们不使用事务,因为 Connection 的借用问题
207            // 如果需要事务,可以在 operations 闭包内部处理
208            operations(conn)
209        })
210        .await
211    }
212
213    /// 检查错误是否可重试
214    fn is_retryable_error(error_msg: &str) -> bool {
215        // DuckDB 的 write-write conflict 错误模式
216        error_msg.contains("write-write conflict")
217            || error_msg.contains("database is locked")
218            || error_msg.contains("database is busy")
219            || error_msg.contains("SQLITE_BUSY")
220            || error_msg.contains("SQLITE_LOCKED")
221    }
222
223    /// 初始化数据库表结构
224    pub async fn initialize_schema(&self) -> Result<()> {
225        debug!("Initializing database table structure...");
226
227        // 读取SQL初始化脚本
228        let schema_sql = include_str!("../migrations/init_duckdb.sql");
229
230        // 使用更智能的SQL语句分割方式
231        self.write_with_retry(|conn| {
232            let statements = self.parse_sql_statements(schema_sql);
233            for statement in statements {
234                let trimmed = statement.trim();
235                if trimmed.is_empty() {
236                    continue;
237                }
238
239                // 检查是否只包含注释
240                let is_only_comments = trimmed
241                    .lines()
242                    .map(|line| line.trim())
243                    .all(|line| line.is_empty() || line.starts_with("--"));
244
245                if is_only_comments {
246                    continue;
247                }
248
249                debug!(
250                    "Executing SQL statement: {}",
251                    if trimmed.len() > 100 {
252                        // 安全地截取字符串,避免切断多字节字符
253                        let mut end = 100;
254                        while end > 0 && !trimmed.is_char_boundary(end) {
255                            end -= 1;
256                        }
257                        format!("{}...", &trimmed[..end])
258                    } else {
259                        trimmed.to_string()
260                    }
261                );
262
263                if let Err(e) = conn.execute(trimmed, []) {
264                    error!("SQL statement execution failed: {}, statement: {}", e, trimmed);
265                    return Err(e);
266                }
267            }
268            Ok(())
269        })
270        .await?;
271
272        debug!("Database table structure initialization completed");
273        Ok(())
274    }
275
276    /// 智能解析SQL语句(处理JSON中的分号)
277    fn parse_sql_statements(&self, sql: &str) -> Vec<String> {
278        let mut statements = Vec::new();
279        let mut current_statement = String::new();
280        let mut in_string = false;
281        let mut in_json = false;
282        let mut brace_count = 0;
283        let chars = sql.chars().peekable();
284
285        for ch in chars {
286            match ch {
287                '\'' | '"' => {
288                    current_statement.push(ch);
289                    if !in_json {
290                        in_string = !in_string;
291                    }
292                }
293                '{' => {
294                    current_statement.push(ch);
295                    if !in_string {
296                        brace_count += 1;
297                        in_json = true;
298                    }
299                }
300                '}' => {
301                    current_statement.push(ch);
302                    if !in_string && in_json {
303                        brace_count -= 1;
304                        if brace_count == 0 {
305                            in_json = false;
306                        }
307                    }
308                }
309                ';' => {
310                    if !in_string && !in_json {
311                        // 这是一个真正的语句结束符
312                        if !current_statement.trim().is_empty() {
313                            statements.push(current_statement.trim().to_string());
314                        }
315                        current_statement.clear();
316                    } else {
317                        current_statement.push(ch);
318                    }
319                }
320                _ => {
321                    current_statement.push(ch);
322                }
323            }
324        }
325
326        // 添加最后一个语句
327        if !current_statement.trim().is_empty() {
328            statements.push(current_statement.trim().to_string());
329        }
330
331        statements
332    }
333
334    /// 获取连接统计信息
335    pub fn get_connection_stats(&self) -> ConnectionStats {
336        ConnectionStats {
337            db_type: if self.config.db_path.is_some() {
338                "file".to_string()
339            } else {
340                "memory".to_string()
341            },
342            is_memory_db: self.config.memory_connection.is_some(),
343        }
344    }
345
346    /// 健康检查
347    pub async fn health_check(&self) -> Result<HealthStatus> {
348        // 检查读连接
349        let read_result = self
350            .read_with_retry(|conn| {
351                conn.query_row("SELECT 1", [], |row| {
352                    let value: i32 = row.get(0)?;
353                    Ok(value)
354                })
355            })
356            .await;
357
358        // 检查写连接
359        let write_result = self
360            .write_with_retry(|conn| {
361                conn.query_row("SELECT 1", [], |row| {
362                    let value: i32 = row.get(0)?;
363                    Ok(value)
364                })
365            })
366            .await;
367
368        Ok(HealthStatus {
369            read_healthy: read_result.is_ok(),
370            write_healthy: write_result.is_ok(),
371            read_error: read_result.err().map(|e| e.to_string()),
372            write_error: write_result.err().map(|e| e.to_string()),
373        })
374    }
375
376    /// 调试:执行单个SQL语句并返回结果
377    #[cfg(test)]
378    pub async fn debug_execute_sql(&self, sql: &str) -> Result<()> {
379        self.write_with_retry(|conn| {
380            debug!("Executing debug SQL: {}", sql);
381            conn.execute(sql, [])?;
382            Ok(())
383        })
384        .await
385    }
386
387    /// 调试:检查表是否存在
388    #[cfg(test)]
389    pub async fn debug_table_exists(&self, table_name: &str) -> Result<bool> {
390        self.read_with_retry(|conn| {
391            let exists = conn.query_row(
392                "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?",
393                [table_name],
394                |row| {
395                    let count: i64 = row.get(0)?;
396                    Ok(count > 0)
397                },
398            );
399
400            // 如果information_schema不存在,尝试SQLite方式
401            match exists {
402                Ok(result) => Ok(result),
403                Err(_) => {
404                    // 尝试DuckDB的系统表
405                    conn.query_row(
406                        "SELECT COUNT(*) FROM duckdb_tables() WHERE table_name = ?",
407                        [table_name],
408                        |row| {
409                            let count: i64 = row.get(0)?;
410                            Ok(count > 0)
411                        },
412                    )
413                }
414            }
415        })
416        .await
417    }
418
419    /// 调试:获取所有表名
420    #[cfg(test)]
421    pub async fn debug_list_tables(&self) -> Result<Vec<String>> {
422        self.read_with_retry(|conn| {
423            let mut stmt = conn.prepare("SELECT table_name FROM duckdb_tables()")?;
424            let table_iter = stmt.query_map([], |row| {
425                let table_name: String = row.get(0)?;
426                Ok(table_name)
427            })?;
428
429            let mut tables = Vec::new();
430            for table in table_iter {
431                tables.push(table?);
432            }
433            Ok(tables)
434        })
435        .await
436    }
437}
438
439/// 连接统计信息
440#[derive(Debug, Clone)]
441pub struct ConnectionStats {
442    pub db_type: String,
443    pub is_memory_db: bool,
444}
445
446/// 健康检查状态
447#[derive(Debug, Clone)]
448pub struct HealthStatus {
449    pub read_healthy: bool,
450    pub write_healthy: bool,
451    pub read_error: Option<String>,
452    pub write_error: Option<String>,
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use tempfile::tempdir;
459
460    #[tokio::test]
461    async fn test_database_manager_creation() {
462        let temp_dir = tempdir().unwrap();
463        let db_path = temp_dir.path().join("test.db");
464
465        let manager = DatabaseManager::new(&db_path).await.unwrap();
466        let stats = manager.get_connection_stats();
467
468        assert_eq!(stats.db_type, "file");
469        assert!(!stats.is_memory_db);
470    }
471
472    #[tokio::test]
473    async fn test_memory_database_creation() {
474        let manager = DatabaseManager::new_memory().await.unwrap();
475        let stats = manager.get_connection_stats();
476
477        assert_eq!(stats.db_type, "memory");
478        assert!(stats.is_memory_db);
479    }
480
481    #[tokio::test]
482    async fn test_concurrent_read_operations() {
483        let manager = DatabaseManager::new_memory().await.unwrap();
484
485        // 并发执行多个读操作
486        let mut handles = Vec::new();
487        for i in 0..10 {
488            let manager = manager.clone();
489            let handle = tokio::spawn(async move {
490                manager
491                    .read_with_retry(|conn| {
492                        conn.query_row("SELECT ?", [i], |row| {
493                            let value: i32 = row.get(0)?;
494                            Ok(value)
495                        })
496                    })
497                    .await
498            });
499            handles.push(handle);
500        }
501
502        // 等待所有操作完成
503        for (i, handle) in handles.into_iter().enumerate() {
504            let result = handle.await.unwrap();
505            assert_eq!(result.unwrap(), i as i32);
506        }
507    }
508
509    #[tokio::test]
510    async fn test_write_operations() {
511        let manager = DatabaseManager::new_memory().await.unwrap();
512
513        // 测试写操作
514        let result = manager
515            .write_with_retry(|conn| {
516                conn.execute(
517                    "CREATE TABLE IF NOT EXISTS test_table (id INTEGER, name TEXT)",
518                    [],
519                )?;
520                conn.execute("INSERT INTO test_table (id, name) VALUES (1, 'test')", [])?;
521                Ok(())
522            })
523            .await;
524
525        assert!(result.is_ok());
526
527        // 验证数据是否写入
528        let value = manager
529            .read_with_retry(|conn| {
530                conn.query_row("SELECT name FROM test_table WHERE id = 1", [], |row| {
531                    let name: String = row.get(0)?;
532                    Ok(name)
533                })
534            })
535            .await;
536
537        assert_eq!(value.unwrap(), "test");
538    }
539
540    #[tokio::test]
541    async fn test_health_check() {
542        let manager = DatabaseManager::new_memory().await.unwrap();
543        let health = manager.health_check().await.unwrap();
544
545        assert!(health.read_healthy);
546        assert!(health.write_healthy);
547        assert!(health.read_error.is_none());
548        assert!(health.write_error.is_none());
549    }
550
551    #[tokio::test]
552    async fn test_batch_write_operations() {
553        let manager = DatabaseManager::new_memory().await.unwrap();
554
555        // 测试批量写操作
556        let result = manager
557            .batch_write_with_retry(|conn| {
558                conn.execute(
559                    "CREATE TABLE IF NOT EXISTS batch_test (id INTEGER, value TEXT)",
560                    [],
561                )?;
562                conn.execute("INSERT INTO batch_test (id, value) VALUES (1, 'a')", [])?;
563                conn.execute("INSERT INTO batch_test (id, value) VALUES (2, 'b')", [])?;
564                conn.execute("INSERT INTO batch_test (id, value) VALUES (3, 'c')", [])?;
565                Ok(())
566            })
567            .await;
568
569        assert!(result.is_ok());
570
571        // 验证所有数据都已写入
572        let count = manager
573            .read_with_retry(|conn| {
574                conn.query_row("SELECT COUNT(*) FROM batch_test", [], |row| {
575                    let count: i64 = row.get(0)?;
576                    Ok(count)
577                })
578            })
579            .await;
580
581        assert_eq!(count.unwrap(), 3);
582    }
583
584    #[tokio::test]
585    async fn test_file_database_concurrent_operations() {
586        let temp_dir = tempdir().unwrap();
587        let db_path = temp_dir.path().join("concurrent_test.db");
588        let manager = DatabaseManager::new(&db_path).await.unwrap();
589
590        // 创建测试表
591        manager
592            .write_with_retry(|conn| {
593                conn.execute(
594                    "CREATE TABLE IF NOT EXISTS concurrent_test (id INTEGER, value TEXT)",
595                    [],
596                )?;
597                Ok(())
598            })
599            .await
600            .unwrap();
601
602        // 并发写入测试
603        let mut handles = Vec::new();
604        for i in 0..5 {
605            let manager = manager.clone();
606            let handle = tokio::spawn(async move {
607                manager
608                    .write_with_retry(|conn| {
609                        conn.execute(
610                            "INSERT INTO concurrent_test (id, value) VALUES (?, ?)",
611                            [&i.to_string(), &format!("value_{i}")],
612                        )?;
613                        Ok(())
614                    })
615                    .await
616            });
617            handles.push(handle);
618        }
619
620        // 等待所有写操作完成
621        for handle in handles {
622            let result = handle.await.unwrap();
623            assert!(result.is_ok());
624        }
625
626        // 验证数据完整性
627        let count = manager
628            .read_with_retry(|conn| {
629                conn.query_row("SELECT COUNT(*) FROM concurrent_test", [], |row| {
630                    let count: i64 = row.get(0)?;
631                    Ok(count)
632                })
633            })
634            .await;
635
636        assert_eq!(count.unwrap(), 5);
637    }
638
639    #[tokio::test]
640    async fn test_debug_sql_initialization() {
641        // 创建一个不初始化的数据库管理器
642        let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
643        let manager = DatabaseManager {
644            config: Arc::new(DatabaseConfig {
645                db_path: None,
646                memory_connection: Some(connection),
647            }),
648        };
649
650        println!("=== 初始化前 ===");
651        // 检查表是否存在
652        let tables = manager.debug_list_tables().await.unwrap();
653        println!("初始化前的表: {tables:?}");
654
655        println!("=== 开始初始化 ===");
656        // 现在手动初始化
657        let init_result = manager.initialize_schema().await;
658        println!("初始化结果: {init_result:?}");
659
660        if init_result.is_ok() {
661            println!("=== 初始化成功,检查表 ===");
662            let tables_after = manager.debug_list_tables().await.unwrap();
663            println!("初始化后的表: {tables_after:?}");
664
665            // 检查app_config表是否存在
666            let app_config_exists = manager.debug_table_exists("app_config").await.unwrap();
667            println!("app_config表存在: {app_config_exists}");
668        } else {
669            println!("=== 初始化失败,尝试手动创建简单表 ===");
670            let result = manager.debug_execute_sql(
671                "CREATE TABLE app_config (config_key VARCHAR PRIMARY KEY, config_value JSON NOT NULL)"
672            ).await;
673            println!("手动创建app_config表的结果: {result:?}");
674
675            if result.is_ok() {
676                let app_config_exists_after =
677                    manager.debug_table_exists("app_config").await.unwrap();
678                println!("创建后app_config表存在: {app_config_exists_after}");
679            }
680        }
681    }
682
683    #[tokio::test]
684    async fn test_debug_sql_parsing() {
685        // 创建一个不初始化的数据库管理器
686        let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
687        let manager = DatabaseManager {
688            config: Arc::new(DatabaseConfig {
689                db_path: None,
690                memory_connection: Some(connection),
691            }),
692        };
693
694        let test_sql = r#"
695        CREATE TABLE test1 (id INTEGER);
696        INSERT INTO test1 VALUES (1);
697        CREATE TABLE test2 (data JSON);
698        INSERT INTO test2 VALUES ('{"key": "value; with semicolon"}');
699        "#;
700
701        let statements = manager.parse_sql_statements(test_sql);
702        println!("解析出的SQL语句数量: {}", statements.len());
703        for (i, stmt) in statements.iter().enumerate() {
704            println!("语句 {}: {}", i + 1, stmt);
705        }
706
707        // 测试我们的真实SQL脚本
708        let schema_sql = include_str!("../migrations/init_duckdb.sql");
709        let real_statements = manager.parse_sql_statements(schema_sql);
710        println!("真实SQL脚本解析出的语句数量: {}", real_statements.len());
711
712        // 打印前几个语句
713        for (i, stmt) in real_statements.iter().take(10).enumerate() {
714            println!("真实语句 {}: {}", i + 1, stmt);
715        }
716    }
717
718    #[tokio::test]
719    async fn test_debug_individual_sql_statements() {
720        // 创建一个不初始化的数据库管理器
721        let connection = Arc::new(Mutex::new(Connection::open_in_memory().unwrap()));
722        let manager = DatabaseManager {
723            config: Arc::new(DatabaseConfig {
724                db_path: None,
725                memory_connection: Some(connection),
726            }),
727        };
728
729        let schema_sql = include_str!("../migrations/init_duckdb.sql");
730        let statements = manager.parse_sql_statements(schema_sql);
731        println!("总共解析出 {} 个语句", statements.len());
732
733        // 逐个执行SQL语句
734        for (i, stmt) in statements.iter().enumerate() {
735            let trimmed = stmt.trim();
736            if trimmed.is_empty() {
737                println!("跳过语句 {}: [空语句]", i + 1);
738                continue;
739            }
740
741            // 检查是否只包含注释
742            let is_only_comments = trimmed
743                .lines()
744                .map(|line| line.trim())
745                .all(|line| line.is_empty() || line.starts_with("--"));
746
747            if is_only_comments {
748                println!("跳过语句 {}: [仅包含注释]", i + 1);
749                continue;
750            }
751
752            println!(
753                "执行语句 {}: {}",
754                i + 1,
755                if trimmed.len() > 100 {
756                    // 安全地截取字符串,避免切断多字节字符
757                    let mut end = 100;
758                    while end > 0 && !trimmed.is_char_boundary(end) {
759                        end -= 1;
760                    }
761                    format!("{}...", &trimmed[..end])
762                } else {
763                    trimmed.to_string()
764                }
765            );
766
767            let result = manager
768                .write_with_retry(|conn| {
769                    conn.execute(trimmed, [])?;
770                    Ok(())
771                })
772                .await;
773
774            if let Err(e) = result {
775                println!("❌ 语句 {} 执行失败: {}", i + 1, e);
776                println!("失败的语句: {trimmed}");
777                break;
778            } else {
779                println!("✅ 语句 {} 执行成功", i + 1);
780            }
781        }
782    }
783}