use indoc::indoc;
use super::GraphileWorkerMigration;
pub const M000005_MIGRATION: GraphileWorkerMigration = GraphileWorkerMigration {
name: "m000005",
is_breaking: false,
stmts: &[
indoc! {r#"
alter table :GRAPHILE_WORKER_SCHEMA.jobs add column revision int default 0 not null;
"#},
indoc! {r#"
alter table :GRAPHILE_WORKER_SCHEMA.jobs add column flags jsonb default null;
"#},
indoc! {r#"
drop function :GRAPHILE_WORKER_SCHEMA.add_job(text, json, text, timestamptz, int, text, int);
"#},
indoc! {r#"
create function :GRAPHILE_WORKER_SCHEMA.add_job(
identifier text,
payload json = null,
queue_name text = null,
run_at timestamptz = null,
max_attempts int = null,
job_key text = null,
priority int = null,
flags text[] = null
) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$
declare
v_job :GRAPHILE_WORKER_SCHEMA.jobs;
begin
-- Apply rationality checks
if length(identifier) > 128 then
raise exception 'Task identifier is too long (max length: 128).' using errcode = 'GWBID';
end if;
if queue_name is not null and length(queue_name) > 128 then
raise exception 'Job queue name is too long (max length: 128).' using errcode = 'GWBQN';
end if;
if job_key is not null and length(job_key) > 512 then
raise exception 'Job key is too long (max length: 512).' using errcode = 'GWBJK';
end if;
if max_attempts < 1 then
raise exception 'Job maximum attempts must be at least 1' using errcode = 'GWBMA';
end if;
if job_key is not null then
-- Upsert job
insert into :GRAPHILE_WORKER_SCHEMA.jobs (
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
key,
priority,
flags
)
values(
identifier,
coalesce(payload, '{}'::json),
queue_name,
coalesce(run_at, now()),
coalesce(max_attempts, 25),
job_key,
coalesce(priority, 0),
(
select jsonb_object_agg(flag, true)
from unnest(flags) as item(flag)
)
)
on conflict (key) do update set
task_identifier=excluded.task_identifier,
payload=excluded.payload,
queue_name=excluded.queue_name,
max_attempts=excluded.max_attempts,
run_at=excluded.run_at,
priority=excluded.priority,
revision=jobs.revision + 1,
flags=excluded.flags,
-- always reset error/retry state
attempts=0,
last_error=null
where jobs.locked_at is null
returning *
into v_job;
-- If upsert succeeded (insert or update), return early
if not (v_job is null) then
return v_job;
end if;
-- Upsert failed -> there must be an existing job that is locked. Remove
-- existing key to allow a new one to be inserted, and prevent any
-- subsequent retries by bumping attempts to the max allowed.
update :GRAPHILE_WORKER_SCHEMA.jobs
set
key = null,
attempts = jobs.max_attempts
where key = job_key;
end if;
-- insert the new job. Assume no conflicts due to the update above
insert into :GRAPHILE_WORKER_SCHEMA.jobs(
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
key,
priority,
flags
)
values(
identifier,
coalesce(payload, '{}'::json),
queue_name,
coalesce(run_at, now()),
coalesce(max_attempts, 25),
job_key,
coalesce(priority, 0),
(
select jsonb_object_agg(flag, true)
from unnest(flags) as item(flag)
)
)
returning *
into v_job;
return v_job;
end;
$$ language plpgsql volatile;
"#},
indoc! {r#"
drop function :GRAPHILE_WORKER_SCHEMA.get_job(text, text[], interval);
"#},
indoc! {r#"
create function :GRAPHILE_WORKER_SCHEMA.get_job(
worker_id text,
task_identifiers text[] = null,
job_expiry interval = interval '4 hours',
forbidden_flags text[] = null
) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$
declare
v_job_id bigint;
v_queue_name text;
v_row :GRAPHILE_WORKER_SCHEMA.jobs;
v_now timestamptz = now();
begin
if worker_id is null or length(worker_id) < 10 then
raise exception 'invalid worker id';
end if;
select jobs.queue_name, jobs.id into v_queue_name, v_job_id
from :GRAPHILE_WORKER_SCHEMA.jobs
where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry))
and (
jobs.queue_name is null
or
exists (
select 1
from :GRAPHILE_WORKER_SCHEMA.job_queues
where job_queues.queue_name = jobs.queue_name
and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry))
for update
skip locked
)
)
and run_at <= v_now
and attempts < max_attempts
and (task_identifiers is null or task_identifier = any(task_identifiers))
and (forbidden_flags is null or (flags ?| forbidden_flags) is not true)
order by priority asc, run_at asc, id asc
limit 1
for update
skip locked;
if v_job_id is null then
return null;
end if;
if v_queue_name is not null then
update :GRAPHILE_WORKER_SCHEMA.job_queues
set
locked_by = worker_id,
locked_at = v_now
where job_queues.queue_name = v_queue_name;
end if;
update :GRAPHILE_WORKER_SCHEMA.jobs
set
attempts = attempts + 1,
locked_by = worker_id,
locked_at = v_now
where id = v_job_id
returning * into v_row;
return v_row;
end;
$$ language plpgsql volatile;
"#},
],
};