Hammerwork
A high-performance, database-driven job queue for Rust with support for both PostgreSQL and MySQL, featuring job prioritization, cron scheduling, timeouts, dead job management, and comprehensive statistics collection.
Features
- Multi-database support: Works with both PostgreSQL and MySQL
- Job prioritization: Five priority levels (Background, Low, Normal, High, Critical) with weighted and strict scheduling
- Cron scheduling: Full cron expression support with timezone-aware recurring jobs
- Job timeouts: Per-job and worker-level timeout configuration with automatic timeout detection
- Statistics & monitoring: Comprehensive job statistics, dead job management, and performance tracking
- Async/await: Built on Tokio for high concurrency
- Reliable: Uses database transactions for job processing
- Retries: Configurable retry logic with exponential backoff
- Delayed jobs: Schedule jobs to run at specific times
- Worker pools: Multiple workers can process jobs concurrently
- Type-safe: Leverages Rust's type system for safety
Installation
Add this to your Cargo.toml:
[]
= "0.4"
# Choose your database
= { = "0.4", = ["postgres"] }
# or
= { = "0.4", = ["mysql"] }
Quick Start
PostgreSQL Example
use ;
use json;
use PgPool;
use Arc;
async
MySQL Example
use ;
use json;
use MySqlPool;
use Arc;
async
Job Types
Basic Job
use Job;
use json;
let job = new;
Delayed Job
use Duration;
let delayed_job = with_delay;
Job with Priority and Timeout
use ;
use Duration;
// High priority job with timeout
let job = new
.as_high_priority
.with_timeout
.with_max_attempts;
// Critical priority job
let urgent_job = new
.as_critical;
Recurring Job with Cron Schedule
use CronSchedule;
use USEastern;
// Daily backup at 2 AM Eastern time
let backup_job = new
.with_cron? // seconds, minutes, hours, day, month, weekday
.with_timezone
.as_recurring;
// Weekly report every Monday at 9 AM
let report_job = new
.with_cron_schedule
.as_recurring;
Worker Configuration
Basic Worker
let worker = new
.with_poll_interval // How often to check for jobs
.with_max_retries // Max retry attempts
.with_retry_delay; // Delay between retries
Worker with Priority Configuration
use ;
// Worker with custom priority weights
let priority_weights = builder
.critical
.high
.normal
.low
.background
.fairness_factor // Prevent starvation
.build;
let worker = new
.with_priority_weights
.with_default_timeout; // 10 minute default timeout
// Worker with strict priority (highest priority jobs always first)
let strict_worker = new
.with_strict_priority;
Worker with Statistics
use InMemoryStatsCollector;
use Arc;
let stats_collector = new;
let worker = new
.with_stats_collector;
// View statistics
let queue_stats = stats_collector.get_queue_stats.await?;
println!;
println!;
println!;
Job Prioritization
Hammerwork provides a comprehensive job prioritization system with five priority levels and multiple scheduling algorithms.
Priority Levels
use ;
// Five priority levels available
let background_job = new.as_background; // Lowest priority
let low_job = new.as_low_priority; // Low priority
let normal_job = new; // Default priority
let high_job = new.as_high_priority; // High priority
let critical_job = new.as_critical; // Highest priority
// Set priority explicitly
let custom_job = new
.with_priority;
Priority Scheduling Algorithms
Weighted Priority Scheduling (Default)
use PriorityWeights;
let weights = builder
.critical // Critical jobs are 50x more likely to be selected
.high // High jobs are 20x more likely
.normal // Normal jobs baseline
.low // Low jobs are 2x less likely
.background // Background jobs are 10x less likely
.fairness_factor // 10% chance to select from lower priorities (prevents starvation)
.build;
let worker = new
.with_priority_weights;
Strict Priority Scheduling
// Highest priority jobs are always processed first
let worker = new
.with_strict_priority;
Priority Statistics
use InMemoryStatsCollector;
let stats = stats_collector.get_priority_stats.await?;
println!;
println!;
println!;
// Detect priority starvation
if stats.has_starvation_risk
Cron Scheduling
Hammerwork supports comprehensive cron-based job scheduling with timezone awareness for recurring jobs.
Basic Cron Jobs
use CronSchedule;
use USEastern;
// Using cron expressions (6-field format: seconds, minutes, hours, day, month, weekday)
let daily_backup = new
.with_cron? // Every day at 2:00 AM
.with_timezone
.as_recurring;
// Using built-in presets
let hourly_cleanup = new
.with_cron_schedule
.as_recurring;
let weekday_reports = new
.with_cron_schedule
.as_recurring;
Advanced Cron Scheduling
// Custom schedules with timezone support
let monthly_billing = new
.with_cron? // 1st of every month at 9 AM
.with_timezone
.as_recurring;
// Complex schedules
let business_hours_sync = new
.with_cron? // Every 15 minutes, 9 AM to 5 PM, weekdays
.as_recurring;
Cron Job Management
use DatabaseQueue;
// Enqueue recurring jobs
queue.enqueue_cron_job.await?;
// Get jobs ready for execution
let due_jobs = queue.get_due_cron_jobs.await?; // All queues
let queue_jobs = queue.get_due_cron_jobs.await?; // Specific queue
// Manage recurring jobs
let recurring_jobs = queue.get_recurring_jobs.await?;
queue.disable_recurring_job.await?;
queue.enable_recurring_job.await?;
// Manual rescheduling
queue.reschedule_cron_job.await?;
Job Timeouts
Configure timeouts at the job level or worker level to prevent jobs from running indefinitely.
Job-Level Timeouts
use Duration;
// Set timeout when creating jobs
let timeout_job = new
.with_timeout; // 5 minute timeout
// Timeout takes precedence over worker defaults
let priority_timeout = new
.as_critical
.with_timeout; // 1 minute for urgent jobs
Worker-Level Default Timeouts
// Set default timeout for all jobs processed by this worker
let worker = new
.with_default_timeout; // 10 minute default
// Job-specific timeouts override worker defaults
// Job without timeout uses worker default (600s)
// Job with timeout uses its own setting
Timeout Handling
// Jobs that exceed their timeout are automatically marked as TimedOut
// Timeout events are recorded in statistics
let stats = stats_collector.get_queue_stats.await?;
println!;
println!;
// Timed out jobs can be retried or marked as dead
if job.status == TimedOut
Statistics and Dead Job Management
Comprehensive monitoring and management capabilities for job queues.
Statistics Collection
use ;
use Arc;
let stats_collector = new;
// Configure worker with statistics
let worker = new
.with_stats_collector;
// View comprehensive statistics
let stats = stats_collector.get_job_statistics.await?;
println!;
println!;
println!;
println!;
// Queue-specific statistics
let queue_stats = stats_collector.get_queue_stats.await?;
println!;
println!;
println!;
Dead Job Management
use DatabaseQueue;
// Get dead jobs for analysis
let dead_jobs = queue.get_dead_jobs.await?; // 50 jobs, offset 0
let queue_dead_jobs = queue.get_dead_jobs_by_queue.await?;
// Analyze dead job patterns
let dead_summary = queue.get_dead_job_summary.await?;
println!;
println!;
for in dead_summary.error_frequency
// Retry dead jobs
for dead_job in dead_jobs
// Clean up old dead jobs
let cutoff = now - days;
let purged_count = queue.purge_dead_jobs.await?;
println!;
Advanced Monitoring
// Real-time monitoring
use ;
let mut monitor_interval = interval;
loop
Database Schema
Hammerwork creates a single table hammerwork_jobs with the following structure:
Core Fields
id: Unique job identifier (UUID)queue_name: Name of the queuepayload: JSON payload datastatus: Current job status (pending, running, completed, failed, retrying, dead, timed_out)attempts: Number of processing attemptsmax_attempts: Maximum allowed attempts
Priority System
priority: Job priority level (0=Background, 1=Low, 2=Normal, 3=High, 4=Critical)
Timing & Scheduling
created_at: When the job was createdscheduled_at: When the job should be processedstarted_at: When processing begancompleted_at: When processing finishedfailed_at: When the job failed permanentlytimed_out_at: When the job timed out
Timeout Configuration
timeout_seconds: Job-specific timeout in seconds
Cron Scheduling
cron_schedule: Cron expression for recurring jobsnext_run_at: Next scheduled execution timerecurring: Whether this is a recurring jobtimezone: Timezone for cron calculations
Error Handling
error_message: Error details if job failed
Optimized Indexes
idx_hammerwork_jobs_queue_status_priority_scheduled: Priority-aware job pollingidx_recurring_next_run: Efficient recurring job queriesidx_cron_schedule: Cron job management
Development & Testing
Local Development Setup
Hammerwork includes comprehensive integration testing with Docker containers:
# Clone the repository
# Start database containers and run all tests
# Or run specific database tests
# Run unit tests only
# Performance benchmarks
Prerequisites
- Docker & Docker Compose: For database containers
- Rust 1.75+: For building and running tests
- Make (optional): For convenient commands
Available Commands
# Database management
# Testing
# Development
# Cleanup
Integration Testing
The project includes comprehensive integration tests that validate:
- ✅ Job queue functionality with real databases
- ✅ PostgreSQL and MySQL compatibility
- ✅ Worker pool management and job processing
- ✅ Performance benchmarks and regression testing
- ✅ Containerized deployment scenarios
- ✅ End-to-end job lifecycle management
See docs/integration-testing.md for detailed testing documentation.
CI/CD Pipeline
GitHub Actions automatically runs:
- Unit Tests: Fast validation without dependencies
- PostgreSQL Integration: Full database testing with PostgreSQL 16
- MySQL Integration: Full database testing with MySQL 8.0
- Docker Tests: Containerized deployment validation
- Security Audit: Vulnerability scanning
- Performance Tests: Benchmark regression detection (scheduled daily)
Examples
Complete working examples are available in the examples/ directory:
postgres_example.rs: PostgreSQL job processing with timeouts and statisticsmysql_example.rs: MySQL job processing with multiple workers and priority configurationcron_example.rs: Comprehensive cron scheduling with timezone supportpriority_example.rs: Job prioritization with weighted and strict scheduling
Run examples with:
# PostgreSQL example (requires running PostgreSQL)
# MySQL example (requires running MySQL)
# Cron scheduling example (requires running PostgreSQL)
# Priority system example (requires running PostgreSQL)
Contributing
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Run tests:
make integration-all - Make your changes with appropriate tests
- Ensure all tests pass:
make ci - Commit your changes:
git commit -m 'Add amazing feature' - Push to the branch:
git push origin feature/amazing-feature - Create a Pull Request
Please ensure your code:
- ✅ Includes appropriate tests
- ✅ Follows Rust formatting (
cargo fmt) - ✅ Passes all linting (
cargo clippy) - ✅ Maintains or improves test coverage
- ✅ Includes documentation for new features
License
This project is licensed under the MIT License - see the LICENSE-MIT file for details.