celers-broker-sql
MySQL database broker implementation for CeleRS - a high-performance Celery-compatible task queue framework for Rust.
Version: 0.2.0 | Status: [Alpha] | Tests: 68 | Updated: 2026-03-27
Features
- Reliable Task Queue: MySQL-based task queue with
FOR UPDATE SKIP LOCKEDfor distributed workers - Priority Queues: Tasks can be prioritized for execution order
- Dead Letter Queue (DLQ): Automatic handling of permanently failed tasks
- Delayed Execution: Schedule tasks for future execution with
enqueue_atandenqueue_after - Batch Operations: High-throughput batch enqueue/dequeue/ack operations
- Queue Control: Pause/resume queue processing at runtime
- Task Inspection: Query task status, statistics, and worker assignments
- Result Storage: Store and retrieve task execution results
- Worker Tracking: Monitor which workers are processing which tasks
- Health Monitoring: Database health checks and table size monitoring
- Maintenance Tools: Task archiving, stuck task recovery, and selective purging
- Prometheus Metrics: Optional metrics integration (with
metricsfeature)
Requirements
- MySQL 8.0+ (requires
FOR UPDATE SKIP LOCKEDsupport) - Rust 2021 edition
Installation
Add to your Cargo.toml:
[]
= "0.2"
= "0.2"
# Optional: Enable Prometheus metrics
# celers-broker-sql = { version = "0.2", features = ["metrics"] }
Quick Start
1. Create a MySQL Database
;
2. Initialize the Broker
use MysqlBroker;
use Broker;
async
3. Enqueue Tasks
use SerializedTask;
// Create a task
let task = new;
// Enqueue it
let task_id = broker.enqueue.await?;
println!;
4. Dequeue and Process Tasks
// Dequeue a task
if let Some = broker.dequeue.await?
Advanced Usage
Delayed Task Execution
use SystemTime;
// Schedule for specific timestamp (Unix seconds)
let execute_at = now
.duration_since?
.as_secs as i64 + 3600; // 1 hour from now
broker.enqueue_at.await?;
// Or schedule after a delay (seconds)
broker.enqueue_after.await?; // 5 minutes
Batch Operations
// Batch enqueue (high throughput)
let tasks = vec!;
let task_ids = broker.enqueue_batch.await?;
// Batch dequeue
let messages = broker.dequeue_batch.await?;
// Batch ack
let tasks_to_ack: = messages.iter
.map
.collect;
broker.ack_batch.await?;
Queue Control
// Pause queue (dequeue returns None)
broker.pause;
assert!;
// Resume queue
broker.resume;
assert!;
Task Inspection
use DbTaskState;
// Get task details
if let Some = broker.get_task.await?
// List tasks by state
let pending_tasks = broker.list_tasks.await?;
// Get queue statistics
let stats = broker.get_statistics.await?;
println!;
// Count by task name
let counts = broker.count_by_task_name.await?;
for count in counts
// List scheduled tasks
let scheduled = broker.list_scheduled_tasks.await?;
for task in scheduled
Worker Tracking
// Dequeue with worker ID
let worker_id = "worker-001";
if let Some = broker.dequeue_with_worker_id.await?
// Get tasks by worker
let worker_tasks = broker.get_tasks_by_worker.await?;
Task Result Storage
use TaskResultStatus;
use json;
// Store result
broker.store_result.await?;
// Retrieve result
if let Some = broker.get_result.await?
Dead Letter Queue (DLQ)
// List DLQ tasks
let dlq_tasks = broker.list_dlq.await?;
// Requeue from DLQ
if let Some = dlq_tasks.first
// Purge DLQ
let purged = broker.purge_all_dlq.await?;
println!;
Health Checks and Maintenance
use Duration;
// Health check
let health = broker.check_health.await?;
println!;
println!;
// Archive old completed tasks (older than 7 days)
let archived = broker.archive_completed_tasks.await?;
println!;
// Recover stuck tasks (processing > 1 hour)
let recovered = broker.recover_stuck_tasks.await?;
println!;
// Archive old results (older than 30 days)
let archived_results = broker.archive_results.await?;
println!;
Database Monitoring
// Get table sizes
let table_sizes = broker.get_table_sizes.await?;
for table in table_sizes
// Optimize tables (run periodically)
broker.optimize_tables.await?;
// Analyze tables (update index statistics)
broker.analyze_tables.await?;
Prometheus Metrics (with metrics feature)
// Update metrics (call periodically, e.g., every 10 seconds)
broker.update_metrics.await?;
// Metrics exposed:
// - celers_tasks_enqueued_total
// - celers_tasks_enqueued_by_type
// - celers_queue_size (pending tasks)
// - celers_processing_queue_size
// - celers_dlq_size
Multi-tenant Queues
// Create broker with specific queue name
let queue_a = with_queue.await?;
let queue_b = with_queue.await?;
// Each queue is logically separated (stored in metadata)
queue_a.enqueue.await?;
queue_b.enqueue.await?;
Database Schema
Tables
celers_tasks- Main task queuecelers_dead_letter_queue- Failed tasks that exceeded max retriescelers_task_results- Task execution resultscelers_task_history- Task audit trail (future)
Key Indexes
idx_tasks_state_priority- Efficient dequeue by state and priorityidx_tasks_scheduled- Scheduled task processingidx_tasks_worker- Worker trackingidx_tasks_task_name- Task name lookupsidx_results_task_name- Result queries by task type- See migration files for complete index strategy
Performance Tuning
Connection Pool
// Default: 20 connections, 5s timeout
// For high throughput, increase max_connections:
// Edit MysqlBroker::new() or MysqlBroker::with_queue()
Batch Operations
Use batch operations for high throughput:
enqueue_batch()- Up to 10x faster than individual enqueuesdequeue_batch()- Fetch multiple tasks in one transactionack_batch()- Acknowledge multiple tasks at once
MySQL Configuration
Recommended my.cnf settings:
[mysqld]
# Connection settings
max_connections = 500
connect_timeout = 10
wait_timeout = 28800
# Performance
innodb_buffer_pool_size = 2G # 70-80% of RAM
innodb_log_file_size = 512M
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
# Query cache (MySQL 5.7)
query_cache_type = 0
query_cache_size = 0
Maintenance Schedule
Run these operations periodically:
// Daily: Archive old tasks
broker.archive_completed_tasks.await?;
// Daily: Recover stuck tasks
broker.recover_stuck_tasks.await?;
// Weekly: Optimize tables
broker.optimize_tables.await?;
// Weekly: Analyze tables
broker.analyze_tables.await?;
// Monthly: Archive old results
broker.archive_results.await?;
Comparison with PostgreSQL Broker
Similarities
- Same
FOR UPDATE SKIP LOCKEDpattern - Same API (implements
Brokertrait) - Same performance characteristics
- Same safety guarantees
Differences
- MySQL uses
?placeholders vs PostgreSQL$1, $2 - MySQL stores UUIDs as
CHAR(36)vs nativeUUIDtype - MySQL uses stored procedures vs PostgreSQL functions
- MySQL
DATE_ADD()vs PostgreSQLINTERVALsyntax - MySQL
ON DUPLICATE KEY UPDATEvs PostgreSQLON CONFLICT
Error Handling
All operations return Result<T, CelersError>:
match broker.enqueue.await
Migration
Migrations are embedded in the binary and run via broker.migrate():
001_init.sql- Initial schema (tasks, DLQ, history tables)002_results.sql- Results table003_performance_indexes.sql- Additional performance indexes
To run migrations:
broker.migrate.await?;
Migrations are idempotent and can be run multiple times safely.
Backup and Restore Procedures
Database Backup
Use mysqldump for backing up CeleRS tables:
# Backup all CeleRS tables
# Backup with compression
|
# Include routines (stored procedures)
Selective Backup Strategies
# Backup only pending and processing tasks (for migration)
# Backup DLQ for analysis
# Backup results for auditing
Database Restore
# Restore from backup
# Restore from compressed backup
|
# Restore specific table
Point-in-Time Recovery
Enable binary logging in MySQL for PITR:
[mysqld]
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW
expire_logs_days = 7
Recovery procedure:
# 1. Restore from last full backup
# 2. Apply binary logs up to specific point
| \
Automated Backup Script
#!/bin/bash
# celers_backup.sh - Automated CeleRS backup script
BACKUP_DIR="/backups/celers"
DB_NAME="your_database"
DB_USER="backup_user"
RETENTION_DAYS=30
TIMESTAMP=
# Create backup directory
# Perform backup
|
# Remove old backups
# Verify backup
if [; then
# Optional: Upload to S3 or other storage
# aws s3 cp "$BACKUP_DIR/celers_$TIMESTAMP.sql.gz" s3://my-bucket/celers-backups/
else
fi
Disaster Recovery Checklist
-
Before Disaster:
- Regular automated backups (daily minimum)
- Test restore procedures monthly
- Store backups offsite (S3, GCS, etc.)
- Monitor backup success/failure
- Document recovery procedures
-
During Recovery:
- Stop all workers to prevent new tasks
- Assess data loss window
- Restore from most recent backup
- Apply binary logs if available
- Verify data integrity
- Resume workers gradually
-
After Recovery:
- Check for lost tasks (compare with application logs)
- Verify DLQ items
- Monitor for anomalies
- Document incident for post-mortem
Data Migration Between Environments
# Export from production
# Import to staging
# Or use programmatic approach
// Programmatic migration example
async
Examples
See the examples directory for complete working examples:
- task_producer.rs - Comprehensive task enqueueing with different patterns (single, batch, scheduled, priority)
- worker_pool.rs - Production-ready worker pool with health monitoring and graceful shutdown
- circuit_breaker.rs - Circuit breaker pattern for resilient database operations
- bulk_import_export.rs - Data migration and backup utilities using JSON format
- recurring_tasks.rs - Scheduled periodic task execution (cron-like functionality)
- advanced_retry.rs - Sophisticated retry strategies with exponential backoff and jitter
Each example includes detailed documentation and can be run with:
For detailed usage instructions, see examples/README.md.
License
Apache-2.0
Contributing
Contributions welcome! Please ensure:
- All tests pass:
cargo test - No warnings:
cargo clippy - Code is formatted:
cargo fmt