1use std::collections::HashMap;
4
5use serde::Serialize;
6use tokio_postgres::Client;
7
8use crate::config::WaypointConfig;
9use crate::db;
10use crate::error::{Result, WaypointError};
11use crate::history;
12use crate::hooks::{self, HookType, ResolvedHook};
13use crate::migration::{scan_migrations, MigrationVersion, ResolvedMigration};
14use crate::placeholder::{build_placeholders, replace_placeholders};
15
16#[derive(Debug, Serialize)]
18pub struct MigrateReport {
19 pub migrations_applied: usize,
20 pub total_time_ms: i32,
21 pub details: Vec<MigrateDetail>,
22 pub hooks_executed: usize,
23 pub hooks_time_ms: i32,
24}
25
26#[derive(Debug, Serialize)]
28pub struct MigrateDetail {
29 pub version: Option<String>,
30 pub description: String,
31 pub script: String,
32 pub execution_time_ms: i32,
33}
34
35pub async fn execute(
37 client: &Client,
38 config: &WaypointConfig,
39 target_version: Option<&str>,
40) -> Result<MigrateReport> {
41 let table = &config.migrations.table;
42
43 db::acquire_advisory_lock(client, table).await?;
45
46 let result = run_migrate(client, config, target_version).await;
47
48 if let Err(e) = db::release_advisory_lock(client, table).await {
50 tracing::warn!(error = %e, "Failed to release advisory lock");
51 }
52
53 match &result {
54 Ok(report) => {
55 tracing::info!(
56 migrations_applied = report.migrations_applied,
57 total_time_ms = report.total_time_ms,
58 hooks_executed = report.hooks_executed,
59 "Migrate completed"
60 );
61 }
62 Err(e) => {
63 tracing::error!(error = %e, "Migrate failed");
64 }
65 }
66
67 result
68}
69
70async fn run_migrate(
71 client: &Client,
72 config: &WaypointConfig,
73 target_version: Option<&str>,
74) -> Result<MigrateReport> {
75 let schema = &config.migrations.schema;
76 let table = &config.migrations.table;
77
78 history::create_history_table(client, schema, table).await?;
80
81 if config.migrations.validate_on_migrate {
83 if let Err(e) = super::validate::execute(client, config).await {
84 match &e {
86 WaypointError::ValidationFailed(_) => return Err(e),
87 _ => {
88 tracing::debug!("Validation skipped: {}", e);
89 }
90 }
91 }
92 }
93
94 let resolved = scan_migrations(&config.migrations.locations)?;
96
97 let mut all_hooks: Vec<ResolvedHook> = hooks::scan_hooks(&config.migrations.locations)?;
99 let config_hooks = hooks::load_config_hooks(&config.hooks)?;
100 all_hooks.extend(config_hooks);
101
102 let applied = history::get_applied_migrations(client, schema, table).await?;
104
105 let db_user = db::get_current_user(client)
107 .await
108 .unwrap_or_else(|_| "unknown".to_string());
109 let db_name = db::get_current_database(client)
110 .await
111 .unwrap_or_else(|_| "unknown".to_string());
112 let installed_by = config
113 .migrations
114 .installed_by
115 .as_deref()
116 .unwrap_or(&db_user);
117
118 let target = target_version.map(MigrationVersion::parse).transpose()?;
120
121 let baseline_version = applied
123 .iter()
124 .find(|a| a.migration_type == "BASELINE")
125 .and_then(|a| a.version.as_ref())
126 .map(|v| MigrationVersion::parse(v))
127 .transpose()?;
128
129 let highest_applied = applied
132 .iter()
133 .filter(|a| a.success && a.version.is_some())
134 .filter_map(|a| a.version.as_ref())
135 .filter_map(|v| MigrationVersion::parse(v).ok())
136 .max();
137
138 let applied_versions: HashMap<String, &crate::history::AppliedMigration> = applied
140 .iter()
141 .filter(|a| a.success)
142 .filter_map(|a| a.version.as_ref().map(|v| (v.clone(), a)))
143 .collect();
144
145 let applied_scripts: HashMap<String, &crate::history::AppliedMigration> = applied
146 .iter()
147 .filter(|a| a.success && a.version.is_none())
148 .map(|a| (a.script.clone(), a))
149 .collect();
150
151 let mut report = MigrateReport {
152 migrations_applied: 0,
153 total_time_ms: 0,
154 details: Vec::new(),
155 hooks_executed: 0,
156 hooks_time_ms: 0,
157 };
158
159 let before_placeholders = build_placeholders(
161 &config.placeholders,
162 schema,
163 &db_user,
164 &db_name,
165 "beforeMigrate",
166 );
167 let (count, ms) = hooks::run_hooks(
168 client,
169 config,
170 &all_hooks,
171 &HookType::BeforeMigrate,
172 &before_placeholders,
173 )
174 .await?;
175 report.hooks_executed += count;
176 report.hooks_time_ms += ms;
177
178 let versioned: Vec<&ResolvedMigration> = resolved.iter().filter(|m| m.is_versioned()).collect();
180
181 for migration in &versioned {
182 let version = migration.version().unwrap();
183
184 if applied_versions.contains_key(&version.raw) {
186 continue;
187 }
188
189 if let Some(ref bv) = baseline_version {
191 if version <= bv {
192 tracing::debug!("Skipping {} (below baseline)", migration.script);
193 continue;
194 }
195 }
196
197 if let Some(ref tv) = target {
199 if version > tv {
200 tracing::debug!("Skipping {} (above target {})", migration.script, tv);
201 break;
202 }
203 }
204
205 if !config.migrations.out_of_order {
207 if let Some(ref highest) = highest_applied {
208 if version < highest {
209 return Err(WaypointError::OutOfOrder {
210 version: version.raw.clone(),
211 highest: highest.raw.clone(),
212 });
213 }
214 }
215 }
216
217 let each_placeholders = build_placeholders(
219 &config.placeholders,
220 schema,
221 &db_user,
222 &db_name,
223 &migration.script,
224 );
225 let (count, ms) = hooks::run_hooks(
226 client,
227 config,
228 &all_hooks,
229 &HookType::BeforeEachMigrate,
230 &each_placeholders,
231 )
232 .await?;
233 report.hooks_executed += count;
234 report.hooks_time_ms += ms;
235
236 let exec_time = apply_migration(
238 client,
239 config,
240 migration,
241 schema,
242 table,
243 installed_by,
244 &db_user,
245 &db_name,
246 )
247 .await?;
248
249 let (count, ms) = hooks::run_hooks(
251 client,
252 config,
253 &all_hooks,
254 &HookType::AfterEachMigrate,
255 &each_placeholders,
256 )
257 .await?;
258 report.hooks_executed += count;
259 report.hooks_time_ms += ms;
260
261 report.migrations_applied += 1;
262 report.total_time_ms += exec_time;
263 report.details.push(MigrateDetail {
264 version: Some(version.raw.clone()),
265 description: migration.description.clone(),
266 script: migration.script.clone(),
267 execution_time_ms: exec_time,
268 });
269 }
270
271 let repeatables: Vec<&ResolvedMigration> =
273 resolved.iter().filter(|m| !m.is_versioned()).collect();
274
275 for migration in &repeatables {
276 if let Some(applied_entry) = applied_scripts.get(&migration.script) {
278 if applied_entry.checksum == Some(migration.checksum) {
279 continue; }
281 tracing::info!(migration = %migration.script, "Re-applying changed repeatable migration");
283 }
284
285 let each_placeholders = build_placeholders(
287 &config.placeholders,
288 schema,
289 &db_user,
290 &db_name,
291 &migration.script,
292 );
293 let (count, ms) = hooks::run_hooks(
294 client,
295 config,
296 &all_hooks,
297 &HookType::BeforeEachMigrate,
298 &each_placeholders,
299 )
300 .await?;
301 report.hooks_executed += count;
302 report.hooks_time_ms += ms;
303
304 let exec_time = apply_migration(
305 client,
306 config,
307 migration,
308 schema,
309 table,
310 installed_by,
311 &db_user,
312 &db_name,
313 )
314 .await?;
315
316 let (count, ms) = hooks::run_hooks(
318 client,
319 config,
320 &all_hooks,
321 &HookType::AfterEachMigrate,
322 &each_placeholders,
323 )
324 .await?;
325 report.hooks_executed += count;
326 report.hooks_time_ms += ms;
327
328 report.migrations_applied += 1;
329 report.total_time_ms += exec_time;
330 report.details.push(MigrateDetail {
331 version: None,
332 description: migration.description.clone(),
333 script: migration.script.clone(),
334 execution_time_ms: exec_time,
335 });
336 }
337
338 let after_placeholders = build_placeholders(
340 &config.placeholders,
341 schema,
342 &db_user,
343 &db_name,
344 "afterMigrate",
345 );
346 let (count, ms) = hooks::run_hooks(
347 client,
348 config,
349 &all_hooks,
350 &HookType::AfterMigrate,
351 &after_placeholders,
352 )
353 .await?;
354 report.hooks_executed += count;
355 report.hooks_time_ms += ms;
356
357 Ok(report)
358}
359
360#[allow(clippy::too_many_arguments)]
361async fn apply_migration(
362 client: &Client,
363 config: &WaypointConfig,
364 migration: &ResolvedMigration,
365 schema: &str,
366 table: &str,
367 installed_by: &str,
368 db_user: &str,
369 db_name: &str,
370) -> Result<i32> {
371 tracing::info!(migration = %migration.script, schema = %schema, "Applying migration");
372
373 let placeholders = build_placeholders(
375 &config.placeholders,
376 schema,
377 db_user,
378 db_name,
379 &migration.script,
380 );
381
382 let sql = replace_placeholders(&migration.sql, &placeholders)?;
384
385 let version_str = migration.version().map(|v| v.raw.as_str());
386 let type_str = migration.migration_type().to_string();
387
388 match db::execute_in_transaction(client, &sql).await {
390 Ok(exec_time) => {
391 history::insert_applied_migration(
393 client,
394 schema,
395 table,
396 version_str,
397 &migration.description,
398 &type_str,
399 &migration.script,
400 Some(migration.checksum),
401 installed_by,
402 exec_time,
403 true,
404 )
405 .await?;
406
407 Ok(exec_time)
408 }
409 Err(e) => {
410 if let Err(record_err) = history::insert_applied_migration(
412 client,
413 schema,
414 table,
415 version_str,
416 &migration.description,
417 &type_str,
418 &migration.script,
419 Some(migration.checksum),
420 installed_by,
421 0,
422 false,
423 )
424 .await
425 {
426 tracing::warn!(script = %migration.script, error = %record_err, "Failed to record migration failure in history table");
427 }
428
429 let reason = match &e {
431 WaypointError::DatabaseError(db_err) => crate::error::format_db_error(db_err),
432 other => other.to_string(),
433 };
434 tracing::error!(script = %migration.script, reason = %reason, "Migration failed");
435 Err(WaypointError::MigrationFailed {
436 script: migration.script.clone(),
437 reason,
438 })
439 }
440 }
441}