Asynq - Rust Distributed Task Queue
Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by hibiken/asynq.
π Fully Compatible with Go asynq: This implementation is fully compatible with the Go version of hibiken/asynq, allowing seamless interoperation with Go services.
π Features
- β Guaranteed at-least-once execution - Tasks won't be lost
- β° Task scheduling - Support for delayed and scheduled tasks
- π Automatic retry - Configurable retry policies for failed tasks
- π‘οΈ Fault recovery - Automatic task recovery on worker crashes
- π― Priority queues - Support for weighted and strict priority
- β‘ Low latency - Fast Redis writes with low task enqueue latency
- π Task deduplication - Support for unique task options
- β±οΈ Timeout control - Per-task timeout and deadline support
- π¦ Task aggregation - Support for batch processing of multiple tasks
- π Flexible interface - Support for middleware and custom handlers
- βΈοΈ Queue pause - Ability to pause/resume specific queues
- π Periodic tasks - Support for cron-style scheduled tasks
- π High availability - Support for Redis Cluster
- π₯οΈ Web UI - Web-based management interface for queues and tasks
- π Go compatible - Fully compatible with Go version asynq, can be deployed together
- π― Macro support - Attribute macros for easy handler registration (optional feature)
π Quick Start
Add Dependencies
Add to your Cargo.toml:
[]
= { = "0.1", = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
= { = "1.0", = ["full"] }
= { = "1.0", = ["derive"] }
Basic Usage
Producer (Enqueue Tasks)
use ;
use ;
async
Consumer (Process Tasks)
use ;
use async_trait;
use ;
use HashMap;
;
async
Using ServeMux for Task Routing
ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:
use ;
use HashMap;
async
Features:
- π― Automatically route tasks to corresponding handlers based on task type
- β‘ Support for both synchronous (
handle_func) and asynchronous (handle_async_func) handlers - π Fully compatible with Go version ServeMux
- π‘οΈ Type-safe with compile-time checking
- π Clean API, easy to use
See examples/servemux_example.rs for more examples.
Task Handler Macros (Optional Feature)
When the macros feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:
use ;
use HashMap;
// Define handlers with attribute macros
async
async
Macro Features:
- π― Declarative syntax: Define handlers with clean attribute syntax
- π Reduced boilerplate: Pattern strings are stored with the function
- π§ Convenient registration: Use
register_handlers!andregister_async_handlers!macros - π Familiar pattern: Similar to actix-web's
#[get("/path")]routing macros
See examples/macro_example.rs for a complete example.
π Advanced Usage
Delayed Tasks
use Duration;
// Execute after 5 minutes delay
client.enqueue_in.await?;
Unique Tasks (Deduplication)
use Duration;
// Keep unique within 1 hour
let unique_task = new_with_json?;
client.enqueue_unique.await?;
Task Groups (Batch Processing)
// Add tasks to group for aggregation
for i in 1..=10
Task Options
let task = new_with_json?
.with_queue // Specify queue
.with_max_retry // Maximum retry attempts
.with_timeout // Timeout
.with_unique_ttl; // Uniqueness TTL
Priority Queues
let mut queues = new;
queues.insert; // Highest priority
queues.insert; // Medium priority
queues.insert; // Low priority
let config = new
.queues
.strict_priority; // Strict priority mode
ποΈ Architecture Design
Asynq uses a modular design with main components:
asynq/
βββ src/
β βββ lib.rs # Library entry and public API
β βββ client.rs # Client implementation
β βββ server.rs # Server implementation
β βββ serve_mux.rs # ServeMux routing (compatible with Go serve_mux.go)
β βββ processor.rs # Processor implementation (compatible with Go processor.go)
β βββ task.rs # Task data structures
β βββ error.rs # Error handling
β βββ config.rs # Configuration management
β βββ redis.rs # Redis connection management
β βββ inspector.rs # Queue inspector
β βββ broker/ # Storage backend abstraction
β βββ mod.rs # Broker trait definition
β βββ redis_broker.rs # Redis implementation
βββ proto/
β βββ asynq.proto # Protocol Buffer definitions
βββ examples/
βββ producer.rs # Producer example
βββ consumer.rs # Consumer example
βββ servemux_example.rs # ServeMux usage example
βββ processor_example.rs # Processor example
Core Components
- Client: Responsible for enqueueing tasks
- Server: Responsible for dequeuing and processing tasks
- ServeMux: Task routing multiplexer, routes tasks to different handlers by type (compatible with Go servemux.go)
- Processor: Task processor core, handles concurrency control and task execution (compatible with Go asynq processor.go)
- Aggregator: Task aggregator, aggregates tasks from the same group into batch tasks (compatible with Go asynq aggregator.go)
- Broker: Storage backend abstraction, currently supports Redis
- Task: Task data structure containing type, payload, and options
- Handler: Task handler trait that users need to implement
- Inspector: Queue and task inspection and management tool
Processor Features
The Processor module implements task processing architecture compatible with Go asynq processor.go:
- β Semaphore concurrency control: Uses Tokio Semaphore for precise control of concurrent workers
- β Queue priority: Supports both strict priority and weighted priority modes
- β Task timeout: Supports task-level and global timeout settings
- β Graceful shutdown: Waits for all active workers to complete before shutdown
- β Automatic retry: Failed tasks automatically retry with exponential backoff
- β Task archiving: Tasks automatically archived after reaching max retry count
GroupAggregator Features
The GroupAggregator module implements task aggregation functionality compatible with Go asynq aggregator.go:
- β
Task grouping: Set group label for tasks using
with_group() - β Batch aggregation: Automatically aggregate tasks from the same group into a single batch task
- β Flexible triggers: Supports three trigger conditions: grace period, max group size, max delay
- β
Custom aggregation: Customize aggregation logic via
GroupAggregatortrait - β
Functional interface: Quickly create aggregators using
GroupAggregatorFunc
Example usage:
use GroupAggregatorFunc;
// Define aggregation function
let aggregator = new;
// Set on server
server.set_group_aggregator;
See GROUP_AGGREGATOR.md for more details.
π οΈ Configuration Options
Server Configuration
use ServerConfig;
use Duration;
let config = new
.concurrency // Number of concurrent workers
.task_check_interval // Task check interval
.delayed_task_check_interval // Delayed task check interval
.shutdown_timeout // Shutdown timeout
.health_check_interval // Health check interval
.group_grace_period? // Group aggregation grace period
.group_max_delay // Group max delay
.group_max_size // Group max size
.janitor_interval // Janitor interval
.janitor_batch_size; // Janitor batch size
Redis Configuration
use ;
use Duration;
// Basic configuration
let redis_config = single?;
let nodes = vec!;
let redis_config = cluster?;
π Monitoring and Management
Queue Inspector
use TaskState;
use Inspector;
async
π§ Development Guide
Local Development
- Clone the repository:
- Install dependencies:
- Start Redis:
- Run examples:
# Terminal 1: Start consumer
# Terminal 2: Run producer
Run Tests
# Unit tests
# Integration tests (requires Redis)
π€ Contributing
We welcome contributions of all kinds! Please read CONTRIBUTING.md for details.
Development Principles
- Use Rust features and best practices
- Keep the API simple and easy to use
- Provide comprehensive documentation
- Ensure code quality and test coverage
- Follow semantic versioning
π License
This project is licensed under the MIT License OR GPL License.
π Acknowledgments
- Thanks to hibiken/asynq for design inspiration
- Thanks to the Rust community for excellent library support
π Contact
If you have any questions or suggestions, please:
- Submit an Issue
- Create a Pull Request
- Join our discussions
β If this project helps you, please give us a star!