graphile_worker_migrations 0.4.15

Migrations package for graphile_worker, a high performance Rust/PostgreSQL job queue
Documentation
use indoc::indoc;

use super::GraphileWorkerMigration;

pub const M000009_MIGRATION: GraphileWorkerMigration = GraphileWorkerMigration {
    name: "m000009",
    is_breaking: false,
    stmts: &[
        indoc! {r#"
            drop function :GRAPHILE_WORKER_SCHEMA.get_job(text, text[], interval, text[]);
        "#},
        indoc! {r#"
            create function :GRAPHILE_WORKER_SCHEMA.get_job(
                worker_id text,
                task_identifiers text[] = null,
                job_expiry interval = interval '4 hours',
                forbidden_flags text[] = null,
                now timestamptz = now()
            ) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$
            declare
                v_job_id bigint;
                v_queue_name text;
                v_row :GRAPHILE_WORKER_SCHEMA.jobs;
            begin
                if worker_id is null or length(worker_id) < 10 then
                    raise exception 'invalid worker id';
                end if;

                select jobs.queue_name, jobs.id into v_queue_name, v_job_id
                    from :GRAPHILE_WORKER_SCHEMA.jobs
                    where (jobs.locked_at is null or jobs.locked_at < (now - job_expiry))
                    and (
                        jobs.queue_name is null
                    or
                        exists (
                            select 1
                            from :GRAPHILE_WORKER_SCHEMA.job_queues
                            where job_queues.queue_name = jobs.queue_name
                            and (job_queues.locked_at is null or job_queues.locked_at < (now - job_expiry))
                            for update
                            skip locked
                        )
                    )
                    and run_at <= now
                    and attempts < max_attempts
                    and (task_identifiers is null or task_identifier = any(task_identifiers))
                    and (forbidden_flags is null or (flags ?| forbidden_flags) is not true)
                    order by priority asc, run_at asc, id asc
                    limit 1
                    for update
                    skip locked;

                if v_job_id is null then
                    return null;
                end if;

                if v_queue_name is not null then
                    update :GRAPHILE_WORKER_SCHEMA.job_queues
                        set
                            locked_by = worker_id,
                            locked_at = now
                        where job_queues.queue_name = v_queue_name;
                end if;

                update :GRAPHILE_WORKER_SCHEMA.jobs
                    set
                        attempts = attempts + 1,
                        locked_by = worker_id,
                        locked_at = now
                    where id = v_job_id
                    returning * into v_row;

                return v_row;
            end;
            $$ language plpgsql volatile;
        "#},
    ],
};