1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::Serialize;
7use tokio_postgres::Client;
8
9use crate::config::WaypointConfig;
10use crate::error::Result;
11use crate::history;
12use crate::migration::{scan_migrations, MigrationKind, MigrationVersion, ResolvedMigration};
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
16pub enum MigrationState {
17 Pending,
19 Applied,
21 Failed,
23 Missing,
25 Outdated,
27 OutOfOrder,
29 BelowBaseline,
31 Ignored,
33 Baseline,
35 Undone,
37}
38
39impl std::fmt::Display for MigrationState {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 MigrationState::Pending => write!(f, "Pending"),
43 MigrationState::Applied => write!(f, "Applied"),
44 MigrationState::Failed => write!(f, "Failed"),
45 MigrationState::Missing => write!(f, "Missing"),
46 MigrationState::Outdated => write!(f, "Outdated"),
47 MigrationState::OutOfOrder => write!(f, "Out of Order"),
48 MigrationState::BelowBaseline => write!(f, "Below Baseline"),
49 MigrationState::Ignored => write!(f, "Ignored"),
50 MigrationState::Baseline => write!(f, "Baseline"),
51 MigrationState::Undone => write!(f, "Undone"),
52 }
53 }
54}
55
56#[derive(Debug, Clone, Serialize)]
58pub struct MigrationInfo {
59 pub version: Option<String>,
61 pub description: String,
63 pub migration_type: String,
65 pub script: String,
67 pub state: MigrationState,
69 pub installed_on: Option<DateTime<Utc>>,
71 pub execution_time: Option<i32>,
73 pub checksum: Option<i32>,
75}
76
77pub async fn execute(client: &Client, config: &WaypointConfig) -> Result<Vec<MigrationInfo>> {
79 let schema = &config.migrations.schema;
80 let table = &config.migrations.table;
81
82 if !history::history_table_exists(client, schema, table).await? {
84 let resolved = scan_migrations(&config.migrations.locations)?;
86 return Ok(resolved
87 .into_iter()
88 .filter(|m| !m.is_undo())
89 .map(|m| {
90 let version = m.version().map(|v| v.raw.clone());
91 let migration_type = m.migration_type().to_string();
92 MigrationInfo {
93 version,
94 description: m.description,
95 migration_type,
96 script: m.script,
97 state: MigrationState::Pending,
98 installed_on: None,
99 execution_time: None,
100 checksum: Some(m.checksum),
101 }
102 })
103 .collect());
104 }
105
106 let resolved = scan_migrations(&config.migrations.locations)?;
107 let applied = history::get_applied_migrations(client, schema, table).await?;
108
109 let effective = history::effective_applied_versions(&applied);
111
112 let resolved_by_version: HashMap<String, &ResolvedMigration> = resolved
114 .iter()
115 .filter(|m| m.is_versioned())
116 .filter_map(|m| m.version().map(|v| (v.raw.clone(), m)))
117 .collect();
118
119 let resolved_by_script: HashMap<String, &ResolvedMigration> = resolved
120 .iter()
121 .filter(|m| !m.is_versioned() && !m.is_undo())
122 .map(|m| (m.script.clone(), m))
123 .collect();
124
125 let baseline_version = applied
127 .iter()
128 .find(|a| a.migration_type == "BASELINE")
129 .and_then(|a| a.version.as_ref())
130 .map(|v| MigrationVersion::parse(v))
131 .transpose()?;
132
133 let highest_applied = effective
135 .iter()
136 .filter_map(|v| MigrationVersion::parse(v).ok())
137 .max();
138
139 let mut infos: Vec<MigrationInfo> = Vec::new();
140
141 let mut seen_versions: HashMap<String, bool> = HashMap::new();
143 let mut seen_scripts: HashMap<String, bool> = HashMap::new();
144
145 for am in &applied {
146 let is_versioned = am.version.is_some();
149 let is_repeatable = am.version.is_none() && am.migration_type != "BASELINE";
150
151 let state = if am.migration_type == "BASELINE" {
152 MigrationState::Baseline
153 } else if am.migration_type == "UNDO_SQL" {
154 MigrationState::Undone
155 } else if !am.success {
156 MigrationState::Failed
157 } else if is_versioned {
158 if let Some(ref version) = am.version {
159 if !effective.contains(version) {
160 MigrationState::Undone
162 } else if resolved_by_version.contains_key(version) {
163 MigrationState::Applied
164 } else {
165 MigrationState::Missing
166 }
167 } else {
168 MigrationState::Applied
169 }
170 } else if is_repeatable {
171 if let Some(resolved) = resolved_by_script.get(&am.script) {
173 if Some(resolved.checksum) != am.checksum {
174 MigrationState::Outdated
175 } else {
176 MigrationState::Applied
177 }
178 } else {
179 MigrationState::Missing
180 }
181 } else {
182 MigrationState::Applied
183 };
184
185 if let Some(ref v) = am.version {
186 seen_versions.insert(v.clone(), true);
187 }
188 if am.version.is_none() {
189 seen_scripts.insert(am.script.clone(), true);
190 }
191
192 infos.push(MigrationInfo {
193 version: am.version.clone(),
194 description: am.description.clone(),
195 migration_type: am.migration_type.clone(),
196 script: am.script.clone(),
197 state,
198 installed_on: Some(am.installed_on),
199 execution_time: Some(am.execution_time),
200 checksum: am.checksum,
201 });
202 }
203
204 for m in &resolved {
206 if m.is_undo() {
207 continue;
208 }
209 match &m.kind {
210 MigrationKind::Versioned(version) => {
211 if seen_versions.contains_key(&version.raw) {
212 continue;
213 }
214
215 let state = if let Some(ref bv) = baseline_version {
216 if version <= bv {
217 MigrationState::BelowBaseline
218 } else if let Some(ref highest) = highest_applied {
219 if version < highest {
220 MigrationState::OutOfOrder
221 } else {
222 MigrationState::Pending
223 }
224 } else {
225 MigrationState::Pending
226 }
227 } else if let Some(ref highest) = highest_applied {
228 if version < highest {
229 MigrationState::OutOfOrder
230 } else {
231 MigrationState::Pending
232 }
233 } else {
234 MigrationState::Pending
235 };
236
237 infos.push(MigrationInfo {
238 version: Some(version.raw.clone()),
239 description: m.description.clone(),
240 migration_type: m.migration_type().to_string(),
241 script: m.script.clone(),
242 state,
243 installed_on: None,
244 execution_time: None,
245 checksum: Some(m.checksum),
246 });
247 }
248 MigrationKind::Repeatable => {
249 if seen_scripts.contains_key(&m.script) {
250 continue; }
252
253 infos.push(MigrationInfo {
254 version: None,
255 description: m.description.clone(),
256 migration_type: m.migration_type().to_string(),
257 script: m.script.clone(),
258 state: MigrationState::Pending,
259 installed_on: None,
260 execution_time: None,
261 checksum: Some(m.checksum),
262 });
263 }
264 MigrationKind::Undo(_) => unreachable!("undo files are skipped above"),
265 }
266 }
267
268 infos.sort_by(|a, b| match (&a.version, &b.version) {
270 (Some(av), Some(bv)) => {
271 let pa = MigrationVersion::parse(av);
272 let pb = MigrationVersion::parse(bv);
273 match (pa, pb) {
274 (Ok(pa), Ok(pb)) => pa.cmp(&pb),
275 _ => av.cmp(bv),
276 }
277 }
278 (Some(_), None) => std::cmp::Ordering::Less,
279 (None, Some(_)) => std::cmp::Ordering::Greater,
280 (None, None) => a.description.cmp(&b.description),
281 });
282
283 Ok(infos)
284}