Asynq - Rust Distributed Task Queue
Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by hibiken/asynq.
π Fully Compatible with Go asynq: This implementation is fully compatible with the Go version of hibiken/asynq, allowing seamless interoperation with Go services.
π Features
- β Guaranteed at-least-once execution - Tasks won't be lost
- β° Task scheduling - Support for delayed and scheduled tasks
- π Automatic retry - Configurable retry policies for failed tasks
- π‘οΈ Fault recovery - Automatic task recovery on worker crashes
- π― Priority queues - Support for weighted and strict priority
- β‘ Low latency - Fast Redis writes with low task enqueue latency
- π Task deduplication - Support for unique task options
- β±οΈ Timeout control - Per-task timeout and deadline support
- π¦ Task aggregation - Support for batch processing of multiple tasks
- π Flexible interface - Support for middleware and custom handlers
- βΈοΈ Queue pause - Ability to pause/resume specific queues
- π Periodic tasks - Support for cron-style scheduled tasks
- π High availability - Support for Redis Cluster
- π₯οΈ Web UI - Web-based management interface for queues and tasks
- π Go compatible - Fully compatible with Go version asynq, can be deployed together
- π― Macro support - Attribute macros for easy handler registration (optional feature)
π Quick Start
Add Dependencies
Add to your Cargo.toml:
[]
 = {  = "0.1",  = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
 = {  = "1.0",  = ["full"] }
 = {  = "1.0",  = ["derive"] }
Basic Usage
Producer (Enqueue Tasks)
use ;
use ;
async 
Consumer (Process Tasks)
use ;
use async_trait;
use ;
use HashMap;
;
async 
Using ServeMux for Task Routing
ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:
use ;
use HashMap;
async 
Features:
- π― Automatically route tasks to corresponding handlers based on task type
- β‘ Support for both synchronous (handle_func) and asynchronous (handle_async_func) handlers
- π Fully compatible with Go version ServeMux
- π‘οΈ Type-safe with compile-time checking
- π Clean API, easy to use
See examples/servemux_example.rs for more examples.
Task Handler Macros (Optional Feature)
When the macros feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:
use ;
use HashMap;
// Define handlers with attribute macros
async 
async 
Macro Features:
- π― Declarative syntax: Define handlers with clean attribute syntax
- π Reduced boilerplate: Pattern strings are stored with the function
- π§ Convenient registration: Use register_handlers!andregister_async_handlers!macros
- π Familiar pattern: Similar to actix-web's #[get("/path")]routing macros
See examples/macro_example.rs for a complete example.
π Advanced Usage
Delayed Tasks
use Duration;
// Execute after 5 minutes delay
client.enqueue_in.await?;
Unique Tasks (Deduplication)
use Duration;
// Keep unique within 1 hour
let unique_task = new_with_json?;
client.enqueue_unique.await?;
Task Groups (Batch Processing)
// Add tasks to group for aggregation
for i in 1..=10 
Task Options
let task = new_with_json?
    .with_queue     // Specify queue
    .with_max_retry                  // Maximum retry attempts
    .with_timeout // Timeout
    .with_unique_ttl; // Uniqueness TTL
Priority Queues
let mut queues = new;
queues.insert;  // Highest priority
queues.insert;   // Medium priority
queues.insert;       // Low priority
let config = new
    .queues
    .strict_priority; // Strict priority mode
ποΈ Architecture Design
Asynq uses a modular design with main components:
asynq/
βββ src/
β   βββ lib.rs              # Library entry and public API
β   βββ client.rs           # Client implementation
β   βββ server.rs           # Server implementation
β   βββ serve_mux.rs         # ServeMux routing (compatible with Go serve_mux.go)
β   βββ processor.rs        # Processor implementation (compatible with Go processor.go)
β   βββ task.rs             # Task data structures
β   βββ error.rs            # Error handling
β   βββ config.rs           # Configuration management
β   βββ redis.rs            # Redis connection management
β   βββ inspector.rs        # Queue inspector
β   βββ broker/             # Storage backend abstraction
β       βββ mod.rs          # Broker trait definition
β       βββ redis_broker.rs # Redis implementation
βββ proto/
β   βββ asynq.proto         # Protocol Buffer definitions
βββ examples/
    βββ producer.rs         # Producer example
    βββ consumer.rs         # Consumer example
    βββ servemux_example.rs # ServeMux usage example
    βββ processor_example.rs # Processor example
Core Components
- Client: Responsible for enqueueing tasks
- Server: Responsible for dequeuing and processing tasks
- ServeMux: Task routing multiplexer, routes tasks to different handlers by type (compatible with Go servemux.go)
- Processor: Task processor core, handles concurrency control and task execution (compatible with Go asynq processor.go)
- Aggregator: Task aggregator, aggregates tasks from the same group into batch tasks (compatible with Go asynq aggregator.go)
- Broker: Storage backend abstraction, currently supports Redis
- Task: Task data structure containing type, payload, and options
- Handler: Task handler trait that users need to implement
- Inspector: Queue and task inspection and management tool
Processor Features
The Processor module implements task processing architecture compatible with Go asynq processor.go:
- β Semaphore concurrency control: Uses Tokio Semaphore for precise control of concurrent workers
- β Queue priority: Supports both strict priority and weighted priority modes
- β Task timeout: Supports task-level and global timeout settings
- β Graceful shutdown: Waits for all active workers to complete before shutdown
- β Automatic retry: Failed tasks automatically retry with exponential backoff
- β Task archiving: Tasks automatically archived after reaching max retry count
GroupAggregator Features
The GroupAggregator module implements task aggregation functionality compatible with Go asynq aggregator.go:
- β
 Task grouping: Set group label for tasks using with_group()
- β Batch aggregation: Automatically aggregate tasks from the same group into a single batch task
- β Flexible triggers: Supports three trigger conditions: grace period, max group size, max delay
- β
 Custom aggregation: Customize aggregation logic via GroupAggregatortrait
- β
 Functional interface: Quickly create aggregators using GroupAggregatorFunc
Example usage:
use GroupAggregatorFunc;
// Define aggregation function
let aggregator = new;
// Set on server
server.set_group_aggregator;
See GROUP_AGGREGATOR.md for more details.
π οΈ Configuration Options
Server Configuration
use ServerConfig;
use Duration;
let config = new
    .concurrency                                          // Number of concurrent workers
    .task_check_interval            // Task check interval
    .delayed_task_check_interval    // Delayed task check interval
    .shutdown_timeout              // Shutdown timeout
    .health_check_interval         // Health check interval
    .group_grace_period?           // Group aggregation grace period
    .group_max_delay              // Group max delay
    .group_max_size                                    // Group max size
    .janitor_interval               // Janitor interval
    .janitor_batch_size;                                // Janitor batch size
Redis Configuration
use ;
use Duration;
// Basic configuration
let redis_config = single?;
let nodes = vec!;
let redis_config = cluster?;
π Monitoring and Management
Queue Inspector
use TaskState;
use Inspector;
async 
π§ Development Guide
Local Development
- Clone the repository:
- Install dependencies:
- Start Redis:
- Run examples:
# Terminal 1: Start consumer
# Terminal 2: Run producer
Run Tests
# Unit tests
# Integration tests (requires Redis)
π€ Contributing
We welcome contributions of all kinds! Please read CONTRIBUTING.md for details.
Development Principles
- Use Rust features and best practices
- Keep the API simple and easy to use
- Provide comprehensive documentation
- Ensure code quality and test coverage
- Follow semantic versioning
π License
This project is licensed under the MIT License OR GPL License.
π Acknowledgments
- Thanks to hibiken/asynq for design inspiration
- Thanks to the Rust community for excellent library support
π Contact
If you have any questions or suggestions, please:
- Submit an Issue
- Create a Pull Request
- Join our discussions
β If this project helps you, please give us a star!