1use std::collections::HashMap;
5
6use serde::Serialize;
7use tokio_postgres::Client;
8
9use crate::config::WaypointConfig;
10use crate::db;
11use crate::error::{Result, WaypointError};
12use crate::history;
13use crate::migration::{scan_migrations, MigrationVersion, ResolvedMigration};
14use crate::placeholder::{build_placeholders, replace_placeholders};
15
16#[derive(Debug, Clone)]
18pub enum UndoTarget {
19 Last,
21 Version(MigrationVersion),
23 Count(usize),
25}
26
27#[derive(Debug, Serialize)]
29pub struct UndoReport {
30 pub migrations_undone: usize,
32 pub total_time_ms: i32,
34 pub details: Vec<UndoDetail>,
36}
37
38#[derive(Debug, Serialize)]
40pub struct UndoDetail {
41 pub version: String,
43 pub description: String,
45 pub script: String,
47 pub execution_time_ms: i32,
49 pub auto_reversal: bool,
51}
52
53#[allow(clippy::too_many_arguments)]
59async fn execute_undo_sql(
60 client: &Client,
61 schema: &str,
62 table: &str,
63 version: &str,
64 description: &str,
65 script: &str,
66 checksum: Option<i32>,
67 installed_by: &str,
68 sql: &str,
69) -> Result<i32> {
70 let start = std::time::Instant::now();
71 client.batch_execute("BEGIN").await?;
72
73 match client.batch_execute(sql).await {
74 Ok(()) => {
75 let exec_time = start.elapsed().as_millis() as i32;
76 match history::insert_applied_migration(
77 client,
78 schema,
79 table,
80 Some(version),
81 description,
82 "UNDO_SQL",
83 script,
84 checksum,
85 installed_by,
86 exec_time,
87 true,
88 )
89 .await
90 {
91 Ok(()) => {
92 client.batch_execute("COMMIT").await?;
93 Ok(exec_time)
94 }
95 Err(e) => {
96 if let Err(rb) = client.batch_execute("ROLLBACK").await {
97 log::error!("Failed to rollback undo transaction: {}", rb);
98 }
99 Err(e)
100 }
101 }
102 }
103 Err(e) => {
104 if let Err(rollback_err) = client.batch_execute("ROLLBACK").await {
105 log::error!("Failed to rollback undo transaction: {}", rollback_err);
106 }
107
108 if let Err(record_err) = history::insert_applied_migration(
110 client,
111 schema,
112 table,
113 Some(version),
114 description,
115 "UNDO_SQL",
116 script,
117 checksum,
118 installed_by,
119 0,
120 false,
121 )
122 .await
123 {
124 log::warn!(
125 "Failed to record undo failure; script={}, error={}",
126 script,
127 record_err
128 );
129 }
130
131 let reason = crate::error::format_db_error(&e);
132 Err(WaypointError::UndoFailed {
133 script: script.to_string(),
134 reason,
135 })
136 }
137 }
138}
139
140pub async fn execute(
142 client: &Client,
143 config: &WaypointConfig,
144 target: UndoTarget,
145) -> Result<UndoReport> {
146 let table = &config.migrations.table;
147
148 db::acquire_advisory_lock(client, table).await?;
150
151 let result = run_undo(client, config, target).await;
152
153 if let Err(e) = db::release_advisory_lock(client, table).await {
155 log::error!("Failed to release advisory lock: {}", e);
156 }
157
158 match &result {
159 Ok(report) => {
160 log::info!(
161 "Undo completed; migrations_undone={}, total_time_ms={}",
162 report.migrations_undone,
163 report.total_time_ms
164 );
165 }
166 Err(e) => {
167 log::error!("Undo failed: {}", e);
168 }
169 }
170
171 result
172}
173
174async fn run_undo(
175 client: &Client,
176 config: &WaypointConfig,
177 target: UndoTarget,
178) -> Result<UndoReport> {
179 let schema = &config.migrations.schema;
180 let table = &config.migrations.table;
181
182 history::create_history_table(client, schema, table).await?;
184
185 let resolved = scan_migrations(&config.migrations.locations)?;
187 let undo_by_version: HashMap<String, &ResolvedMigration> = resolved
188 .iter()
189 .filter(|m| m.is_undo())
190 .filter_map(|m| m.version().map(|v| (v.raw.clone(), m)))
191 .collect();
192
193 let applied = history::get_applied_migrations(client, schema, table).await?;
195 let effective = history::effective_applied_versions(&applied);
196
197 let mut applied_versions: Vec<MigrationVersion> = effective
199 .iter()
200 .filter_map(|v| MigrationVersion::parse(v).ok())
201 .collect();
202 applied_versions.sort();
203 applied_versions.reverse(); let versions_to_undo: Vec<MigrationVersion> = match target {
207 UndoTarget::Last => applied_versions.into_iter().take(1).collect(),
208 UndoTarget::Count(n) => applied_versions.into_iter().take(n).collect(),
209 UndoTarget::Version(ref target_ver) => applied_versions
210 .into_iter()
211 .filter(|v| v > target_ver)
212 .collect(),
213 };
214
215 let db_user = db::get_current_user(client)
217 .await
218 .unwrap_or_else(|_| "unknown".to_string());
219 let db_name = db::get_current_database(client)
220 .await
221 .unwrap_or_else(|_| "unknown".to_string());
222 let installed_by = config
223 .migrations
224 .installed_by
225 .as_deref()
226 .unwrap_or(&db_user);
227
228 let mut report = UndoReport {
229 migrations_undone: 0,
230 total_time_ms: 0,
231 details: Vec::new(),
232 };
233
234 for version in &versions_to_undo {
236 if let Some(undo_migration) = undo_by_version.get(&version.raw) {
238 log::info!(
240 "Undoing migration (manual); migration={}, schema={}",
241 undo_migration.script,
242 schema
243 );
244
245 let placeholders = build_placeholders(
246 &config.placeholders,
247 schema,
248 &db_user,
249 &db_name,
250 &undo_migration.script,
251 );
252 let sql = replace_placeholders(&undo_migration.sql, &placeholders)?;
253
254 let exec_time = execute_undo_sql(
255 client,
256 schema,
257 table,
258 &version.raw,
259 &undo_migration.description,
260 &undo_migration.script,
261 Some(undo_migration.checksum),
262 installed_by,
263 &sql,
264 )
265 .await?;
266
267 report.migrations_undone += 1;
268 report.total_time_ms += exec_time;
269 report.details.push(UndoDetail {
270 version: version.raw.clone(),
271 description: undo_migration.description.clone(),
272 script: undo_migration.script.clone(),
273 execution_time_ms: exec_time,
274 auto_reversal: false,
275 });
276 } else if config.reversals.enabled {
277 match crate::reversal::get_reversal(client, schema, table, &version.raw).await? {
279 Some(reversal_sql) => {
280 let script = format!("auto-reversal:V{}", version.raw);
281 log::info!(
282 "Undoing migration (auto-reversal); version={}, schema={}",
283 version.raw,
284 schema
285 );
286
287 let exec_time = execute_undo_sql(
288 client,
289 schema,
290 table,
291 &version.raw,
292 "Auto-generated reversal",
293 &script,
294 None,
295 installed_by,
296 &reversal_sql,
297 )
298 .await?;
299
300 report.migrations_undone += 1;
301 report.total_time_ms += exec_time;
302 report.details.push(UndoDetail {
303 version: version.raw.clone(),
304 description: "Auto-generated reversal".to_string(),
305 script,
306 execution_time_ms: exec_time,
307 auto_reversal: true,
308 });
309 }
310 None => {
311 return Err(WaypointError::UndoMissing {
312 version: version.raw.clone(),
313 });
314 }
315 }
316 } else {
317 return Err(WaypointError::UndoMissing {
318 version: version.raw.clone(),
319 });
320 }
321 }
322
323 Ok(report)
324}