use indoc::indoc;
use super::GraphileWorkerMigration;
pub const M000018_MIGRATION: GraphileWorkerMigration = GraphileWorkerMigration {
name: "m000018",
is_breaking: false,
stmts: &[
indoc! {r#"
DROP TRIGGER _900_after_insert ON :GRAPHILE_WORKER_SCHEMA._private_jobs;
"#},
indoc! {r#"
DROP FUNCTION :GRAPHILE_WORKER_SCHEMA.tg_jobs__after_insert;
"#},
indoc! {r#"
CREATE OR REPLACE FUNCTION :GRAPHILE_WORKER_SCHEMA.add_job(identifier text, payload json DEFAULT NULL::json, queue_name text DEFAULT NULL::text, run_at timestamp with time zone DEFAULT NULL::timestamp with time zone, max_attempts integer DEFAULT NULL::integer, job_key text DEFAULT NULL::text, priority integer DEFAULT NULL::integer, flags text[] DEFAULT NULL::text[], job_key_mode text DEFAULT 'replace'::text) RETURNS :GRAPHILE_WORKER_SCHEMA._private_jobs
LANGUAGE plpgsql
AS $$
declare
v_job :GRAPHILE_WORKER_SCHEMA._private_jobs;
begin
if (job_key is null or job_key_mode is null or job_key_mode in ('replace', 'preserve_run_at')) then
select * into v_job
from :GRAPHILE_WORKER_SCHEMA.add_jobs(
ARRAY[(
identifier,
payload,
queue_name,
run_at,
max_attempts::smallint,
job_key,
priority::smallint,
flags
):::GRAPHILE_WORKER_SCHEMA.job_spec],
(job_key_mode = 'preserve_run_at')
)
limit 1;
return v_job;
elsif job_key_mode = 'unsafe_dedupe' then
-- Ensure all the tasks exist
insert into :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks (identifier)
values (identifier)
on conflict do nothing;
-- Ensure all the queues exist
if queue_name is not null then
insert into :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues (queue_name)
values (queue_name)
on conflict do nothing;
end if;
-- Insert job, but if one already exists then do nothing
insert into :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs (
job_queue_id,
task_id,
payload,
run_at,
max_attempts,
key,
priority,
flags
)
select
job_queues.id,
tasks.id,
coalesce(payload, '{}'::json),
coalesce(run_at, now()),
coalesce(max_attempts::smallint, 25::smallint),
job_key,
coalesce(priority::smallint, 0::smallint),
(
select jsonb_object_agg(flag, true)
from unnest(flags) as item(flag)
)
from :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks
left join :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues
on job_queues.queue_name = add_job.queue_name
where tasks.identifier = add_job.identifier
on conflict (key)
do update set
revision = jobs.revision + 1,
updated_at = now()
returning *
into v_job;
if v_job.revision = 0 then
perform pg_notify('jobs:insert', '{"r":' || random()::text || ',"count":1}');
end if;
return v_job;
else
raise exception 'Invalid job_key_mode value, expected ''replace'', ''preserve_run_at'' or ''unsafe_dedupe''.' using errcode = 'GWBKM';
end if;
end;
$$;
"#},
indoc! {r#"
CREATE OR REPLACE FUNCTION :GRAPHILE_WORKER_SCHEMA.add_jobs(specs :GRAPHILE_WORKER_SCHEMA.job_spec[], job_key_preserve_run_at boolean DEFAULT false) RETURNS SETOF :GRAPHILE_WORKER_SCHEMA._private_jobs
LANGUAGE plpgsql
AS $$
begin
-- Ensure all the tasks exist
insert into :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks (identifier)
select distinct spec.identifier
from unnest(specs) spec
on conflict do nothing;
-- Ensure all the queues exist
insert into :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues (queue_name)
select distinct spec.queue_name
from unnest(specs) spec
where spec.queue_name is not null
on conflict do nothing;
-- Ensure any locked jobs have their key cleared - in the case of locked
-- existing job create a new job instead as it must have already started
-- executing (i.e., it's world state is out of date, and the fact add_job
-- has been called again implies there's new information that needs to be
-- acted upon).
update :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs
set
key = null,
attempts = jobs.max_attempts,
updated_at = now()
from unnest(specs) spec
where spec.job_key is not null
and jobs.key = spec.job_key
and is_available is not true;
-- WARNING: this count is not 100% accurate; 'on conflict' clause will cause it to be an overestimate
perform pg_notify('jobs:insert', '{"r":' || random()::text || ',"count":' || array_length(specs, 1)::text || '}');
-- TODO: is there a risk that a conflict could occur depending on the
-- isolation level?
return query insert into :GRAPHILE_WORKER_SCHEMA._private_jobs as jobs (
job_queue_id,
task_id,
payload,
run_at,
max_attempts,
key,
priority,
flags
)
select
job_queues.id,
tasks.id,
coalesce(spec.payload, '{}'::json),
coalesce(spec.run_at, now()),
coalesce(spec.max_attempts, 25),
spec.job_key,
coalesce(spec.priority, 0),
(
select jsonb_object_agg(flag, true)
from unnest(spec.flags) as item(flag)
)
from unnest(specs) spec
inner join :GRAPHILE_WORKER_SCHEMA._private_tasks as tasks
on tasks.identifier = spec.identifier
left join :GRAPHILE_WORKER_SCHEMA._private_job_queues as job_queues
on job_queues.queue_name = spec.queue_name
on conflict (key) do update set
job_queue_id = excluded.job_queue_id,
task_id = excluded.task_id,
payload =
case
when json_typeof(jobs.payload) = 'array' and json_typeof(excluded.payload) = 'array' then
(jobs.payload::jsonb || excluded.payload::jsonb)::json
else
excluded.payload
end,
max_attempts = excluded.max_attempts,
run_at = (case
when job_key_preserve_run_at is true and jobs.attempts = 0 then jobs.run_at
else excluded.run_at
end),
priority = excluded.priority,
revision = jobs.revision + 1,
flags = excluded.flags,
-- always reset error/retry state
attempts = 0,
last_error = null,
updated_at = now()
where jobs.locked_at is null
returning *;
end;
$$;
"#},
indoc! {r#"
CREATE OR REPLACE FUNCTION :GRAPHILE_WORKER_SCHEMA.remove_job(job_key text) RETURNS :GRAPHILE_WORKER_SCHEMA._private_jobs
LANGUAGE plpgsql STRICT
AS $$
declare
v_job :GRAPHILE_WORKER_SCHEMA._private_jobs;
begin
-- Delete job if not locked
DELETE FROM :GRAPHILE_WORKER_SCHEMA._private_jobs AS jobs
WHERE key = job_key
AND (
locked_at IS NULL
OR
locked_at < NOW() - interval '4 hours'
)
RETURNING * INTO v_job;
IF NOT (v_job IS NULL) THEN
perform pg_notify('jobs:insert', '{"r":' || random()::text || ',"count":-1}');
RETURN v_job;
END IF;
-- Otherwise prevent job from retrying, and clear the key
UPDATE :GRAPHILE_WORKER_SCHEMA._private_jobs AS jobs
SET
key = NULL,
attempts = jobs.max_attempts,
updated_at = now()
WHERE key = job_key
RETURNING * INTO v_job;
RETURN v_job;
end;
$$;
"#},
],
};