Skip to main content

awa_model/
migrations.rs

1use crate::error::AwaError;
2use sqlx::postgres::PgConnection;
3use sqlx::PgPool;
4use tracing::info;
5
6/// Current schema version.
7pub const CURRENT_VERSION: i32 = 5;
8
9/// All migrations in order. SQL lives in `awa-model/migrations/*.sql`
10/// for easy inspection by users who run their own migration tooling.
11///
12/// ## Migration policy
13///
14/// Migrations MUST be **additive only**:
15/// - Add tables, columns (with defaults), indexes, functions
16/// - Never drop columns, change types, or tighten constraints
17///
18/// This ensures running workers are not broken by a schema upgrade.
19/// For breaking schema changes, bump the major version and document
20/// the required stop-the-world upgrade procedure.
21const MIGRATIONS: &[(i32, &str, &[&str])] = &[
22    (1, "Canonical schema with UI indexes", &[V1_UP]),
23    (2, "Runtime observability snapshots", &[V2_UP]),
24    (3, "Maintenance loop health in runtime snapshots", &[V3_UP]),
25    (4, "Admin metadata cache tables", &[V4_UP]),
26    (5, "Statement-level admin metadata triggers", &[V5_UP]),
27];
28
29const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql");
30const V2_UP: &str = include_str!("../migrations/v002_runtime_instances.sql");
31const V3_UP: &str = include_str!("../migrations/v003_maintenance_health.sql");
32const V4_UP: &str = include_str!("../migrations/v004_admin_metadata.sql");
33const V5_UP: &str = include_str!("../migrations/v005_admin_metadata_stmt_triggers.sql");
34
35/// Old version numbers from pre-0.4 releases that used V3/V4/V5 numbering.
36/// Also tolerates the unreleased inline-V6 branch numbering used during review.
37/// Maps old max version → equivalent new version.
38fn normalize_legacy_version(old_version: i32) -> i32 {
39    match old_version {
40        v if v >= 6 => 4, // legacy/unreleased V6 admin metadata = V4 (new)
41        5 => 3,           // V5 (0.3.x) = V3 (new)
42        4 => 2,           // V4 = V2 (new)
43        3 => 1,           // V3 = V1 (new)
44        _ => 0,           // Pre-canonical or fresh
45    }
46}
47
48/// Run all pending migrations against the database.
49///
50/// Applies only migrations newer than the current schema version.
51/// V1 bootstraps the canonical schema from scratch; V2+ are incremental
52/// and use `IF NOT EXISTS` guards so they are safe to re-run.
53///
54/// by replacing the legacy `schema_version` rows with the new numbering.
55///
56/// Takes `&PgPool` for ergonomic use from Rust.
57pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
58    let lock_key: i64 = 0x4157_415f_4d49_4752; // "AWA_MIGR"
59    let mut conn = pool.acquire().await?;
60    sqlx::query("SELECT pg_advisory_lock($1)")
61        .bind(lock_key)
62        .execute(&mut *conn)
63        .await?;
64
65    let result = run_inner(&mut conn).await;
66
67    let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
68        .bind(lock_key)
69        .execute(&mut *conn)
70        .await;
71
72    result
73}
74
75async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
76    let has_schema: bool =
77        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
78            .fetch_one(&mut *conn)
79            .await?;
80
81    let current = if has_schema {
82        current_version_conn(conn).await?
83    } else {
84        0
85    };
86
87    if has_schema && current == CURRENT_VERSION {
88        info!(version = current, "Schema is up to date");
89        return Ok(());
90    }
91
92    for &(version, description, steps) in MIGRATIONS {
93        if version <= current {
94            continue;
95        }
96        info!(version, description, "Applying migration");
97        for step in steps {
98            sqlx::raw_sql(step).execute(&mut *conn).await?;
99        }
100        info!(version, "Migration applied");
101    }
102
103    Ok(())
104}
105
106/// Get the current schema version.
107pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
108    let mut conn = pool.acquire().await?;
109    current_version_conn(&mut conn).await
110}
111
112async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
113    let has_schema: bool =
114        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
115            .fetch_one(&mut *conn)
116            .await?;
117
118    if !has_schema {
119        return Ok(0);
120    }
121
122    let has_table: bool = sqlx::query_scalar(
123        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
124    )
125    .fetch_one(&mut *conn)
126    .await?;
127
128    if !has_table {
129        return Ok(0);
130    }
131
132    let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
133        .fetch_one(&mut *conn)
134        .await?;
135
136    let raw_version = version.unwrap_or(0);
137
138    // Detect legacy version numbering from pre-0.4 releases.
139    // Version 5/6 are always legacy. Version 4 is legacy only if the new
140    // admin metadata schema is absent.
141    let has_legacy_high: bool =
142        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM awa.schema_version WHERE version >= 6)")
143            .fetch_one(&mut *conn)
144            .await
145            .unwrap_or(false);
146
147    let has_admin_metadata: bool = sqlx::query_scalar(
148        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
149    )
150    .fetch_one(&mut *conn)
151    .await
152    .unwrap_or(false);
153
154    let is_legacy_v5_only = raw_version == 5 && !has_legacy_high && !has_admin_metadata;
155    let is_legacy_v4_only = raw_version == 4 && !has_legacy_high && !has_admin_metadata;
156
157    // Also detect a single legacy V3 row (0.3.0 with only canonical schema)
158    // by checking if runtime_instances exists — if not, this is legacy V3.
159    let is_legacy_v3_only = raw_version == 3
160        && !has_legacy_high
161        && {
162            let has_runtime: bool = sqlx::query_scalar(
163            "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
164        )
165        .fetch_one(&mut *conn)
166        .await
167        .unwrap_or(false);
168            !has_runtime
169        };
170
171    if has_legacy_high || is_legacy_v5_only || is_legacy_v4_only || is_legacy_v3_only {
172        let normalized = normalize_legacy_version(raw_version);
173        info!(
174            old_version = raw_version,
175            new_version = normalized,
176            "Normalizing legacy version numbering"
177        );
178        // Replace legacy rows so future calls return the new numbering.
179        sqlx::query("DELETE FROM awa.schema_version WHERE version >= 3")
180            .execute(&mut *conn)
181            .await?;
182        for &(v, desc, _) in MIGRATIONS {
183            if v <= normalized {
184                sqlx::query(
185                    "INSERT INTO awa.schema_version (version, description) VALUES ($1, $2) ON CONFLICT (version) DO NOTHING",
186                )
187                .bind(v)
188                .bind(desc)
189                .execute(&mut *conn)
190                .await?;
191            }
192        }
193        return Ok(normalized);
194    }
195
196    Ok(raw_version)
197}
198
199/// Get the raw SQL for all migrations (for extraction / external tooling).
200pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
201    MIGRATIONS
202        .iter()
203        .map(|&(v, d, steps)| (v, d, steps.join("\n")))
204        .collect()
205}
206
207/// Get migration SQL for a version range `(from, to]` — `from` is exclusive,
208/// `to` is inclusive. Returns only migrations where `from < version <= to`.
209pub fn migration_sql_range(from: i32, to: i32) -> Vec<(i32, &'static str, String)> {
210    MIGRATIONS
211        .iter()
212        .filter(|&&(v, _, _)| v > from && v <= to)
213        .map(|&(v, d, steps)| (v, d, steps.join("\n")))
214        .collect()
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn migration_sql_range_all() {
223        let all = migration_sql_range(0, CURRENT_VERSION);
224        assert_eq!(all.len(), MIGRATIONS.len());
225        assert_eq!(all.first().unwrap().0, 1);
226        assert_eq!(all.last().unwrap().0, CURRENT_VERSION);
227    }
228
229    #[test]
230    fn migration_sql_range_subset() {
231        let subset = migration_sql_range(2, CURRENT_VERSION);
232        assert!(subset.iter().all(|(v, _, _)| *v > 2));
233        assert_eq!(subset.len(), (CURRENT_VERSION - 2) as usize);
234    }
235
236    #[test]
237    fn migration_sql_range_single() {
238        let single = migration_sql_range(2, 3);
239        assert_eq!(single.len(), 1);
240        assert_eq!(single[0].0, 3);
241        assert!(!single[0].2.is_empty());
242    }
243
244    #[test]
245    fn migration_sql_range_empty_when_equal() {
246        let empty = migration_sql_range(CURRENT_VERSION, CURRENT_VERSION);
247        assert!(empty.is_empty());
248    }
249
250    #[test]
251    fn migration_sql_range_empty_when_inverted() {
252        let empty = migration_sql_range(3, 1);
253        assert!(empty.is_empty());
254    }
255
256    #[test]
257    fn migration_sql_range_matches_full() {
258        let full = migration_sql();
259        let ranged = migration_sql_range(0, CURRENT_VERSION);
260        assert_eq!(full.len(), ranged.len());
261        for (f, r) in full.iter().zip(ranged.iter()) {
262            assert_eq!(f.0, r.0);
263            assert_eq!(f.1, r.1);
264            assert_eq!(f.2, r.2);
265        }
266    }
267}