Skip to main content

awa_model/
migrations.rs

1use crate::error::AwaError;
2use sqlx::postgres::PgConnection;
3use sqlx::PgPool;
4use tracing::info;
5
6/// Current schema version.
7pub const CURRENT_VERSION: i32 = 6;
8
9/// All migrations in order. SQL lives in `awa-model/migrations/*.sql`
10/// for easy inspection by users who run their own migration tooling.
11///
12/// ## Migration policy
13///
14/// Migrations MUST be **additive only**:
15/// - Add tables, columns (with defaults), indexes, functions
16/// - Never drop columns, change types, or tighten constraints
17///
18/// This ensures running workers are not broken by a schema upgrade.
19/// For breaking schema changes, bump the major version and document
20/// the required stop-the-world upgrade procedure.
21const MIGRATIONS: &[(i32, &str, &[&str])] = &[
22    (1, "Canonical schema with UI indexes", &[V1_UP]),
23    (2, "Runtime observability snapshots", &[V2_UP]),
24    (3, "Maintenance loop health in runtime snapshots", &[V3_UP]),
25    (4, "Admin metadata cache tables", &[V4_UP]),
26    (5, "Statement-level admin metadata triggers", &[V5_UP]),
27    (
28        6,
29        "Dirty-key statement triggers for deadlock-free admin metadata",
30        &[V6_UP],
31    ),
32];
33
34const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql");
35const V2_UP: &str = include_str!("../migrations/v002_runtime_instances.sql");
36const V3_UP: &str = include_str!("../migrations/v003_maintenance_health.sql");
37const V4_UP: &str = include_str!("../migrations/v004_admin_metadata.sql");
38const V5_UP: &str = include_str!("../migrations/v005_admin_metadata_stmt_triggers.sql");
39const V6_UP: &str = include_str!("../migrations/v006_remove_hot_table_triggers.sql");
40
41/// Old version numbers from pre-0.4 releases that used V3/V4/V5 numbering.
42/// Also tolerates the unreleased inline-V6 branch numbering used during review.
43/// Maps old max version → equivalent new version.
44fn normalize_legacy_version(old_version: i32) -> i32 {
45    match old_version {
46        v if v >= 6 => 4, // legacy/unreleased V6 admin metadata = V4 (new)
47        5 => 3,           // V5 (0.3.x) = V3 (new)
48        4 => 2,           // V4 = V2 (new)
49        3 => 1,           // V3 = V1 (new)
50        _ => 0,           // Pre-canonical or fresh
51    }
52}
53
54/// Run all pending migrations against the database.
55///
56/// Applies only migrations newer than the current schema version.
57/// V1 bootstraps the canonical schema from scratch; V2+ are incremental
58/// and use `IF NOT EXISTS` guards so they are safe to re-run.
59///
60/// by replacing the legacy `schema_version` rows with the new numbering.
61///
62/// Takes `&PgPool` for ergonomic use from Rust.
63pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
64    let lock_key: i64 = 0x4157_415f_4d49_4752; // "AWA_MIGR"
65    let mut conn = pool.acquire().await?;
66    sqlx::query("SELECT pg_advisory_lock($1)")
67        .bind(lock_key)
68        .execute(&mut *conn)
69        .await?;
70
71    let result = run_inner(&mut conn).await;
72
73    let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
74        .bind(lock_key)
75        .execute(&mut *conn)
76        .await;
77
78    result
79}
80
81async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
82    let has_schema: bool =
83        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
84            .fetch_one(&mut *conn)
85            .await?;
86
87    let current = if has_schema {
88        current_version_conn(conn).await?
89    } else {
90        0
91    };
92
93    if !(has_schema && current == CURRENT_VERSION) {
94        for &(version, description, steps) in MIGRATIONS {
95            if version <= current {
96                continue;
97            }
98            info!(version, description, "Applying migration");
99            for step in steps {
100                sqlx::raw_sql(step).execute(&mut *conn).await?;
101            }
102            info!(version, "Migration applied");
103        }
104    } else {
105        info!(version = current, "Schema is up to date");
106    }
107
108    // Ensure the admin metadata cache is warm. Since v006 removed the
109    // synchronous triggers on jobs_hot, the cache is only updated by the
110    // maintenance leader. Refreshing here guarantees queue_stats() and
111    // state_counts() return accurate data immediately after migrate().
112    let has_refresh: bool = sqlx::query_scalar(
113        "SELECT EXISTS(SELECT 1 FROM pg_proc WHERE proname = 'refresh_admin_metadata' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'awa'))",
114    )
115    .fetch_one(&mut *conn)
116    .await?;
117    if has_refresh {
118        // Best-effort cache warmup. Uses a short statement timeout to avoid
119        // blocking if a previous runtime's maintenance leader is still
120        // holding the cache advisory lock during a slow shutdown.
121        // Wrapped in an explicit transaction because SET LOCAL is only
122        // effective inside a transaction block (not in autocommit mode).
123        let _ = sqlx::raw_sql(
124            "BEGIN; SET LOCAL statement_timeout = '5s'; SELECT awa.refresh_admin_metadata(); COMMIT;",
125        )
126        .execute(&mut *conn)
127        .await;
128    }
129
130    Ok(())
131}
132
133/// Get the current schema version.
134pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
135    let mut conn = pool.acquire().await?;
136    current_version_conn(&mut conn).await
137}
138
139async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
140    let has_schema: bool =
141        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
142            .fetch_one(&mut *conn)
143            .await?;
144
145    if !has_schema {
146        return Ok(0);
147    }
148
149    let has_table: bool = sqlx::query_scalar(
150        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
151    )
152    .fetch_one(&mut *conn)
153    .await?;
154
155    if !has_table {
156        return Ok(0);
157    }
158
159    let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
160        .fetch_one(&mut *conn)
161        .await?;
162
163    let raw_version = version.unwrap_or(0);
164
165    // If max version is within the current MIGRATIONS range and the expected
166    // tables exist, this is a current install — skip legacy detection.
167    if (1..=CURRENT_VERSION).contains(&raw_version) {
168        // Quick check: does the schema match what we expect at this version?
169        // If queue_state_counts exists, we're past v4 in the current numbering.
170        let has_admin_tables: bool = sqlx::query_scalar(
171            "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
172        )
173        .fetch_one(&mut *conn)
174        .await
175        .unwrap_or(false);
176
177        // Current v4+ has queue_state_counts. If we're at v4+ and have
178        // the table, this is definitely a current install.
179        if raw_version >= 4 && has_admin_tables {
180            return Ok(raw_version);
181        }
182        // Current v1-v3 don't have queue_state_counts.
183        if raw_version <= 3 {
184            let has_runtime: bool = sqlx::query_scalar(
185                "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
186            )
187            .fetch_one(&mut *conn)
188            .await
189            .unwrap_or(false);
190            // v2+ has runtime_instances. If present, current install.
191            if (raw_version >= 2 && has_runtime) || raw_version == 1 {
192                return Ok(raw_version);
193            }
194        }
195    }
196
197    // Detect legacy version numbering from pre-0.4 releases.
198    // Legacy installs used a different numbering scheme where v3-v6 mapped
199    // to what is now v1-v4.
200    let has_legacy_high: bool =
201        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM awa.schema_version WHERE version >= 6)")
202            .fetch_one(&mut *conn)
203            .await
204            .unwrap_or(false);
205
206    let has_admin_metadata: bool = sqlx::query_scalar(
207        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
208    )
209    .fetch_one(&mut *conn)
210    .await
211    .unwrap_or(false);
212
213    let is_legacy_v5_only = raw_version == 5 && !has_legacy_high && !has_admin_metadata;
214    let is_legacy_v4_only = raw_version == 4 && !has_legacy_high && !has_admin_metadata;
215
216    // Also detect a single legacy V3 row (0.3.0 with only canonical schema)
217    // by checking if runtime_instances exists — if not, this is legacy V3.
218    let is_legacy_v3_only = raw_version == 3
219        && !has_legacy_high
220        && {
221            let has_runtime: bool = sqlx::query_scalar(
222            "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
223        )
224        .fetch_one(&mut *conn)
225        .await
226        .unwrap_or(false);
227            !has_runtime
228        };
229
230    if has_legacy_high || is_legacy_v5_only || is_legacy_v4_only || is_legacy_v3_only {
231        let normalized = normalize_legacy_version(raw_version);
232        info!(
233            old_version = raw_version,
234            new_version = normalized,
235            "Normalizing legacy version numbering"
236        );
237        // Replace legacy rows so future calls return the new numbering.
238        sqlx::query("DELETE FROM awa.schema_version WHERE version >= 3")
239            .execute(&mut *conn)
240            .await?;
241        for &(v, desc, _) in MIGRATIONS {
242            if v <= normalized {
243                sqlx::query(
244                    "INSERT INTO awa.schema_version (version, description) VALUES ($1, $2) ON CONFLICT (version) DO NOTHING",
245                )
246                .bind(v)
247                .bind(desc)
248                .execute(&mut *conn)
249                .await?;
250            }
251        }
252        return Ok(normalized);
253    }
254
255    Ok(raw_version)
256}
257
258/// Get the raw SQL for all migrations (for extraction / external tooling).
259pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
260    MIGRATIONS
261        .iter()
262        .map(|&(v, d, steps)| (v, d, steps.join("\n")))
263        .collect()
264}
265
266/// Get migration SQL for a version range `(from, to]` — `from` is exclusive,
267/// `to` is inclusive. Returns only migrations where `from < version <= to`.
268pub fn migration_sql_range(from: i32, to: i32) -> Vec<(i32, &'static str, String)> {
269    MIGRATIONS
270        .iter()
271        .filter(|&&(v, _, _)| v > from && v <= to)
272        .map(|&(v, d, steps)| (v, d, steps.join("\n")))
273        .collect()
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn migration_sql_range_all() {
282        let all = migration_sql_range(0, CURRENT_VERSION);
283        assert_eq!(all.len(), MIGRATIONS.len());
284        assert_eq!(all.first().unwrap().0, 1);
285        assert_eq!(all.last().unwrap().0, CURRENT_VERSION);
286    }
287
288    #[test]
289    fn migration_sql_range_subset() {
290        let subset = migration_sql_range(2, CURRENT_VERSION);
291        assert!(subset.iter().all(|(v, _, _)| *v > 2));
292        assert_eq!(subset.len(), (CURRENT_VERSION - 2) as usize);
293    }
294
295    #[test]
296    fn migration_sql_range_single() {
297        let single = migration_sql_range(2, 3);
298        assert_eq!(single.len(), 1);
299        assert_eq!(single[0].0, 3);
300        assert!(!single[0].2.is_empty());
301    }
302
303    #[test]
304    fn migration_sql_range_empty_when_equal() {
305        let empty = migration_sql_range(CURRENT_VERSION, CURRENT_VERSION);
306        assert!(empty.is_empty());
307    }
308
309    #[test]
310    fn migration_sql_range_empty_when_inverted() {
311        let empty = migration_sql_range(3, 1);
312        assert!(empty.is_empty());
313    }
314
315    #[test]
316    fn migration_sql_range_matches_full() {
317        let full = migration_sql();
318        let ranged = migration_sql_range(0, CURRENT_VERSION);
319        assert_eq!(full.len(), ranged.len());
320        for (f, r) in full.iter().zip(ranged.iter()) {
321            assert_eq!(f.0, r.0);
322            assert_eq!(f.1, r.1);
323            assert_eq!(f.2, r.2);
324        }
325    }
326}