Skip to main content

awa_model/
migrations.rs

1use crate::error::AwaError;
2use sqlx::PgPool;
3use tracing::info;
4
5/// Current schema version.
6pub const CURRENT_VERSION: i32 = 1;
7
8/// The initial migration SQL.
9const V1_UP: &str = r#"
10-- Awa schema v1: Initial schema
11
12CREATE SCHEMA IF NOT EXISTS awa;
13
14CREATE TYPE awa.job_state AS ENUM (
15    'scheduled', 'available', 'running',
16    'completed', 'retryable', 'failed', 'cancelled'
17);
18
19CREATE TABLE awa.jobs (
20    id              BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
21    kind            TEXT        NOT NULL,
22    queue           TEXT        NOT NULL DEFAULT 'default',
23    args            JSONB       NOT NULL DEFAULT '{}',
24    state           awa.job_state NOT NULL DEFAULT 'available',
25    priority        SMALLINT    NOT NULL DEFAULT 2,
26    attempt         SMALLINT    NOT NULL DEFAULT 0,
27    max_attempts    SMALLINT    NOT NULL DEFAULT 25,
28    run_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
29    heartbeat_at    TIMESTAMPTZ,
30    deadline_at     TIMESTAMPTZ,
31    attempted_at    TIMESTAMPTZ,
32    finalized_at    TIMESTAMPTZ,
33    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
34    errors          JSONB[]     DEFAULT '{}',
35    metadata        JSONB       NOT NULL DEFAULT '{}',
36    tags            TEXT[]      NOT NULL DEFAULT '{}',
37    unique_key      BYTEA,
38    unique_states   BIT(8),
39
40    CONSTRAINT priority_in_range CHECK (priority BETWEEN 1 AND 4),
41    CONSTRAINT max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
42    CONSTRAINT queue_name_length CHECK (length(queue) <= 200),
43    CONSTRAINT kind_length CHECK (length(kind) <= 200),
44    CONSTRAINT tags_count CHECK (cardinality(tags) <= 20)
45);
46
47CREATE TABLE awa.queue_meta (
48    queue       TEXT PRIMARY KEY,
49    paused      BOOLEAN NOT NULL DEFAULT FALSE,
50    paused_at   TIMESTAMPTZ,
51    paused_by   TEXT
52);
53
54CREATE TABLE awa.schema_version (
55    version     INT PRIMARY KEY,
56    description TEXT NOT NULL,
57    applied_at  TIMESTAMPTZ NOT NULL DEFAULT now()
58);
59
60-- Functions (must be created before indexes that reference them)
61
62CREATE FUNCTION awa.job_state_in_bitmask(bitmask BIT(8), state awa.job_state)
63RETURNS BOOLEAN AS $$
64    SELECT CASE state
65        WHEN 'scheduled'  THEN get_bit(bitmask, 0) = 1
66        WHEN 'available'  THEN get_bit(bitmask, 1) = 1
67        WHEN 'running'    THEN get_bit(bitmask, 2) = 1
68        WHEN 'completed'  THEN get_bit(bitmask, 3) = 1
69        WHEN 'retryable'  THEN get_bit(bitmask, 4) = 1
70        WHEN 'failed'     THEN get_bit(bitmask, 5) = 1
71        WHEN 'cancelled'  THEN get_bit(bitmask, 6) = 1
72        ELSE FALSE
73    END;
74$$ LANGUAGE sql IMMUTABLE;
75
76CREATE FUNCTION awa.backoff_duration(attempt SMALLINT, max_attempts SMALLINT)
77RETURNS interval AS $$
78    SELECT LEAST(
79        (power(2, attempt)::int || ' seconds')::interval
80            + (random() * power(2, attempt) * 0.25 || ' seconds')::interval,
81        interval '24 hours'
82    );
83$$ LANGUAGE sql VOLATILE;
84
85CREATE FUNCTION awa.notify_new_job() RETURNS trigger AS $$
86BEGIN
87    PERFORM pg_notify('awa:' || NEW.queue, NEW.id::text);
88    RETURN NEW;
89END;
90$$ LANGUAGE plpgsql;
91
92CREATE TRIGGER trg_awa_notify
93    AFTER INSERT ON awa.jobs
94    FOR EACH ROW
95    WHEN (NEW.state = 'available' AND NEW.run_at <= now())
96    EXECUTE FUNCTION awa.notify_new_job();
97
98-- Indexes
99
100-- Dequeue hot path
101CREATE INDEX idx_awa_jobs_dequeue
102    ON awa.jobs (queue, priority, run_at, id)
103    WHERE state = 'available';
104
105-- Heartbeat staleness (crash detection)
106CREATE INDEX idx_awa_jobs_heartbeat
107    ON awa.jobs (heartbeat_at)
108    WHERE state = 'running';
109
110-- Hard deadline (runaway protection)
111CREATE INDEX idx_awa_jobs_deadline
112    ON awa.jobs (deadline_at)
113    WHERE state = 'running' AND deadline_at IS NOT NULL;
114
115-- Uniqueness enforcement
116CREATE UNIQUE INDEX idx_awa_jobs_unique
117    ON awa.jobs (unique_key)
118    WHERE unique_key IS NOT NULL
119      AND unique_states IS NOT NULL
120      AND awa.job_state_in_bitmask(unique_states, state);
121
122-- Kind-based lookups (admin, monitoring)
123CREATE INDEX idx_awa_jobs_kind_state
124    ON awa.jobs (kind, state);
125
126-- Record version
127INSERT INTO awa.schema_version (version, description) VALUES (1, 'Initial schema');
128"#;
129
130/// Run all pending migrations against the database.
131///
132/// Uses an advisory lock to prevent concurrent migration attempts.
133pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
134    // Acquire advisory lock to serialize migration (key chosen to not collide with maintenance)
135    let lock_key: i64 = 0x4157_415f_4d49_4752; // "AWA_MIGR"
136    sqlx::query("SELECT pg_advisory_lock($1)")
137        .bind(lock_key)
138        .execute(pool)
139        .await?;
140
141    let result = run_inner(pool).await;
142
143    // Always release the lock
144    let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
145        .bind(lock_key)
146        .execute(pool)
147        .await;
148
149    result
150}
151
152async fn run_inner(pool: &PgPool) -> Result<(), AwaError> {
153    let has_schema: bool =
154        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
155            .fetch_one(pool)
156            .await?;
157
158    if !has_schema {
159        info!("Running initial migration (v1)");
160        sqlx::raw_sql(V1_UP).execute(pool).await?;
161        info!("Migration v1 applied successfully");
162        return Ok(());
163    }
164
165    // Check if schema_version table exists
166    let has_version_table: bool = sqlx::query_scalar(
167        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
168    )
169    .fetch_one(pool)
170    .await?;
171
172    if !has_version_table {
173        info!("Running initial migration (v1)");
174        sqlx::raw_sql(V1_UP).execute(pool).await?;
175        info!("Migration v1 applied successfully");
176        return Ok(());
177    }
178
179    let current: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
180        .fetch_one(pool)
181        .await?;
182
183    let current_version = current.unwrap_or(0);
184
185    if current_version >= CURRENT_VERSION {
186        info!(version = current_version, "Schema is up to date");
187        return Ok(());
188    }
189
190    if current_version == 0 {
191        info!("Running initial migration (v1)");
192        sqlx::raw_sql(V1_UP).execute(pool).await?;
193        info!("Migration v1 applied successfully");
194    }
195
196    Ok(())
197}
198
199/// Get the current schema version.
200pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
201    let has_schema: bool =
202        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
203            .fetch_one(pool)
204            .await?;
205
206    if !has_schema {
207        return Ok(0);
208    }
209
210    let has_table: bool = sqlx::query_scalar(
211        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
212    )
213    .fetch_one(pool)
214    .await?;
215
216    if !has_table {
217        return Ok(0);
218    }
219
220    let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
221        .fetch_one(pool)
222        .await?;
223
224    Ok(version.unwrap_or(0))
225}
226
227/// Get the raw SQL for all migrations (for extraction / external tooling).
228pub fn migration_sql() -> Vec<(i32, &'static str, &'static str)> {
229    vec![(1, "Initial schema", V1_UP)]
230}