# tasker-pgmq
Tasker's PGMQ wrapper with PostgreSQL NOTIFY support for event-driven message processing.
## Attribution
This crate builds upon the excellent [PGMQ (PostgreSQL Message Queue)](https://github.com/pgmq/pgmq) project. PGMQ provides a lightweight, durable message queue built on PostgreSQL.
We extend PGMQ with:
- PostgreSQL `LISTEN/NOTIFY` integration for low-latency event-driven processing
- Configurable namespace-aware channel routing
- Signal-only notifications for large messages (>7KB) with fetch-by-ID support
- Full payload notifications for small messages (<7KB) enabling direct processing
## Features
- **Generic Design**: Works with any PGMQ setup, not tied to specific applications
- **Namespace Awareness**: Supports namespace extraction from queue names
- **Reliable Notifications**: Built on `sqlx::PgListener` with auto-reconnection
- **Minimal Payloads**: Keeps notifications under pg_notify 8KB limit
- **Configurable Channels**: Customizable channel naming and prefixes
- **Trigger-Based Architecture**: Database triggers emit notifications automatically
## Usage
### Trigger-Based (Recommended)
```rust,ignore
use tasker_pgmq::{PgmqNotifyClient, PgmqNotifyListener, PgmqNotifyConfig, PgmqNotifyEvent};
// 1. Install database triggers via migration (one-time setup)
// Run: tasker-pgmq-cli generate-migration --name pgmq_notifications
// Then apply the generated migration to your database
// 2. Create enhanced PGMQ client with trigger-based notifications
let config = PgmqNotifyConfig::new()
.with_queue_naming_pattern(r"(?P<namespace>\w+)_queue")
.with_default_namespace("orders");
let mut client = PgmqNotifyClient::new(database_url, config).await?;
// 3. Create listener for real-time event processing
let buffer_size = 1000;
let mut listener = PgmqNotifyListener::new(pool, config, buffer_size).await?;
listener.connect().await?;
listener.listen_message_ready_for_namespace("orders").await?;
// 4. Queue operations automatically emit notifications via triggers
client.create_queue("orders_queue").await?; // Triggers queue_created notification
client.send("orders_queue", &my_message).await?; // Triggers message_ready notification
// 5. Process real-time events
while let Some(event) = listener.next_event().await? {
match event {
PgmqNotifyEvent::MessageReady { msg_id, queue_name, .. } => {
println!("Message {} ready in queue {}", msg_id, queue_name);
// Process message with <10ms latency
}
}
}
```
## CLI Tool
The `tasker-pgmq-cli` binary generates SQL migration files for PGMQ notification triggers:
```bash
# Generate migration with default settings
tasker-pgmq-cli generate-migration --output-dir migrations
# Generate with custom configuration
tasker-pgmq-cli generate-migration \
--output-dir migrations \
--queue-pattern '(?P<namespace>\w+)_queue' \
--channel-prefix myapp \
--name add_pgmq_notifications
# Validate configuration file
tasker-pgmq-cli validate-config config.toml
```
## Architecture
The crate provides three main components:
1. **Event Types**: Structured representations of PGMQ events (`PgmqNotifyEvent`, `MessageReadyEvent`, `QueueCreatedEvent`)
2. **Database Triggers**: SQL functions that publish notifications via `pg_notify` (generated by CLI)
3. **Listeners**: Subscribe to and receive typed events via PostgreSQL LISTEN
### Event Types
- `QueueCreatedEvent` - Emitted when a new queue is created
- `MessageReadyEvent` - Signal-only for large messages (>7KB), requires fetch
- `MessageWithPayloadEvent` - Full payload for small messages (<7KB), direct processing
- `BatchReadyEvent` - Emitted for batch message operations
## Configuration
```rust
use tasker_pgmq::PgmqNotifyConfig;
let config = PgmqNotifyConfig::new()
.with_queue_naming_pattern(r"(?P<namespace>\w+)_queue") // Extract namespace from queue name
.with_channels_prefix("myapp") // Prefix all channels: myapp.pgmq_*
.with_triggers_enabled(true) // Enable trigger-based notifications
.with_default_namespace("default") // Auto-listen to this namespace
.with_max_payload_size(7800); // Stay under 8KB pg_notify limit
```
## License
MIT License - See LICENSE file for details.
## Acknowledgments
Special thanks to the [PGMQ](https://github.com/pgmq/pgmq) team for creating and maintaining PGMQ, which makes PostgreSQL-based message queuing simple and reliable.