Skip to main content

awa_model/
migrations.rs

1use crate::error::AwaError;
2use sqlx::PgPool;
3use tracing::info;
4
5/// Current schema version.
6pub const CURRENT_VERSION: i32 = 2;
7
8/// All migrations in order.
9const MIGRATIONS: &[(i32, &str, &str)] = &[
10    (1, "Initial schema", V1_UP),
11    (2, "Periodic/cron jobs", V2_UP),
12];
13
14/// The initial migration SQL.
15const V1_UP: &str = r#"
16-- Awa schema v1: Initial schema
17
18CREATE SCHEMA IF NOT EXISTS awa;
19
20CREATE TYPE awa.job_state AS ENUM (
21    'scheduled', 'available', 'running',
22    'completed', 'retryable', 'failed', 'cancelled'
23);
24
25CREATE TABLE awa.jobs (
26    id              BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
27    kind            TEXT        NOT NULL,
28    queue           TEXT        NOT NULL DEFAULT 'default',
29    args            JSONB       NOT NULL DEFAULT '{}',
30    state           awa.job_state NOT NULL DEFAULT 'available',
31    priority        SMALLINT    NOT NULL DEFAULT 2,
32    attempt         SMALLINT    NOT NULL DEFAULT 0,
33    max_attempts    SMALLINT    NOT NULL DEFAULT 25,
34    run_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
35    heartbeat_at    TIMESTAMPTZ,
36    deadline_at     TIMESTAMPTZ,
37    attempted_at    TIMESTAMPTZ,
38    finalized_at    TIMESTAMPTZ,
39    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
40    errors          JSONB[]     DEFAULT '{}',
41    metadata        JSONB       NOT NULL DEFAULT '{}',
42    tags            TEXT[]      NOT NULL DEFAULT '{}',
43    unique_key      BYTEA,
44    unique_states   BIT(8),
45
46    CONSTRAINT priority_in_range CHECK (priority BETWEEN 1 AND 4),
47    CONSTRAINT max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
48    CONSTRAINT queue_name_length CHECK (length(queue) <= 200),
49    CONSTRAINT kind_length CHECK (length(kind) <= 200),
50    CONSTRAINT tags_count CHECK (cardinality(tags) <= 20)
51);
52
53CREATE TABLE awa.queue_meta (
54    queue       TEXT PRIMARY KEY,
55    paused      BOOLEAN NOT NULL DEFAULT FALSE,
56    paused_at   TIMESTAMPTZ,
57    paused_by   TEXT
58);
59
60CREATE TABLE awa.schema_version (
61    version     INT PRIMARY KEY,
62    description TEXT NOT NULL,
63    applied_at  TIMESTAMPTZ NOT NULL DEFAULT now()
64);
65
66-- Functions (must be created before indexes that reference them)
67
68CREATE FUNCTION awa.job_state_in_bitmask(bitmask BIT(8), state awa.job_state)
69RETURNS BOOLEAN AS $$
70    SELECT CASE state
71        WHEN 'scheduled'  THEN get_bit(bitmask, 0) = 1
72        WHEN 'available'  THEN get_bit(bitmask, 1) = 1
73        WHEN 'running'    THEN get_bit(bitmask, 2) = 1
74        WHEN 'completed'  THEN get_bit(bitmask, 3) = 1
75        WHEN 'retryable'  THEN get_bit(bitmask, 4) = 1
76        WHEN 'failed'     THEN get_bit(bitmask, 5) = 1
77        WHEN 'cancelled'  THEN get_bit(bitmask, 6) = 1
78        ELSE FALSE
79    END;
80$$ LANGUAGE sql IMMUTABLE;
81
82CREATE FUNCTION awa.backoff_duration(attempt SMALLINT, max_attempts SMALLINT)
83RETURNS interval AS $$
84    SELECT LEAST(
85        (power(2, attempt)::int || ' seconds')::interval
86            + (random() * power(2, attempt) * 0.25 || ' seconds')::interval,
87        interval '24 hours'
88    );
89$$ LANGUAGE sql VOLATILE;
90
91CREATE FUNCTION awa.notify_new_job() RETURNS trigger AS $$
92BEGIN
93    PERFORM pg_notify('awa:' || NEW.queue, NEW.id::text);
94    RETURN NEW;
95END;
96$$ LANGUAGE plpgsql;
97
98CREATE TRIGGER trg_awa_notify
99    AFTER INSERT ON awa.jobs
100    FOR EACH ROW
101    WHEN (NEW.state = 'available' AND NEW.run_at <= now())
102    EXECUTE FUNCTION awa.notify_new_job();
103
104-- Indexes
105
106-- Dequeue hot path
107CREATE INDEX idx_awa_jobs_dequeue
108    ON awa.jobs (queue, priority, run_at, id)
109    WHERE state = 'available';
110
111-- Heartbeat staleness (crash detection)
112CREATE INDEX idx_awa_jobs_heartbeat
113    ON awa.jobs (heartbeat_at)
114    WHERE state = 'running';
115
116-- Hard deadline (runaway protection)
117CREATE INDEX idx_awa_jobs_deadline
118    ON awa.jobs (deadline_at)
119    WHERE state = 'running' AND deadline_at IS NOT NULL;
120
121-- Uniqueness enforcement
122CREATE UNIQUE INDEX idx_awa_jobs_unique
123    ON awa.jobs (unique_key)
124    WHERE unique_key IS NOT NULL
125      AND unique_states IS NOT NULL
126      AND awa.job_state_in_bitmask(unique_states, state);
127
128-- Kind-based lookups (admin, monitoring)
129CREATE INDEX idx_awa_jobs_kind_state
130    ON awa.jobs (kind, state);
131
132-- Record version
133INSERT INTO awa.schema_version (version, description) VALUES (1, 'Initial schema');
134"#;
135
136/// V2 migration: Periodic/cron jobs table.
137const V2_UP: &str = r#"
138-- Awa schema v2: Periodic/cron jobs
139
140CREATE TABLE awa.cron_jobs (
141    name            TEXT PRIMARY KEY,
142    cron_expr       TEXT        NOT NULL,
143    timezone        TEXT        NOT NULL DEFAULT 'UTC',
144    kind            TEXT        NOT NULL,
145    queue           TEXT        NOT NULL DEFAULT 'default',
146    args            JSONB       NOT NULL DEFAULT '{}',
147    priority        SMALLINT    NOT NULL DEFAULT 2,
148    max_attempts    SMALLINT    NOT NULL DEFAULT 25,
149    tags            TEXT[]      NOT NULL DEFAULT '{}',
150    metadata        JSONB       NOT NULL DEFAULT '{}',
151    last_enqueued_at TIMESTAMPTZ,
152    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
153    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now()
154);
155
156INSERT INTO awa.schema_version (version, description)
157VALUES (2, 'Periodic/cron jobs');
158"#;
159
160/// Run all pending migrations against the database.
161///
162/// Uses an advisory lock to prevent concurrent migration attempts.
163pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
164    // Acquire advisory lock to serialize migration (key chosen to not collide with maintenance)
165    let lock_key: i64 = 0x4157_415f_4d49_4752; // "AWA_MIGR"
166    sqlx::query("SELECT pg_advisory_lock($1)")
167        .bind(lock_key)
168        .execute(pool)
169        .await?;
170
171    let result = run_inner(pool).await;
172
173    // Always release the lock
174    let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
175        .bind(lock_key)
176        .execute(pool)
177        .await;
178
179    result
180}
181
182async fn run_inner(pool: &PgPool) -> Result<(), AwaError> {
183    let has_schema: bool =
184        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
185            .fetch_one(pool)
186            .await?;
187
188    if !has_schema {
189        // Fresh database — apply all migrations in order
190        for &(version, description, sql) in MIGRATIONS {
191            info!(version, description, "Applying migration");
192            sqlx::raw_sql(sql).execute(pool).await?;
193            info!(version, "Migration applied");
194        }
195        return Ok(());
196    }
197
198    // Check if schema_version table exists
199    let has_version_table: bool = sqlx::query_scalar(
200        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
201    )
202    .fetch_one(pool)
203    .await?;
204
205    if !has_version_table {
206        // Schema exists but no version table — apply all from v1
207        for &(version, description, sql) in MIGRATIONS {
208            info!(version, description, "Applying migration");
209            sqlx::raw_sql(sql).execute(pool).await?;
210            info!(version, "Migration applied");
211        }
212        return Ok(());
213    }
214
215    let current: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
216        .fetch_one(pool)
217        .await?;
218
219    let current_version = current.unwrap_or(0);
220
221    if current_version >= CURRENT_VERSION {
222        info!(version = current_version, "Schema is up to date");
223        return Ok(());
224    }
225
226    // Walk pending migrations
227    for &(version, description, sql) in MIGRATIONS {
228        if version > current_version {
229            info!(version, description, "Applying migration");
230            sqlx::raw_sql(sql).execute(pool).await?;
231            info!(version, "Migration applied");
232        }
233    }
234
235    Ok(())
236}
237
238/// Get the current schema version.
239pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
240    let has_schema: bool =
241        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
242            .fetch_one(pool)
243            .await?;
244
245    if !has_schema {
246        return Ok(0);
247    }
248
249    let has_table: bool = sqlx::query_scalar(
250        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
251    )
252    .fetch_one(pool)
253    .await?;
254
255    if !has_table {
256        return Ok(0);
257    }
258
259    let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
260        .fetch_one(pool)
261        .await?;
262
263    Ok(version.unwrap_or(0))
264}
265
266/// Get the raw SQL for all migrations (for extraction / external tooling).
267pub fn migration_sql() -> Vec<(i32, &'static str, &'static str)> {
268    MIGRATIONS.iter().map(|&(v, d, s)| (v, d, s)).collect()
269}