PGMQueue

Struct PGMQueue 

Source
pub struct PGMQueue {
    pub url: String,
    pub connection: Pool<Postgres>,
}
Expand description

Main controller for interacting with a queue.

Fields§

§url: String§connection: Pool<Postgres>

Implementations§

Source§

impl PGMQueue

Source

pub async fn new(url: String) -> Result<Self, PgmqError>

Source

pub async fn new_with_pool(pool: Pool<Postgres>) -> Self

BYOP - bring your own pool initialize a PGMQ connection with your own SQLx Postgres connection pool

Source

pub async fn create(&self, queue_name: &str) -> Result<(), PgmqError>

Create a queue. This sets up the queue’s tables, indexes, and metadata. It is idempotent, but does not check if the queue already exists. Amounts to IF NOT EXISTS statements in Postgres.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
   let my_queue = "my_queue";
   queue.create(my_queue).await?;
   Ok(())
}
Source

pub async fn create_unlogged(&self, queue_name: &str) -> Result<(), PgmqError>

Create an unlogged queue

Source

pub async fn destroy(&self, queue_name: &str) -> Result<(), PgmqError>

Destroy a queue. This deletes the queue’s tables, indexes, and metadata. Does not delete any data related to adjacent queues.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue_destroy".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");
    queue.destroy("my_queue_destroy").await.expect("Failed to destroy queue!");
    Ok(())
}
Source

pub async fn send<T: Serialize>( &self, queue_name: &str, message: &T, ) -> Result<i64, PgmqError>

Send a single message to a queue. Messages can be any implementor of the serde::Serialize trait. The message id, unique to the queue, is returned. Typically, the message sender does not consume the message id but may use it for logging and tracing purposes.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let struct_message = MyMessage {
        foo: "bar".to_owned(),
    };

    let struct_message_id: i64 = queue
       .send(&my_queue, &struct_message)
       .await
       .expect("Failed to enqueue message");
    println!("Struct Message id: {}", struct_message_id);

    let json_message = serde_json::json!({
        "foo": "bar"
    });
    let json_message_id: i64 = queue
        .send(&my_queue, &json_message)
        .await
        .expect("Failed to enqueue message");
    println!("Json Message id: {}", json_message_id);
    Ok(())
}
Source

pub async fn send_delay<T: Serialize>( &self, queue_name: &str, message: &T, delay: u64, ) -> Result<i64, PgmqError>

Send a single message to a queue with a delay. Specify your delay in seconds. Messages can be any implementor of the serde::Serialize trait. The message id, unique to the queue, is returned. Typically, the message sender does not consume the message id but may use it for logging and tracing purposes.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let struct_message = MyMessage {
        foo: "bar".to_owned(),
    };

    let struct_message_id: i64 = queue
       .send_delay(&my_queue, &struct_message, 15)
       .await
       .expect("Failed to enqueue message");
    println!("Struct Message id: {}", struct_message_id);

    let json_message = serde_json::json!({
        "foo": "bar"
    });
    let json_message_id: i64 = queue
        .send_delay(&my_queue, &json_message, 15)
        .await
        .expect("Failed to enqueue message");
    println!("Json Message id: {}", json_message_id);
    Ok(())
}
Source

pub async fn send_batch<T: Serialize>( &self, queue_name: &str, messages: &[T], ) -> Result<Vec<i64>, PgmqError>

Send multiple messages to a queue. Same as send(), messages can be any implementor of the serde::Serialize trait. A vector of message ids are returned in the call. These message ids are in the same order as the messages in the input vector.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");
   let struct_message_batch = vec![
       MyMessage {foo: "bar1".to_owned()},
       MyMessage {foo: "bar2".to_owned()},
       MyMessage {foo: "bar3".to_owned()},
   ];

   let struct_message_batch_ids = queue.send_batch(&my_queue, &struct_message_batch)
       .await
       .expect("Failed to enqueue messages");
    println!("Struct Message ids: {:?}", struct_message_batch_ids);
    Ok(())
}
Source

pub async fn send_batch_delay<T: Serialize>( &self, queue_name: &str, messages: &[T], delay: u64, ) -> Result<Vec<i64>, PgmqError>

