1use crate::error::AwaError;
2use sqlx::postgres::PgConnection;
3use sqlx::PgPool;
4use tracing::info;
5
6pub const CURRENT_VERSION: i32 = 5;
8
9const MIGRATIONS: &[(i32, &str, &[&str])] = &[
22 (1, "Canonical schema with UI indexes", &[V1_UP]),
23 (2, "Runtime observability snapshots", &[V2_UP]),
24 (3, "Maintenance loop health in runtime snapshots", &[V3_UP]),
25 (4, "Admin metadata cache tables", &[V4_UP]),
26 (5, "Statement-level admin metadata triggers", &[V5_UP]),
27];
28
29const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql");
30const V2_UP: &str = include_str!("../migrations/v002_runtime_instances.sql");
31const V3_UP: &str = include_str!("../migrations/v003_maintenance_health.sql");
32const V4_UP: &str = include_str!("../migrations/v004_admin_metadata.sql");
33const V5_UP: &str = include_str!("../migrations/v005_admin_metadata_stmt_triggers.sql");
34
35fn normalize_legacy_version(old_version: i32) -> i32 {
39 match old_version {
40 v if v >= 6 => 4, 5 => 3, 4 => 2, 3 => 1, _ => 0, }
46}
47
48pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
58 let lock_key: i64 = 0x4157_415f_4d49_4752; let mut conn = pool.acquire().await?;
60 sqlx::query("SELECT pg_advisory_lock($1)")
61 .bind(lock_key)
62 .execute(&mut *conn)
63 .await?;
64
65 let result = run_inner(&mut conn).await;
66
67 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
68 .bind(lock_key)
69 .execute(&mut *conn)
70 .await;
71
72 result
73}
74
75async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
76 let has_schema: bool =
77 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
78 .fetch_one(&mut *conn)
79 .await?;
80
81 let current = if has_schema {
82 current_version_conn(conn).await?
83 } else {
84 0
85 };
86
87 if has_schema && current == CURRENT_VERSION {
88 info!(version = current, "Schema is up to date");
89 return Ok(());
90 }
91
92 for &(version, description, steps) in MIGRATIONS {
93 if version <= current {
94 continue;
95 }
96 info!(version, description, "Applying migration");
97 for step in steps {
98 sqlx::raw_sql(step).execute(&mut *conn).await?;
99 }
100 info!(version, "Migration applied");
101 }
102
103 Ok(())
104}
105
106pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
108 let mut conn = pool.acquire().await?;
109 current_version_conn(&mut conn).await
110}
111
112async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
113 let has_schema: bool =
114 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
115 .fetch_one(&mut *conn)
116 .await?;
117
118 if !has_schema {
119 return Ok(0);
120 }
121
122 let has_table: bool = sqlx::query_scalar(
123 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
124 )
125 .fetch_one(&mut *conn)
126 .await?;
127
128 if !has_table {
129 return Ok(0);
130 }
131
132 let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
133 .fetch_one(&mut *conn)
134 .await?;
135
136 let raw_version = version.unwrap_or(0);
137
138 let has_legacy_high: bool =
142 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM awa.schema_version WHERE version >= 6)")
143 .fetch_one(&mut *conn)
144 .await
145 .unwrap_or(false);
146
147 let has_admin_metadata: bool = sqlx::query_scalar(
148 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
149 )
150 .fetch_one(&mut *conn)
151 .await
152 .unwrap_or(false);
153
154 let is_legacy_v5_only = raw_version == 5 && !has_legacy_high && !has_admin_metadata;
155 let is_legacy_v4_only = raw_version == 4 && !has_legacy_high && !has_admin_metadata;
156
157 let is_legacy_v3_only = raw_version == 3
160 && !has_legacy_high
161 && {
162 let has_runtime: bool = sqlx::query_scalar(
163 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
164 )
165 .fetch_one(&mut *conn)
166 .await
167 .unwrap_or(false);
168 !has_runtime
169 };
170
171 if has_legacy_high || is_legacy_v5_only || is_legacy_v4_only || is_legacy_v3_only {
172 let normalized = normalize_legacy_version(raw_version);
173 info!(
174 old_version = raw_version,
175 new_version = normalized,
176 "Normalizing legacy version numbering"
177 );
178 sqlx::query("DELETE FROM awa.schema_version WHERE version >= 3")
180 .execute(&mut *conn)
181 .await?;
182 for &(v, desc, _) in MIGRATIONS {
183 if v <= normalized {
184 sqlx::query(
185 "INSERT INTO awa.schema_version (version, description) VALUES ($1, $2) ON CONFLICT (version) DO NOTHING",
186 )
187 .bind(v)
188 .bind(desc)
189 .execute(&mut *conn)
190 .await?;
191 }
192 }
193 return Ok(normalized);
194 }
195
196 Ok(raw_version)
197}
198
199pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
201 MIGRATIONS
202 .iter()
203 .map(|&(v, d, steps)| (v, d, steps.join("\n")))
204 .collect()
205}
206
207pub fn migration_sql_range(from: i32, to: i32) -> Vec<(i32, &'static str, String)> {
210 MIGRATIONS
211 .iter()
212 .filter(|&&(v, _, _)| v > from && v <= to)
213 .map(|&(v, d, steps)| (v, d, steps.join("\n")))
214 .collect()
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn migration_sql_range_all() {
223 let all = migration_sql_range(0, CURRENT_VERSION);
224 assert_eq!(all.len(), MIGRATIONS.len());
225 assert_eq!(all.first().unwrap().0, 1);
226 assert_eq!(all.last().unwrap().0, CURRENT_VERSION);
227 }
228
229 #[test]
230 fn migration_sql_range_subset() {
231 let subset = migration_sql_range(2, CURRENT_VERSION);
232 assert!(subset.iter().all(|(v, _, _)| *v > 2));
233 assert_eq!(subset.len(), (CURRENT_VERSION - 2) as usize);
234 }
235
236 #[test]
237 fn migration_sql_range_single() {
238 let single = migration_sql_range(2, 3);
239 assert_eq!(single.len(), 1);
240 assert_eq!(single[0].0, 3);
241 assert!(!single[0].2.is_empty());
242 }
243
244 #[test]
245 fn migration_sql_range_empty_when_equal() {
246 let empty = migration_sql_range(CURRENT_VERSION, CURRENT_VERSION);
247 assert!(empty.is_empty());
248 }
249
250 #[test]
251 fn migration_sql_range_empty_when_inverted() {
252 let empty = migration_sql_range(3, 1);
253 assert!(empty.is_empty());
254 }
255
256 #[test]
257 fn migration_sql_range_matches_full() {
258 let full = migration_sql();
259 let ranged = migration_sql_range(0, CURRENT_VERSION);
260 assert_eq!(full.len(), ranged.len());
261 for (f, r) in full.iter().zip(ranged.iter()) {
262 assert_eq!(f.0, r.0);
263 assert_eq!(f.1, r.1);
264 assert_eq!(f.2, r.2);
265 }
266 }
267}