1use sqlx::{Acquire, PgConnection, Pool, Postgres, Result};
2
3pub trait TaskTableProvider: Send + Sync + 'static {
7 fn schema_name(&self) -> &str;
8 fn tasks_table(&self) -> &str;
9 fn tasks_ready_view(&self) -> &str;
10 fn tasks_notify_fn(&self) -> &str;
11 fn tasks_notify_done_fn(&self) -> &str;
12
13 fn tasks_queue_name(&self) -> &str;
14
15 fn tasks_queue_done_name(&self) -> String {
16 format!("{}_done", self.tasks_queue_name())
17 }
18
19 fn tasks_table_full_name(&self) -> String {
20 format!("{}.{}", self.schema_name(), self.tasks_table())
21 }
22 fn tasks_ready_view_full_name(&self) -> String {
23 format!("{}.{}", self.schema_name(), self.tasks_ready_view())
24 }
25 fn tasks_notify_fn_full_name(&self) -> String {
26 format!("{}.{}", self.schema_name(), self.tasks_notify_fn())
27 }
28 fn tasks_notify_done_fn_full_name(&self) -> String {
29 format!("{}.{}", self.schema_name(), self.tasks_notify_done_fn())
30 }
31}
32
33impl TaskTableProvider for TaskTables {
34 fn schema_name(&self) -> &str {
35 &self.schema
36 }
37
38 fn tasks_table(&self) -> &str {
39 &self.tasks_table.name
40 }
41
42 fn tasks_ready_view(&self) -> &str {
43 &self.tasks_ready.name
44 }
45
46 fn tasks_notify_fn(&self) -> &str {
47 &self.tasks_notify.name
48 }
49
50 fn tasks_notify_done_fn(&self) -> &str {
51 &self.tasks_notify_done.name
52 }
53
54 fn tasks_queue_name(&self) -> &str {
55 &self.tasks_queue_name
56 }
57}
58
59#[derive(Debug, Clone)]
64enum EntityType {
65 Function,
66 View,
67 Table,
68}
69
70#[derive(Debug, Clone)]
71pub struct TaskTableEntity {
72 schema: String,
73 name: String,
74 definition: String,
75 entity_type: EntityType,
76}
77
78impl TaskTableEntity {
79 async fn exists(&self, tx: &mut PgConnection) -> Result<bool> {
80 let Self {
81 schema,
82 name,
83 entity_type,
84 ..
85 } = self;
86
87 let query = match entity_type {
88 EntityType::Function => {
89 "
90SELECT count(*)
91FROM information_schema.routines
92WHERE routine_schema = $1 AND routine_name = $2
93"
94 }
95 EntityType::View | EntityType::Table => {
96 "
97SELECT count(*)
98FROM information_schema.tables
99WHERE table_schema = $1 AND table_name = $2
100"
101 }
102 };
103
104 let n = sqlx::query_scalar::<_, i64>(query)
105 .bind(schema)
106 .bind(name)
107 .fetch_one(tx)
108 .await?;
109
110 let exists = n > 0;
111
112 debug!("{schema}.{name} exists={exists}");
113
114 Ok(exists)
115 }
116
117 async fn create(&self, tx: &mut PgConnection) -> Result<()> {
118 if self.exists(&mut *tx).await? {
119 return Ok(());
120 }
121 sqlx::query(&self.definition).execute(tx).await?;
122 info!("{}.{} created", self.schema, self.name);
123 Ok(())
124 }
125
126 async fn drop(&self, tx: &mut PgConnection) -> Result<()> {
127 if !self.exists(tx).await? {
128 return Ok(());
129 }
130
131 let Self {
132 schema,
133 name,
134 entity_type,
135 ..
136 } = self;
137
138 let query = match entity_type {
139 EntityType::Function => {
140 format!("DROP FUNCTION {schema}.{name} CASCADE")
141 }
142 EntityType::View => {
143 format!("DROP VIEW {schema}.{name} CASCADE")
144 }
145 EntityType::Table => format!("DROP TABLE {schema}.{name} CASCADE"),
146 };
147
148 sqlx::query(&query).execute(tx).await?;
149
150 info!("{schema}.{name} dropped");
151
152 Ok(())
153 }
154}
155
156#[derive(Default)]
160pub struct TaskTableBuilder {
161 schema_name: Option<String>,
162 base_name: Option<String>,
163}
164
165impl TaskTableBuilder {
166 pub fn new() -> Self {
167 Self::default()
168 }
169
170 #[must_use]
172 pub fn schema_name(mut self, schema_name: impl ToString) -> Self {
173 self.schema_name = Some(schema_name.to_string());
174 self
175 }
176
177 #[must_use]
180 pub fn base_name(mut self, base_name: impl ToString) -> Self {
181 self.base_name = Some(base_name.to_string());
182 self
183 }
184
185 pub fn build(self) -> TaskTables {
186 let schema = self.schema_name.unwrap_or_else(|| "public".to_string());
187 let base_name = self.base_name.unwrap_or_else(|| "tasks".to_string());
188
189 let fullname = |name: Option<&str>| -> String {
190 if let Some(name) = name {
191 format!("{}_{}", base_name, name)
192 } else {
193 base_name.to_string()
194 }
195 };
196
197 let tasks_table_name = fullname(None);
198 let tasks_notify_name = fullname(Some("notify"));
199 let tasks_notify_done_name = fullname(Some("notify_done"));
200 let tasks_ready_name = fullname(Some("ready"));
201 let tasks_queue_name: String = fullname(Some("queue"));
202
203 let tasks_table_def = format!(
204 "
205CREATE TABLE {schema}.{tasks_table_name} (
206 id UUID PRIMARY KEY,
207 parent UUID REFERENCES {schema}.{tasks_table_name}(id) ON DELETE CASCADE,
208 created_at TIMESTAMPTZ DEFAULT NOW(),
209 updated_at TIMESTAMPTZ DEFAULT NOW(),
210 task_type TEXT NOT NULL,
211 request JSONB DEFAULT NULL,
212 result JSONB DEFAULT NULL,
213 error JSONB DEFAULT NULL,
214 in_progress BOOLEAN DEFAULT FALSE,
215 done BOOLEAN DEFAULT FALSE
216);
217"
218 );
219
220 let tasks_notify_fn_def = format!(
221 "
222CREATE FUNCTION {schema}.{tasks_notify_name}(task_id uuid)
223RETURNS VOID AS $$
224BEGIN
225PERFORM pg_notify('{tasks_queue_name}', task_id::text);
226END;
227$$ LANGUAGE plpgsql;
228"
229 );
230
231 let tasks_notify_done_fn_def = format!(
233 "
234CREATE FUNCTION {schema}.{tasks_notify_done_name}(task_id uuid)
235RETURNS VOID AS $$
236BEGIN
237PERFORM pg_notify('{tasks_queue_name}_done', task_id::text);
238END;
239$$ LANGUAGE plpgsql;
240"
241 );
242
243 let tasks_ready_view_def = format!(
244 "
245CREATE VIEW {schema}.{tasks_ready_name} AS (
246 -- Select tasks with children. The in_progress field will be false if the task
247 -- itself has in_progress set to false and all children tasks are done.
248 WITH tasks_with_children as (
249 SELECT parent.id,
250 parent.parent,
251 parent.created_at,
252 parent.updated_at,
253 parent.task_type,
254 parent.request,
255 parent.result,
256 parent.error,
257 (parent.in_progress OR bool_or(not dep_tasks.done)) AS in_progress,
258 parent.done
259 FROM {schema}.{tasks_table_name} dep_tasks
260 JOIN {schema}.{tasks_table_name} parent ON parent.id = dep_tasks.parent
261 GROUP BY parent.id
262 ),
263 -- any tasks that is not a tasks_with_children
264 leaf_tasks AS (
265 SELECT *
266 FROM {schema}.{tasks_table_name} t
267 WHERE t.id NOT IN (SELECT id FROM tasks_with_children)
268 )
269 SELECT tasks.*
270 FROM (SELECT * FROM leaf_tasks UNION SELECT * FROM tasks_with_children) AS tasks
271 WHERE in_progress = false AND done = false AND error IS NULL);
272"
273 );
274
275 TaskTables {
276 tasks_table: TaskTableEntity {
277 schema: schema.clone(),
278 name: tasks_table_name,
279 definition: tasks_table_def,
280 entity_type: EntityType::Table,
281 },
282 tasks_notify: TaskTableEntity {
283 schema: schema.clone(),
284 name: tasks_notify_name,
285 definition: tasks_notify_fn_def,
286 entity_type: EntityType::Function,
287 },
288 tasks_notify_done: TaskTableEntity {
289 schema: schema.clone(),
290 name: tasks_notify_done_name,
291 definition: tasks_notify_done_fn_def,
292 entity_type: EntityType::Function,
293 },
294 tasks_ready: TaskTableEntity {
295 schema: schema.clone(),
296 name: tasks_ready_name,
297 definition: tasks_ready_view_def,
298 entity_type: EntityType::View,
299 },
300 schema,
301 tasks_queue_name,
302 }
303 }
304}
305
306#[derive(Debug, Clone)]
322pub struct TaskTables {
323 schema: String,
324 tasks_queue_name: String,
325 tasks_table: TaskTableEntity,
326 tasks_notify: TaskTableEntity,
327 tasks_notify_done: TaskTableEntity,
328 tasks_ready: TaskTableEntity,
329}
330
331impl Default for TaskTables {
332 fn default() -> Self {
333 TaskTableBuilder::new().build()
334 }
335}
336
337impl TaskTables {
338 pub async fn exists(&self, pool: &Pool<Postgres>) -> Result<bool> {
340 let mut tx = pool.begin().await?;
341
342 if !self.tasks_table.exists(&mut tx).await? {
343 return Ok(false);
344 };
345 if !self.tasks_notify.exists(&mut tx).await? {
346 return Ok(false);
347 };
348 if !self.tasks_notify_done.exists(&mut tx).await? {
349 return Ok(false);
350 };
351 if !self.tasks_ready.exists(&mut tx).await? {
352 return Ok(false);
353 };
354
355 Ok(true)
356 }
357
358 pub async fn create(&self, pool: &Pool<Postgres>) -> Result<()> {
362 let mut tx = pool.begin().await?;
363 let con = tx.acquire().await?;
364 self.tasks_table.create(&mut *con).await?;
365 self.tasks_notify.create(&mut *con).await?;
366 self.tasks_notify_done.create(&mut *con).await?;
367 self.tasks_ready.create(&mut *con).await?;
368 tx.commit().await?;
369
370 Ok(())
371 }
372
373 pub async fn drop(self, pool: &Pool<Postgres>) -> Result<()> {
376 let mut tx = pool.begin().await?;
377 self.tasks_ready.drop(&mut tx).await?;
378 self.tasks_notify.drop(&mut tx).await?;
379 self.tasks_notify_done.drop(&mut tx).await?;
380 self.tasks_table.drop(&mut tx).await?;
381 tx.commit().await?;
382
383 debug!("cleanup for task setup done");
384
385 Ok(())
386 }
387}