Send multiple messages with a delay in seconds to a queue. Same as send_batch(), messages can be any implementor of the serde::Serialize trait. A vector of message ids are returned in the call. These message ids are in the same order as the messages in the input vector.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");
   let struct_message_batch = vec![
       MyMessage {foo: "bar1".to_owned()},
       MyMessage {foo: "bar2".to_owned()},
       MyMessage {foo: "bar3".to_owned()},
   ];

   let struct_message_batch_ids = queue.send_batch_delay(&my_queue, &struct_message_batch, 1000)
       .await
       .expect("Failed to enqueue messages");
    println!("Struct Message ids: {:?}", struct_message_batch_ids);
    Ok(())
}
Source

pub async fn read<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, vt: Option<i32>, ) -> Result<Option<Message<T>>, PgmqError>

Reads a single message from the queue. If the queue is empty or all messages are invisible, Option::None is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds.

Reading a message returns a Message struct. Typically, the application reading messages is most interested in the message body but will use the message id in order to either delete or archive the message when it is done processing it.

Refer to the Message struct for more details.

You can specify the message structure you are expecting to read from the queue by using the type parameters. Any implementor of the serde::Deserialize trait can be used. If you do not know the type of the message, it will default to serde_json::Value.

Example:

use pgmq::{Message, PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");
    let struct_message_batch = vec![
       MyMessage {foo: "bar1".to_owned()},
       MyMessage {foo: "bar2".to_owned()},
       MyMessage {foo: "bar3".to_owned()},
    ];

    let struct_message_batch_ids = queue.send_batch(&my_queue, &struct_message_batch)
       .await
       .expect("Failed to enqueue messages");
    println!("Struct Message ids: {:?}", struct_message_batch_ids);

    let visibility_timeout_seconds = 30;
    let known_message_structure: Message<MyMessage> = queue.read::<MyMessage>(&my_queue, Some(visibility_timeout_seconds))
        .await
        .unwrap()
        .expect("no messages in the queue!");
    println!("Received known : {known_message_structure:?}");

    let unknown_message_structure: Message = queue.read(&my_queue, Some(visibility_timeout_seconds))
        .await
        .unwrap()
        .expect("no messages in the queue!");
    println!("Received known : {unknown_message_structure:?}");
    Ok(())
}
Source

pub async fn read_batch<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, vt: Option<i32>, num_msgs: i32, ) -> Result<Option<Vec<Message<T>>>, PgmqError>

Reads a specified number of messages (num_msgs) from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds.

If the queue is empty or all messages are invisible,Option::None is returned. If there are messages, it is returned as a vector of Message structs (it will never be an empty vector).

Refer to the Message struct for more details.

You can specify the message structure you are expecting to read from the queue by using the type parameters. Any implementor of the serde::Deserialize trait can be used. If you do not know the type of the message, it will default to serde_json::Value.

Example:

use pgmq::{Message, PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let struct_message_batch = vec![
       MyMessage {foo: "bar1".to_owned()},
       MyMessage {foo: "bar2".to_owned()},
       MyMessage {foo: "bar3".to_owned()},
    ];

    let struct_message_batch_ids = queue.send_batch(&my_queue, &struct_message_batch)
       .await
       .expect("Failed to enqueue messages");
    println!("Struct Message ids: {struct_message_batch_ids:?}");

    let visibility_timeout_seconds = 30;
    let batch_size = 1;
    let batch: Vec<Message<MyMessage>> = queue.read_batch::<MyMessage>(&my_queue, Some(visibility_timeout_seconds), batch_size)
        .await
        .unwrap()
        .expect("no messages in the queue!");
    println!("Received a batch of messages: {batch:?}");

    let batch_size = 2;
    let unknown_message_structure: Message = queue.read(&my_queue, Some(visibility_timeout_seconds))
        .await
        .unwrap()
        .expect("no messages in the queue!");
    println!("Received known : {unknown_message_structure:?}");
    Ok(())
}
Source

pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, vt: Option<i32>, max_batch_size: i32, poll_timeout: Option<Duration>, poll_interval: Option<Duration>, ) -> Result<Option<Vec<Message<T>>>, PgmqError>

Similar to [read_batch], but allows waiting until a message is available

You can specify a maximum duration for polling (defaults to 5 seconds), and an interval between calls (defaults to 250ms). A lower interval implies higher maximum latency, but less load on the database.

Refer to the [read_batch] function for more details.

Source

pub async fn delete( &self, queue_name: &str, msg_id: i64, ) -> Result<u64, PgmqError>

Delete a message from the queue. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the archive method.

Deletes happen by message id, so you must have the message id to delete the message.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;

#[derive(Debug, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let struct_message = MyMessage {
        foo: "bar".to_owned(),
    };

    let message_id: i64 = queue
       .send(&my_queue, &struct_message)
       .await
       .expect("Failed to enqueue message");
    println!("Struct Message id: {message_id}");

    queue.delete(&my_queue, message_id).await.expect("failed to delete message");

    Ok(())
}
Source

