1use crate::error::AwaError;
2use sqlx::postgres::PgConnection;
3use sqlx::PgPool;
4use tracing::info;
5
6pub const CURRENT_VERSION: i32 = 6;
8
9const MIGRATIONS: &[(i32, &str, &[&str])] = &[
22 (1, "Canonical schema with UI indexes", &[V1_UP]),
23 (2, "Runtime observability snapshots", &[V2_UP]),
24 (3, "Maintenance loop health in runtime snapshots", &[V3_UP]),
25 (4, "Admin metadata cache tables", &[V4_UP]),
26 (5, "Statement-level admin metadata triggers", &[V5_UP]),
27 (
28 6,
29 "Dirty-key statement triggers for deadlock-free admin metadata",
30 &[V6_UP],
31 ),
32];
33
34const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql");
35const V2_UP: &str = include_str!("../migrations/v002_runtime_instances.sql");
36const V3_UP: &str = include_str!("../migrations/v003_maintenance_health.sql");
37const V4_UP: &str = include_str!("../migrations/v004_admin_metadata.sql");
38const V5_UP: &str = include_str!("../migrations/v005_admin_metadata_stmt_triggers.sql");
39const V6_UP: &str = include_str!("../migrations/v006_remove_hot_table_triggers.sql");
40
41fn normalize_legacy_version(old_version: i32) -> i32 {
45 match old_version {
46 v if v >= 6 => 4, 5 => 3, 4 => 2, 3 => 1, _ => 0, }
52}
53
54pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
64 let lock_key: i64 = 0x4157_415f_4d49_4752; let mut conn = pool.acquire().await?;
66 sqlx::query("SELECT pg_advisory_lock($1)")
67 .bind(lock_key)
68 .execute(&mut *conn)
69 .await?;
70
71 let result = run_inner(&mut conn).await;
72
73 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
74 .bind(lock_key)
75 .execute(&mut *conn)
76 .await;
77
78 result
79}
80
81async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
82 let has_schema: bool =
83 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
84 .fetch_one(&mut *conn)
85 .await?;
86
87 let current = if has_schema {
88 current_version_conn(conn).await?
89 } else {
90 0
91 };
92
93 if !(has_schema && current == CURRENT_VERSION) {
94 for &(version, description, steps) in MIGRATIONS {
95 if version <= current {
96 continue;
97 }
98 info!(version, description, "Applying migration");
99 for step in steps {
100 sqlx::raw_sql(step).execute(&mut *conn).await?;
101 }
102 info!(version, "Migration applied");
103 }
104 } else {
105 info!(version = current, "Schema is up to date");
106 }
107
108 let has_refresh: bool = sqlx::query_scalar(
113 "SELECT EXISTS(SELECT 1 FROM pg_proc WHERE proname = 'refresh_admin_metadata' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'awa'))",
114 )
115 .fetch_one(&mut *conn)
116 .await?;
117 if has_refresh {
118 let _ = sqlx::raw_sql(
124 "BEGIN; SET LOCAL statement_timeout = '5s'; SELECT awa.refresh_admin_metadata(); COMMIT;",
125 )
126 .execute(&mut *conn)
127 .await;
128 }
129
130 Ok(())
131}
132
133pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
135 let mut conn = pool.acquire().await?;
136 current_version_conn(&mut conn).await
137}
138
139async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
140 let has_schema: bool =
141 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
142 .fetch_one(&mut *conn)
143 .await?;
144
145 if !has_schema {
146 return Ok(0);
147 }
148
149 let has_table: bool = sqlx::query_scalar(
150 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
151 )
152 .fetch_one(&mut *conn)
153 .await?;
154
155 if !has_table {
156 return Ok(0);
157 }
158
159 let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
160 .fetch_one(&mut *conn)
161 .await?;
162
163 let raw_version = version.unwrap_or(0);
164
165 if (1..=CURRENT_VERSION).contains(&raw_version) {
168 let has_admin_tables: bool = sqlx::query_scalar(
171 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
172 )
173 .fetch_one(&mut *conn)
174 .await
175 .unwrap_or(false);
176
177 if raw_version >= 4 && has_admin_tables {
180 return Ok(raw_version);
181 }
182 if raw_version <= 3 {
184 let has_runtime: bool = sqlx::query_scalar(
185 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
186 )
187 .fetch_one(&mut *conn)
188 .await
189 .unwrap_or(false);
190 if (raw_version >= 2 && has_runtime) || raw_version == 1 {
192 return Ok(raw_version);
193 }
194 }
195 }
196
197 let has_legacy_high: bool =
201 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM awa.schema_version WHERE version >= 6)")
202 .fetch_one(&mut *conn)
203 .await
204 .unwrap_or(false);
205
206 let has_admin_metadata: bool = sqlx::query_scalar(
207 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
208 )
209 .fetch_one(&mut *conn)
210 .await
211 .unwrap_or(false);
212
213 let is_legacy_v5_only = raw_version == 5 && !has_legacy_high && !has_admin_metadata;
214 let is_legacy_v4_only = raw_version == 4 && !has_legacy_high && !has_admin_metadata;
215
216 let is_legacy_v3_only = raw_version == 3
219 && !has_legacy_high
220 && {
221 let has_runtime: bool = sqlx::query_scalar(
222 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
223 )
224 .fetch_one(&mut *conn)
225 .await
226 .unwrap_or(false);
227 !has_runtime
228 };
229
230 if has_legacy_high || is_legacy_v5_only || is_legacy_v4_only || is_legacy_v3_only {
231 let normalized = normalize_legacy_version(raw_version);
232 info!(
233 old_version = raw_version,
234 new_version = normalized,
235 "Normalizing legacy version numbering"
236 );
237 sqlx::query("DELETE FROM awa.schema_version WHERE version >= 3")
239 .execute(&mut *conn)
240 .await?;
241 for &(v, desc, _) in MIGRATIONS {
242 if v <= normalized {
243 sqlx::query(
244 "INSERT INTO awa.schema_version (version, description) VALUES ($1, $2) ON CONFLICT (version) DO NOTHING",
245 )
246 .bind(v)
247 .bind(desc)
248 .execute(&mut *conn)
249 .await?;
250 }
251 }
252 return Ok(normalized);
253 }
254
255 Ok(raw_version)
256}
257
258pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
260 MIGRATIONS
261 .iter()
262 .map(|&(v, d, steps)| (v, d, steps.join("\n")))
263 .collect()
264}
265
266pub fn migration_sql_range(from: i32, to: i32) -> Vec<(i32, &'static str, String)> {
269 MIGRATIONS
270 .iter()
271 .filter(|&&(v, _, _)| v > from && v <= to)
272 .map(|&(v, d, steps)| (v, d, steps.join("\n")))
273 .collect()
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn migration_sql_range_all() {
282 let all = migration_sql_range(0, CURRENT_VERSION);
283 assert_eq!(all.len(), MIGRATIONS.len());
284 assert_eq!(all.first().unwrap().0, 1);
285 assert_eq!(all.last().unwrap().0, CURRENT_VERSION);
286 }
287
288 #[test]
289 fn migration_sql_range_subset() {
290 let subset = migration_sql_range(2, CURRENT_VERSION);
291 assert!(subset.iter().all(|(v, _, _)| *v > 2));
292 assert_eq!(subset.len(), (CURRENT_VERSION - 2) as usize);
293 }
294
295 #[test]
296 fn migration_sql_range_single() {
297 let single = migration_sql_range(2, 3);
298 assert_eq!(single.len(), 1);
299 assert_eq!(single[0].0, 3);
300 assert!(!single[0].2.is_empty());
301 }
302
303 #[test]
304 fn migration_sql_range_empty_when_equal() {
305 let empty = migration_sql_range(CURRENT_VERSION, CURRENT_VERSION);
306 assert!(empty.is_empty());
307 }
308
309 #[test]
310 fn migration_sql_range_empty_when_inverted() {
311 let empty = migration_sql_range(3, 1);
312 assert!(empty.is_empty());
313 }
314
315 #[test]
316 fn migration_sql_range_matches_full() {
317 let full = migration_sql();
318 let ranged = migration_sql_range(0, CURRENT_VERSION);
319 assert_eq!(full.len(), ranged.len());
320 for (f, r) in full.iter().zip(ranged.iter()) {
321 assert_eq!(f.0, r.0);
322 assert_eq!(f.1, r.1);
323 assert_eq!(f.2, r.2);
324 }
325 }
326}