elif_orm/migrations/
runner.rs1use sqlx::{PgPool, Row};
7use std::collections::HashSet;
8
9use super::definitions::{Migration, MigrationRecord, MigrationRunResult};
10use super::manager::MigrationManager;
11use crate::error::{OrmError, OrmResult};
12
13pub struct MigrationRunner {
15 manager: MigrationManager,
16 pool: PgPool,
17}
18
19impl MigrationRunner {
20 pub fn new(manager: MigrationManager, pool: PgPool) -> Self {
22 Self { manager, pool }
23 }
24
25 pub async fn from_url(manager: MigrationManager, database_url: &str) -> OrmResult<Self> {
27 let pool = PgPool::connect(database_url)
28 .await
29 .map_err(|e| OrmError::Migration(format!("Failed to connect to database: {}", e)))?;
30
31 Ok(Self::new(manager, pool))
32 }
33
34 pub fn pool(&self) -> &PgPool {
36 &self.pool
37 }
38
39 pub fn manager(&self) -> &MigrationManager {
41 &self.manager
42 }
43
44 pub async fn run_migrations(&self) -> OrmResult<MigrationRunResult> {
46 let start_time = std::time::Instant::now();
47
48 self.ensure_migrations_table().await?;
50
51 let all_migrations = self.manager.load_migrations().await?;
53
54 let applied_migrations = self.get_applied_migrations().await?;
56 let applied_ids: HashSet<String> = applied_migrations.into_iter().map(|m| m.id).collect();
57
58 let pending_migrations: Vec<_> = all_migrations
60 .into_iter()
61 .filter(|m| !applied_ids.contains(&m.id))
62 .collect();
63
64 if pending_migrations.is_empty() {
65 return Ok(MigrationRunResult {
66 applied_count: 0,
67 applied_migrations: Vec::new(),
68 skipped_count: applied_ids.len(),
69 execution_time_ms: start_time.elapsed().as_millis(),
70 });
71 }
72
73 let next_batch = self.get_next_batch_number().await?;
75
76 let mut applied_migration_ids = Vec::new();
78
79 for migration in &pending_migrations {
80 println!("Applying migration: {} - {}", migration.id, migration.name);
81
82 let mut transaction =
84 self.pool.begin().await.map_err(|e| {
85 OrmError::Migration(format!("Failed to start transaction: {}", e))
86 })?;
87
88 if !migration.up_sql.trim().is_empty() {
90 for statement in self.manager.split_sql_statements(&migration.up_sql)? {
91 if !statement.trim().is_empty() {
92 sqlx::query(&statement)
93 .execute(&mut *transaction)
94 .await
95 .map_err(|e| {
96 OrmError::Migration(format!(
97 "Failed to execute migration {}: {}",
98 migration.id, e
99 ))
100 })?;
101 }
102 }
103 }
104
105 let (record_sql, params) = self.record_migration_sql(&migration.id, next_batch);
107 let mut query = sqlx::query(&record_sql);
108 for param in params {
109 query = query.bind(param);
110 }
111 query
112 .execute(&mut *transaction)
113 .await
114 .map_err(|e| OrmError::Migration(format!("Failed to record migration: {}", e)))?;
115
116 transaction
118 .commit()
119 .await
120 .map_err(|e| OrmError::Migration(format!("Failed to commit migration: {}", e)))?;
121
122 applied_migration_ids.push(migration.id.clone());
123 }
124
125 Ok(MigrationRunResult {
126 applied_count: applied_migration_ids.len(),
127 applied_migrations: applied_migration_ids,
128 skipped_count: applied_ids.len(),
129 execution_time_ms: start_time.elapsed().as_millis(),
130 })
131 }
132
133 pub async fn run_migration(&self, migration_id: &str) -> OrmResult<()> {
135 let migrations = self.manager.load_migrations().await?;
137 let migration = migrations
138 .iter()
139 .find(|m| m.id == migration_id)
140 .ok_or_else(|| OrmError::Migration(format!("Migration {} not found", migration_id)))?;
141
142 if self.is_migration_applied(migration_id).await? {
144 return Err(OrmError::Migration(format!(
145 "Migration {} is already applied",
146 migration_id
147 )));
148 }
149
150 let next_batch = self.get_next_batch_number().await?;
152
153 self.apply_migration(migration, next_batch).await?;
155
156 Ok(())
157 }
158
159 async fn apply_migration(&self, migration: &Migration, batch: i32) -> OrmResult<()> {
161 let mut transaction = self
162 .pool
163 .begin()
164 .await
165 .map_err(|e| OrmError::Migration(format!("Failed to start transaction: {}", e)))?;
166
167 if !migration.up_sql.trim().is_empty() {
169 for statement in self.manager.split_sql_statements(&migration.up_sql)? {
170 if !statement.trim().is_empty() {
171 sqlx::query(&statement)
172 .execute(&mut *transaction)
173 .await
174 .map_err(|e| {
175 OrmError::Migration(format!(
176 "Failed to execute migration {}: {}",
177 migration.id, e
178 ))
179 })?;
180 }
181 }
182 }
183
184 let (record_sql, params) = self.record_migration_sql(&migration.id, batch);
186 let mut query = sqlx::query(&record_sql);
187 for param in params {
188 query = query.bind(param);
189 }
190 query
191 .execute(&mut *transaction)
192 .await
193 .map_err(|e| OrmError::Migration(format!("Failed to record migration: {}", e)))?;
194
195 transaction
197 .commit()
198 .await
199 .map_err(|e| OrmError::Migration(format!("Failed to commit migration: {}", e)))?;
200
201 Ok(())
202 }
203
204 async fn ensure_migrations_table(&self) -> OrmResult<()> {
206 let sql = self.create_migrations_table_sql();
207 sqlx::query(&sql).execute(&self.pool).await.map_err(|e| {
208 OrmError::Migration(format!("Failed to create migrations table: {}", e))
209 })?;
210 Ok(())
211 }
212
213 async fn get_applied_migrations(&self) -> OrmResult<Vec<MigrationRecord>> {
215 let sql = self.get_applied_migrations_sql();
216 let rows = sqlx::query(&sql).fetch_all(&self.pool).await.map_err(|e| {
217 OrmError::Migration(format!("Failed to query applied migrations: {}", e))
218 })?;
219
220 let mut records = Vec::new();
221 for row in rows {
222 let id: String = row
223 .try_get("id")
224 .map_err(|e| OrmError::Migration(format!("Failed to get migration id: {}", e)))?;
225 let applied_at: chrono::DateTime<chrono::Utc> = row
226 .try_get("applied_at")
227 .map_err(|e| OrmError::Migration(format!("Failed to get applied_at: {}", e)))?;
228 let batch: i32 = row
229 .try_get("batch")
230 .map_err(|e| OrmError::Migration(format!("Failed to get batch: {}", e)))?;
231
232 records.push(MigrationRecord {
233 id,
234 applied_at,
235 batch,
236 });
237 }
238
239 Ok(records)
240 }
241
242 async fn is_migration_applied(&self, migration_id: &str) -> OrmResult<bool> {
244 let (sql, params) = self.check_migration_sql(migration_id);
245 let mut query = sqlx::query(&sql);
246 for param in params {
247 query = query.bind(param);
248 }
249
250 let result = query
251 .fetch_optional(&self.pool)
252 .await
253 .map_err(|e| OrmError::Migration(format!("Failed to check migration status: {}", e)))?;
254
255 Ok(result.is_some())
256 }
257
258 async fn get_next_batch_number(&self) -> OrmResult<i32> {
260 let sql = self.get_latest_batch_sql();
261 let row = sqlx::query(&sql)
262 .fetch_one(&self.pool)
263 .await
264 .map_err(|e| OrmError::Migration(format!("Failed to get latest batch: {}", e)))?;
265
266 let latest_batch: i32 = row.try_get(0).unwrap_or(0);
267 Ok(latest_batch + 1)
268 }
269
270 fn create_migrations_table_sql(&self) -> String {
272 format!(
273 "CREATE TABLE IF NOT EXISTS {} (\n \
274 id VARCHAR(255) PRIMARY KEY,\n \
275 applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n \
276 batch INTEGER NOT NULL\n\
277 );",
278 self.manager.config().migrations_table
279 )
280 }
281
282 fn check_migration_sql(&self, migration_id: &str) -> (String, Vec<String>) {
284 (
285 format!(
286 "SELECT id FROM {} WHERE id = $1",
287 self.manager.config().migrations_table
288 ),
289 vec![migration_id.to_string()],
290 )
291 }
292
293 fn record_migration_sql(&self, migration_id: &str, batch: i32) -> (String, Vec<String>) {
295 (
296 format!(
297 "INSERT INTO {} (id, applied_at, batch) VALUES ($1, $2::timestamp, $3::integer)",
298 self.manager.config().migrations_table
299 ),
300 vec![
301 migration_id.to_string(),
302 chrono::Utc::now().to_rfc3339(),
303 batch.to_string(),
304 ],
305 )
306 }
307
308 fn get_latest_batch_sql(&self) -> String {
310 format!(
311 "SELECT COALESCE(MAX(batch), 0) FROM {}",
312 self.manager.config().migrations_table
313 )
314 }
315
316 fn get_applied_migrations_sql(&self) -> String {
318 format!(
319 "SELECT id, applied_at, batch FROM {} ORDER BY batch DESC, applied_at DESC",
320 self.manager.config().migrations_table
321 )
322 }
323
324 pub async fn get_migration_status(&self) -> OrmResult<Vec<(Migration, bool)>> {
326 let all_migrations = self.manager.load_migrations().await?;
328
329 let applied_migrations = self.get_applied_migrations().await?;
331 let applied_ids: HashSet<String> = applied_migrations.into_iter().map(|m| m.id).collect();
332
333 let mut status_list = Vec::new();
335 for migration in all_migrations {
336 let is_applied = applied_ids.contains(&migration.id);
337 status_list.push((migration, is_applied));
338 }
339
340 Ok(status_list)
341 }
342}