Expand description
§tasker-pgmq
Generic PostgreSQL LISTEN/NOTIFY integration for PGMQ queues.
This crate provides event-driven capabilities for PGMQ queue operations, enabling real-time notifications for queue creation, message enqueueing, and other queue lifecycle events.
§Features
- Generic Design: Works with any PGMQ setup, not tied to specific applications
- Namespace Awareness: Supports namespace extraction from queue names
- Reliable Notifications: Built on
sqlx::PgListenerwith auto-reconnection - Minimal Payloads: Keeps notifications under
pg_notify8KB limit - Configurable Channels: Customizable channel naming and prefixes
§Architecture
The crate provides three main components:
- Event Types: Structured representations of PGMQ events
- Database Triggers: Automatically publish notifications via SQL triggers (recommended)
- Listeners: Subscribe to and receive typed events via
pg_notify - Enhanced Client: PGMQ wrapper that integrates with trigger-based notifications
§Usage - Trigger-Based (Recommended)
ⓘ
use tasker_pgmq::{PgmqNotifyClient, PgmqNotifyListener, PgmqNotifyConfig, PgmqNotifyEvent};
// 1. Install database triggers via migration (one-time setup)
// Run: tasker-pgmq-cli generate-migration --name pgmq_notifications
// Then apply the generated migration to your database
// 2. Create enhanced PGMQ client with trigger-based notifications
let config = PgmqNotifyConfig::new()
.with_queue_naming_pattern(r"(?P<namespace>\w+)_queue")
.with_default_namespace("orders");
let mut client = PgmqNotifyClient::new(database_url, config).await?;
// 3. Create listener for real-time event processing (TAS-51: bounded channel)
let buffer_size = 1000; // From config.mpsc_channels.*.event_listeners.pgmq_event_buffer_size
let mut listener = PgmqNotifyListener::new(pool, config, buffer_size).await?;
listener.connect().await?;
listener.listen_message_ready_for_namespace("orders").await?;
// 4. Queue operations automatically emit notifications via triggers
client.create_queue("orders_queue").await?; // Triggers queue_created notification
client.send("orders_queue", &my_message).await?; // Triggers message_ready notification
// 5. Process real-time events
while let Some(event) = listener.next_event().await? {
match event {
PgmqNotifyEvent::MessageReady { msg_id, queue_name, .. } => {
println!("Message {} ready in queue {}", msg_id, queue_name);
// Process message with <10ms latency
}
}
}Re-exports§
pub use error::PgmqNotifyError;pub use error::Result;pub use listener::PgmqNotifyListener;pub use types::ClientStatus;pub use types::MessagingError;pub use types::QueueMetrics;
Modules§
- error
- Error types for tasker-pgmq
- listener
- Event listener for PGMQ notifications using
sqlx::PgListener - types
- Types for tasker-pgmq client
Structs§
- Batch
Ready Event - Event emitted when a batch of messages is ready for processing
- DbEmitter
- Database-backed emitter using
PostgreSQLNOTIFY - Message
Ready Event - Event emitted when a message is ready for processing
- Message
With Payload Event - Event emitted when a message is ready with full payload included (TAS-133)
- Noop
Emitter - No-operation emitter for testing and disabled scenarios
- Pgmq
Client - Unified PGMQ client with comprehensive functionality and notification capabilities
- Pgmq
Notify Client - Unified PGMQ client with comprehensive functionality and notification capabilities
- Pgmq
Notify Client Factory - Factory for creating
PgmqClientinstances - Pgmq
Notify Config - Configuration for PGMQ notification behavior
- Queue
Created Event - Event emitted when a new PGMQ queue is created
Enums§
- Pgmq
Notify Event - Union of all possible PGMQ notification events
Traits§
- Pgmq
Notify Emitter - Trait for emitting PGMQ notifications