Skip to main content

waypoint_core/commands/
migrate.rs

1//! Apply pending migrations to the database.
2
3use std::collections::HashMap;
4
5use serde::Serialize;
6use tokio_postgres::Client;
7
8use crate::config::WaypointConfig;
9use crate::db;
10use crate::error::{Result, WaypointError};
11use crate::history;
12use crate::hooks::{self, HookType, ResolvedHook};
13use crate::migration::{scan_migrations, MigrationVersion, ResolvedMigration};
14use crate::placeholder::{build_placeholders, replace_placeholders};
15
16/// Report returned after a migrate operation.
17#[derive(Debug, Serialize)]
18pub struct MigrateReport {
19    pub migrations_applied: usize,
20    pub total_time_ms: i32,
21    pub details: Vec<MigrateDetail>,
22    pub hooks_executed: usize,
23    pub hooks_time_ms: i32,
24}
25
26/// Details of a single applied migration within a migrate run.
27#[derive(Debug, Serialize)]
28pub struct MigrateDetail {
29    pub version: Option<String>,
30    pub description: String,
31    pub script: String,
32    pub execution_time_ms: i32,
33}
34
35/// Execute the migrate command.
36pub async fn execute(
37    client: &Client,
38    config: &WaypointConfig,
39    target_version: Option<&str>,
40) -> Result<MigrateReport> {
41    let table = &config.migrations.table;
42
43    // Acquire advisory lock
44    db::acquire_advisory_lock(client, table).await?;
45
46    let result = run_migrate(client, config, target_version).await;
47
48    // Always release the advisory lock
49    if let Err(e) = db::release_advisory_lock(client, table).await {
50        tracing::warn!(error = %e, "Failed to release advisory lock");
51    }
52
53    match &result {
54        Ok(report) => {
55            tracing::info!(
56                migrations_applied = report.migrations_applied,
57                total_time_ms = report.total_time_ms,
58                hooks_executed = report.hooks_executed,
59                "Migrate completed"
60            );
61        }
62        Err(e) => {
63            tracing::error!(error = %e, "Migrate failed");
64        }
65    }
66
67    result
68}
69
70async fn run_migrate(
71    client: &Client,
72    config: &WaypointConfig,
73    target_version: Option<&str>,
74) -> Result<MigrateReport> {
75    let schema = &config.migrations.schema;
76    let table = &config.migrations.table;
77
78    // Create history table if not exists
79    history::create_history_table(client, schema, table).await?;
80
81    // Validate on migrate if enabled
82    if config.migrations.validate_on_migrate {
83        if let Err(e) = super::validate::execute(client, config).await {
84            // Only fail on actual validation errors, not if there's nothing to validate
85            match &e {
86                WaypointError::ValidationFailed(_) => return Err(e),
87                _ => {
88                    tracing::debug!("Validation skipped: {}", e);
89                }
90            }
91        }
92    }
93
94    // Scan migration files
95    let resolved = scan_migrations(&config.migrations.locations)?;
96
97    // Scan and load hooks
98    let mut all_hooks: Vec<ResolvedHook> = hooks::scan_hooks(&config.migrations.locations)?;
99    let config_hooks = hooks::load_config_hooks(&config.hooks)?;
100    all_hooks.extend(config_hooks);
101
102    // Get applied migrations
103    let applied = history::get_applied_migrations(client, schema, table).await?;
104
105    // Get database user info for placeholders
106    let db_user = db::get_current_user(client)
107        .await
108        .unwrap_or_else(|_| "unknown".to_string());
109    let db_name = db::get_current_database(client)
110        .await
111        .unwrap_or_else(|_| "unknown".to_string());
112    let installed_by = config
113        .migrations
114        .installed_by
115        .as_deref()
116        .unwrap_or(&db_user);
117
118    // Parse target version if provided
119    let target = target_version.map(MigrationVersion::parse).transpose()?;
120
121    // Find the baseline version if any
122    let baseline_version = applied
123        .iter()
124        .find(|a| a.migration_type == "BASELINE")
125        .and_then(|a| a.version.as_ref())
126        .map(|v| MigrationVersion::parse(v))
127        .transpose()?;
128
129    // Find highest applied versioned migration (version presence, not type string,
130    // for Flyway compatibility)
131    let highest_applied = applied
132        .iter()
133        .filter(|a| a.success && a.version.is_some())
134        .filter_map(|a| a.version.as_ref())
135        .filter_map(|v| MigrationVersion::parse(v).ok())
136        .max();
137
138    // Build set of applied versions and scripts for quick lookup
139    let applied_versions: HashMap<String, &crate::history::AppliedMigration> = applied
140        .iter()
141        .filter(|a| a.success)
142        .filter_map(|a| a.version.as_ref().map(|v| (v.clone(), a)))
143        .collect();
144
145    let applied_scripts: HashMap<String, &crate::history::AppliedMigration> = applied
146        .iter()
147        .filter(|a| a.success && a.version.is_none())
148        .map(|a| (a.script.clone(), a))
149        .collect();
150
151    let mut report = MigrateReport {
152        migrations_applied: 0,
153        total_time_ms: 0,
154        details: Vec::new(),
155        hooks_executed: 0,
156        hooks_time_ms: 0,
157    };
158
159    // ── beforeMigrate hooks ──
160    let before_placeholders = build_placeholders(
161        &config.placeholders,
162        schema,
163        &db_user,
164        &db_name,
165        "beforeMigrate",
166    );
167    let (count, ms) = hooks::run_hooks(
168        client,
169        config,
170        &all_hooks,
171        &HookType::BeforeMigrate,
172        &before_placeholders,
173    )
174    .await?;
175    report.hooks_executed += count;
176    report.hooks_time_ms += ms;
177
178    // ── Apply versioned migrations ──
179    let versioned: Vec<&ResolvedMigration> = resolved.iter().filter(|m| m.is_versioned()).collect();
180
181    for migration in &versioned {
182        let version = migration.version().unwrap();
183
184        // Skip if already applied successfully
185        if applied_versions.contains_key(&version.raw) {
186            continue;
187        }
188
189        // Skip if below baseline
190        if let Some(ref bv) = baseline_version {
191            if version <= bv {
192                tracing::debug!("Skipping {} (below baseline)", migration.script);
193                continue;
194            }
195        }
196
197        // Check target version
198        if let Some(ref tv) = target {
199            if version > tv {
200                tracing::debug!("Skipping {} (above target {})", migration.script, tv);
201                break;
202            }
203        }
204
205        // Check out-of-order
206        if !config.migrations.out_of_order {
207            if let Some(ref highest) = highest_applied {
208                if version < highest {
209                    return Err(WaypointError::OutOfOrder {
210                        version: version.raw.clone(),
211                        highest: highest.raw.clone(),
212                    });
213                }
214            }
215        }
216
217        // beforeEachMigrate hooks
218        let each_placeholders = build_placeholders(
219            &config.placeholders,
220            schema,
221            &db_user,
222            &db_name,
223            &migration.script,
224        );
225        let (count, ms) = hooks::run_hooks(
226            client,
227            config,
228            &all_hooks,
229            &HookType::BeforeEachMigrate,
230            &each_placeholders,
231        )
232        .await?;
233        report.hooks_executed += count;
234        report.hooks_time_ms += ms;
235
236        // Apply migration
237        let exec_time = apply_migration(
238            client,
239            config,
240            migration,
241            schema,
242            table,
243            installed_by,
244            &db_user,
245            &db_name,
246        )
247        .await?;
248
249        // afterEachMigrate hooks
250        let (count, ms) = hooks::run_hooks(
251            client,
252            config,
253            &all_hooks,
254            &HookType::AfterEachMigrate,
255            &each_placeholders,
256        )
257        .await?;
258        report.hooks_executed += count;
259        report.hooks_time_ms += ms;
260
261        report.migrations_applied += 1;
262        report.total_time_ms += exec_time;
263        report.details.push(MigrateDetail {
264            version: Some(version.raw.clone()),
265            description: migration.description.clone(),
266            script: migration.script.clone(),
267            execution_time_ms: exec_time,
268        });
269    }
270
271    // ── Apply repeatable migrations ──
272    let repeatables: Vec<&ResolvedMigration> =
273        resolved.iter().filter(|m| !m.is_versioned()).collect();
274
275    for migration in &repeatables {
276        // Check if already applied with same checksum
277        if let Some(applied_entry) = applied_scripts.get(&migration.script) {
278            if applied_entry.checksum == Some(migration.checksum) {
279                continue; // Unchanged, skip
280            }
281            // Checksum differs — re-apply (outdated)
282            tracing::info!(migration = %migration.script, "Re-applying changed repeatable migration");
283        }
284
285        // beforeEachMigrate hooks
286        let each_placeholders = build_placeholders(
287            &config.placeholders,
288            schema,
289            &db_user,
290            &db_name,
291            &migration.script,
292        );
293        let (count, ms) = hooks::run_hooks(
294            client,
295            config,
296            &all_hooks,
297            &HookType::BeforeEachMigrate,
298            &each_placeholders,
299        )
300        .await?;
301        report.hooks_executed += count;
302        report.hooks_time_ms += ms;
303
304        let exec_time = apply_migration(
305            client,
306            config,
307            migration,
308            schema,
309            table,
310            installed_by,
311            &db_user,
312            &db_name,
313        )
314        .await?;
315
316        // afterEachMigrate hooks
317        let (count, ms) = hooks::run_hooks(
318            client,
319            config,
320            &all_hooks,
321            &HookType::AfterEachMigrate,
322            &each_placeholders,
323        )
324        .await?;
325        report.hooks_executed += count;
326        report.hooks_time_ms += ms;
327
328        report.migrations_applied += 1;
329        report.total_time_ms += exec_time;
330        report.details.push(MigrateDetail {
331            version: None,
332            description: migration.description.clone(),
333            script: migration.script.clone(),
334            execution_time_ms: exec_time,
335        });
336    }
337
338    // ── afterMigrate hooks ──
339    let after_placeholders = build_placeholders(
340        &config.placeholders,
341        schema,
342        &db_user,
343        &db_name,
344        "afterMigrate",
345    );
346    let (count, ms) = hooks::run_hooks(
347        client,
348        config,
349        &all_hooks,
350        &HookType::AfterMigrate,
351        &after_placeholders,
352    )
353    .await?;
354    report.hooks_executed += count;
355    report.hooks_time_ms += ms;
356
357    Ok(report)
358}
359
360#[allow(clippy::too_many_arguments)]
361async fn apply_migration(
362    client: &Client,
363    config: &WaypointConfig,
364    migration: &ResolvedMigration,
365    schema: &str,
366    table: &str,
367    installed_by: &str,
368    db_user: &str,
369    db_name: &str,
370) -> Result<i32> {
371    tracing::info!(migration = %migration.script, schema = %schema, "Applying migration");
372
373    // Build placeholders
374    let placeholders = build_placeholders(
375        &config.placeholders,
376        schema,
377        db_user,
378        db_name,
379        &migration.script,
380    );
381
382    // Replace placeholders in SQL
383    let sql = replace_placeholders(&migration.sql, &placeholders)?;
384
385    let version_str = migration.version().map(|v| v.raw.as_str());
386    let type_str = migration.migration_type().to_string();
387
388    // Execute in transaction
389    match db::execute_in_transaction(client, &sql).await {
390        Ok(exec_time) => {
391            // Record success (rank is assigned atomically in the INSERT)
392            history::insert_applied_migration(
393                client,
394                schema,
395                table,
396                version_str,
397                &migration.description,
398                &type_str,
399                &migration.script,
400                Some(migration.checksum),
401                installed_by,
402                exec_time,
403                true,
404            )
405            .await?;
406
407            Ok(exec_time)
408        }
409        Err(e) => {
410            // Record failure — we try to insert the failure record, but don't fail if that also fails
411            if let Err(record_err) = history::insert_applied_migration(
412                client,
413                schema,
414                table,
415                version_str,
416                &migration.description,
417                &type_str,
418                &migration.script,
419                Some(migration.checksum),
420                installed_by,
421                0,
422                false,
423            )
424            .await
425            {
426                tracing::warn!(script = %migration.script, error = %record_err, "Failed to record migration failure in history table");
427            }
428
429            // Extract detailed error message
430            let reason = match &e {
431                WaypointError::DatabaseError(db_err) => crate::error::format_db_error(db_err),
432                other => other.to_string(),
433            };
434            tracing::error!(script = %migration.script, reason = %reason, "Migration failed");
435            Err(WaypointError::MigrationFailed {
436                script: migration.script.clone(),
437                reason,
438            })
439        }
440    }
441}