Skip to main content

awa_model/
migrations.rs

1use crate::error::AwaError;
2use sqlx::postgres::PgConnection;
3use sqlx::PgPool;
4use tracing::info;
5
6/// Current schema version.
7pub const CURRENT_VERSION: i32 = 3;
8
9/// All migrations in order.
10const MIGRATIONS: &[(i32, &str, &[&str])] = &[(3, "Canonical schema with UI indexes", &[V3_UP])];
11
12/// The canonical schema (V3: V2 + BRIN on created_at + GIN on tags).
13const V3_UP: &str = r#"
14-- Awa schema v2: Canonical hot/deferred schema with structured progress
15
16CREATE SCHEMA IF NOT EXISTS awa;
17
18CREATE TYPE awa.job_state AS ENUM (
19    'scheduled', 'available', 'running',
20    'completed', 'retryable', 'failed', 'cancelled', 'waiting_external'
21);
22
23CREATE SEQUENCE awa.jobs_id_seq;
24
25CREATE TABLE awa.jobs_hot (
26    id                  BIGINT      NOT NULL DEFAULT nextval('awa.jobs_id_seq') PRIMARY KEY,
27    kind                TEXT        NOT NULL,
28    queue               TEXT        NOT NULL DEFAULT 'default',
29    args                JSONB       NOT NULL DEFAULT '{}',
30    state               awa.job_state NOT NULL DEFAULT 'available',
31    priority            SMALLINT    NOT NULL DEFAULT 2,
32    attempt             SMALLINT    NOT NULL DEFAULT 0,
33    max_attempts        SMALLINT    NOT NULL DEFAULT 25,
34    run_at              TIMESTAMPTZ NOT NULL DEFAULT now(),
35    heartbeat_at        TIMESTAMPTZ,
36    deadline_at         TIMESTAMPTZ,
37    attempted_at        TIMESTAMPTZ,
38    finalized_at        TIMESTAMPTZ,
39    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
40    errors              JSONB[]     DEFAULT '{}',
41    metadata            JSONB       NOT NULL DEFAULT '{}',
42    tags                TEXT[]      NOT NULL DEFAULT '{}',
43    unique_key          BYTEA,
44    unique_states       BIT(8),
45    callback_id         UUID,
46    callback_timeout_at TIMESTAMPTZ,
47    callback_filter     TEXT,
48    callback_on_complete TEXT,
49    callback_on_fail    TEXT,
50    callback_transform  TEXT,
51    run_lease           BIGINT      NOT NULL DEFAULT 0,
52    progress            JSONB,
53
54    CONSTRAINT jobs_hot_state_check CHECK (state NOT IN ('scheduled', 'retryable')),
55    CONSTRAINT jobs_hot_priority_in_range CHECK (priority BETWEEN 1 AND 4),
56    CONSTRAINT jobs_hot_max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
57    CONSTRAINT jobs_hot_queue_name_length CHECK (length(queue) <= 200),
58    CONSTRAINT jobs_hot_kind_length CHECK (length(kind) <= 200),
59    CONSTRAINT jobs_hot_tags_count CHECK (cardinality(tags) <= 20)
60);
61
62CREATE TABLE awa.scheduled_jobs (
63    id                  BIGINT      NOT NULL DEFAULT nextval('awa.jobs_id_seq') PRIMARY KEY,
64    kind                TEXT        NOT NULL,
65    queue               TEXT        NOT NULL DEFAULT 'default',
66    args                JSONB       NOT NULL DEFAULT '{}',
67    state               awa.job_state NOT NULL DEFAULT 'scheduled',
68    priority            SMALLINT    NOT NULL DEFAULT 2,
69    attempt             SMALLINT    NOT NULL DEFAULT 0,
70    max_attempts        SMALLINT    NOT NULL DEFAULT 25,
71    run_at              TIMESTAMPTZ NOT NULL DEFAULT now(),
72    heartbeat_at        TIMESTAMPTZ,
73    deadline_at         TIMESTAMPTZ,
74    attempted_at        TIMESTAMPTZ,
75    finalized_at        TIMESTAMPTZ,
76    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
77    errors              JSONB[]     DEFAULT '{}',
78    metadata            JSONB       NOT NULL DEFAULT '{}',
79    tags                TEXT[]      NOT NULL DEFAULT '{}',
80    unique_key          BYTEA,
81    unique_states       BIT(8),
82    callback_id         UUID,
83    callback_timeout_at TIMESTAMPTZ,
84    callback_filter     TEXT,
85    callback_on_complete TEXT,
86    callback_on_fail    TEXT,
87    callback_transform  TEXT,
88    run_lease           BIGINT      NOT NULL DEFAULT 0,
89    progress            JSONB,
90
91    CONSTRAINT scheduled_jobs_state_check CHECK (state IN ('scheduled', 'retryable')),
92    CONSTRAINT scheduled_jobs_priority_in_range CHECK (priority BETWEEN 1 AND 4),
93    CONSTRAINT scheduled_jobs_max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
94    CONSTRAINT scheduled_jobs_queue_name_length CHECK (length(queue) <= 200),
95    CONSTRAINT scheduled_jobs_kind_length CHECK (length(kind) <= 200),
96    CONSTRAINT scheduled_jobs_tags_count CHECK (cardinality(tags) <= 20)
97);
98
99CREATE TABLE awa.queue_meta (
100    queue       TEXT PRIMARY KEY,
101    paused      BOOLEAN NOT NULL DEFAULT FALSE,
102    paused_at   TIMESTAMPTZ,
103    paused_by   TEXT
104);
105
106CREATE TABLE awa.job_unique_claims (
107    unique_key  BYTEA NOT NULL,
108    job_id      BIGINT NOT NULL
109);
110
111CREATE TABLE awa.cron_jobs (
112    name             TEXT PRIMARY KEY,
113    cron_expr        TEXT        NOT NULL,
114    timezone         TEXT        NOT NULL DEFAULT 'UTC',
115    kind             TEXT        NOT NULL,
116    queue            TEXT        NOT NULL DEFAULT 'default',
117    args             JSONB       NOT NULL DEFAULT '{}',
118    priority         SMALLINT    NOT NULL DEFAULT 2,
119    max_attempts     SMALLINT    NOT NULL DEFAULT 25,
120    tags             TEXT[]      NOT NULL DEFAULT '{}',
121    metadata         JSONB       NOT NULL DEFAULT '{}',
122    last_enqueued_at TIMESTAMPTZ,
123    created_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
124    updated_at       TIMESTAMPTZ NOT NULL DEFAULT now()
125);
126
127CREATE TABLE awa.schema_version (
128    version     INT PRIMARY KEY,
129    description TEXT NOT NULL,
130    applied_at  TIMESTAMPTZ NOT NULL DEFAULT now()
131);
132
133CREATE FUNCTION awa.job_state_in_bitmask(bitmask BIT(8), state awa.job_state)
134RETURNS BOOLEAN AS $$
135    SELECT CASE state
136        WHEN 'scheduled'         THEN get_bit(bitmask, 0) = 1
137        WHEN 'available'         THEN get_bit(bitmask, 1) = 1
138        WHEN 'running'           THEN get_bit(bitmask, 2) = 1
139        WHEN 'completed'         THEN get_bit(bitmask, 3) = 1
140        WHEN 'retryable'         THEN get_bit(bitmask, 4) = 1
141        WHEN 'failed'            THEN get_bit(bitmask, 5) = 1
142        WHEN 'cancelled'         THEN get_bit(bitmask, 6) = 1
143        WHEN 'waiting_external'  THEN get_bit(bitmask, 7) = 1
144        ELSE FALSE
145    END;
146$$ LANGUAGE sql IMMUTABLE;
147
148CREATE FUNCTION awa.backoff_duration(attempt SMALLINT, max_attempts SMALLINT)
149RETURNS interval AS $$
150    SELECT LEAST(
151        (power(2, attempt)::int || ' seconds')::interval
152            + (random() * power(2, attempt) * 0.25 || ' seconds')::interval,
153        interval '24 hours'
154    );
155$$ LANGUAGE sql VOLATILE;
156
157CREATE OR REPLACE FUNCTION awa.notify_new_job() RETURNS trigger AS $$
158BEGIN
159    PERFORM pg_notify('awa:' || NEW.queue, '');
160    RETURN NEW;
161END;
162$$ LANGUAGE plpgsql;
163
164CREATE OR REPLACE FUNCTION awa.sync_job_unique_claims() RETURNS trigger AS $$
165DECLARE
166    old_claim BOOLEAN := FALSE;
167    new_claim BOOLEAN := FALSE;
168    existing_job_id BIGINT;
169BEGIN
170    IF TG_OP <> 'INSERT' THEN
171        old_claim := OLD.unique_key IS NOT NULL
172            AND OLD.unique_states IS NOT NULL
173            AND awa.job_state_in_bitmask(OLD.unique_states, OLD.state);
174    END IF;
175
176    IF TG_OP <> 'DELETE' THEN
177        new_claim := NEW.unique_key IS NOT NULL
178            AND NEW.unique_states IS NOT NULL
179            AND awa.job_state_in_bitmask(NEW.unique_states, NEW.state);
180    END IF;
181
182    IF old_claim AND (
183        NOT new_claim
184        OR OLD.unique_key IS DISTINCT FROM NEW.unique_key
185        OR OLD.id IS DISTINCT FROM NEW.id
186    ) THEN
187        DELETE FROM awa.job_unique_claims
188        WHERE unique_key = OLD.unique_key
189          AND job_id = OLD.id;
190    END IF;
191
192    IF new_claim AND (
193        NOT old_claim
194        OR OLD.unique_key IS DISTINCT FROM NEW.unique_key
195        OR OLD.id IS DISTINCT FROM NEW.id
196    ) THEN
197        BEGIN
198            INSERT INTO awa.job_unique_claims (unique_key, job_id)
199            VALUES (NEW.unique_key, NEW.id);
200        EXCEPTION
201            WHEN unique_violation THEN
202                SELECT job_id
203                INTO existing_job_id
204                FROM awa.job_unique_claims
205                WHERE unique_key = NEW.unique_key;
206
207                IF existing_job_id IS DISTINCT FROM NEW.id THEN
208                    RAISE unique_violation
209                        USING CONSTRAINT = 'idx_awa_jobs_unique',
210                              MESSAGE = 'duplicate key value violates unique constraint "idx_awa_jobs_unique"';
211                END IF;
212        END;
213    END IF;
214
215    IF TG_OP = 'DELETE' THEN
216        RETURN OLD;
217    END IF;
218    RETURN NEW;
219END;
220$$ LANGUAGE plpgsql;
221
222CREATE VIEW awa.jobs AS
223SELECT * FROM awa.jobs_hot
224UNION ALL
225SELECT * FROM awa.scheduled_jobs;
226
227CREATE OR REPLACE FUNCTION awa.write_jobs_view() RETURNS trigger AS $$
228DECLARE
229    target_table TEXT;
230    source_table TEXT;
231BEGIN
232    IF TG_OP = 'INSERT' THEN
233        NEW.id := COALESCE(NEW.id, nextval('awa.jobs_id_seq'));
234        NEW.queue := COALESCE(NEW.queue, 'default');
235        NEW.args := COALESCE(NEW.args, '{}'::jsonb);
236        NEW.state := COALESCE(NEW.state, 'available'::awa.job_state);
237        NEW.priority := COALESCE(NEW.priority, 2);
238        NEW.attempt := COALESCE(NEW.attempt, 0);
239        NEW.max_attempts := COALESCE(NEW.max_attempts, 25);
240        NEW.run_at := COALESCE(NEW.run_at, now());
241        NEW.created_at := COALESCE(NEW.created_at, now());
242        NEW.errors := COALESCE(NEW.errors, '{}'::jsonb[]);
243        NEW.metadata := COALESCE(NEW.metadata, '{}'::jsonb);
244        NEW.tags := COALESCE(NEW.tags, '{}'::text[]);
245        NEW.run_lease := COALESCE(NEW.run_lease, 0);
246    END IF;
247
248    IF TG_OP = 'DELETE' THEN
249        source_table := CASE
250            WHEN OLD.state IN ('scheduled'::awa.job_state, 'retryable'::awa.job_state)
251                THEN 'awa.scheduled_jobs'
252            ELSE 'awa.jobs_hot'
253        END;
254        EXECUTE format('DELETE FROM %s WHERE id = $1', source_table) USING OLD.id;
255        RETURN OLD;
256    END IF;
257
258    IF TG_OP = 'UPDATE' THEN
259        source_table := CASE
260            WHEN OLD.state IN ('scheduled'::awa.job_state, 'retryable'::awa.job_state)
261                THEN 'awa.scheduled_jobs'
262            ELSE 'awa.jobs_hot'
263        END;
264        EXECUTE format('DELETE FROM %s WHERE id = $1', source_table) USING OLD.id;
265    END IF;
266
267    target_table := CASE
268        WHEN NEW.state IN ('scheduled'::awa.job_state, 'retryable'::awa.job_state)
269            THEN 'awa.scheduled_jobs'
270        ELSE 'awa.jobs_hot'
271    END;
272
273    EXECUTE format(
274        'INSERT INTO %s (
275            id, kind, queue, args, state, priority, attempt, max_attempts,
276            run_at, heartbeat_at, deadline_at, attempted_at, finalized_at,
277            created_at, errors, metadata, tags, unique_key, unique_states,
278            callback_id, callback_timeout_at, callback_filter, callback_on_complete,
279            callback_on_fail, callback_transform, run_lease, progress
280        ) VALUES (
281            $1, $2, $3, $4, $5, $6, $7, $8,
282            $9, $10, $11, $12, $13,
283            $14, $15, $16, $17, $18, $19,
284            $20, $21, $22, $23,
285            $24, $25, $26, $27
286        )',
287        target_table
288    )
289    USING
290        NEW.id, NEW.kind, NEW.queue, NEW.args, NEW.state, NEW.priority, NEW.attempt,
291        NEW.max_attempts, NEW.run_at, NEW.heartbeat_at, NEW.deadline_at, NEW.attempted_at,
292        NEW.finalized_at, NEW.created_at, NEW.errors, NEW.metadata, NEW.tags,
293        NEW.unique_key, NEW.unique_states, NEW.callback_id, NEW.callback_timeout_at,
294        NEW.callback_filter, NEW.callback_on_complete, NEW.callback_on_fail,
295        NEW.callback_transform, NEW.run_lease, NEW.progress;
296
297    RETURN NEW;
298END;
299$$ LANGUAGE plpgsql;
300
301CREATE TRIGGER trg_awa_notify
302    AFTER INSERT ON awa.jobs_hot
303    FOR EACH ROW
304    WHEN (NEW.state = 'available' AND NEW.run_at <= now())
305    EXECUTE FUNCTION awa.notify_new_job();
306
307CREATE TRIGGER trg_jobs_hot_unique_claims_insert
308    AFTER INSERT ON awa.jobs_hot
309    FOR EACH ROW
310    WHEN (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
311    EXECUTE FUNCTION awa.sync_job_unique_claims();
312
313CREATE TRIGGER trg_jobs_hot_unique_claims_update
314    AFTER UPDATE ON awa.jobs_hot
315    FOR EACH ROW
316    WHEN (
317        (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
318        OR (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
319    )
320    EXECUTE FUNCTION awa.sync_job_unique_claims();
321
322CREATE TRIGGER trg_jobs_hot_unique_claims_delete
323    AFTER DELETE ON awa.jobs_hot
324    FOR EACH ROW
325    WHEN (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
326    EXECUTE FUNCTION awa.sync_job_unique_claims();
327
328CREATE TRIGGER trg_scheduled_jobs_unique_claims_insert
329    AFTER INSERT ON awa.scheduled_jobs
330    FOR EACH ROW
331    WHEN (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
332    EXECUTE FUNCTION awa.sync_job_unique_claims();
333
334CREATE TRIGGER trg_scheduled_jobs_unique_claims_update
335    AFTER UPDATE ON awa.scheduled_jobs
336    FOR EACH ROW
337    WHEN (
338        (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
339        OR (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
340    )
341    EXECUTE FUNCTION awa.sync_job_unique_claims();
342
343CREATE TRIGGER trg_scheduled_jobs_unique_claims_delete
344    AFTER DELETE ON awa.scheduled_jobs
345    FOR EACH ROW
346    WHEN (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
347    EXECUTE FUNCTION awa.sync_job_unique_claims();
348
349CREATE TRIGGER trg_awa_jobs_view_insert
350    INSTEAD OF INSERT ON awa.jobs
351    FOR EACH ROW
352    EXECUTE FUNCTION awa.write_jobs_view();
353
354CREATE TRIGGER trg_awa_jobs_view_update
355    INSTEAD OF UPDATE ON awa.jobs
356    FOR EACH ROW
357    EXECUTE FUNCTION awa.write_jobs_view();
358
359CREATE TRIGGER trg_awa_jobs_view_delete
360    INSTEAD OF DELETE ON awa.jobs
361    FOR EACH ROW
362    EXECUTE FUNCTION awa.write_jobs_view();
363
364CREATE INDEX idx_awa_jobs_hot_dequeue
365    ON awa.jobs_hot (queue, priority, run_at, id)
366    WHERE state = 'available';
367
368CREATE INDEX idx_awa_jobs_hot_heartbeat
369    ON awa.jobs_hot (heartbeat_at)
370    WHERE state = 'running';
371
372CREATE INDEX idx_awa_jobs_hot_deadline
373    ON awa.jobs_hot (deadline_at)
374    WHERE state = 'running' AND deadline_at IS NOT NULL;
375
376CREATE INDEX idx_awa_jobs_hot_kind_state
377    ON awa.jobs_hot (kind, state);
378
379CREATE UNIQUE INDEX idx_awa_jobs_hot_callback_id
380    ON awa.jobs_hot (callback_id)
381    WHERE callback_id IS NOT NULL;
382
383CREATE INDEX idx_awa_jobs_hot_callback_timeout
384    ON awa.jobs_hot (callback_timeout_at)
385    WHERE state = 'waiting_external' AND callback_timeout_at IS NOT NULL;
386
387CREATE INDEX idx_awa_scheduled_jobs_run_at_scheduled
388    ON awa.scheduled_jobs (run_at, id, queue)
389    WHERE state = 'scheduled';
390
391CREATE INDEX idx_awa_scheduled_jobs_run_at_retryable
392    ON awa.scheduled_jobs (run_at, id, queue)
393    WHERE state = 'retryable';
394
395CREATE INDEX idx_awa_scheduled_jobs_kind_state
396    ON awa.scheduled_jobs (kind, state);
397
398CREATE UNIQUE INDEX idx_awa_jobs_unique
399    ON awa.job_unique_claims (unique_key);
400
401-- BRIN indexes on created_at for time-range queries (UI dashboard, timeseries)
402CREATE INDEX idx_awa_jobs_hot_created_at
403    ON awa.jobs_hot USING BRIN (created_at) WITH (pages_per_range = 32);
404CREATE INDEX idx_awa_scheduled_jobs_created_at
405    ON awa.scheduled_jobs USING BRIN (created_at) WITH (pages_per_range = 32);
406
407-- GIN indexes on tags for array containment queries (tag filtering)
408CREATE INDEX idx_awa_jobs_hot_tags
409    ON awa.jobs_hot USING GIN (tags) WHERE tags IS NOT NULL AND tags != '{}';
410CREATE INDEX idx_awa_scheduled_jobs_tags
411    ON awa.scheduled_jobs USING GIN (tags) WHERE tags IS NOT NULL AND tags != '{}';
412
413INSERT INTO awa.schema_version (version, description)
414VALUES (3, 'Canonical schema with UI indexes');
415"#;
416
417/// Run all pending migrations against the database.
418///
419/// Because Awa does not have external users yet, any pre-canonical `awa`
420/// schema is replaced with the canonical schema rather than upgraded through a
421/// historical chain.
422///
423/// Takes `&PgPool` for ergonomic use from Rust. For a `Send`-safe variant
424/// that takes the pool by value, see [`run_owned`].
425pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
426    let lock_key: i64 = 0x4157_415f_4d49_4752; // "AWA_MIGR"
427    let mut conn = pool.acquire().await?;
428    sqlx::query("SELECT pg_advisory_lock($1)")
429        .bind(lock_key)
430        .execute(&mut *conn)
431        .await?;
432
433    let result = run_inner(&mut conn).await;
434
435    let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
436        .bind(lock_key)
437        .execute(&mut *conn)
438        .await;
439
440    result
441}
442
443async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
444    let has_schema: bool =
445        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
446            .fetch_one(&mut *conn)
447            .await?;
448
449    let current = if has_schema {
450        current_version_conn(conn).await?
451    } else {
452        0
453    };
454
455    if has_schema && current == CURRENT_VERSION {
456        info!(version = current, "Schema is up to date");
457        return Ok(());
458    }
459
460    if has_schema {
461        info!(
462            existing_version = current,
463            "Replacing existing awa schema with canonical schema"
464        );
465        sqlx::raw_sql("DROP SCHEMA awa CASCADE")
466            .execute(&mut *conn)
467            .await?;
468    }
469
470    for &(version, description, steps) in MIGRATIONS {
471        info!(version, description, "Applying migration");
472        for step in steps {
473            sqlx::raw_sql(step).execute(&mut *conn).await?;
474        }
475        info!(version, "Migration applied");
476    }
477
478    Ok(())
479}
480
481/// Get the current schema version.
482pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
483    let mut conn = pool.acquire().await?;
484    current_version_conn(&mut conn).await
485}
486
487async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
488    let has_schema: bool =
489        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
490            .fetch_one(&mut *conn)
491            .await?;
492
493    if !has_schema {
494        return Ok(0);
495    }
496
497    let has_table: bool = sqlx::query_scalar(
498        "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
499    )
500    .fetch_one(&mut *conn)
501    .await?;
502
503    if !has_table {
504        return Ok(0);
505    }
506
507    let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
508        .fetch_one(&mut *conn)
509        .await?;
510
511    Ok(version.unwrap_or(0))
512}
513
514/// Get the raw SQL for all migrations (for extraction / external tooling).
515pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
516    MIGRATIONS
517        .iter()
518        .map(|&(v, d, steps)| (v, d, steps.join("\n")))
519        .collect()
520}