1use std::collections::HashMap;
2
3use chrono::{DateTime, Utc};
4use serde::Serialize;
5use tokio_postgres::Client;
6
7use crate::config::WaypointConfig;
8use crate::error::Result;
9use crate::history;
10use crate::migration::{scan_migrations, MigrationKind, MigrationVersion, ResolvedMigration};
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
14pub enum MigrationState {
15 Pending,
16 Applied,
17 Failed,
18 Missing,
19 Outdated,
20 OutOfOrder,
21 BelowBaseline,
22 Ignored,
23 Baseline,
24}
25
26impl std::fmt::Display for MigrationState {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 MigrationState::Pending => write!(f, "Pending"),
30 MigrationState::Applied => write!(f, "Applied"),
31 MigrationState::Failed => write!(f, "Failed"),
32 MigrationState::Missing => write!(f, "Missing"),
33 MigrationState::Outdated => write!(f, "Outdated"),
34 MigrationState::OutOfOrder => write!(f, "Out of Order"),
35 MigrationState::BelowBaseline => write!(f, "Below Baseline"),
36 MigrationState::Ignored => write!(f, "Ignored"),
37 MigrationState::Baseline => write!(f, "Baseline"),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize)]
44pub struct MigrationInfo {
45 pub version: Option<String>,
46 pub description: String,
47 pub migration_type: String,
48 pub script: String,
49 pub state: MigrationState,
50 pub installed_on: Option<DateTime<Utc>>,
51 pub execution_time: Option<i32>,
52 pub checksum: Option<i32>,
53}
54
55pub async fn execute(client: &Client, config: &WaypointConfig) -> Result<Vec<MigrationInfo>> {
57 let schema = &config.migrations.schema;
58 let table = &config.migrations.table;
59
60 if !history::history_table_exists(client, schema, table).await? {
62 let resolved = scan_migrations(&config.migrations.locations)?;
64 return Ok(resolved
65 .into_iter()
66 .map(|m| {
67 let version = m.version().map(|v| v.raw.clone());
68 let migration_type = m.migration_type().to_string();
69 MigrationInfo {
70 version,
71 description: m.description,
72 migration_type,
73 script: m.script,
74 state: MigrationState::Pending,
75 installed_on: None,
76 execution_time: None,
77 checksum: Some(m.checksum),
78 }
79 })
80 .collect());
81 }
82
83 let resolved = scan_migrations(&config.migrations.locations)?;
84 let applied = history::get_applied_migrations(client, schema, table).await?;
85
86 let resolved_by_version: HashMap<String, &ResolvedMigration> = resolved
88 .iter()
89 .filter(|m| m.is_versioned())
90 .filter_map(|m| m.version().map(|v| (v.raw.clone(), m)))
91 .collect();
92
93 let resolved_by_script: HashMap<String, &ResolvedMigration> = resolved
94 .iter()
95 .filter(|m| !m.is_versioned())
96 .map(|m| (m.script.clone(), m))
97 .collect();
98
99 let baseline_version = applied
101 .iter()
102 .find(|a| a.migration_type == "BASELINE")
103 .and_then(|a| a.version.as_ref())
104 .map(|v| MigrationVersion::parse(v))
105 .transpose()?;
106
107 let highest_applied = applied
109 .iter()
110 .filter(|a| a.success && a.version.is_some())
111 .filter_map(|a| a.version.as_ref())
112 .filter_map(|v| MigrationVersion::parse(v).ok())
113 .max();
114
115 let mut infos: Vec<MigrationInfo> = Vec::new();
116
117 let mut seen_versions: HashMap<String, bool> = HashMap::new();
119 let mut seen_scripts: HashMap<String, bool> = HashMap::new();
120
121 for am in &applied {
122 let is_versioned = am.version.is_some();
125 let is_repeatable = am.version.is_none() && am.migration_type != "BASELINE";
126
127 let state = if am.migration_type == "BASELINE" {
128 MigrationState::Baseline
129 } else if !am.success {
130 MigrationState::Failed
131 } else if is_versioned {
132 if let Some(ref version) = am.version {
133 if resolved_by_version.contains_key(version) {
134 MigrationState::Applied
135 } else {
136 MigrationState::Missing
137 }
138 } else {
139 MigrationState::Applied
140 }
141 } else if is_repeatable {
142 if let Some(resolved) = resolved_by_script.get(&am.script) {
144 if Some(resolved.checksum) != am.checksum {
145 MigrationState::Outdated
146 } else {
147 MigrationState::Applied
148 }
149 } else {
150 MigrationState::Missing
151 }
152 } else {
153 MigrationState::Applied
154 };
155
156 if let Some(ref v) = am.version {
157 seen_versions.insert(v.clone(), true);
158 }
159 if am.version.is_none() {
160 seen_scripts.insert(am.script.clone(), true);
161 }
162
163 infos.push(MigrationInfo {
164 version: am.version.clone(),
165 description: am.description.clone(),
166 migration_type: am.migration_type.clone(),
167 script: am.script.clone(),
168 state,
169 installed_on: Some(am.installed_on),
170 execution_time: Some(am.execution_time),
171 checksum: am.checksum,
172 });
173 }
174
175 for m in &resolved {
177 match &m.kind {
178 MigrationKind::Versioned(version) => {
179 if seen_versions.contains_key(&version.raw) {
180 continue;
181 }
182
183 let state = if let Some(ref bv) = baseline_version {
184 if version <= bv {
185 MigrationState::BelowBaseline
186 } else if let Some(ref highest) = highest_applied {
187 if version < highest {
188 MigrationState::OutOfOrder
189 } else {
190 MigrationState::Pending
191 }
192 } else {
193 MigrationState::Pending
194 }
195 } else if let Some(ref highest) = highest_applied {
196 if version < highest {
197 MigrationState::OutOfOrder
198 } else {
199 MigrationState::Pending
200 }
201 } else {
202 MigrationState::Pending
203 };
204
205 infos.push(MigrationInfo {
206 version: Some(version.raw.clone()),
207 description: m.description.clone(),
208 migration_type: m.migration_type().to_string(),
209 script: m.script.clone(),
210 state,
211 installed_on: None,
212 execution_time: None,
213 checksum: Some(m.checksum),
214 });
215 }
216 MigrationKind::Repeatable => {
217 if seen_scripts.contains_key(&m.script) {
218 continue; }
220
221 infos.push(MigrationInfo {
222 version: None,
223 description: m.description.clone(),
224 migration_type: m.migration_type().to_string(),
225 script: m.script.clone(),
226 state: MigrationState::Pending,
227 installed_on: None,
228 execution_time: None,
229 checksum: Some(m.checksum),
230 });
231 }
232 }
233 }
234
235 infos.sort_by(|a, b| match (&a.version, &b.version) {
237 (Some(av), Some(bv)) => {
238 let pa = MigrationVersion::parse(av);
239 let pb = MigrationVersion::parse(bv);
240 match (pa, pb) {
241 (Ok(pa), Ok(pb)) => pa.cmp(&pb),
242 _ => av.cmp(bv),
243 }
244 }
245 (Some(_), None) => std::cmp::Ordering::Less,
246 (None, Some(_)) => std::cmp::Ordering::Greater,
247 (None, None) => a.description.cmp(&b.description),
248 });
249
250 Ok(infos)
251}