Skip to main content

Crate tasker_pgmq

Crate tasker_pgmq 

Source
Expand description

§tasker-pgmq

Generic PostgreSQL LISTEN/NOTIFY integration for PGMQ queues.

This crate provides event-driven capabilities for PGMQ queue operations, enabling real-time notifications for queue creation, message enqueueing, and other queue lifecycle events.

§Features

  • Generic Design: Works with any PGMQ setup, not tied to specific applications
  • Namespace Awareness: Supports namespace extraction from queue names
  • Reliable Notifications: Built on sqlx::PgListener with auto-reconnection
  • Minimal Payloads: Keeps notifications under pg_notify 8KB limit
  • Configurable Channels: Customizable channel naming and prefixes

§Architecture

The crate provides three main components:

  1. Event Types: Structured representations of PGMQ events
  2. Database Triggers: Automatically publish notifications via SQL triggers (recommended)
  3. Listeners: Subscribe to and receive typed events via pg_notify
  4. Enhanced Client: PGMQ wrapper that integrates with trigger-based notifications
use tasker_pgmq::{PgmqNotifyClient, PgmqNotifyListener, PgmqNotifyConfig, PgmqNotifyEvent};

// 1. Install database triggers via migration (one-time setup)
// Run: tasker-pgmq-cli generate-migration --name pgmq_notifications
// Then apply the generated migration to your database

// 2. Create enhanced PGMQ client with trigger-based notifications
let config = PgmqNotifyConfig::new()
    .with_queue_naming_pattern(r"(?P<namespace>\w+)_queue")
    .with_default_namespace("orders");

let mut client = PgmqNotifyClient::new(database_url, config).await?;

// 3. Create listener for real-time event processing (TAS-51: bounded channel)
let buffer_size = 1000; // From config.mpsc_channels.*.event_listeners.pgmq_event_buffer_size
let mut listener = PgmqNotifyListener::new(pool, config, buffer_size).await?;
listener.connect().await?;
listener.listen_message_ready_for_namespace("orders").await?;

// 4. Queue operations automatically emit notifications via triggers
client.create_queue("orders_queue").await?; // Triggers queue_created notification
client.send("orders_queue", &my_message).await?; // Triggers message_ready notification

// 5. Process real-time events
while let Some(event) = listener.next_event().await? {
    match event {
        PgmqNotifyEvent::MessageReady { msg_id, queue_name, .. } => {
            println!("Message {} ready in queue {}", msg_id, queue_name);
            // Process message with <10ms latency
        }
    }
}

Re-exports§

pub use error::PgmqNotifyError;
pub use error::Result;
pub use listener::PgmqNotifyListener;
pub use types::ClientStatus;
pub use types::MessagingError;
pub use types::QueueMetrics;

Modules§

error
Error types for tasker-pgmq
listener
Event listener for PGMQ notifications using sqlx::PgListener
types
Types for tasker-pgmq client

Structs§

BatchReadyEvent
Event emitted when a batch of messages is ready for processing
DbEmitter
Database-backed emitter using PostgreSQL NOTIFY
MessageReadyEvent
Event emitted when a message is ready for processing
MessageWithPayloadEvent
Event emitted when a message is ready with full payload included (TAS-133)
NoopEmitter
No-operation emitter for testing and disabled scenarios
PgmqClient
Unified PGMQ client with comprehensive functionality and notification capabilities
PgmqNotifyClient
Unified PGMQ client with comprehensive functionality and notification capabilities
PgmqNotifyClientFactory
Factory for creating PgmqClient instances
PgmqNotifyConfig
Configuration for PGMQ notification behavior
QueueCreatedEvent
Event emitted when a new PGMQ queue is created

Enums§

PgmqNotifyEvent
Union of all possible PGMQ notification events

Traits§

PgmqNotifyEmitter
Trait for emitting PGMQ notifications