1use crate::{
4 errors::PgmqError,
5 types::{ARCHIVE_PREFIX, PGMQ_SCHEMA, QUEUE_PREFIX},
6 util::check_input,
7 util::CheckedName,
8};
9
10use sqlx::types::chrono::Utc;
11
12pub fn init_queue(name: &str, is_unlogged: bool) -> Result<Vec<String>, PgmqError> {
13 let name = CheckedName::new(name)?;
14 Ok(vec![
15 create_queue(name, is_unlogged)?,
16 assign_queue(name)?,
17 create_index(name)?,
18 create_archive(name)?,
19 assign_archive(name)?,
20 create_archive_index(name)?,
21 insert_meta(name, false, is_unlogged)?,
22 ])
23}
24
25pub fn destroy_queue(name: &str) -> Result<Vec<String>, PgmqError> {
26 let name = CheckedName::new(name)?;
27 Ok(vec![
28 unassign_queue(name)?,
29 unassign_archive(name)?,
30 drop_queue(name)?,
31 drop_queue_archive(name)?,
32 delete_queue_metadata(name)?,
33 ])
34}
35
36pub fn create_queue(name: CheckedName<'_>, is_unlogged: bool) -> Result<String, PgmqError> {
37 let maybe_unlogged = if is_unlogged { "UNLOGGED" } else { "" };
38
39 Ok(format!(
40 "
41 CREATE {maybe_unlogged} TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (
42 msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
43 read_ct INT DEFAULT 0 NOT NULL,
44 enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
45 vt TIMESTAMP WITH TIME ZONE NOT NULL,
46 message JSONB
47 );
48 "
49 ))
50}
51
52pub fn create_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
53 Ok(format!(
54 "
55 CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (
56 msg_id BIGINT PRIMARY KEY,
57 read_ct INT DEFAULT 0 NOT NULL,
58 enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
59 archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
60 vt TIMESTAMP WITH TIME ZONE NOT NULL,
61 message JSONB
62 );
63 "
64 ))
65}
66
67pub fn create_schema() -> String {
68 format!("CREATE SCHEMA IF NOT EXISTS {PGMQ_SCHEMA}")
69}
70
71pub fn create_meta() -> String {
72 format!(
73 "
74 CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.meta (
75 queue_name VARCHAR UNIQUE NOT NULL,
76 is_partitioned BOOLEAN NOT NULL,
77 is_unlogged BOOLEAN NOT NULL,
78 created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
79 );
80 "
81 )
82}
83
84pub fn drop_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
85 Ok(format!(
86 "
87 DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name};
88 "
89 ))
90}
91
92pub fn delete_queue_metadata(name: CheckedName<'_>) -> Result<String, PgmqError> {
93 Ok(format!(
94 "
95 DO $$
96 BEGIN
97 IF EXISTS (
98 SELECT 1
99 FROM information_schema.tables
100 WHERE table_name = 'meta' and table_schema = 'pgmq')
101 THEN
102 DELETE
103 FROM {PGMQ_SCHEMA}.meta
104 WHERE queue_name = '{name}';
105 END IF;
106 END $$;
107 "
108 ))
109}
110
111pub fn drop_queue_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
112 Ok(format!(
113 "
114 DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name};
115 "
116 ))
117}
118
119pub fn insert_meta(
120 name: CheckedName<'_>,
121 is_partitioned: bool,
122 is_unlogged: bool,
123) -> Result<String, PgmqError> {
124 Ok(format!(
125 "
126 INSERT INTO {PGMQ_SCHEMA}.meta (queue_name, is_partitioned, is_unlogged)
127 VALUES ('{name}', {is_partitioned}, {is_unlogged})
128 ON CONFLICT
129 DO NOTHING;
130 ",
131 ))
132}
133
134pub fn create_archive_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
135 Ok(format!(
136 "
137 CREATE INDEX IF NOT EXISTS archived_at_idx_{name} ON {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (archived_at);
138 "
139 ))
140}
141
142pub fn create_index(name: CheckedName<'_>) -> Result<String, PgmqError> {
144 Ok(format!(
145 "
146 CREATE INDEX IF NOT EXISTS {QUEUE_PREFIX}_{name}_vt_idx ON {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt ASC);
147 "
148 ))
149}
150
151pub fn purge_queue(name: &str) -> Result<String, PgmqError> {
152 check_input(name)?;
153 Ok(format!("DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name};"))
154}
155
156pub fn enqueue(name: &str, messages_num: usize, delay: &u64) -> Result<String, PgmqError> {
157 check_input(name)?;
159 let mut values = "".to_owned();
160
161 for i in 0..messages_num {
162 let full_msg = format!("((now() + interval '{delay} seconds'), ${}::json),", i + 1);
163 values.push_str(&full_msg);
164 }
165 values.pop();
167 Ok(format!(
168 "
169 INSERT INTO {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt, message)
170 VALUES {values}
171 RETURNING msg_id;
172 "
173 ))
174}
175
176pub fn read(name: &str, vt: i32, limit: i32) -> Result<String, PgmqError> {
177 check_input(name)?;
178 Ok(format!(
179 "
180 WITH cte AS
181 (
182 SELECT msg_id
183 FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
184 WHERE vt <= clock_timestamp()
185 ORDER BY msg_id ASC
186 LIMIT {limit}
187 FOR UPDATE SKIP LOCKED
188 )
189 UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} t
190 SET
191 vt = clock_timestamp() + interval '{vt} seconds',
192 read_ct = read_ct + 1
193 FROM cte
194 WHERE t.msg_id=cte.msg_id
195 RETURNING *;
196 "
197 ))
198}
199
200pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime<Utc>) -> Result<String, PgmqError> {
201 check_input(name)?;
202 Ok(format!(
203 "
204 UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
205 SET vt = '{t}'::timestamp
206 WHERE msg_id = {msg_id}
207 RETURNING *;
208 ",
209 t = vt.format("%Y-%m-%d %H:%M:%S%.3f %z")
210 ))
211}
212
213pub fn delete_batch(name: &str) -> Result<String, PgmqError> {
214 check_input(name)?;
216
217 Ok(format!(
218 "
219 DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
220 WHERE msg_id = ANY($1)
221 RETURNING msg_id;
222 "
223 ))
224}
225
226pub fn archive_batch(name: &str) -> Result<String, PgmqError> {
227 check_input(name)?;
228
229 Ok(format!(
230 "
231 WITH archived AS (
232 DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
233 WHERE msg_id = ANY($1)
234 RETURNING msg_id, vt, read_ct, enqueued_at, message
235 )
236 INSERT INTO {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (msg_id, vt, read_ct, enqueued_at, message)
237 SELECT msg_id, vt, read_ct, enqueued_at, message
238 FROM archived
239 RETURNING msg_id;
240 "
241 ))
242}
243
244pub fn pop(name: &str) -> Result<String, PgmqError> {
245 check_input(name)?;
246 Ok(format!(
247 "
248 WITH cte AS
249 (
250 SELECT msg_id
251 FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
252 WHERE vt <= now()
253 ORDER BY msg_id ASC
254 LIMIT 1
255 FOR UPDATE SKIP LOCKED
256 )
257 DELETE from {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
258 WHERE msg_id = (select msg_id from cte)
259 RETURNING *;
260 "
261 ))
262}
263
264pub fn assign_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
265 Ok(assign(&format!("{QUEUE_PREFIX}_{name}")))
266}
267
268pub fn assign_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
269 Ok(assign(&format!("{ARCHIVE_PREFIX}_{name}")))
270}
271
272pub fn unassign_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
273 Ok(format!(
274 "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}; "
275 ))
276}
277
278pub fn unassign_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
279 Ok(format!(
280 "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name}; "
281 ))
282}
283
284pub fn assign(table_name: &str) -> String {
286 format!(
287 "
288 DO $$
289 BEGIN
290 -- Check if the table is not yet associated with the extension
291 IF NOT EXISTS (
292 SELECT 1
293 FROM pg_depend
294 WHERE refobjid = (SELECT oid FROM pg_extension WHERE extname = 'pgmq')
295 AND objid = (
296 SELECT oid
297 FROM pg_class
298 WHERE relname = '{table_name}'
299 )
300 ) THEN
301 EXECUTE 'ALTER EXTENSION pgmq ADD TABLE {PGMQ_SCHEMA}.{table_name}';
302 END IF;
303 END $$;
304 "
305 )
306}
307
308fn grant_stmt(table: &str) -> String {
309 let grant_seq = match &table.contains("meta") {
310 true => "".to_string(),
311 false => {
312 format!("\n EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';")
313 }
314 };
315 format!(
316 "
317DO $$
318BEGIN
319 IF NOT EXISTS (
320 SELECT 1
321 WHERE has_table_privilege('pg_monitor', '{table}', 'SELECT')
322 ) THEN
323 EXECUTE 'GRANT SELECT ON {table} TO pg_monitor';{grant_seq}
324 END IF;
325END;
326$$ LANGUAGE plpgsql;
327"
328 )
329}
330
331pub fn grant_pgmon_meta() -> String {
333 let table = format!("{PGMQ_SCHEMA}.meta");
334 grant_stmt(&table)
335}
336
337pub fn grant_pgmon_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
339 let table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}");
340 Ok(grant_stmt(&table))
341}
342
343pub fn grant_pgmon_queue_seq(name: CheckedName<'_>) -> Result<String, PgmqError> {
344 let table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}_msg_id_seq");
345 Ok(grant_stmt(&table))
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[test]
353 fn test_grant() {
354 let q = grant_stmt("my_table");
355 let expected = "
356DO $$
357BEGIN
358 IF NOT EXISTS (
359 SELECT 1
360 WHERE has_table_privilege('pg_monitor', 'my_table', 'SELECT')
361 ) THEN
362 EXECUTE 'GRANT SELECT ON my_table TO pg_monitor';
363 EXECUTE 'GRANT SELECT ON SEQUENCE my_table_msg_id_seq TO pg_monitor';
364 END IF;
365END;
366$$ LANGUAGE plpgsql;
367";
368 assert_eq!(q, expected);
369
370 let q = grant_stmt("meta");
371 let expected = "
372DO $$
373BEGIN
374 IF NOT EXISTS (
375 SELECT 1
376 WHERE has_table_privilege('pg_monitor', 'meta', 'SELECT')
377 ) THEN
378 EXECUTE 'GRANT SELECT ON meta TO pg_monitor';
379 END IF;
380END;
381$$ LANGUAGE plpgsql;
382";
383 assert_eq!(q, expected)
384 }
385
386 #[test]
387 fn test_assign() {
388 let query = assign("a_my_queue_archive");
389 assert!(query.contains("WHERE relname = 'a_my_queue_archive'"));
390 }
391
392 #[test]
393 fn test_create() {
394 let queue_name = CheckedName::new("yolo").unwrap();
395 let query = create_queue(queue_name, false);
396 assert!(query.unwrap().contains("q_yolo"));
397 }
398
399 #[test]
400 fn create_unlogged() {
401 let queue_name = CheckedName::new("yolo").unwrap();
402 let query = create_queue(queue_name, true);
403 assert!(query.unwrap().contains("CREATE UNLOGGED TABLE"));
404 }
405
406 #[test]
407 fn test_enqueue() {
408 let query = enqueue("yolo", 1, &0).unwrap();
409 assert!(query.contains("q_yolo"));
410 assert!(query.contains("(now() + interval '0 seconds'), $1::json)"));
411 }
412
413 #[test]
414 fn test_read() {
415 let qname = "myqueue";
416 let vt: i32 = 20;
417 let limit: i32 = 1;
418
419 let query = read(&qname, vt, limit).unwrap();
420
421 assert!(query.contains(&qname));
422 assert!(query.contains(&vt.to_string()));
423 }
424
425 #[test]
426 fn check_input_rejects_names_too_large() {
427 let table_name = "my_valid_table_name";
428 assert!(check_input(table_name).is_ok());
429 assert!(check_input(&"a".repeat(47)).is_ok());
430 assert!(check_input(&"a".repeat(48)).is_err());
431 assert!(check_input(&"a".repeat(70)).is_err());
432 }
433
434 #[test]
435 fn test_check_input() {
436 let invalids = vec!["bad;queue_name", "bad name", "bad--name"];
437 for i in invalids.iter() {
438 let is_valid = check_input(i);
439 assert!(is_valid.is_err())
440 }
441 let valids = vec!["good_queue", "greatqueue", "my_great_queue"];
442 for i in valids.iter() {
443 let is_valid = check_input(i);
444 assert!(is_valid.is_ok())
445 }
446 }
447}