burncloud_database_client/
migration.rs1use burncloud_database_core::error::{DatabaseResult, DatabaseError};
4use burncloud_database_core::{QueryExecutor, QueryContext, MigrationInfo};
5use async_trait::async_trait;
6use std::collections::HashMap;
7use chrono::{DateTime, Utc};
8
9pub struct MigrationManager {
11 query_executor: Box<dyn QueryExecutor>,
12 migrations: Vec<Migration>,
13}
14
15pub struct Migration {
17 pub version: String,
18 pub name: String,
19 pub up_sql: String,
20 pub down_sql: String,
21 pub checksum: String,
22}
23
24impl MigrationManager {
25 pub fn new(query_executor: Box<dyn QueryExecutor>) -> Self {
26 let mut manager = Self {
27 query_executor,
28 migrations: Vec::new(),
29 };
30
31 manager.register_migrations();
33 manager
34 }
35
36 fn register_migrations(&mut self) {
38 self.add_migration(Migration {
40 version: "001".to_string(),
41 name: "create_base_tables".to_string(),
42 up_sql: include_str!("../migrations/001_create_base_tables.sql").to_string(),
43 down_sql: include_str!("../migrations/001_create_base_tables_down.sql").to_string(),
44 checksum: self.calculate_checksum("001"),
45 });
46
47 self.add_migration(Migration {
49 version: "002".to_string(),
50 name: "create_ai_model_tables".to_string(),
51 up_sql: include_str!("../migrations/002_create_ai_model_tables.sql").to_string(),
52 down_sql: include_str!("../migrations/002_create_ai_model_tables_down.sql").to_string(),
53 checksum: self.calculate_checksum("002"),
54 });
55
56 self.add_migration(Migration {
58 version: "003".to_string(),
59 name: "create_monitoring_tables".to_string(),
60 up_sql: include_str!("../migrations/003_create_monitoring_tables.sql").to_string(),
61 down_sql: include_str!("../migrations/003_create_monitoring_tables_down.sql").to_string(),
62 checksum: self.calculate_checksum("003"),
63 });
64
65 self.add_migration(Migration {
67 version: "004".to_string(),
68 name: "create_user_security_tables".to_string(),
69 up_sql: include_str!("../migrations/004_create_user_security_tables.sql").to_string(),
70 down_sql: include_str!("../migrations/004_create_user_security_tables_down.sql").to_string(),
71 checksum: self.calculate_checksum("004"),
72 });
73
74 self.add_migration(Migration {
76 version: "005".to_string(),
77 name: "create_indexes".to_string(),
78 up_sql: include_str!("../migrations/005_create_indexes.sql").to_string(),
79 down_sql: include_str!("../migrations/005_create_indexes_down.sql").to_string(),
80 checksum: self.calculate_checksum("005"),
81 });
82 }
83
84 fn add_migration(&mut self, migration: Migration) {
85 self.migrations.push(migration);
86 }
87
88 fn calculate_checksum(&self, version: &str) -> String {
89 format!("checksum_{}", version)
91 }
92
93 pub async fn run_migrations(&self, context: &QueryContext) -> DatabaseResult<()> {
95 self.create_migration_table(context).await?;
97
98 let applied_migrations = self.get_applied_migrations(context).await?;
100
101 for migration in &self.migrations {
102 if !applied_migrations.iter().any(|m| m.version == migration.version) {
103 println!("Running migration: {} - {}", migration.version, migration.name);
104 self.apply_migration(migration, context).await?;
105 self.record_migration(migration, context).await?;
106 println!("✅ Migration {} completed", migration.version);
107 }
108 }
109
110 Ok(())
111 }
112
113 pub async fn rollback_migration(&self, version: &str, context: &QueryContext) -> DatabaseResult<()> {
115 if let Some(migration) = self.migrations.iter().find(|m| m.version == version) {
116 println!("Rolling back migration: {} - {}", migration.version, migration.name);
117
118 self.execute_sql(&migration.down_sql, context).await?;
120
121 self.remove_migration_record(version, context).await?;
123
124 println!("✅ Migration {} rolled back", version);
125 Ok(())
126 } else {
127 Err(DatabaseError::ConfigurationError(format!("Migration {} not found", version)))
128 }
129 }
130
131 pub async fn get_migration_status(&self, context: &QueryContext) -> DatabaseResult<Vec<MigrationInfo>> {
133 self.get_applied_migrations(context).await
134 }
135
136 async fn create_migration_table(&self, context: &QueryContext) -> DatabaseResult<()> {
138 let sql = "
139 CREATE TABLE IF NOT EXISTS schema_migrations (
140 version VARCHAR(255) PRIMARY KEY,
141 name VARCHAR(255) NOT NULL,
142 applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
143 checksum VARCHAR(255) NOT NULL
144 )
145 ";
146
147 self.execute_sql(sql, context).await
148 }
149
150 async fn apply_migration(&self, migration: &Migration, context: &QueryContext) -> DatabaseResult<()> {
152 self.execute_sql(&migration.up_sql, context).await
153 }
154
155 async fn record_migration(&self, migration: &Migration, context: &QueryContext) -> DatabaseResult<()> {
157 let sql = "
158 INSERT INTO schema_migrations (version, name, checksum)
159 VALUES ($1, $2, $3)
160 ";
161
162 let version_param = burncloud_database_impl::StringParam(migration.version.clone());
163 let name_param = burncloud_database_impl::StringParam(migration.name.clone());
164 let checksum_param = burncloud_database_impl::StringParam(migration.checksum.clone());
165 let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![&version_param, &name_param, &checksum_param];
166
167 self.query_executor.execute_query(sql, ¶ms, context).await?;
168 Ok(())
169 }
170
171 async fn remove_migration_record(&self, version: &str, context: &QueryContext) -> DatabaseResult<()> {
173 let sql = "DELETE FROM schema_migrations WHERE version = $1";
174 let version_param = burncloud_database_impl::StringParam(version.to_string());
175 let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![&version_param];
176
177 self.query_executor.execute_query(sql, ¶ms, context).await?;
178 Ok(())
179 }
180
181 async fn get_applied_migrations(&self, context: &QueryContext) -> DatabaseResult<Vec<MigrationInfo>> {
183 let sql = "SELECT version, name, applied_at, checksum FROM schema_migrations ORDER BY version";
184 let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![];
185
186 let result = self.query_executor.execute_query(sql, ¶ms, context).await?;
187
188 let mut migrations = Vec::new();
189 for row in result.rows {
190 let version = row.get("version")
191 .and_then(|v| v.as_str())
192 .ok_or_else(|| DatabaseError::SerializationError("Missing version".to_string()))?
193 .to_string();
194
195 let name = row.get("name")
196 .and_then(|v| v.as_str())
197 .ok_or_else(|| DatabaseError::SerializationError("Missing name".to_string()))?
198 .to_string();
199
200 let applied_at_str = row.get("applied_at")
201 .and_then(|v| v.as_str())
202 .ok_or_else(|| DatabaseError::SerializationError("Missing applied_at".to_string()))?;
203
204 let applied_at = DateTime::parse_from_rfc3339(applied_at_str)
205 .map_err(|e| DatabaseError::SerializationError(e.to_string()))?
206 .with_timezone(&Utc);
207
208 let checksum = row.get("checksum")
209 .and_then(|v| v.as_str())
210 .ok_or_else(|| DatabaseError::SerializationError("Missing checksum".to_string()))?
211 .to_string();
212
213 migrations.push(MigrationInfo {
214 version,
215 name,
216 applied_at,
217 checksum,
218 });
219 }
220
221 Ok(migrations)
222 }
223
224 async fn execute_sql(&self, sql: &str, context: &QueryContext) -> DatabaseResult<()> {
226 let params: Vec<&dyn burncloud_database_core::QueryParam> = vec![];
227 self.query_executor.execute_query(sql, ¶ms, context).await?;
228 Ok(())
229 }
230}
231
232#[async_trait]
233impl burncloud_database_core::MigrationManager for MigrationManager {
234 async fn run_migrations(&self) -> DatabaseResult<()> {
235 let context = QueryContext::default();
236 self.run_migrations(&context).await
237 }
238
239 async fn rollback_migration(&self, version: &str) -> DatabaseResult<()> {
240 let context = QueryContext::default();
241 self.rollback_migration(version, &context).await
242 }
243
244 async fn get_migration_status(&self) -> DatabaseResult<Vec<MigrationInfo>> {
245 let context = QueryContext::default();
246 self.get_migration_status(&context).await
247 }
248}