pub struct PGMQueue {
pub url: String,
pub connection: Pool<Postgres>,
}Expand description
Main controller for interacting with a queue.
Fields§
§url: String§connection: Pool<Postgres>Implementations§
Source§impl PGMQueue
impl PGMQueue
pub async fn new(url: String) -> Result<Self, PgmqError>
Sourcepub async fn new_with_pool(pool: Pool<Postgres>) -> Self
pub async fn new_with_pool(pool: Pool<Postgres>) -> Self
BYOP - bring your own pool initialize a PGMQ connection with your own SQLx Postgres connection pool
Sourcepub async fn create(&self, queue_name: &str) -> Result<(), PgmqError>
pub async fn create(&self, queue_name: &str) -> Result<(), PgmqError>
Create a queue. This sets up the queue’s tables, indexes, and metadata.
It is idempotent, but does not check if the queue already exists.
Amounts to IF NOT EXISTS statements in Postgres.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue";
queue.create(my_queue).await?;
Ok(())
}Sourcepub async fn create_unlogged(&self, queue_name: &str) -> Result<(), PgmqError>
pub async fn create_unlogged(&self, queue_name: &str) -> Result<(), PgmqError>
Create an unlogged queue
Sourcepub async fn destroy(&self, queue_name: &str) -> Result<(), PgmqError>
pub async fn destroy(&self, queue_name: &str) -> Result<(), PgmqError>
Destroy a queue. This deletes the queue’s tables, indexes, and metadata. Does not delete any data related to adjacent queues.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue_destroy".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
queue.destroy("my_queue_destroy").await.expect("Failed to destroy queue!");
Ok(())
}Sourcepub async fn send<T: Serialize>(
&self,
queue_name: &str,
message: &T,
) -> Result<i64, PgmqError>
pub async fn send<T: Serialize>( &self, queue_name: &str, message: &T, ) -> Result<i64, PgmqError>
Send a single message to a queue.
Messages can be any implementor of the serde::Serialize trait.
The message id, unique to the queue, is returned. Typically,
the message sender does not consume the message id but may use it for
logging and tracing purposes.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message = MyMessage {
foo: "bar".to_owned(),
};
let struct_message_id: i64 = queue
.send(&my_queue, &struct_message)
.await
.expect("Failed to enqueue message");
println!("Struct Message id: {}", struct_message_id);
let json_message = serde_json::json!({
"foo": "bar"
});
let json_message_id: i64 = queue
.send(&my_queue, &json_message)
.await
.expect("Failed to enqueue message");
println!("Json Message id: {}", json_message_id);
Ok(())
}Sourcepub async fn send_delay<T: Serialize>(
&self,
queue_name: &str,
message: &T,
delay: u64,
) -> Result<i64, PgmqError>
pub async fn send_delay<T: Serialize>( &self, queue_name: &str, message: &T, delay: u64, ) -> Result<i64, PgmqError>
Send a single message to a queue with a delay.
Specify your delay in seconds.
Messages can be any implementor of the serde::Serialize trait.
The message id, unique to the queue, is returned. Typically,
the message sender does not consume the message id but may use it for
logging and tracing purposes.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message = MyMessage {
foo: "bar".to_owned(),
};
let struct_message_id: i64 = queue
.send_delay(&my_queue, &struct_message, 15)
.await
.expect("Failed to enqueue message");
println!("Struct Message id: {}", struct_message_id);
let json_message = serde_json::json!({
"foo": "bar"
});
let json_message_id: i64 = queue
.send_delay(&my_queue, &json_message, 15)
.await
.expect("Failed to enqueue message");
println!("Json Message id: {}", json_message_id);
Ok(())
}Sourcepub async fn send_batch<T: Serialize>(
&self,
queue_name: &str,
messages: &[T],
) -> Result<Vec<i64>, PgmqError>
pub async fn send_batch<T: Serialize>( &self, queue_name: &str, messages: &[T], ) -> Result<Vec<i64>, PgmqError>
Send multiple messages to a queue.
Same as send(), messages can be any implementor of the serde::Serialize trait.
A vector of message ids are returned in the call. These message ids are in the
same order as the messages in the input vector.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message_batch = vec![
MyMessage {foo: "bar1".to_owned()},
MyMessage {foo: "bar2".to_owned()},
MyMessage {foo: "bar3".to_owned()},
];
let struct_message_batch_ids = queue.send_batch(&my_queue, &struct_message_batch)
.await
.expect("Failed to enqueue messages");
println!("Struct Message ids: {:?}", struct_message_batch_ids);
Ok(())
}Sourcepub async fn send_batch_delay<T: Serialize>(
&self,
queue_name: &str,
messages: &[T],
delay: u64,
) -> Result<Vec<i64>, PgmqError>
pub async fn send_batch_delay<T: Serialize>( &self, queue_name: &str, messages: &[T], delay: u64, ) -> Result<Vec<i64>, PgmqError>
Send multiple messages with a delay in seconds to a queue.
Same as send_batch(), messages can be any implementor of the serde::Serialize trait.
A vector of message ids are returned in the call. These message ids are in the
same order as the messages in the input vector.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message_batch = vec![
MyMessage {foo: "bar1".to_owned()},
MyMessage {foo: "bar2".to_owned()},
MyMessage {foo: "bar3".to_owned()},
];
let struct_message_batch_ids = queue.send_batch_delay(&my_queue, &struct_message_batch, 1000)
.await
.expect("Failed to enqueue messages");
println!("Struct Message ids: {:?}", struct_message_batch_ids);
Ok(())
}Sourcepub async fn read<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<i32>,
) -> Result<Option<Message<T>>, PgmqError>
pub async fn read<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, vt: Option<i32>, ) -> Result<Option<Message<T>>, PgmqError>
Reads a single message from the queue. If the queue is empty or all messages are invisible, Option::None is returned.
If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds.
Reading a message returns a Message struct.
Typically, the application reading messages is most interested in the message body but will use the
message id in order to either delete or archive the message when it is done processing it.
Refer to the Message struct for more details.
You can specify the message structure you are expecting to read from the queue by using the type parameters.
Any implementor of the serde::Deserialize trait can be used.
If you do not know the type of the message, it will default to serde_json::Value.
Example:
use pgmq::{Message, PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message_batch = vec![
MyMessage {foo: "bar1".to_owned()},
MyMessage {foo: "bar2".to_owned()},
MyMessage {foo: "bar3".to_owned()},
];
let struct_message_batch_ids = queue.send_batch(&my_queue, &struct_message_batch)
.await
.expect("Failed to enqueue messages");
println!("Struct Message ids: {:?}", struct_message_batch_ids);
let visibility_timeout_seconds = 30;
let known_message_structure: Message<MyMessage> = queue.read::<MyMessage>(&my_queue, Some(visibility_timeout_seconds))
.await
.unwrap()
.expect("no messages in the queue!");
println!("Received known : {known_message_structure:?}");
let unknown_message_structure: Message = queue.read(&my_queue, Some(visibility_timeout_seconds))
.await
.unwrap()
.expect("no messages in the queue!");
println!("Received known : {unknown_message_structure:?}");
Ok(())
}Sourcepub async fn read_batch<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<i32>,
num_msgs: i32,
) -> Result<Option<Vec<Message<T>>>, PgmqError>
pub async fn read_batch<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, vt: Option<i32>, num_msgs: i32, ) -> Result<Option<Vec<Message<T>>>, PgmqError>
Reads a specified number of messages (num_msgs) from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds.
If the queue is empty or all messages are invisible,Option::None is returned. If there are messages,
it is returned as a vector of Message structs (it will never be an empty vector).
Refer to the Message struct for more details.
You can specify the message structure you are expecting to read from the queue by using the type parameters.
Any implementor of the serde::Deserialize trait can be used.
If you do not know the type of the message, it will default to serde_json::Value.
Example:
use pgmq::{Message, PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message_batch = vec![
MyMessage {foo: "bar1".to_owned()},
MyMessage {foo: "bar2".to_owned()},
MyMessage {foo: "bar3".to_owned()},
];
let struct_message_batch_ids = queue.send_batch(&my_queue, &struct_message_batch)
.await
.expect("Failed to enqueue messages");
println!("Struct Message ids: {struct_message_batch_ids:?}");
let visibility_timeout_seconds = 30;
let batch_size = 1;
let batch: Vec<Message<MyMessage>> = queue.read_batch::<MyMessage>(&my_queue, Some(visibility_timeout_seconds), batch_size)
.await
.unwrap()
.expect("no messages in the queue!");
println!("Received a batch of messages: {batch:?}");
let batch_size = 2;
let unknown_message_structure: Message = queue.read(&my_queue, Some(visibility_timeout_seconds))
.await
.unwrap()
.expect("no messages in the queue!");
println!("Received known : {unknown_message_structure:?}");
Ok(())
}Sourcepub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<i32>,
max_batch_size: i32,
poll_timeout: Option<Duration>,
poll_interval: Option<Duration>,
) -> Result<Option<Vec<Message<T>>>, PgmqError>
pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, vt: Option<i32>, max_batch_size: i32, poll_timeout: Option<Duration>, poll_interval: Option<Duration>, ) -> Result<Option<Vec<Message<T>>>, PgmqError>
Similar to [read_batch], but allows waiting until a message is available
You can specify a maximum duration for polling (defaults to 5 seconds), and an interval between calls (defaults to 250ms). A lower interval implies higher maximum latency, but less load on the database.
Refer to the [read_batch] function for more details.
Sourcepub async fn delete(
&self,
queue_name: &str,
msg_id: i64,
) -> Result<u64, PgmqError>
pub async fn delete( &self, queue_name: &str, msg_id: i64, ) -> Result<u64, PgmqError>
Delete a message from the queue. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the archive method.
Deletes happen by message id, so you must have the message id to delete the message.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;
#[derive(Debug, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message = MyMessage {
foo: "bar".to_owned(),
};
let message_id: i64 = queue
.send(&my_queue, &struct_message)
.await
.expect("Failed to enqueue message");
println!("Struct Message id: {message_id}");
queue.delete(&my_queue, message_id).await.expect("failed to delete message");
Ok(())
}Sourcepub async fn delete_batch(
&self,
queue_name: &str,
msg_ids: &[i64],
) -> Result<u64, PgmqError>
pub async fn delete_batch( &self, queue_name: &str, msg_ids: &[i64], ) -> Result<u64, PgmqError>
Delete multiple messages from the queue. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the archive method.
Deletes happen by message id, so you must have the message id to delete the message.
Example:
use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let msgs = vec![
serde_json::json!({"foo": "bar1"}),
serde_json::json!({"foo": "bar2"}),
serde_json::json!({"foo": "bar3"}),
];
let msg_ids = queue
.send_batch(&my_queue, &msgs)
.await
.expect("Failed to enqueue messages");
let del = queue
.delete_batch(&my_queue, &msg_ids)
.await
.expect("Failed to delete messages from queue");
Ok(())
}pub async fn purge(&self, queue_name: &str) -> Result<u64, PgmqError>
Sourcepub async fn archive(
&self,
queue_name: &str,
msg_id: i64,
) -> Result<u64, PgmqError>
pub async fn archive( &self, queue_name: &str, msg_id: i64, ) -> Result<u64, PgmqError>
Moves a message, by message id, from the queue table to archive table View messages on the archive table with sql:
SELECT * FROM pgmq_<queue_name>_archive;Example:
use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let message = serde_json::json!({"foo": "bar1"});
let message_id: i64 = queue
.send(&my_queue, &message)
.await
.expect("Failed to enqueue message");
queue.archive(&my_queue, message_id).await.expect("failed to archive message");
Ok(())
}Sourcepub async fn archive_batch(
&self,
queue_name: &str,
msg_ids: &[i64],
) -> Result<u64, PgmqError>
pub async fn archive_batch( &self, queue_name: &str, msg_ids: &[i64], ) -> Result<u64, PgmqError>
Moves multiple messages, by message id, from the queue table to archive table View messages on the archive table with sql:
SELECT * FROM pgmq_<queue_name>_archive;Example:
use pgmq::{PgmqError, PGMQueue};
use serde::Serialize;
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let msgs = vec![
serde_json::json!({"foo": "bar1"}),
serde_json::json!({"foo": "bar2"}),
serde_json::json!({"foo": "bar3"}),
];
let msg_ids: Vec<i64> = queue
.send_batch(&my_queue, &msgs)
.await
.expect("Failed to enqueue messages");
queue.archive_batch(&my_queue, &msg_ids).await.expect("failed to archive messages");
Ok(())
}Sourcepub async fn pop<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
) -> Result<Option<Message<T>>, PgmqError>
pub async fn pop<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, ) -> Result<Option<Message<T>>, PgmqError>
Reads single message from the queue and delete it at the same time.
Similar to read and read_batch,
if no messages are available, Option::None is returned. Unlike these methods,
the visibility timeout does not apply. This is because the message is immediately deleted.
Example:
use pgmq::{Message, PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let send_message = MyMessage {foo: "bar1".to_owned()};
let struct_message_batch_ids = queue.send(&my_queue, &send_message)
.await
.expect("Failed to send message");
let popped_message: Message<MyMessage> = queue.pop::<MyMessage>(&my_queue)
.await
.unwrap()
.expect("no messages in the queue!");
println!("Received popped message : {popped_message:?}");
Ok(())
}Sourcepub async fn set_vt<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
msg_id: i64,
vt: DateTime<Utc>,
) -> Result<Option<Message<T>>, PgmqError>
pub async fn set_vt<T: for<'de> Deserialize<'de>>( &self, queue_name: &str, msg_id: i64, vt: DateTime<Utc>, ) -> Result<Option<Message<T>>, PgmqError>
Set the visibility time for a single message. This is useful when you want change when a message becomes visible again (able to be read with .read() methods). For example, in task execution use cases or job scheduling.
Example:
use chrono::{Utc, DateTime, Duration};
use pgmq::{PgmqError, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Serialize)]
struct MyMessage {
foo: String,
}
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
.await
.expect("Failed to connect to postgres");
let my_queue = "my_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
let struct_message = MyMessage {
foo: "bar".to_owned(),
};
let message_id: i64 = queue
.send(&my_queue, &struct_message)
.await
.expect("Failed to enqueue message");
println!("Struct Message id: {message_id}");
let utc_24h_from_now = Utc::now() + Duration::hours(24);
queue.set_vt::<MyMessage>(&my_queue, message_id, utc_24h_from_now).await.expect("failed to set vt");
Ok(())
}Trait Implementations§
Auto Trait Implementations§
impl Freeze for PGMQueue
impl !RefUnwindSafe for PGMQueue
impl Send for PGMQueue
impl Sync for PGMQueue
impl Unpin for PGMQueue
impl !UnwindSafe for PGMQueue
Blanket Implementations§
§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§unsafe fn clone_to_uninit(&self, dest: *mut u8)
unsafe fn clone_to_uninit(&self, dest: *mut u8)
clone_to_uninit)Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more