graphile_worker_migrations 0.4.17

Migrations package for graphile_worker, a high performance Rust/PostgreSQL job queue
Documentation
use indoc::indoc;

use super::GraphileWorkerMigration;

pub const M000005_MIGRATION: GraphileWorkerMigration = GraphileWorkerMigration {
    name: "m000005",
    is_breaking: false,
    stmts: &[
        indoc! {r#"
            alter table :GRAPHILE_WORKER_SCHEMA.jobs add column revision int default 0 not null;
        "#},
        indoc! {r#"
            alter table :GRAPHILE_WORKER_SCHEMA.jobs add column flags jsonb default null;
        "#},
        indoc! {r#"
            drop function :GRAPHILE_WORKER_SCHEMA.add_job(text, json, text, timestamptz, int, text, int);
        "#},
        indoc! {r#"
            create function :GRAPHILE_WORKER_SCHEMA.add_job(
                identifier text,
                payload json = null,
                queue_name text = null,
                run_at timestamptz = null,
                max_attempts int = null,
                job_key text = null,
                priority int = null,
                flags text[] = null
            ) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$
            declare
                v_job :GRAPHILE_WORKER_SCHEMA.jobs;
            begin
                -- Apply rationality checks
                if length(identifier) > 128 then
                    raise exception 'Task identifier is too long (max length: 128).' using errcode = 'GWBID';
                end if;
                if queue_name is not null and length(queue_name) > 128 then
                    raise exception 'Job queue name is too long (max length: 128).' using errcode = 'GWBQN';
                end if;
                if job_key is not null and length(job_key) > 512 then
                    raise exception 'Job key is too long (max length: 512).' using errcode = 'GWBJK';
                end if;
                if max_attempts < 1 then
                    raise exception 'Job maximum attempts must be at least 1' using errcode = 'GWBMA';
                end if;

                if job_key is not null then
                    -- Upsert job
                    insert into :GRAPHILE_WORKER_SCHEMA.jobs (
                        task_identifier,
                        payload,
                        queue_name,
                        run_at,
                        max_attempts,
                        key,
                        priority,
                        flags
                    )
                        values(
                            identifier,
                            coalesce(payload, '{}'::json),
                            queue_name,
                            coalesce(run_at, now()),
                            coalesce(max_attempts, 25),
                            job_key,
                            coalesce(priority, 0),
                            (
                              select jsonb_object_agg(flag, true)
                              from unnest(flags) as item(flag)
                            )
                        )
                        on conflict (key) do update set
                            task_identifier=excluded.task_identifier,
                            payload=excluded.payload,
                            queue_name=excluded.queue_name,
                            max_attempts=excluded.max_attempts,
                            run_at=excluded.run_at,
                            priority=excluded.priority,
                            revision=jobs.revision + 1,
                            flags=excluded.flags,

                            -- always reset error/retry state
                            attempts=0,
                            last_error=null
                        where jobs.locked_at is null
                        returning *
                        into v_job;

                    -- If upsert succeeded (insert or update), return early
                    if not (v_job is null) then
                        return v_job;
                    end if;

                    -- Upsert failed -> there must be an existing job that is locked. Remove
                    -- existing key to allow a new one to be inserted, and prevent any
                    -- subsequent retries by bumping attempts to the max allowed.
                    update :GRAPHILE_WORKER_SCHEMA.jobs
                        set
                            key = null,
                            attempts = jobs.max_attempts
                        where key = job_key;
                end if;

                -- insert the new job. Assume no conflicts due to the update above
                insert into :GRAPHILE_WORKER_SCHEMA.jobs(
                    task_identifier,
                    payload,
                    queue_name,
                    run_at,
                    max_attempts,
                    key,
                    priority,
                    flags
                )
                values(
                    identifier,
                    coalesce(payload, '{}'::json),
                    queue_name,
                    coalesce(run_at, now()),
                    coalesce(max_attempts, 25),
                    job_key,
                    coalesce(priority, 0),
                    (
                        select jsonb_object_agg(flag, true)
                        from unnest(flags) as item(flag)
                    )
                )
                returning *
                into v_job;

                return v_job;
            end;
            $$ language plpgsql volatile;
        "#},
        indoc! {r#"
            drop function :GRAPHILE_WORKER_SCHEMA.get_job(text, text[], interval);
        "#},
        indoc! {r#"
            create function :GRAPHILE_WORKER_SCHEMA.get_job(
                worker_id text,
                task_identifiers text[] = null,
                job_expiry interval = interval '4 hours',
                forbidden_flags text[] = null
            ) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$
            declare
                v_job_id bigint;
                v_queue_name text;
                v_row :GRAPHILE_WORKER_SCHEMA.jobs;
                v_now timestamptz = now();
            begin
                if worker_id is null or length(worker_id) < 10 then
                    raise exception 'invalid worker id';
                end if;

                select jobs.queue_name, jobs.id into v_queue_name, v_job_id
                    from :GRAPHILE_WORKER_SCHEMA.jobs
                    where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry))
                    and (
                        jobs.queue_name is null
                        or
                        exists (
                            select 1
                            from :GRAPHILE_WORKER_SCHEMA.job_queues
                            where job_queues.queue_name = jobs.queue_name
                            and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry))
                            for update
                            skip locked
                        )
                    )
                    and run_at <= v_now
                    and attempts < max_attempts
                    and (task_identifiers is null or task_identifier = any(task_identifiers))
                    and (forbidden_flags is null or (flags ?| forbidden_flags) is not true)
                    order by priority asc, run_at asc, id asc
                    limit 1
                    for update
                    skip locked;

                if v_job_id is null then
                    return null;
                end if;

                if v_queue_name is not null then
                    update :GRAPHILE_WORKER_SCHEMA.job_queues
                    set
                        locked_by = worker_id,
                        locked_at = v_now
                    where job_queues.queue_name = v_queue_name;
                end if;

                update :GRAPHILE_WORKER_SCHEMA.jobs
                    set
                        attempts = attempts + 1,
                        locked_by = worker_id,
                        locked_at = v_now
                    where id = v_job_id
                    returning * into v_row;
                return v_row;
            end;
            $$ language plpgsql volatile;
        "#},
    ],
};