1use crate::error::AwaError;
2use sqlx::PgPool;
3use tracing::info;
4
5pub const CURRENT_VERSION: i32 = 2;
7
8const MIGRATIONS: &[(i32, &str, &str)] = &[
10 (1, "Initial schema", V1_UP),
11 (2, "Periodic/cron jobs", V2_UP),
12];
13
14const V1_UP: &str = r#"
16-- Awa schema v1: Initial schema
17
18CREATE SCHEMA IF NOT EXISTS awa;
19
20CREATE TYPE awa.job_state AS ENUM (
21 'scheduled', 'available', 'running',
22 'completed', 'retryable', 'failed', 'cancelled'
23);
24
25CREATE TABLE awa.jobs (
26 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
27 kind TEXT NOT NULL,
28 queue TEXT NOT NULL DEFAULT 'default',
29 args JSONB NOT NULL DEFAULT '{}',
30 state awa.job_state NOT NULL DEFAULT 'available',
31 priority SMALLINT NOT NULL DEFAULT 2,
32 attempt SMALLINT NOT NULL DEFAULT 0,
33 max_attempts SMALLINT NOT NULL DEFAULT 25,
34 run_at TIMESTAMPTZ NOT NULL DEFAULT now(),
35 heartbeat_at TIMESTAMPTZ,
36 deadline_at TIMESTAMPTZ,
37 attempted_at TIMESTAMPTZ,
38 finalized_at TIMESTAMPTZ,
39 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
40 errors JSONB[] DEFAULT '{}',
41 metadata JSONB NOT NULL DEFAULT '{}',
42 tags TEXT[] NOT NULL DEFAULT '{}',
43 unique_key BYTEA,
44 unique_states BIT(8),
45
46 CONSTRAINT priority_in_range CHECK (priority BETWEEN 1 AND 4),
47 CONSTRAINT max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
48 CONSTRAINT queue_name_length CHECK (length(queue) <= 200),
49 CONSTRAINT kind_length CHECK (length(kind) <= 200),
50 CONSTRAINT tags_count CHECK (cardinality(tags) <= 20)
51);
52
53CREATE TABLE awa.queue_meta (
54 queue TEXT PRIMARY KEY,
55 paused BOOLEAN NOT NULL DEFAULT FALSE,
56 paused_at TIMESTAMPTZ,
57 paused_by TEXT
58);
59
60CREATE TABLE awa.schema_version (
61 version INT PRIMARY KEY,
62 description TEXT NOT NULL,
63 applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
64);
65
66-- Functions (must be created before indexes that reference them)
67
68CREATE FUNCTION awa.job_state_in_bitmask(bitmask BIT(8), state awa.job_state)
69RETURNS BOOLEAN AS $$
70 SELECT CASE state
71 WHEN 'scheduled' THEN get_bit(bitmask, 0) = 1
72 WHEN 'available' THEN get_bit(bitmask, 1) = 1
73 WHEN 'running' THEN get_bit(bitmask, 2) = 1
74 WHEN 'completed' THEN get_bit(bitmask, 3) = 1
75 WHEN 'retryable' THEN get_bit(bitmask, 4) = 1
76 WHEN 'failed' THEN get_bit(bitmask, 5) = 1
77 WHEN 'cancelled' THEN get_bit(bitmask, 6) = 1
78 ELSE FALSE
79 END;
80$$ LANGUAGE sql IMMUTABLE;
81
82CREATE FUNCTION awa.backoff_duration(attempt SMALLINT, max_attempts SMALLINT)
83RETURNS interval AS $$
84 SELECT LEAST(
85 (power(2, attempt)::int || ' seconds')::interval
86 + (random() * power(2, attempt) * 0.25 || ' seconds')::interval,
87 interval '24 hours'
88 );
89$$ LANGUAGE sql VOLATILE;
90
91CREATE FUNCTION awa.notify_new_job() RETURNS trigger AS $$
92BEGIN
93 PERFORM pg_notify('awa:' || NEW.queue, NEW.id::text);
94 RETURN NEW;
95END;
96$$ LANGUAGE plpgsql;
97
98CREATE TRIGGER trg_awa_notify
99 AFTER INSERT ON awa.jobs
100 FOR EACH ROW
101 WHEN (NEW.state = 'available' AND NEW.run_at <= now())
102 EXECUTE FUNCTION awa.notify_new_job();
103
104-- Indexes
105
106-- Dequeue hot path
107CREATE INDEX idx_awa_jobs_dequeue
108 ON awa.jobs (queue, priority, run_at, id)
109 WHERE state = 'available';
110
111-- Heartbeat staleness (crash detection)
112CREATE INDEX idx_awa_jobs_heartbeat
113 ON awa.jobs (heartbeat_at)
114 WHERE state = 'running';
115
116-- Hard deadline (runaway protection)
117CREATE INDEX idx_awa_jobs_deadline
118 ON awa.jobs (deadline_at)
119 WHERE state = 'running' AND deadline_at IS NOT NULL;
120
121-- Uniqueness enforcement
122CREATE UNIQUE INDEX idx_awa_jobs_unique
123 ON awa.jobs (unique_key)
124 WHERE unique_key IS NOT NULL
125 AND unique_states IS NOT NULL
126 AND awa.job_state_in_bitmask(unique_states, state);
127
128-- Kind-based lookups (admin, monitoring)
129CREATE INDEX idx_awa_jobs_kind_state
130 ON awa.jobs (kind, state);
131
132-- Record version
133INSERT INTO awa.schema_version (version, description) VALUES (1, 'Initial schema');
134"#;
135
136const V2_UP: &str = r#"
138-- Awa schema v2: Periodic/cron jobs
139
140CREATE TABLE awa.cron_jobs (
141 name TEXT PRIMARY KEY,
142 cron_expr TEXT NOT NULL,
143 timezone TEXT NOT NULL DEFAULT 'UTC',
144 kind TEXT NOT NULL,
145 queue TEXT NOT NULL DEFAULT 'default',
146 args JSONB NOT NULL DEFAULT '{}',
147 priority SMALLINT NOT NULL DEFAULT 2,
148 max_attempts SMALLINT NOT NULL DEFAULT 25,
149 tags TEXT[] NOT NULL DEFAULT '{}',
150 metadata JSONB NOT NULL DEFAULT '{}',
151 last_enqueued_at TIMESTAMPTZ,
152 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
153 updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
154);
155
156INSERT INTO awa.schema_version (version, description)
157VALUES (2, 'Periodic/cron jobs');
158"#;
159
160pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
164 let lock_key: i64 = 0x4157_415f_4d49_4752; sqlx::query("SELECT pg_advisory_lock($1)")
167 .bind(lock_key)
168 .execute(pool)
169 .await?;
170
171 let result = run_inner(pool).await;
172
173 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
175 .bind(lock_key)
176 .execute(pool)
177 .await;
178
179 result
180}
181
182async fn run_inner(pool: &PgPool) -> Result<(), AwaError> {
183 let has_schema: bool =
184 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
185 .fetch_one(pool)
186 .await?;
187
188 if !has_schema {
189 for &(version, description, sql) in MIGRATIONS {
191 info!(version, description, "Applying migration");
192 sqlx::raw_sql(sql).execute(pool).await?;
193 info!(version, "Migration applied");
194 }
195 return Ok(());
196 }
197
198 let has_version_table: bool = sqlx::query_scalar(
200 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
201 )
202 .fetch_one(pool)
203 .await?;
204
205 if !has_version_table {
206 for &(version, description, sql) in MIGRATIONS {
208 info!(version, description, "Applying migration");
209 sqlx::raw_sql(sql).execute(pool).await?;
210 info!(version, "Migration applied");
211 }
212 return Ok(());
213 }
214
215 let current: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
216 .fetch_one(pool)
217 .await?;
218
219 let current_version = current.unwrap_or(0);
220
221 if current_version >= CURRENT_VERSION {
222 info!(version = current_version, "Schema is up to date");
223 return Ok(());
224 }
225
226 for &(version, description, sql) in MIGRATIONS {
228 if version > current_version {
229 info!(version, description, "Applying migration");
230 sqlx::raw_sql(sql).execute(pool).await?;
231 info!(version, "Migration applied");
232 }
233 }
234
235 Ok(())
236}
237
238pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
240 let has_schema: bool =
241 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
242 .fetch_one(pool)
243 .await?;
244
245 if !has_schema {
246 return Ok(0);
247 }
248
249 let has_table: bool = sqlx::query_scalar(
250 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
251 )
252 .fetch_one(pool)
253 .await?;
254
255 if !has_table {
256 return Ok(0);
257 }
258
259 let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
260 .fetch_one(pool)
261 .await?;
262
263 Ok(version.unwrap_or(0))
264}
265
266pub fn migration_sql() -> Vec<(i32, &'static str, &'static str)> {
268 MIGRATIONS.iter().map(|&(v, d, s)| (v, d, s)).collect()
269}