use pgmq::{Message, PGMQueueExt};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, Row};
#[derive(Serialize, Debug, Deserialize, Eq, PartialEq)]
struct MyMessage {
foo: String,
num: u64,
}
#[tokio::main]
async fn main() {
let db_url = "postgres://postgres:postgres@localhost:5432/postgres";
let example_queue = "example_tx_queue";
let pool = PgPool::connect(&db_url)
.await
.expect("failed to connect to postgres");
let queue: PGMQueueExt =
PGMQueueExt::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned(), 2)
.await
.expect("Failed to connect to postgres");
queue
.create(&example_queue)
.await
.expect("Failed to create queue");
let mut tx = pool.begin().await.expect("failed to start transaction");
let msg = MyMessage {
foo: "bar".to_owned(),
num: 42,
};
let sent = queue
.send_with_cxn(&example_queue, &msg, &mut *tx)
.await
.expect("failed to send message");
println!("message sent. id: {sent}, msg: {msg:?}");
let rows = sqlx::query("SELECT queue_length FROM pgmq.metrics($1)")
.bind(example_queue)
.fetch_one(&pool)
.await
.expect("failed to fetch row")
.get::<i64, usize>(0);
assert_eq!(rows, 0);
println!("queue length: {rows}");
let received: Option<Message<MyMessage>> = queue
.read(&example_queue, 10)
.await
.expect("failed to read message");
assert!(received.is_none());
tx.commit().await.expect("failed to commit transaction");
println!("transaction committed");
let rows = sqlx::query("SELECT queue_length FROM pgmq.metrics($1)")
.bind(example_queue)
.fetch_one(&pool)
.await
.expect("failed to fetch row")
.get::<i64, usize>(0);
assert_eq!(rows, 1);
println!("queue length: {rows}");
let received: Message<MyMessage> = queue
.read(&example_queue, 10)
.await
.expect("failed to read message")
.expect("expected message");
assert!(received.msg_id == sent);
assert_eq!(received.message, msg);
println!(
"message received. id: {}, msg: {:?}",
received.msg_id, received.message
);
queue
.drop_queue(&example_queue)
.await
.expect("failed to drop queue");
}