pg_taskq/
tables.rs

1use sqlx::{Acquire, PgConnection, Pool, Postgres, Result};
2
3/// Used to make the postgres table/view/function names pluggable. You'll
4/// typically want to use its implementor [`TaskTables`] that can be
5/// instantiated with a customizable schema and name prefix.
6pub trait TaskTableProvider: Send + Sync + 'static {
7    fn schema_name(&self) -> &str;
8    fn tasks_table(&self) -> &str;
9    fn tasks_ready_view(&self) -> &str;
10    fn tasks_notify_fn(&self) -> &str;
11    fn tasks_notify_done_fn(&self) -> &str;
12
13    fn tasks_queue_name(&self) -> &str;
14
15    fn tasks_queue_done_name(&self) -> String {
16        format!("{}_done", self.tasks_queue_name())
17    }
18
19    fn tasks_table_full_name(&self) -> String {
20        format!("{}.{}", self.schema_name(), self.tasks_table())
21    }
22    fn tasks_ready_view_full_name(&self) -> String {
23        format!("{}.{}", self.schema_name(), self.tasks_ready_view())
24    }
25    fn tasks_notify_fn_full_name(&self) -> String {
26        format!("{}.{}", self.schema_name(), self.tasks_notify_fn())
27    }
28    fn tasks_notify_done_fn_full_name(&self) -> String {
29        format!("{}.{}", self.schema_name(), self.tasks_notify_done_fn())
30    }
31}
32
33impl TaskTableProvider for TaskTables {
34    fn schema_name(&self) -> &str {
35        &self.schema
36    }
37
38    fn tasks_table(&self) -> &str {
39        &self.tasks_table.name
40    }
41
42    fn tasks_ready_view(&self) -> &str {
43        &self.tasks_ready.name
44    }
45
46    fn tasks_notify_fn(&self) -> &str {
47        &self.tasks_notify.name
48    }
49
50    fn tasks_notify_done_fn(&self) -> &str {
51        &self.tasks_notify_done.name
52    }
53
54    fn tasks_queue_name(&self) -> &str {
55        &self.tasks_queue_name
56    }
57}
58
59// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
60
61/// Helper used by [`TaskTables`] but create/delete/test existance of postgres
62/// entities.
63#[derive(Debug, Clone)]
64enum EntityType {
65    Function,
66    View,
67    Table,
68}
69
70#[derive(Debug, Clone)]
71pub struct TaskTableEntity {
72    schema: String,
73    name: String,
74    definition: String,
75    entity_type: EntityType,
76}
77
78impl TaskTableEntity {
79    async fn exists(&self, tx: &mut PgConnection) -> Result<bool> {
80        let Self {
81            schema,
82            name,
83            entity_type,
84            ..
85        } = self;
86
87        let query = match entity_type {
88            EntityType::Function => {
89                "
90SELECT count(*)
91FROM information_schema.routines
92WHERE routine_schema = $1 AND routine_name = $2
93"
94            }
95            EntityType::View | EntityType::Table => {
96                "
97SELECT count(*)
98FROM information_schema.tables
99WHERE table_schema = $1 AND table_name = $2
100"
101            }
102        };
103
104        let n = sqlx::query_scalar::<_, i64>(query)
105            .bind(schema)
106            .bind(name)
107            .fetch_one(tx)
108            .await?;
109
110        let exists = n > 0;
111
112        debug!("{schema}.{name} exists={exists}");
113
114        Ok(exists)
115    }
116
117    async fn create(&self, tx: &mut PgConnection) -> Result<()> {
118        if self.exists(&mut *tx).await? {
119            return Ok(());
120        }
121        sqlx::query(&self.definition).execute(tx).await?;
122        info!("{}.{} created", self.schema, self.name);
123        Ok(())
124    }
125
126    async fn drop(&self, tx: &mut PgConnection) -> Result<()> {
127        if !self.exists(tx).await? {
128            return Ok(());
129        }
130
131        let Self {
132            schema,
133            name,
134            entity_type,
135            ..
136        } = self;
137
138        let query = match entity_type {
139            EntityType::Function => {
140                format!("DROP FUNCTION {schema}.{name} CASCADE")
141            }
142            EntityType::View => {
143                format!("DROP VIEW {schema}.{name} CASCADE")
144            }
145            EntityType::Table => format!("DROP TABLE {schema}.{name} CASCADE"),
146        };
147
148        sqlx::query(&query).execute(tx).await?;
149
150        info!("{schema}.{name} dropped");
151
152        Ok(())
153    }
154}
155
156// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
157
158/// Use this to make [`TaskTables`].
159#[derive(Default)]
160pub struct TaskTableBuilder {
161    schema_name: Option<String>,
162    base_name: Option<String>,
163}
164
165impl TaskTableBuilder {
166    pub fn new() -> Self {
167        Self::default()
168    }
169
170    /// The postgres schema name to install the task queue table into.
171    #[must_use]
172    pub fn schema_name(mut self, schema_name: impl ToString) -> Self {
173        self.schema_name = Some(schema_name.to_string());
174        self
175    }
176
177    /// Prefix name used for the postgres entities. Useful if you want to install
178    /// multiple independent task queues into the same schema.
179    #[must_use]
180    pub fn base_name(mut self, base_name: impl ToString) -> Self {
181        self.base_name = Some(base_name.to_string());
182        self
183    }
184
185    pub fn build(self) -> TaskTables {
186        let schema = self.schema_name.unwrap_or_else(|| "public".to_string());
187        let base_name = self.base_name.unwrap_or_else(|| "tasks".to_string());
188
189        let fullname = |name: Option<&str>| -> String {
190            if let Some(name) = name {
191                format!("{}_{}", base_name, name)
192            } else {
193                base_name.to_string()
194            }
195        };
196
197        let tasks_table_name = fullname(None);
198        let tasks_notify_name = fullname(Some("notify"));
199        let tasks_notify_done_name = fullname(Some("notify_done"));
200        let tasks_ready_name = fullname(Some("ready"));
201        let tasks_queue_name: String = fullname(Some("queue"));
202
203        let tasks_table_def = format!(
204            "
205CREATE TABLE {schema}.{tasks_table_name} (
206    id UUID PRIMARY KEY,
207    parent UUID REFERENCES {schema}.{tasks_table_name}(id) ON DELETE CASCADE,
208    created_at TIMESTAMPTZ DEFAULT NOW(),
209    updated_at TIMESTAMPTZ DEFAULT NOW(),
210    task_type TEXT NOT NULL,
211    request JSONB DEFAULT NULL,
212    result JSONB DEFAULT NULL,
213    error JSONB DEFAULT NULL,
214    in_progress BOOLEAN DEFAULT FALSE,
215    done BOOLEAN DEFAULT FALSE
216);
217"
218        );
219
220        let tasks_notify_fn_def = format!(
221            "
222CREATE FUNCTION {schema}.{tasks_notify_name}(task_id uuid)
223RETURNS VOID AS $$
224BEGIN
225PERFORM pg_notify('{tasks_queue_name}', task_id::text);
226END;
227$$ LANGUAGE plpgsql;
228"
229        );
230
231        // TODO get rid of this? unused currently...
232        let tasks_notify_done_fn_def = format!(
233            "
234CREATE FUNCTION {schema}.{tasks_notify_done_name}(task_id uuid)
235RETURNS VOID AS $$
236BEGIN
237PERFORM pg_notify('{tasks_queue_name}_done', task_id::text);
238END;
239$$ LANGUAGE plpgsql;
240"
241        );
242
243        let tasks_ready_view_def = format!(
244            "
245CREATE VIEW {schema}.{tasks_ready_name} AS (
246  -- Select tasks with children. The in_progress field will be false if the task
247  -- itself has in_progress set to false and all children tasks are done.
248  WITH tasks_with_children as (
249    SELECT parent.id,
250           parent.parent,
251           parent.created_at,
252           parent.updated_at,
253           parent.task_type,
254           parent.request,
255           parent.result,
256           parent.error,
257           (parent.in_progress OR bool_or(not dep_tasks.done)) AS in_progress,
258           parent.done
259    FROM {schema}.{tasks_table_name} dep_tasks
260    JOIN {schema}.{tasks_table_name} parent ON parent.id = dep_tasks.parent
261    GROUP BY parent.id
262  ),
263  -- any tasks that is not a tasks_with_children
264  leaf_tasks AS (
265    SELECT *
266    FROM {schema}.{tasks_table_name} t
267    WHERE t.id NOT IN (SELECT id FROM tasks_with_children)
268  )
269  SELECT tasks.*
270  FROM (SELECT * FROM leaf_tasks UNION SELECT * FROM tasks_with_children) AS tasks
271  WHERE in_progress = false AND done = false AND error IS NULL);
272"
273        );
274
275        TaskTables {
276            tasks_table: TaskTableEntity {
277                schema: schema.clone(),
278                name: tasks_table_name,
279                definition: tasks_table_def,
280                entity_type: EntityType::Table,
281            },
282            tasks_notify: TaskTableEntity {
283                schema: schema.clone(),
284                name: tasks_notify_name,
285                definition: tasks_notify_fn_def,
286                entity_type: EntityType::Function,
287            },
288            tasks_notify_done: TaskTableEntity {
289                schema: schema.clone(),
290                name: tasks_notify_done_name,
291                definition: tasks_notify_done_fn_def,
292                entity_type: EntityType::Function,
293            },
294            tasks_ready: TaskTableEntity {
295                schema: schema.clone(),
296                name: tasks_ready_name,
297                definition: tasks_ready_view_def,
298                entity_type: EntityType::View,
299            },
300            schema,
301            tasks_queue_name,
302        }
303    }
304}
305
306// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
307
308/// [`TaskTables`] is used for creating the necessary tables/views/functions in
309/// a postgres database. It implements [`TaskTableProvider`] and can be passed
310/// to [`crate::Task`] and [`crate::Worker`] that use it for finding out what
311/// postgres entities to use.
312///
313/// Use [`TaskTableBuilder`] to create it or use `TaskTables::default()` if you
314/// are OK with default set of postgres entities.
315///
316/// That default will create:
317/// - table `public.tasks`
318/// - view `public.tasks_ready`
319/// - function `public.notify`
320/// - function `public.notify_done`
321#[derive(Debug, Clone)]
322pub struct TaskTables {
323    schema: String,
324    tasks_queue_name: String,
325    tasks_table: TaskTableEntity,
326    tasks_notify: TaskTableEntity,
327    tasks_notify_done: TaskTableEntity,
328    tasks_ready: TaskTableEntity,
329}
330
331impl Default for TaskTables {
332    fn default() -> Self {
333        TaskTableBuilder::new().build()
334    }
335}
336
337impl TaskTables {
338    /// Check if the necessary postgres entities have been installed already.
339    pub async fn exists(&self, pool: &Pool<Postgres>) -> Result<bool> {
340        let mut tx = pool.begin().await?;
341
342        if !self.tasks_table.exists(&mut tx).await? {
343            return Ok(false);
344        };
345        if !self.tasks_notify.exists(&mut tx).await? {
346            return Ok(false);
347        };
348        if !self.tasks_notify_done.exists(&mut tx).await? {
349            return Ok(false);
350        };
351        if !self.tasks_ready.exists(&mut tx).await? {
352            return Ok(false);
353        };
354
355        Ok(true)
356    }
357
358    /// Create the necessary postgres entities for the task queue in the
359    /// connected postgres database. This function is idempotent and it will not
360    /// error if the entities already exist.
361    pub async fn create(&self, pool: &Pool<Postgres>) -> Result<()> {
362        let mut tx = pool.begin().await?;
363        let con = tx.acquire().await?;
364        self.tasks_table.create(&mut *con).await?;
365        self.tasks_notify.create(&mut *con).await?;
366        self.tasks_notify_done.create(&mut *con).await?;
367        self.tasks_ready.create(&mut *con).await?;
368        tx.commit().await?;
369
370        Ok(())
371    }
372
373    /// Delete all postgres entities used by the task queue. This function is
374    /// idempotent and will not error if the entities don't exist.
375    pub async fn drop(self, pool: &Pool<Postgres>) -> Result<()> {
376        let mut tx = pool.begin().await?;
377        self.tasks_ready.drop(&mut tx).await?;
378        self.tasks_notify.drop(&mut tx).await?;
379        self.tasks_notify_done.drop(&mut tx).await?;
380        self.tasks_table.drop(&mut tx).await?;
381        tx.commit().await?;
382
383        debug!("cleanup for task setup done");
384
385        Ok(())
386    }
387}