1use crate::error::AwaError;
2use sqlx::postgres::PgConnection;
3use sqlx::PgPool;
4use tracing::info;
5
6pub const CURRENT_VERSION: i32 = 3;
8
9const MIGRATIONS: &[(i32, &str, &[&str])] = &[(3, "Canonical schema with UI indexes", &[V3_UP])];
11
12const V3_UP: &str = r#"
14-- Awa schema v2: Canonical hot/deferred schema with structured progress
15
16CREATE SCHEMA IF NOT EXISTS awa;
17
18CREATE TYPE awa.job_state AS ENUM (
19 'scheduled', 'available', 'running',
20 'completed', 'retryable', 'failed', 'cancelled', 'waiting_external'
21);
22
23CREATE SEQUENCE awa.jobs_id_seq;
24
25CREATE TABLE awa.jobs_hot (
26 id BIGINT NOT NULL DEFAULT nextval('awa.jobs_id_seq') PRIMARY KEY,
27 kind TEXT NOT NULL,
28 queue TEXT NOT NULL DEFAULT 'default',
29 args JSONB NOT NULL DEFAULT '{}',
30 state awa.job_state NOT NULL DEFAULT 'available',
31 priority SMALLINT NOT NULL DEFAULT 2,
32 attempt SMALLINT NOT NULL DEFAULT 0,
33 max_attempts SMALLINT NOT NULL DEFAULT 25,
34 run_at TIMESTAMPTZ NOT NULL DEFAULT now(),
35 heartbeat_at TIMESTAMPTZ,
36 deadline_at TIMESTAMPTZ,
37 attempted_at TIMESTAMPTZ,
38 finalized_at TIMESTAMPTZ,
39 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
40 errors JSONB[] DEFAULT '{}',
41 metadata JSONB NOT NULL DEFAULT '{}',
42 tags TEXT[] NOT NULL DEFAULT '{}',
43 unique_key BYTEA,
44 unique_states BIT(8),
45 callback_id UUID,
46 callback_timeout_at TIMESTAMPTZ,
47 callback_filter TEXT,
48 callback_on_complete TEXT,
49 callback_on_fail TEXT,
50 callback_transform TEXT,
51 run_lease BIGINT NOT NULL DEFAULT 0,
52 progress JSONB,
53
54 CONSTRAINT jobs_hot_state_check CHECK (state NOT IN ('scheduled', 'retryable')),
55 CONSTRAINT jobs_hot_priority_in_range CHECK (priority BETWEEN 1 AND 4),
56 CONSTRAINT jobs_hot_max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
57 CONSTRAINT jobs_hot_queue_name_length CHECK (length(queue) <= 200),
58 CONSTRAINT jobs_hot_kind_length CHECK (length(kind) <= 200),
59 CONSTRAINT jobs_hot_tags_count CHECK (cardinality(tags) <= 20)
60);
61
62CREATE TABLE awa.scheduled_jobs (
63 id BIGINT NOT NULL DEFAULT nextval('awa.jobs_id_seq') PRIMARY KEY,
64 kind TEXT NOT NULL,
65 queue TEXT NOT NULL DEFAULT 'default',
66 args JSONB NOT NULL DEFAULT '{}',
67 state awa.job_state NOT NULL DEFAULT 'scheduled',
68 priority SMALLINT NOT NULL DEFAULT 2,
69 attempt SMALLINT NOT NULL DEFAULT 0,
70 max_attempts SMALLINT NOT NULL DEFAULT 25,
71 run_at TIMESTAMPTZ NOT NULL DEFAULT now(),
72 heartbeat_at TIMESTAMPTZ,
73 deadline_at TIMESTAMPTZ,
74 attempted_at TIMESTAMPTZ,
75 finalized_at TIMESTAMPTZ,
76 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
77 errors JSONB[] DEFAULT '{}',
78 metadata JSONB NOT NULL DEFAULT '{}',
79 tags TEXT[] NOT NULL DEFAULT '{}',
80 unique_key BYTEA,
81 unique_states BIT(8),
82 callback_id UUID,
83 callback_timeout_at TIMESTAMPTZ,
84 callback_filter TEXT,
85 callback_on_complete TEXT,
86 callback_on_fail TEXT,
87 callback_transform TEXT,
88 run_lease BIGINT NOT NULL DEFAULT 0,
89 progress JSONB,
90
91 CONSTRAINT scheduled_jobs_state_check CHECK (state IN ('scheduled', 'retryable')),
92 CONSTRAINT scheduled_jobs_priority_in_range CHECK (priority BETWEEN 1 AND 4),
93 CONSTRAINT scheduled_jobs_max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
94 CONSTRAINT scheduled_jobs_queue_name_length CHECK (length(queue) <= 200),
95 CONSTRAINT scheduled_jobs_kind_length CHECK (length(kind) <= 200),
96 CONSTRAINT scheduled_jobs_tags_count CHECK (cardinality(tags) <= 20)
97);
98
99CREATE TABLE awa.queue_meta (
100 queue TEXT PRIMARY KEY,
101 paused BOOLEAN NOT NULL DEFAULT FALSE,
102 paused_at TIMESTAMPTZ,
103 paused_by TEXT
104);
105
106CREATE TABLE awa.job_unique_claims (
107 unique_key BYTEA NOT NULL,
108 job_id BIGINT NOT NULL
109);
110
111CREATE TABLE awa.cron_jobs (
112 name TEXT PRIMARY KEY,
113 cron_expr TEXT NOT NULL,
114 timezone TEXT NOT NULL DEFAULT 'UTC',
115 kind TEXT NOT NULL,
116 queue TEXT NOT NULL DEFAULT 'default',
117 args JSONB NOT NULL DEFAULT '{}',
118 priority SMALLINT NOT NULL DEFAULT 2,
119 max_attempts SMALLINT NOT NULL DEFAULT 25,
120 tags TEXT[] NOT NULL DEFAULT '{}',
121 metadata JSONB NOT NULL DEFAULT '{}',
122 last_enqueued_at TIMESTAMPTZ,
123 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
124 updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
125);
126
127CREATE TABLE awa.schema_version (
128 version INT PRIMARY KEY,
129 description TEXT NOT NULL,
130 applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
131);
132
133CREATE FUNCTION awa.job_state_in_bitmask(bitmask BIT(8), state awa.job_state)
134RETURNS BOOLEAN AS $$
135 SELECT CASE state
136 WHEN 'scheduled' THEN get_bit(bitmask, 0) = 1
137 WHEN 'available' THEN get_bit(bitmask, 1) = 1
138 WHEN 'running' THEN get_bit(bitmask, 2) = 1
139 WHEN 'completed' THEN get_bit(bitmask, 3) = 1
140 WHEN 'retryable' THEN get_bit(bitmask, 4) = 1
141 WHEN 'failed' THEN get_bit(bitmask, 5) = 1
142 WHEN 'cancelled' THEN get_bit(bitmask, 6) = 1
143 WHEN 'waiting_external' THEN get_bit(bitmask, 7) = 1
144 ELSE FALSE
145 END;
146$$ LANGUAGE sql IMMUTABLE;
147
148CREATE FUNCTION awa.backoff_duration(attempt SMALLINT, max_attempts SMALLINT)
149RETURNS interval AS $$
150 SELECT LEAST(
151 (power(2, attempt)::int || ' seconds')::interval
152 + (random() * power(2, attempt) * 0.25 || ' seconds')::interval,
153 interval '24 hours'
154 );
155$$ LANGUAGE sql VOLATILE;
156
157CREATE OR REPLACE FUNCTION awa.notify_new_job() RETURNS trigger AS $$
158BEGIN
159 PERFORM pg_notify('awa:' || NEW.queue, '');
160 RETURN NEW;
161END;
162$$ LANGUAGE plpgsql;
163
164CREATE OR REPLACE FUNCTION awa.sync_job_unique_claims() RETURNS trigger AS $$
165DECLARE
166 old_claim BOOLEAN := FALSE;
167 new_claim BOOLEAN := FALSE;
168 existing_job_id BIGINT;
169BEGIN
170 IF TG_OP <> 'INSERT' THEN
171 old_claim := OLD.unique_key IS NOT NULL
172 AND OLD.unique_states IS NOT NULL
173 AND awa.job_state_in_bitmask(OLD.unique_states, OLD.state);
174 END IF;
175
176 IF TG_OP <> 'DELETE' THEN
177 new_claim := NEW.unique_key IS NOT NULL
178 AND NEW.unique_states IS NOT NULL
179 AND awa.job_state_in_bitmask(NEW.unique_states, NEW.state);
180 END IF;
181
182 IF old_claim AND (
183 NOT new_claim
184 OR OLD.unique_key IS DISTINCT FROM NEW.unique_key
185 OR OLD.id IS DISTINCT FROM NEW.id
186 ) THEN
187 DELETE FROM awa.job_unique_claims
188 WHERE unique_key = OLD.unique_key
189 AND job_id = OLD.id;
190 END IF;
191
192 IF new_claim AND (
193 NOT old_claim
194 OR OLD.unique_key IS DISTINCT FROM NEW.unique_key
195 OR OLD.id IS DISTINCT FROM NEW.id
196 ) THEN
197 BEGIN
198 INSERT INTO awa.job_unique_claims (unique_key, job_id)
199 VALUES (NEW.unique_key, NEW.id);
200 EXCEPTION
201 WHEN unique_violation THEN
202 SELECT job_id
203 INTO existing_job_id
204 FROM awa.job_unique_claims
205 WHERE unique_key = NEW.unique_key;
206
207 IF existing_job_id IS DISTINCT FROM NEW.id THEN
208 RAISE unique_violation
209 USING CONSTRAINT = 'idx_awa_jobs_unique',
210 MESSAGE = 'duplicate key value violates unique constraint "idx_awa_jobs_unique"';
211 END IF;
212 END;
213 END IF;
214
215 IF TG_OP = 'DELETE' THEN
216 RETURN OLD;
217 END IF;
218 RETURN NEW;
219END;
220$$ LANGUAGE plpgsql;
221
222CREATE VIEW awa.jobs AS
223SELECT * FROM awa.jobs_hot
224UNION ALL
225SELECT * FROM awa.scheduled_jobs;
226
227CREATE OR REPLACE FUNCTION awa.write_jobs_view() RETURNS trigger AS $$
228DECLARE
229 target_table TEXT;
230 source_table TEXT;
231BEGIN
232 IF TG_OP = 'INSERT' THEN
233 NEW.id := COALESCE(NEW.id, nextval('awa.jobs_id_seq'));
234 NEW.queue := COALESCE(NEW.queue, 'default');
235 NEW.args := COALESCE(NEW.args, '{}'::jsonb);
236 NEW.state := COALESCE(NEW.state, 'available'::awa.job_state);
237 NEW.priority := COALESCE(NEW.priority, 2);
238 NEW.attempt := COALESCE(NEW.attempt, 0);
239 NEW.max_attempts := COALESCE(NEW.max_attempts, 25);
240 NEW.run_at := COALESCE(NEW.run_at, now());
241 NEW.created_at := COALESCE(NEW.created_at, now());
242 NEW.errors := COALESCE(NEW.errors, '{}'::jsonb[]);
243 NEW.metadata := COALESCE(NEW.metadata, '{}'::jsonb);
244 NEW.tags := COALESCE(NEW.tags, '{}'::text[]);
245 NEW.run_lease := COALESCE(NEW.run_lease, 0);
246 END IF;
247
248 IF TG_OP = 'DELETE' THEN
249 source_table := CASE
250 WHEN OLD.state IN ('scheduled'::awa.job_state, 'retryable'::awa.job_state)
251 THEN 'awa.scheduled_jobs'
252 ELSE 'awa.jobs_hot'
253 END;
254 EXECUTE format('DELETE FROM %s WHERE id = $1', source_table) USING OLD.id;
255 RETURN OLD;
256 END IF;
257
258 IF TG_OP = 'UPDATE' THEN
259 source_table := CASE
260 WHEN OLD.state IN ('scheduled'::awa.job_state, 'retryable'::awa.job_state)
261 THEN 'awa.scheduled_jobs'
262 ELSE 'awa.jobs_hot'
263 END;
264 EXECUTE format('DELETE FROM %s WHERE id = $1', source_table) USING OLD.id;
265 END IF;
266
267 target_table := CASE
268 WHEN NEW.state IN ('scheduled'::awa.job_state, 'retryable'::awa.job_state)
269 THEN 'awa.scheduled_jobs'
270 ELSE 'awa.jobs_hot'
271 END;
272
273 EXECUTE format(
274 'INSERT INTO %s (
275 id, kind, queue, args, state, priority, attempt, max_attempts,
276 run_at, heartbeat_at, deadline_at, attempted_at, finalized_at,
277 created_at, errors, metadata, tags, unique_key, unique_states,
278 callback_id, callback_timeout_at, callback_filter, callback_on_complete,
279 callback_on_fail, callback_transform, run_lease, progress
280 ) VALUES (
281 $1, $2, $3, $4, $5, $6, $7, $8,
282 $9, $10, $11, $12, $13,
283 $14, $15, $16, $17, $18, $19,
284 $20, $21, $22, $23,
285 $24, $25, $26, $27
286 )',
287 target_table
288 )
289 USING
290 NEW.id, NEW.kind, NEW.queue, NEW.args, NEW.state, NEW.priority, NEW.attempt,
291 NEW.max_attempts, NEW.run_at, NEW.heartbeat_at, NEW.deadline_at, NEW.attempted_at,
292 NEW.finalized_at, NEW.created_at, NEW.errors, NEW.metadata, NEW.tags,
293 NEW.unique_key, NEW.unique_states, NEW.callback_id, NEW.callback_timeout_at,
294 NEW.callback_filter, NEW.callback_on_complete, NEW.callback_on_fail,
295 NEW.callback_transform, NEW.run_lease, NEW.progress;
296
297 RETURN NEW;
298END;
299$$ LANGUAGE plpgsql;
300
301CREATE TRIGGER trg_awa_notify
302 AFTER INSERT ON awa.jobs_hot
303 FOR EACH ROW
304 WHEN (NEW.state = 'available' AND NEW.run_at <= now())
305 EXECUTE FUNCTION awa.notify_new_job();
306
307CREATE TRIGGER trg_jobs_hot_unique_claims_insert
308 AFTER INSERT ON awa.jobs_hot
309 FOR EACH ROW
310 WHEN (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
311 EXECUTE FUNCTION awa.sync_job_unique_claims();
312
313CREATE TRIGGER trg_jobs_hot_unique_claims_update
314 AFTER UPDATE ON awa.jobs_hot
315 FOR EACH ROW
316 WHEN (
317 (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
318 OR (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
319 )
320 EXECUTE FUNCTION awa.sync_job_unique_claims();
321
322CREATE TRIGGER trg_jobs_hot_unique_claims_delete
323 AFTER DELETE ON awa.jobs_hot
324 FOR EACH ROW
325 WHEN (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
326 EXECUTE FUNCTION awa.sync_job_unique_claims();
327
328CREATE TRIGGER trg_scheduled_jobs_unique_claims_insert
329 AFTER INSERT ON awa.scheduled_jobs
330 FOR EACH ROW
331 WHEN (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
332 EXECUTE FUNCTION awa.sync_job_unique_claims();
333
334CREATE TRIGGER trg_scheduled_jobs_unique_claims_update
335 AFTER UPDATE ON awa.scheduled_jobs
336 FOR EACH ROW
337 WHEN (
338 (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
339 OR (NEW.unique_key IS NOT NULL AND NEW.unique_states IS NOT NULL)
340 )
341 EXECUTE FUNCTION awa.sync_job_unique_claims();
342
343CREATE TRIGGER trg_scheduled_jobs_unique_claims_delete
344 AFTER DELETE ON awa.scheduled_jobs
345 FOR EACH ROW
346 WHEN (OLD.unique_key IS NOT NULL AND OLD.unique_states IS NOT NULL)
347 EXECUTE FUNCTION awa.sync_job_unique_claims();
348
349CREATE TRIGGER trg_awa_jobs_view_insert
350 INSTEAD OF INSERT ON awa.jobs
351 FOR EACH ROW
352 EXECUTE FUNCTION awa.write_jobs_view();
353
354CREATE TRIGGER trg_awa_jobs_view_update
355 INSTEAD OF UPDATE ON awa.jobs
356 FOR EACH ROW
357 EXECUTE FUNCTION awa.write_jobs_view();
358
359CREATE TRIGGER trg_awa_jobs_view_delete
360 INSTEAD OF DELETE ON awa.jobs
361 FOR EACH ROW
362 EXECUTE FUNCTION awa.write_jobs_view();
363
364CREATE INDEX idx_awa_jobs_hot_dequeue
365 ON awa.jobs_hot (queue, priority, run_at, id)
366 WHERE state = 'available';
367
368CREATE INDEX idx_awa_jobs_hot_heartbeat
369 ON awa.jobs_hot (heartbeat_at)
370 WHERE state = 'running';
371
372CREATE INDEX idx_awa_jobs_hot_deadline
373 ON awa.jobs_hot (deadline_at)
374 WHERE state = 'running' AND deadline_at IS NOT NULL;
375
376CREATE INDEX idx_awa_jobs_hot_kind_state
377 ON awa.jobs_hot (kind, state);
378
379CREATE UNIQUE INDEX idx_awa_jobs_hot_callback_id
380 ON awa.jobs_hot (callback_id)
381 WHERE callback_id IS NOT NULL;
382
383CREATE INDEX idx_awa_jobs_hot_callback_timeout
384 ON awa.jobs_hot (callback_timeout_at)
385 WHERE state = 'waiting_external' AND callback_timeout_at IS NOT NULL;
386
387CREATE INDEX idx_awa_scheduled_jobs_run_at_scheduled
388 ON awa.scheduled_jobs (run_at, id, queue)
389 WHERE state = 'scheduled';
390
391CREATE INDEX idx_awa_scheduled_jobs_run_at_retryable
392 ON awa.scheduled_jobs (run_at, id, queue)
393 WHERE state = 'retryable';
394
395CREATE INDEX idx_awa_scheduled_jobs_kind_state
396 ON awa.scheduled_jobs (kind, state);
397
398CREATE UNIQUE INDEX idx_awa_jobs_unique
399 ON awa.job_unique_claims (unique_key);
400
401-- BRIN indexes on created_at for time-range queries (UI dashboard, timeseries)
402CREATE INDEX idx_awa_jobs_hot_created_at
403 ON awa.jobs_hot USING BRIN (created_at) WITH (pages_per_range = 32);
404CREATE INDEX idx_awa_scheduled_jobs_created_at
405 ON awa.scheduled_jobs USING BRIN (created_at) WITH (pages_per_range = 32);
406
407-- GIN indexes on tags for array containment queries (tag filtering)
408CREATE INDEX idx_awa_jobs_hot_tags
409 ON awa.jobs_hot USING GIN (tags) WHERE tags IS NOT NULL AND tags != '{}';
410CREATE INDEX idx_awa_scheduled_jobs_tags
411 ON awa.scheduled_jobs USING GIN (tags) WHERE tags IS NOT NULL AND tags != '{}';
412
413INSERT INTO awa.schema_version (version, description)
414VALUES (3, 'Canonical schema with UI indexes');
415"#;
416
417pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
426 let lock_key: i64 = 0x4157_415f_4d49_4752; let mut conn = pool.acquire().await?;
428 sqlx::query("SELECT pg_advisory_lock($1)")
429 .bind(lock_key)
430 .execute(&mut *conn)
431 .await?;
432
433 let result = run_inner(&mut conn).await;
434
435 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
436 .bind(lock_key)
437 .execute(&mut *conn)
438 .await;
439
440 result
441}
442
443async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
444 let has_schema: bool =
445 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
446 .fetch_one(&mut *conn)
447 .await?;
448
449 let current = if has_schema {
450 current_version_conn(conn).await?
451 } else {
452 0
453 };
454
455 if has_schema && current == CURRENT_VERSION {
456 info!(version = current, "Schema is up to date");
457 return Ok(());
458 }
459
460 if has_schema {
461 info!(
462 existing_version = current,
463 "Replacing existing awa schema with canonical schema"
464 );
465 sqlx::raw_sql("DROP SCHEMA awa CASCADE")
466 .execute(&mut *conn)
467 .await?;
468 }
469
470 for &(version, description, steps) in MIGRATIONS {
471 info!(version, description, "Applying migration");
472 for step in steps {
473 sqlx::raw_sql(step).execute(&mut *conn).await?;
474 }
475 info!(version, "Migration applied");
476 }
477
478 Ok(())
479}
480
481pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
483 let mut conn = pool.acquire().await?;
484 current_version_conn(&mut conn).await
485}
486
487async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
488 let has_schema: bool =
489 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
490 .fetch_one(&mut *conn)
491 .await?;
492
493 if !has_schema {
494 return Ok(0);
495 }
496
497 let has_table: bool = sqlx::query_scalar(
498 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
499 )
500 .fetch_one(&mut *conn)
501 .await?;
502
503 if !has_table {
504 return Ok(0);
505 }
506
507 let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
508 .fetch_one(&mut *conn)
509 .await?;
510
511 Ok(version.unwrap_or(0))
512}
513
514pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
516 MIGRATIONS
517 .iter()
518 .map(|&(v, d, steps)| (v, d, steps.join("\n")))
519 .collect()
520}