tasker-pgmq 0.1.2

Tasker's PGMQ wrapper with PostgreSQL NOTIFY support
Documentation

tasker-pgmq

Tasker's PGMQ wrapper with PostgreSQL NOTIFY support for event-driven message processing.

Attribution

This crate builds upon the excellent PGMQ (PostgreSQL Message Queue) project. PGMQ provides a lightweight, durable message queue built on PostgreSQL.

We extend PGMQ with:

  • PostgreSQL LISTEN/NOTIFY integration for low-latency event-driven processing
  • Configurable namespace-aware channel routing
  • Signal-only notifications for large messages (>7KB) with fetch-by-ID support
  • Full payload notifications for small messages (<7KB) enabling direct processing

Features

  • Generic Design: Works with any PGMQ setup, not tied to specific applications
  • Namespace Awareness: Supports namespace extraction from queue names
  • Reliable Notifications: Built on sqlx::PgListener with auto-reconnection
  • Minimal Payloads: Keeps notifications under pg_notify 8KB limit
  • Configurable Channels: Customizable channel naming and prefixes
  • Trigger-Based Architecture: Database triggers emit notifications automatically

Usage

Trigger-Based (Recommended)

use tasker_pgmq::{PgmqNotifyClient, PgmqNotifyListener, PgmqNotifyConfig, PgmqNotifyEvent};

// 1. Install database triggers via migration (one-time setup)
// Run: tasker-pgmq-cli generate-migration --name pgmq_notifications
// Then apply the generated migration to your database

// 2. Create enhanced PGMQ client with trigger-based notifications
let config = PgmqNotifyConfig::new()
    .with_queue_naming_pattern(r"(?P<namespace>\w+)_queue")
    .with_default_namespace("orders");

let mut client = PgmqNotifyClient::new(database_url, config).await?;

// 3. Create listener for real-time event processing
let buffer_size = 1000;
let mut listener = PgmqNotifyListener::new(pool, config, buffer_size).await?;
listener.connect().await?;
listener.listen_message_ready_for_namespace("orders").await?;

// 4. Queue operations automatically emit notifications via triggers
client.create_queue("orders_queue").await?; // Triggers queue_created notification
client.send("orders_queue", &my_message).await?; // Triggers message_ready notification

// 5. Process real-time events
while let Some(event) = listener.next_event().await? {
    match event {
        PgmqNotifyEvent::MessageReady { msg_id, queue_name, .. } => {
            println!("Message {} ready in queue {}", msg_id, queue_name);
            // Process message with <10ms latency
        }
    }
}

CLI Tool

The tasker-pgmq-cli binary generates SQL migration files for PGMQ notification triggers:

# Generate migration with default settings
tasker-pgmq-cli generate-migration --output-dir migrations

# Generate with custom configuration
tasker-pgmq-cli generate-migration \
    --output-dir migrations \
    --queue-pattern '(?P<namespace>\w+)_queue' \
    --channel-prefix myapp \
    --name add_pgmq_notifications

# Validate configuration file
tasker-pgmq-cli validate-config config.toml

Architecture

The crate provides three main components:

  1. Event Types: Structured representations of PGMQ events (PgmqNotifyEvent, MessageReadyEvent, QueueCreatedEvent)
  2. Database Triggers: SQL functions that publish notifications via pg_notify (generated by CLI)
  3. Listeners: Subscribe to and receive typed events via PostgreSQL LISTEN

Event Types

  • QueueCreatedEvent - Emitted when a new queue is created
  • MessageReadyEvent - Signal-only for large messages (>7KB), requires fetch
  • MessageWithPayloadEvent - Full payload for small messages (<7KB), direct processing
  • BatchReadyEvent - Emitted for batch message operations

Configuration

use tasker_pgmq::PgmqNotifyConfig;

let config = PgmqNotifyConfig::new()
    .with_queue_naming_pattern(r"(?P<namespace>\w+)_queue")  // Extract namespace from queue name
    .with_channels_prefix("myapp")                           // Prefix all channels: myapp.pgmq_*
    .with_triggers_enabled(true)                             // Enable trigger-based notifications
    .with_default_namespace("default")                       // Auto-listen to this namespace
    .with_max_payload_size(7800);                            // Stay under 8KB pg_notify limit

License

MIT License - See LICENSE file for details.

Acknowledgments

Special thanks to the PGMQ team for creating and maintaining PGMQ, which makes PostgreSQL-based message queuing simple and reliable.