pgmq_core/
query.rs

1//! Query constructors
2
3use crate::{
4    errors::PgmqError,
5    types::{ARCHIVE_PREFIX, PGMQ_SCHEMA, QUEUE_PREFIX},
6    util::check_input,
7    util::CheckedName,
8};
9
10use sqlx::types::chrono::Utc;
11
12pub fn init_queue(name: &str, is_unlogged: bool) -> Result<Vec<String>, PgmqError> {
13    let name = CheckedName::new(name)?;
14    Ok(vec![
15        create_queue(name, is_unlogged)?,
16        assign_queue(name)?,
17        create_index(name)?,
18        create_archive(name)?,
19        assign_archive(name)?,
20        create_archive_index(name)?,
21        insert_meta(name, false, is_unlogged)?,
22    ])
23}
24
25pub fn destroy_queue(name: &str) -> Result<Vec<String>, PgmqError> {
26    let name = CheckedName::new(name)?;
27    Ok(vec![
28        unassign_queue(name)?,
29        unassign_archive(name)?,
30        drop_queue(name)?,
31        drop_queue_archive(name)?,
32        delete_queue_metadata(name)?,
33    ])
34}
35
36pub fn create_queue(name: CheckedName<'_>, is_unlogged: bool) -> Result<String, PgmqError> {
37    let maybe_unlogged = if is_unlogged { "UNLOGGED" } else { "" };
38
39    Ok(format!(
40        "
41        CREATE {maybe_unlogged} TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (
42            msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
43            read_ct INT DEFAULT 0 NOT NULL,
44            enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
45            vt TIMESTAMP WITH TIME ZONE NOT NULL,
46            message JSONB
47        );
48        "
49    ))
50}
51
52pub fn create_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
53    Ok(format!(
54        "
55        CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (
56            msg_id BIGINT PRIMARY KEY,
57            read_ct INT DEFAULT 0 NOT NULL,
58            enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
59            archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
60            vt TIMESTAMP WITH TIME ZONE NOT NULL,
61            message JSONB
62        );
63        "
64    ))
65}
66
67pub fn create_schema() -> String {
68    format!("CREATE SCHEMA IF NOT EXISTS {PGMQ_SCHEMA}")
69}
70
71pub fn create_meta() -> String {
72    format!(
73        "
74        CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.meta (
75            queue_name VARCHAR UNIQUE NOT NULL,
76            is_partitioned BOOLEAN NOT NULL,
77            is_unlogged BOOLEAN NOT NULL,
78            created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
79        );
80        "
81    )
82}
83
84pub fn drop_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
85    Ok(format!(
86        "
87        DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name};
88        "
89    ))
90}
91
92pub fn delete_queue_metadata(name: CheckedName<'_>) -> Result<String, PgmqError> {
93    Ok(format!(
94        "
95        DO $$
96        BEGIN
97           IF EXISTS (
98                SELECT 1
99                FROM information_schema.tables
100                WHERE table_name = 'meta' and table_schema = 'pgmq')
101            THEN
102              DELETE
103              FROM {PGMQ_SCHEMA}.meta
104              WHERE queue_name = '{name}';
105           END IF;
106        END $$;
107        "
108    ))
109}
110
111pub fn drop_queue_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
112    Ok(format!(
113        "
114        DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name};
115        "
116    ))
117}
118
119pub fn insert_meta(
120    name: CheckedName<'_>,
121    is_partitioned: bool,
122    is_unlogged: bool,
123) -> Result<String, PgmqError> {
124    Ok(format!(
125        "
126        INSERT INTO {PGMQ_SCHEMA}.meta (queue_name, is_partitioned, is_unlogged)
127        VALUES ('{name}', {is_partitioned}, {is_unlogged})
128        ON CONFLICT
129        DO NOTHING;
130        ",
131    ))
132}
133
134pub fn create_archive_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
135    Ok(format!(
136        "
137        CREATE INDEX IF NOT EXISTS archived_at_idx_{name} ON {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (archived_at);
138        "
139    ))
140}
141
142// indexes are created ascending to support FIFO
143pub fn create_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
144    Ok(format!(
145        "
146        CREATE INDEX IF NOT EXISTS {QUEUE_PREFIX}_{name}_vt_idx ON {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt ASC);
147        "
148    ))
149}
150
151pub fn purge_queue(name: &str) -> Result<String, PgmqError> {
152    check_input(name)?;
153    Ok(format!("DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name};"))
154}
155
156pub fn enqueue(name: &str, messages_num: usize, delay: &u64) -> Result<String, PgmqError> {
157    // construct string of comma separated messages
158    check_input(name)?;
159    let mut values = "".to_owned();
160
161    for i in 0..messages_num {
162        let full_msg = format!("((now() + interval '{delay} seconds'), ${}::json),", i + 1);
163        values.push_str(&full_msg);
164    }
165    // drop trailing comma from constructed string
166    values.pop();
167    Ok(format!(
168        "
169        INSERT INTO {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt, message)
170        VALUES {values}
171        RETURNING msg_id;
172        "
173    ))
174}
175
176pub fn read(name: &str, vt: i32, limit: i32) -> Result<String, PgmqError> {
177    check_input(name)?;
178    Ok(format!(
179        "
180    WITH cte AS
181        (
182            SELECT msg_id
183            FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
184            WHERE vt <= clock_timestamp()
185            ORDER BY msg_id ASC
186            LIMIT {limit}
187            FOR UPDATE SKIP LOCKED
188        )
189    UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} t
190    SET
191        vt = clock_timestamp() + interval '{vt} seconds',
192        read_ct = read_ct + 1
193    FROM cte
194    WHERE t.msg_id=cte.msg_id
195    RETURNING *;
196    "
197    ))
198}
199
200pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime<Utc>) -> Result<String, PgmqError> {
201    check_input(name)?;
202    Ok(format!(
203        "
204        UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
205        SET vt = '{t}'::timestamp
206        WHERE msg_id = {msg_id}
207        RETURNING *;
208        ",
209        t = vt.format("%Y-%m-%d %H:%M:%S%.3f %z")
210    ))
211}
212
213pub fn delete_batch(name: &str) -> Result<String, PgmqError> {
214    // construct string of comma separated msg_id
215    check_input(name)?;
216
217    Ok(format!(
218        "
219        DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
220        WHERE msg_id = ANY($1)
221        RETURNING msg_id;
222        "
223    ))
224}
225
226pub fn archive_batch(name: &str) -> Result<String, PgmqError> {
227    check_input(name)?;
228
229    Ok(format!(
230        "
231        WITH archived AS (
232            DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
233            WHERE msg_id = ANY($1)
234            RETURNING msg_id, vt, read_ct, enqueued_at, message
235        )
236        INSERT INTO {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (msg_id, vt, read_ct, enqueued_at, message)
237        SELECT msg_id, vt, read_ct, enqueued_at, message
238        FROM archived
239        RETURNING msg_id;
240        "
241    ))
242}
243
244pub fn pop(name: &str) -> Result<String, PgmqError> {
245    check_input(name)?;
246    Ok(format!(
247        "
248        WITH cte AS
249            (
250                SELECT msg_id
251                FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
252                WHERE vt <= now()
253                ORDER BY msg_id ASC
254                LIMIT 1
255                FOR UPDATE SKIP LOCKED
256            )
257        DELETE from {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
258        WHERE msg_id = (select msg_id from cte)
259        RETURNING *;
260        "
261    ))
262}
263
264pub fn assign_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
265    Ok(assign(&format!("{QUEUE_PREFIX}_{name}")))
266}
267
268pub fn assign_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
269    Ok(assign(&format!("{ARCHIVE_PREFIX}_{name}")))
270}
271
272pub fn unassign_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
273    Ok(format!(
274        "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}; "
275    ))
276}
277
278pub fn unassign_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
279    Ok(format!(
280        "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name}; "
281    ))
282}
283
284// assign a table to pgmq extension, only if its not already assigned
285pub fn assign(table_name: &str) -> String {
286    format!(
287        "
288    DO $$
289        BEGIN
290        -- Check if the table is not yet associated with the extension
291        IF NOT EXISTS (
292            SELECT 1
293            FROM pg_depend
294            WHERE refobjid = (SELECT oid FROM pg_extension WHERE extname = 'pgmq')
295            AND objid = (
296                SELECT oid
297                FROM pg_class
298                WHERE relname = '{table_name}'
299            )
300        ) THEN
301            EXECUTE 'ALTER EXTENSION pgmq ADD TABLE {PGMQ_SCHEMA}.{table_name}';
302        END IF;
303    END $$;
304    "
305    )
306}
307
308fn grant_stmt(table: &str) -> String {
309    let grant_seq = match &table.contains("meta") {
310        true => "".to_string(),
311        false => {
312            format!("\n    EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';")
313        }
314    };
315    format!(
316        "
317DO $$
318BEGIN
319  IF NOT EXISTS (
320    SELECT 1
321    WHERE has_table_privilege('pg_monitor', '{table}', 'SELECT')
322  ) THEN
323    EXECUTE 'GRANT SELECT ON {table} TO pg_monitor';{grant_seq}
324  END IF;
325END;
326$$ LANGUAGE plpgsql;
327"
328    )
329}
330
331// pg_monitor needs to query queue metadata
332pub fn grant_pgmon_meta() -> String {
333    let table = format!("{PGMQ_SCHEMA}.meta");
334    grant_stmt(&table)
335}
336
337// pg_monitor needs to query queue tables
338pub fn grant_pgmon_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
339    let table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}");
340    Ok(grant_stmt(&table))
341}
342
343pub fn grant_pgmon_queue_seq(name: CheckedName<'_>) -> Result<String, PgmqError> {
344    let table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}_msg_id_seq");
345    Ok(grant_stmt(&table))
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[test]
353    fn test_grant() {
354        let q = grant_stmt("my_table");
355        let expected = "
356DO $$
357BEGIN
358  IF NOT EXISTS (
359    SELECT 1
360    WHERE has_table_privilege('pg_monitor', 'my_table', 'SELECT')
361  ) THEN
362    EXECUTE 'GRANT SELECT ON my_table TO pg_monitor';
363    EXECUTE 'GRANT SELECT ON SEQUENCE my_table_msg_id_seq TO pg_monitor';
364  END IF;
365END;
366$$ LANGUAGE plpgsql;
367";
368        assert_eq!(q, expected);
369
370        let q = grant_stmt("meta");
371        let expected = "
372DO $$
373BEGIN
374  IF NOT EXISTS (
375    SELECT 1
376    WHERE has_table_privilege('pg_monitor', 'meta', 'SELECT')
377  ) THEN
378    EXECUTE 'GRANT SELECT ON meta TO pg_monitor';
379  END IF;
380END;
381$$ LANGUAGE plpgsql;
382";
383        assert_eq!(q, expected)
384    }
385
386    #[test]
387    fn test_assign() {
388        let query = assign("a_my_queue_archive");
389        assert!(query.contains("WHERE relname = 'a_my_queue_archive'"));
390    }
391
392    #[test]
393    fn test_create() {
394        let queue_name = CheckedName::new("yolo").unwrap();
395        let query = create_queue(queue_name, false);
396        assert!(query.unwrap().contains("q_yolo"));
397    }
398
399    #[test]
400    fn create_unlogged() {
401        let queue_name = CheckedName::new("yolo").unwrap();
402        let query = create_queue(queue_name, true);
403        assert!(query.unwrap().contains("CREATE UNLOGGED TABLE"));
404    }
405
406    #[test]
407    fn test_enqueue() {
408        let query = enqueue("yolo", 1, &0).unwrap();
409        assert!(query.contains("q_yolo"));
410        assert!(query.contains("(now() + interval '0 seconds'), $1::json)"));
411    }
412
413    #[test]
414    fn test_read() {
415        let qname = "myqueue";
416        let vt: i32 = 20;
417        let limit: i32 = 1;
418
419        let query = read(&qname, vt, limit).unwrap();
420
421        assert!(query.contains(&qname));
422        assert!(query.contains(&vt.to_string()));
423    }
424
425    #[test]
426    fn check_input_rejects_names_too_large() {
427        let table_name = "my_valid_table_name";
428        assert!(check_input(table_name).is_ok());
429        assert!(check_input(&"a".repeat(47)).is_ok());
430        assert!(check_input(&"a".repeat(48)).is_err());
431        assert!(check_input(&"a".repeat(70)).is_err());
432    }
433
434    #[test]
435    fn test_check_input() {
436        let invalids = vec!["bad;queue_name", "bad name", "bad--name"];
437        for i in invalids.iter() {
438            let is_valid = check_input(i);
439            assert!(is_valid.is_err())
440        }
441        let valids = vec!["good_queue", "greatqueue", "my_great_queue"];
442        for i in valids.iter() {
443            let is_valid = check_input(i);
444            assert!(is_valid.is_ok())
445        }
446    }
447}