burncloud_database_client/
migration.rs

1// 数据库迁移管理系统
2
3use burncloud_database_core::error::{DatabaseResult, DatabaseError};
4use burncloud_database_core::{QueryExecutor, QueryContext, MigrationInfo};
5use async_trait::async_trait;
6use std::collections::HashMap;
7use chrono::{DateTime, Utc};
8
9/// 数据库迁移管理器
10pub struct MigrationManager {
11    query_executor: Box<dyn QueryExecutor>,
12    migrations: Vec<Migration>,
13}
14
15/// 迁移定义
16pub struct Migration {
17    pub version: String,
18    pub name: String,
19    pub up_sql: String,
20    pub down_sql: String,
21    pub checksum: String,
22}
23
24impl MigrationManager {
25    pub fn new(query_executor: Box<dyn QueryExecutor>) -> Self {
26        let mut manager = Self {
27            query_executor,
28            migrations: Vec::new(),
29        };
30
31        // 注册所有迁移
32        manager.register_migrations();
33        manager
34    }
35
36    /// 注册所有数据库迁移
37    fn register_migrations(&mut self) {
38        // 1. 创建基础表结构
39        self.add_migration(Migration {
40            version: "001".to_string(),
41            name: "create_base_tables".to_string(),
42            up_sql: include_str!("../migrations/001_create_base_tables.sql").to_string(),
43            down_sql: include_str!("../migrations/001_create_base_tables_down.sql").to_string(),
44            checksum: self.calculate_checksum("001"),
45        });
46
47        // 2. AI模型管理表
48        self.add_migration(Migration {
49            version: "002".to_string(),
50            name: "create_ai_model_tables".to_string(),
51            up_sql: include_str!("../migrations/002_create_ai_model_tables.sql").to_string(),
52            down_sql: include_str!("../migrations/002_create_ai_model_tables_down.sql").to_string(),
53            checksum: self.calculate_checksum("002"),
54        });
55
56        // 3. 监控和指标表
57        self.add_migration(Migration {
58            version: "003".to_string(),
59            name: "create_monitoring_tables".to_string(),
60            up_sql: include_str!("../migrations/003_create_monitoring_tables.sql").to_string(),
61            down_sql: include_str!("../migrations/003_create_monitoring_tables_down.sql").to_string(),
62            checksum: self.calculate_checksum("003"),
63        });
64
65        // 4. 用户设置和安全表
66        self.add_migration(Migration {
67            version: "004".to_string(),
68            name: "create_user_security_tables".to_string(),
69            up_sql: include_str!("../migrations/004_create_user_security_tables.sql").to_string(),
70            down_sql: include_str!("../migrations/004_create_user_security_tables_down.sql").to_string(),
71            checksum: self.calculate_checksum("004"),
72        });
73
74        // 5. 索引和性能优化
75        self.add_migration(Migration {
76            version: "005".to_string(),
77            name: "create_indexes".to_string(),
78            up_sql: include_str!("../migrations/005_create_indexes.sql").to_string(),
79            down_sql: include_str!("../migrations/005_create_indexes_down.sql").to_string(),
80            checksum: self.calculate_checksum("005"),
81        });
82    }
83
84    fn add_migration(&mut self, migration: Migration) {
85        self.migrations.push(migration);
86    }
87
88    fn calculate_checksum(&self, version: &str) -> String {
89        // 简单的校验和计算,实际应用中应使用更复杂的算法
90        format!("checksum_{}", version)
91    }
92
93    /// 运行所有待执行的迁移
94    pub async fn run_migrations(&self, context: &QueryContext) -> DatabaseResult<()> {
95        // 确保迁移表存在
96        self.create_migration_table(context).await?;
97
98        // 获取已执行的迁移
99        let applied_migrations = self.get_applied_migrations(context).await?;
100
101        for migration in &self.migrations {
102            if !applied_migrations.iter().any(|m| m.version == migration.version) {
103                println!("Running migration: {} - {}", migration.version, migration.name);
104                self.apply_migration(migration, context).await?;
105                self.record_migration(migration, context).await?;
106                println!("✅ Migration {} completed", migration.version);
107            }
108        }
109
110        Ok(())
111    }
112
113    /// 回滚指定版本的迁移
114    pub async fn rollback_migration(&self, version: &str, context: &QueryContext) -> DatabaseResult<()> {
115        if let Some(migration) = self.migrations.iter().find(|m| m.version == version) {
116            println!("Rolling back migration: {} - {}", migration.version, migration.name);
117
118            // 执行回滚SQL
119            self.execute_sql(&migration.down_sql, context).await?;
120
121            // 从迁移记录中删除
122            self.remove_migration_record(version, context).await?;
123
124            println!("✅ Migration {} rolled back", version);
125            Ok(())
126        } else {
127            Err(DatabaseError::ConfigurationError(format!("Migration {} not found", version)))
128        }
129    }
130
131    /// 获取迁移状态
132    pub async fn get_migration_status(&self, context: &QueryContext) -> DatabaseResult<Vec<MigrationInfo>> {
133        self.get_applied_migrations(context).await
134    }
135
136    /// 创建迁移记录表
137    async fn create_migration_table(&self, context: &QueryContext) -> DatabaseResult<()> {
138        let sql = "
139            CREATE TABLE IF NOT EXISTS schema_migrations (
140                version VARCHAR(255) PRIMARY KEY,
141                name VARCHAR(255) NOT NULL,
142                applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
143                checksum VARCHAR(255) NOT NULL
144            )
145        ";
146
147        self.execute_sql(sql, context).await
148    }
149
150    /// 应用单个迁移
151    async fn apply_migration(&self, migration: &Migration, context: &QueryContext) -> DatabaseResult<()> {
152        self.execute_sql(&migration.up_sql, context).await
153    }
154
155    /// 记录已应用的迁移
156    async fn record_migration(&self, migration: &Migration, context: &QueryContext) -> DatabaseResult<()> {
157        let sql = "
158            INSERT INTO schema_migrations (version, name, checksum)
159            VALUES ($1, $2, $3)
160        ";
161
162        let version_param = burncloud_database_impl::StringParam(migration.version.clone());
163        let name_param = burncloud_database_impl::StringParam(migration.name.clone());
164        let checksum_param = burncloud_database_impl::StringParam(migration.checksum.clone());
165        let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![&version_param, &name_param, &checksum_param];
166
167        self.query_executor.execute_query(sql, &params, context).await?;
168        Ok(())
169    }
170
171    /// 删除迁移记录
172    async fn remove_migration_record(&self, version: &str, context: &QueryContext) -> DatabaseResult<()> {
173        let sql = "DELETE FROM schema_migrations WHERE version = $1";
174        let version_param = burncloud_database_impl::StringParam(version.to_string());
175        let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![&version_param];
176
177        self.query_executor.execute_query(sql, &params, context).await?;
178        Ok(())
179    }
180
181    /// 获取已应用的迁移
182    async fn get_applied_migrations(&self, context: &QueryContext) -> DatabaseResult<Vec<MigrationInfo>> {
183        let sql = "SELECT version, name, applied_at, checksum FROM schema_migrations ORDER BY version";
184        let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![];
185
186        let result = self.query_executor.execute_query(sql, &params, context).await?;
187
188        let mut migrations = Vec::new();
189        for row in result.rows {
190            let version = row.get("version")
191                .and_then(|v| v.as_str())
192                .ok_or_else(|| DatabaseError::SerializationError("Missing version".to_string()))?
193                .to_string();
194
195            let name = row.get("name")
196                .and_then(|v| v.as_str())
197                .ok_or_else(|| DatabaseError::SerializationError("Missing name".to_string()))?
198                .to_string();
199
200            let applied_at_str = row.get("applied_at")
201                .and_then(|v| v.as_str())
202                .ok_or_else(|| DatabaseError::SerializationError("Missing applied_at".to_string()))?;
203
204            let applied_at = DateTime::parse_from_rfc3339(applied_at_str)
205                .map_err(|e| DatabaseError::SerializationError(e.to_string()))?
206                .with_timezone(&Utc);
207
208            let checksum = row.get("checksum")
209                .and_then(|v| v.as_str())
210                .ok_or_else(|| DatabaseError::SerializationError("Missing checksum".to_string()))?
211                .to_string();
212
213            migrations.push(MigrationInfo {
214                version,
215                name,
216                applied_at,
217                checksum,
218            });
219        }
220
221        Ok(migrations)
222    }
223
224    /// 执行SQL语句
225    async fn execute_sql(&self, sql: &str, context: &QueryContext) -> DatabaseResult<()> {
226        let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![];
227        self.query_executor.execute_query(sql, &params, context).await?;
228        Ok(())
229    }
230}
231
232#[async_trait]
233impl burncloud_database_core::MigrationManager for MigrationManager {
234    async fn run_migrations(&self) -> DatabaseResult<()> {
235        let context = QueryContext::default();
236        self.run_migrations(&context).await
237    }
238
239    async fn rollback_migration(&self, version: &str) -> DatabaseResult<()> {
240        let context = QueryContext::default();
241        self.rollback_migration(version, &context).await
242    }
243
244    async fn get_migration_status(&self) -> DatabaseResult<Vec<MigrationInfo>> {
245        let context = QueryContext::default();
246        self.get_migration_status(&context).await
247    }
248}