pub async fn delete_batch( &self, queue_name: &str, msg_ids: &[i64], ) -> Result<u64, PgmqError>

Delete multiple messages from the queue. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the archive method.

Deletes happen by message id, so you must have the message id to delete the message.

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;


#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let msgs = vec![
        serde_json::json!({"foo": "bar1"}),
        serde_json::json!({"foo": "bar2"}),
        serde_json::json!({"foo": "bar3"}),
    ];
    let msg_ids = queue
       .send_batch(&my_queue, &msgs)
       .await
       .expect("Failed to enqueue messages");
    let del = queue
        .delete_batch(&my_queue, &msg_ids)
        .await
        .expect("Failed to delete messages from queue");

    Ok(())
}
Source

pub async fn purge(&self, queue_name: &str) -> Result<u64, PgmqError>

Source

pub async fn archive( &self, queue_name: &str, msg_id: i64, ) -> Result<u64, PgmqError>

Moves a message, by message id, from the queue table to archive table View messages on the archive table with sql:

SELECT * FROM pgmq_<queue_name>_archive;

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let message = serde_json::json!({"foo": "bar1"});

    let message_id: i64 = queue
       .send(&my_queue, &message)
       .await
       .expect("Failed to enqueue message");

    queue.archive(&my_queue, message_id).await.expect("failed to archive message");

    Ok(())
}
Source

pub async fn archive_batch( &self, queue_name: &str, msg_ids: &[i64], ) -> Result<u64, PgmqError>

Moves multiple messages, by message id, from the queue table to archive table View messages on the archive table with sql:

SELECT * FROM pgmq_<queue_name>_archive;

Example:

use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let msgs = vec![
        serde_json::json!({"foo": "bar1"}),
        serde_json::json!({"foo": "bar2"}),
        serde_json::json!({"foo": "bar3"}),
    ];

    let msg_ids: Vec<i64> = queue
       .send_batch(&my_queue, &msgs)
       .await
       .expect("Failed to enqueue messages");

    queue.archive_batch(&my_queue, &msg_ids).await.expect("failed to archive messages");

    Ok(())
}
Source

pub async fn pop<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, ) -> Result<Option<Message<T>>, PgmqError>

Reads single message from the queue and delete it at the same time. Similar to read and read_batch, if no messages are available, Option::None is returned. Unlike these methods, the visibility timeout does not apply. This is because the message is immediately deleted.

Example:

use pgmq::{Message, PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");
    let send_message = MyMessage {foo: "bar1".to_owned()};

    let struct_message_batch_ids = queue.send(&my_queue, &send_message)
       .await
       .expect("Failed to send message");

    let popped_message: Message<MyMessage> = queue.pop::<MyMessage>(&my_queue)
        .await
        .unwrap()
        .expect("no messages in the queue!");
    println!("Received popped message : {popped_message:?}");

    Ok(())
}
Source

pub async fn set_vt<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, msg_id: i64, vt: DateTime<Utc>, ) -> Result<Option<Message<T>>, PgmqError>

Set the visibility time for a single message. This is useful when you want change when a message becomes visible again (able to be read with .read() methods). For example, in task execution use cases or job scheduling.

Example:

use chrono::{Utc, DateTime, Duration};
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
   foo: String,
}

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");
    let my_queue = "my_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    let struct_message = MyMessage {
        foo: "bar".to_owned(),
    };

    let message_id: i64 = queue
       .send(&my_queue, &struct_message)
       .await
       .expect("Failed to enqueue message");
    println!("Struct Message id: {message_id}");

    let utc_24h_from_now = Utc::now() + Duration::hours(24);

    queue.set_vt::<MyMessage>(&my_queue, message_id, utc_24h_from_now).await.expect("failed to set vt");

    Ok(())
}

Trait Implementations§

Source§

impl Clone for PGMQueue

Source§

fn clone(&self) -> PGMQueue

Returns a duplicate of the value. Read more
1.0.0§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for PGMQueue

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

§

impl<T> Any for T
where T: 'static + ?Sized,

§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> Borrow<T> for T
where T: ?Sized,

§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
§

impl<T> BorrowMut<T> for T
where T: ?Sized,

§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> CloneToUninit for T
where T: Clone,

§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
§

impl<T> From<T> for T

§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
§

impl<T, U> Into<U> for T
where U: From<T>,

§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more