tasker-pgmq
Tasker's PGMQ wrapper with PostgreSQL NOTIFY support for event-driven message processing.
Attribution
This crate builds upon the excellent PGMQ (PostgreSQL Message Queue) project. PGMQ provides a lightweight, durable message queue built on PostgreSQL.
We extend PGMQ with:
- PostgreSQL
LISTEN/NOTIFYintegration for low-latency event-driven processing - Configurable namespace-aware channel routing
- Signal-only notifications for large messages (>7KB) with fetch-by-ID support
- Full payload notifications for small messages (<7KB) enabling direct processing
Features
- Generic Design: Works with any PGMQ setup, not tied to specific applications
- Namespace Awareness: Supports namespace extraction from queue names
- Reliable Notifications: Built on
sqlx::PgListenerwith auto-reconnection - Minimal Payloads: Keeps notifications under pg_notify 8KB limit
- Configurable Channels: Customizable channel naming and prefixes
- Trigger-Based Architecture: Database triggers emit notifications automatically
Usage
Trigger-Based (Recommended)
use ;
// 1. Install database triggers via migration (one-time setup)
// Run: tasker-pgmq-cli generate-migration --name pgmq_notifications
// Then apply the generated migration to your database
// 2. Create enhanced PGMQ client with trigger-based notifications
let config = new
.with_queue_naming_pattern
.with_default_namespace;
let mut client = new.await?;
// 3. Create listener for real-time event processing
let buffer_size = 1000;
let mut listener = new.await?;
listener.connect.await?;
listener.listen_message_ready_for_namespace.await?;
// 4. Queue operations automatically emit notifications via triggers
client.create_queue.await?; // Triggers queue_created notification
client.send.await?; // Triggers message_ready notification
// 5. Process real-time events
while let Some = listener.next_event.await?
CLI Tool
The tasker-pgmq-cli binary generates SQL migration files for PGMQ notification triggers:
# Generate migration with default settings
# Generate with custom configuration
# Validate configuration file
Architecture
The crate provides three main components:
- Event Types: Structured representations of PGMQ events (
PgmqNotifyEvent,MessageReadyEvent,QueueCreatedEvent) - Database Triggers: SQL functions that publish notifications via
pg_notify(generated by CLI) - Listeners: Subscribe to and receive typed events via PostgreSQL LISTEN
Event Types
QueueCreatedEvent- Emitted when a new queue is createdMessageReadyEvent- Signal-only for large messages (>7KB), requires fetchMessageWithPayloadEvent- Full payload for small messages (<7KB), direct processingBatchReadyEvent- Emitted for batch message operations
Configuration
use PgmqNotifyConfig;
let config = new
.with_queue_naming_pattern // Extract namespace from queue name
.with_channels_prefix // Prefix all channels: myapp.pgmq_*
.with_triggers_enabled // Enable trigger-based notifications
.with_default_namespace // Auto-listen to this namespace
.with_max_payload_size; // Stay under 8KB pg_notify limit
License
MIT License - See LICENSE file for details.
Acknowledgments
Special thanks to the PGMQ team for creating and maintaining PGMQ, which makes PostgreSQL-based message queuing simple and reliable.