1use crate::error::AwaError;
2use sqlx::PgPool;
3use tracing::info;
4
5pub const CURRENT_VERSION: i32 = 1;
7
8const V1_UP: &str = r#"
10-- Awa schema v1: Initial schema
11
12CREATE SCHEMA IF NOT EXISTS awa;
13
14CREATE TYPE awa.job_state AS ENUM (
15 'scheduled', 'available', 'running',
16 'completed', 'retryable', 'failed', 'cancelled'
17);
18
19CREATE TABLE awa.jobs (
20 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
21 kind TEXT NOT NULL,
22 queue TEXT NOT NULL DEFAULT 'default',
23 args JSONB NOT NULL DEFAULT '{}',
24 state awa.job_state NOT NULL DEFAULT 'available',
25 priority SMALLINT NOT NULL DEFAULT 2,
26 attempt SMALLINT NOT NULL DEFAULT 0,
27 max_attempts SMALLINT NOT NULL DEFAULT 25,
28 run_at TIMESTAMPTZ NOT NULL DEFAULT now(),
29 heartbeat_at TIMESTAMPTZ,
30 deadline_at TIMESTAMPTZ,
31 attempted_at TIMESTAMPTZ,
32 finalized_at TIMESTAMPTZ,
33 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
34 errors JSONB[] DEFAULT '{}',
35 metadata JSONB NOT NULL DEFAULT '{}',
36 tags TEXT[] NOT NULL DEFAULT '{}',
37 unique_key BYTEA,
38 unique_states BIT(8),
39
40 CONSTRAINT priority_in_range CHECK (priority BETWEEN 1 AND 4),
41 CONSTRAINT max_attempts_range CHECK (max_attempts BETWEEN 1 AND 1000),
42 CONSTRAINT queue_name_length CHECK (length(queue) <= 200),
43 CONSTRAINT kind_length CHECK (length(kind) <= 200),
44 CONSTRAINT tags_count CHECK (cardinality(tags) <= 20)
45);
46
47CREATE TABLE awa.queue_meta (
48 queue TEXT PRIMARY KEY,
49 paused BOOLEAN NOT NULL DEFAULT FALSE,
50 paused_at TIMESTAMPTZ,
51 paused_by TEXT
52);
53
54CREATE TABLE awa.schema_version (
55 version INT PRIMARY KEY,
56 description TEXT NOT NULL,
57 applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
58);
59
60-- Functions (must be created before indexes that reference them)
61
62CREATE FUNCTION awa.job_state_in_bitmask(bitmask BIT(8), state awa.job_state)
63RETURNS BOOLEAN AS $$
64 SELECT CASE state
65 WHEN 'scheduled' THEN get_bit(bitmask, 0) = 1
66 WHEN 'available' THEN get_bit(bitmask, 1) = 1
67 WHEN 'running' THEN get_bit(bitmask, 2) = 1
68 WHEN 'completed' THEN get_bit(bitmask, 3) = 1
69 WHEN 'retryable' THEN get_bit(bitmask, 4) = 1
70 WHEN 'failed' THEN get_bit(bitmask, 5) = 1
71 WHEN 'cancelled' THEN get_bit(bitmask, 6) = 1
72 ELSE FALSE
73 END;
74$$ LANGUAGE sql IMMUTABLE;
75
76CREATE FUNCTION awa.backoff_duration(attempt SMALLINT, max_attempts SMALLINT)
77RETURNS interval AS $$
78 SELECT LEAST(
79 (power(2, attempt)::int || ' seconds')::interval
80 + (random() * power(2, attempt) * 0.25 || ' seconds')::interval,
81 interval '24 hours'
82 );
83$$ LANGUAGE sql VOLATILE;
84
85CREATE FUNCTION awa.notify_new_job() RETURNS trigger AS $$
86BEGIN
87 PERFORM pg_notify('awa:' || NEW.queue, NEW.id::text);
88 RETURN NEW;
89END;
90$$ LANGUAGE plpgsql;
91
92CREATE TRIGGER trg_awa_notify
93 AFTER INSERT ON awa.jobs
94 FOR EACH ROW
95 WHEN (NEW.state = 'available' AND NEW.run_at <= now())
96 EXECUTE FUNCTION awa.notify_new_job();
97
98-- Indexes
99
100-- Dequeue hot path
101CREATE INDEX idx_awa_jobs_dequeue
102 ON awa.jobs (queue, priority, run_at, id)
103 WHERE state = 'available';
104
105-- Heartbeat staleness (crash detection)
106CREATE INDEX idx_awa_jobs_heartbeat
107 ON awa.jobs (heartbeat_at)
108 WHERE state = 'running';
109
110-- Hard deadline (runaway protection)
111CREATE INDEX idx_awa_jobs_deadline
112 ON awa.jobs (deadline_at)
113 WHERE state = 'running' AND deadline_at IS NOT NULL;
114
115-- Uniqueness enforcement
116CREATE UNIQUE INDEX idx_awa_jobs_unique
117 ON awa.jobs (unique_key)
118 WHERE unique_key IS NOT NULL
119 AND unique_states IS NOT NULL
120 AND awa.job_state_in_bitmask(unique_states, state);
121
122-- Kind-based lookups (admin, monitoring)
123CREATE INDEX idx_awa_jobs_kind_state
124 ON awa.jobs (kind, state);
125
126-- Record version
127INSERT INTO awa.schema_version (version, description) VALUES (1, 'Initial schema');
128"#;
129
130pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
134 let lock_key: i64 = 0x4157_415f_4d49_4752; sqlx::query("SELECT pg_advisory_lock($1)")
137 .bind(lock_key)
138 .execute(pool)
139 .await?;
140
141 let result = run_inner(pool).await;
142
143 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
145 .bind(lock_key)
146 .execute(pool)
147 .await;
148
149 result
150}
151
152async fn run_inner(pool: &PgPool) -> Result<(), AwaError> {
153 let has_schema: bool =
154 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
155 .fetch_one(pool)
156 .await?;
157
158 if !has_schema {
159 info!("Running initial migration (v1)");
160 sqlx::raw_sql(V1_UP).execute(pool).await?;
161 info!("Migration v1 applied successfully");
162 return Ok(());
163 }
164
165 let has_version_table: bool = sqlx::query_scalar(
167 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
168 )
169 .fetch_one(pool)
170 .await?;
171
172 if !has_version_table {
173 info!("Running initial migration (v1)");
174 sqlx::raw_sql(V1_UP).execute(pool).await?;
175 info!("Migration v1 applied successfully");
176 return Ok(());
177 }
178
179 let current: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
180 .fetch_one(pool)
181 .await?;
182
183 let current_version = current.unwrap_or(0);
184
185 if current_version >= CURRENT_VERSION {
186 info!(version = current_version, "Schema is up to date");
187 return Ok(());
188 }
189
190 if current_version == 0 {
191 info!("Running initial migration (v1)");
192 sqlx::raw_sql(V1_UP).execute(pool).await?;
193 info!("Migration v1 applied successfully");
194 }
195
196 Ok(())
197}
198
199pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
201 let has_schema: bool =
202 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
203 .fetch_one(pool)
204 .await?;
205
206 if !has_schema {
207 return Ok(0);
208 }
209
210 let has_table: bool = sqlx::query_scalar(
211 "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
212 )
213 .fetch_one(pool)
214 .await?;
215
216 if !has_table {
217 return Ok(0);
218 }
219
220 let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
221 .fetch_one(pool)
222 .await?;
223
224 Ok(version.unwrap_or(0))
225}
226
227pub fn migration_sql() -> Vec<(i32, &'static str, &'static str)> {
229 vec![(1, "Initial schema", V1_UP)]
230}