pgqrs
A PostgreSQL-backed job queue for Rust applications.
Features
- Lightweight: Use
pgqrsas a library in your Rust applications. - Compatible with Connection Poolers: Use with pgBouncer or pgcat to scale connections.
- Efficient: Uses PostgreSQL's
SKIP LOCKEDfor concurrent job fetching - Exactly Once Delivery: Guarantees exactly-once delivery within a time range specified by time limit
- Message Archiving: Built-in archiving system for audit trails and historical data retention
Architecture
pgqrs is a distributed job queue system built around PostgreSQL with a clean separation of concerns. The architecture consists of Producer/Consumer roles and a unified table interface for reliable, scalable background job processing.
System Overview
graph TB
subgraph "Application Layer"
P["Producer Services<br/>Your Rust Apps"]
W["Worker/Consumer Services<br/>Your Rust Apps"]
A["Admin Server<br/>Optional Monitoring"]
end
subgraph "pgqrs APIs"
PROD["Producer API<br/>enqueue()<br/>batch_enqueue()<br/>extend_visibility()"]
CONS["Consumer API<br/>dequeue()<br/>archive()<br/>delete()"]
TABLES["Table APIs<br/>PgqrsQueues<br/>PgqrsWorkers<br/>PgqrsMessages<br/>PgqrsArchiveTable"]
AA["PgqrsAdmin API<br/>queue_metrics()<br/>create_queue()<br/>worker_management()"]
end
CLI["pgqrs CLI<br/>install<br/>queue create<br/>message send<br/>metrics"]
subgraph "Connection Layer"
CP["Connection Pooler<br/>PgBouncer/pgcat<br/>Optional"]
end
subgraph "PostgreSQL Database"
subgraph "pgqrs Schema"
QT["pgqrs_queues<br/>Queue definitions"]
WT["pgqrs_workers<br/>Worker registrations"]
MT["pgqrs_messages<br/>Active messages"]
AT["pgqrs_archive<br/>Processed messages"]
META["Metadata & Indexes<br/>Foreign keys & constraints"]
end
end
%% Connections
P --> PROD
W --> CONS
A --> AA
A --> TABLES
PROD --> CP
CONS --> CP
TABLES --> CP
AA --> CP
CLI --> CP
CP --> QT
CP --> WT
CP --> MT
CP --> AT
CP --> META
%% Alternative direct connection (without pooler)
PROD -.-> QT
PROD -.-> MT
CONS -.-> MT
CONS -.-> AT
TABLES -.-> QT
TABLES -.-> WT
TABLES -.-> MT
TABLES -.-> AT
AA -.-> QT
AA -.-> WT
AA -.-> MT
AA -.-> AT
CLI -.-> QT
CLI -.-> WT
CLI -.-> MT
CLI -.-> AT
%% Styling
classDef userApp fill:#e1f5fe
classDef pgqrsLib fill:#f3e5f5
classDef database fill:#e8f5e8
classDef optional fill:#fff3e0
class P,W,A userApp
class PROD,CONS,TABLES,AA,CLI pgqrsLib
class QT,WT,MT,AT,META database
class CP optional
Component Details
1. PostgreSQL Database
- Central storage for all queue data with proper relational design
- Four core tables with foreign key relationships:
pgqrs_queues: Queue definitions and metadatapgqrs_workers: Worker registrations linked to queuespgqrs_messages: Active messages awaiting processingpgqrs_archive: Processed messages for audit trails
- ACID compliance ensures message durability and exactly-once processing
- SKIP LOCKED feature enables efficient concurrent message processing
- Referential integrity maintains data consistency across tables
2. Producer Services - Message Creation
- Your Rust applications that create and enqueue jobs
- Dedicated Producer API for message creation operations
- Key operations:
producer.enqueue(payload)- Add single job to queueproducer.batch_enqueue(payloads)- Add multiple jobs efficientlyproducer.enqueue_delayed(payload, delay)- Schedule future jobsproducer.extend_visibility(msg_id, duration)- Extend job processing time
- Validation & rate limiting built-in for message integrity
- Queue-specific instances for type safety and performance
3. Consumer Services - Message Processing
- Your Rust applications that process jobs from queues
- Dedicated Consumer API for message consumption operations
- Key operations:
consumer.dequeue()- Fetch single job with automatic lockingconsumer.dequeue_batch(size)- Fetch multiple jobs efficientlyconsumer.archive(msg_id)- Archive processed message for audit trailconsumer.delete(msg_id)- Remove completed messageconsumer.delete_batch(msg_ids)- Remove multiple completed messages
- Automatic visibility timeouts prevent stuck jobs
- Queue-specific instances for focused processing
4. Table APIs - Direct Data Access
- Unified Table trait interface for consistent CRUD operations
- Four table implementations:
PgqrsQueues: Manage queue definitions (insert,get,list,delete)PgqrsWorkers: Manage worker registrations with queue relationshipsPgqrsMessages: Direct message table access (typically used by Producer/Consumer)PgqrsArchiveTable: Archive management with full CRUD support
- Common interface methods:
count()- Count total recordscount_by_fk(id)- Count records by foreign key relationshipfilter_by_fk(id)- List records by foreign key relationship
- Type-safe operations with proper error handling
5. Admin Server - System Management
- Your monitoring/admin service using
PgqrsAdminAPIs - Cross-cutting concerns for system health and management
- Key operations:
admin.queue_metrics(name)- Get queue health metricsadmin.all_queues_metrics()- System-wide monitoringadmin.create_queue(name)- Queue lifecycle managementadmin.worker_management()- Worker registration and healthadmin.purge_archive(name)- Archive cleanup operationsadmin.install()/admin.uninstall()- Schema management
6. Connection Pooler (Optional)
- PgBouncer or pgcat for connection management and scaling
- Connection multiplexing allows more workers than database connections
- Transaction-mode compatibility for queue operations
7. pgqrs CLI
- Command-line tool for administrative operations
- Direct database access for debugging and management
- Key commands:
pgqrs install- Set up database schema with all tablespgqrs queue create <name>- Create new queuespgqrs message send <queue> <payload>- Manual job creationpgqrs queue metrics <name>- Inspect queue healthpgqrs archive list <queue>- View processed message history
Data Flow
- Job Creation: Producer services use dedicated
Producerinstances to add jobs to thepgqrs_messagestable - Worker Registration: Worker services register with
pgqrs_workerstable linking to specific queues - Job Processing: Consumer services use
Consumerinstances to fetch jobs with automatic locking - Job Completion: Consumers mark jobs as completed by deleting from
pgqrs_messages - Job Archiving: Optionally, consumers use
archive()to move completed jobs topgqrs_archivetable - System Monitoring: Admin services query across all tables for metrics and health monitoring
Role Separation Benefits
- Producer Focus: Dedicated to message creation, validation, and rate limiting
- Consumer Focus: Optimized for job fetching, processing, and completion
- Clear Boundaries: Producers never read jobs, Consumers never create jobs
- Independent Scaling: Scale producers and consumers independently
- Type Safety: Queue-specific instances prevent cross-queue contamination
Scalability Patterns
- Horizontal Workers: Run multiple consumer instances for increased throughput
- Producer Scaling: Scale producer services independently based on job creation load
- Queue Partitioning: Use multiple queues to distribute load and isolate workloads
- Connection Pooling: PgBouncer enables more workers than database connections
- Batch Processing: Process multiple jobs per database transaction for efficiency
- Worker Registration: Track and monitor worker health via
pgqrs_workerstable
Deployment Considerations
- Database: Single PostgreSQL instance or managed service (RDS, Cloud SQL)
- Workers: Deploy as separate services/containers, scale independently
- Producers: Integrate pgqrs library into existing application services
- Admin/CLI: Use for operational management and debugging
Architecture Benefits
The current pgqrs architecture provides several key advantages:
Clean Separation of Concerns
- Producers: Focus solely on message creation, validation, and rate limiting
- Consumers: Optimized for job fetching, processing, and completion tracking
- Admin: System-wide management and monitoring operations
- Tables: Direct database access for advanced use cases
Unified Data Model
- Single schema with four core tables linked by foreign keys
- Referential integrity prevents orphaned records and data corruption
- Consistent interface via Table trait across all database operations
- Predictable scaling with clear table relationships
Type Safety & Performance
- Queue-specific instances prevent cross-queue contamination
- Compile-time verification of message types and operations
- Optimized queries with proper indexing and constraints
- Connection pooling support for high-concurrency scenarios
Operational Excellence
- Built-in monitoring via metrics and admin APIs
- Archive system for compliance and audit requirements
- Worker tracking for health monitoring and load balancing
- CLI tools for debugging and administrative tasks
Getting Started
Install the binary
cargo install pgqrs
Start a Postgres DB or get the DSN of an existing db.
You'll need a PostgreSQL database to use pgqrs. Here are your options:
Option 1: Using Docker (Recommended for development)
# Start a PostgreSQL container
# Your DSN will be:
# postgresql://postgres:postgres@localhost:5432/postgres
Option 2: Using an existing PostgreSQL database
Get your database connection string (DSN) in this format:
postgresql://username:password@hostname:port/database
Option 3: Using a cloud PostgreSQL service
- AWS RDS: Get the connection string from the RDS console
- Google Cloud SQL: Get the connection string from the Cloud Console
- Azure Database: Get the connection string from the Azure portal
- Heroku Postgres: Use the
DATABASE_URLfrom your Heroku config
Configure pgqrs
Set your database connection using one of these methods (in order of priority):
# Method 1: Command line argument (highest priority)
# Method 2: Environment variable
Create a pgqrs.yaml file:
dsn: "postgresql://postgres:postgres@localhost:5432/postgres"
Then run:
# Method 3: Use a yaml config file.
Install the pgqrs schema
pgqrs requires a few tables to store metadata. It creates these tables as well as queue tables in the specified schema.
Important: You must create the schema before running pgqrs install.
Step 1: Create the schema
Connect to your PostgreSQL database and create the schema:
-- For default 'public' schema (no action needed)
-- For custom schema:
;
Step 2: Install pgqrs
Once you have your database configured and schema created, install the pgqrs schema:
# Install in default 'public' schema
# Install in custom schema
# Verify the installation
# Or verify custom schema
Test queue commands from the CLI
Items can be enqueued or dequeued using the CLI. This option is only available for testing or experiments.
# Create a test queue
# Send a message to the queue
# Send a delayed message (available after 30 seconds)
# Read and immediately consume one message
# Delete a specific message by ID
Queue API
Add to your Cargo.toml:
[]
= "0.3.0"
Core APIs
pgqrs provides three main API layers for different use cases:
1. Producer/Consumer APIs (Recommended)
High-level, role-specific APIs for most queue operations:
- Producer: Message creation, validation, rate limiting
- Consumer: Message processing, archiving, deletion
2. Table APIs (Advanced)
Low-level, unified CRUD interface for direct database operations:
- PgqrsQueues: Queue management
- PgqrsWorkers: Worker registration and health
- PgqrsMessages: Direct message table access
- PgqrsArchiveTable: Archive management
3. Admin API (Management)
System-wide operations for monitoring and administration:
- PgqrsAdmin: Schema management, metrics, cross-table operations
Table Trait Interface
All table types implement a unified Table trait providing consistent CRUD operations:
use ;
use Config;
async
Producer/Consumer Usage
See examples/basic_usage.rs for a full example. The Producer/Consumer APIs provide clean separation of concerns:
Producer Example (Message Creation Service)
use PgqrsAdmin;
use Config;
use Producer;
use json;
async
Consumer Example (Message Processing Service)
use PgqrsAdmin;
use Config;
use Consumer;
async
Full Application Example
use PgqrsAdmin;
use Config;
use ;
use json;
async
Configuration
pgqrs uses a prioritized configuration system. Configuration is loaded in the following order (highest priority first):
1. Command Line Arguments (Highest Priority)
# Override DSN via command line
# Override config file location
2. Environment Variables
# Required: Database connection string
# Optional: Schema name for pgqrs tables (default: public)
# Optional: Connection pool settings
# Optional: Default job settings
# Optional: Config file location
3. Configuration File
Create a YAML configuration file (default locations: pgqrs.yaml, pgqrs.yml):
# Required: Database connection string
dsn: "postgresql://user:pass@localhost/db"
# Optional: Schema name for pgqrs tables (default: public)
schema: "pgqrs"
# Optional: Connection pool settings (defaults shown)
max_connections: 16
connection_timeout_seconds: 30
# Optional: Default job settings (defaults shown)
default_lock_time_seconds: 5
default_max_batch_size: 100
4. Programmatic Configuration
use Config;
// Create from explicit DSN (uses 'public' schema by default)
let config = from_dsn;
// Create with custom schema
let config = from_dsn_with_schema?;
// Load from environment variables
let config = from_env?;
// Load from specific file
let config = from_file?;
// Load automatically with priority order
let config = load?;
// Load with explicit overrides (for CLI tools)
let config = load_with_options?;
Configuration Reference
| Field | Environment Variable | Description | Default |
|---|---|---|---|
dsn |
PGQRS_DSN |
PostgreSQL connection string | Required |
schema |
PGQRS_SCHEMA |
Schema name for pgqrs tables | public |
max_connections |
PGQRS_MAX_CONNECTIONS |
Maximum database connections | 16 |
connection_timeout_seconds |
PGQRS_CONNECTION_TIMEOUT |
Connection timeout in seconds | 30 |
default_lock_time_seconds |
PGQRS_DEFAULT_LOCK_TIME |
Default job lock time | 5 |
default_max_batch_size |
PGQRS_DEFAULT_BATCH_SIZE |
Default batch size for operations | 100 |
CLI Usage
The CLI is defined in src/main.rs and supports the following commands:
Top-level commands
install— Install pgqrs schemauninstall— Uninstall pgqrs schemaverify— Verify installationqueue <subcommand>— Queue managementmessage <subcommand>— Message management
Queue commands
queue create <name>— Create a new queuequeue list— List all queuesqueue delete <name>— Delete a queuequeue purge <name>— Purge all messages from a queuequeue metrics [<name>]— Show metrics for a queue or all queues
Message commands
message send <queue> <payload> [--delay <seconds>]— Send a message (payload is JSON)message read <queue> [--count <n>] [--lock-time <seconds>] [--message-type <type>]— Read messagesmessage dequeue <queue>— Read and return one messagemessage delete <queue> <id>— Delete a message by IDmessage count <queue>— Show pending message countmessage show <queue> <id>— Show message details by IDmessage show <queue> <id> --archive— Show archived message details by ID
Archive commands
pgqrs provides message archiving functionality to maintain a historical record of processed messages while keeping the active queue performant. Archive functionality is accessed through existing commands with the --archive flag.
message show <queue> <id> --archive— Show archived message details by IDmessage count <queue> --archive— Show archived message countmessage read <queue> --archive— Read/list archived messages
Archive System Overview
The archive system automatically creates archive tables (archive_<queue_name>) when queues are created. Archived messages retain all original data plus additional metadata:
archived_at— Timestamp when the message was archivedprocessing_duration— Time taken to process the message (in milliseconds)
Archive Usage Examples
# Show details of an archived message
# Check how many messages are archived
# List archived messages (use read command with --archive flag)
# Note: Message archiving is done programmatically via the Rust API
# CLI does not provide direct archiving commands, only viewing archived messages
Programmatic Archive API
The archive functionality is available through the Consumer API and Table APIs:
use PgqrsAdmin;
use Config;
use ;
async
Archive Best Practices
- Archive after processing: Archive messages only after successful processing
- Regular cleanup: Periodically purge old archived messages to manage database size
- Monitoring: Track archive growth as part of your queue metrics
- Retention policy: Establish how long to keep archived messages based on your compliance needs
Output and Logging Options
All commands support global flags:
-d, --dsn <DSN>— Database URL (highest priority, overrides all other config sources)-c, --config <CONFIG>— Config file path (overrides environment variables and defaults)--log-dest <stderr|file>— Log destination--log-level <error|warn|info|debug|trace>— Log level--format <json|table>— Output format--out <stdout|file>— Output destination
Contributing
Development
- Clone the repository
- Run tests:
cargo test - Run with PostgreSQL: Set
PGQRS_TEST_DSN=postgresql://user:pass@localhost:5432/db
Releases
This project uses release-plz for automated releases:
-
Automatic releases: When changes are merged to
main, release-plz automatically:- Updates the version in
Cargo.tomlbased on conventional commits - Updates the
CHANGELOG.md - Creates a Git tag and GitHub release
- Publishes to crates.io via trusted publishing
- Updates the version in
-
Commit format: Use conventional commits for automatic version bumping:
feat:for new features (minor version bump)fix:for bug fixes (patch version bump)feat!:orBREAKING CHANGE:for breaking changes (major version bump)docs:,refactor:,perf:, etc. for other changes
-
Manual releases: Run the "Release-plz" workflow manually from the GitHub Actions tab
License
Licensed under either of:
- Apache License, Version 2.0
- MIT license
at your option.