asynq 0.1.2

Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq
Documentation

Asynq - Rust Distributed Task Queue

English | δΈ­ζ–‡

License Rust Go Compatible

Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by hibiken/asynq.

πŸ”— Fully Compatible with Go asynq: This implementation is fully compatible with the Go version of hibiken/asynq, allowing seamless interoperation with Go services.

🌟 Features

  • βœ… Guaranteed at-least-once execution - Tasks won't be lost
  • ⏰ Task scheduling - Support for delayed and scheduled tasks
  • πŸ”„ Automatic retry - Configurable retry policies for failed tasks
  • πŸ›‘οΈ Fault recovery - Automatic task recovery on worker crashes
  • 🎯 Priority queues - Support for weighted and strict priority
  • ⚑ Low latency - Fast Redis writes with low task enqueue latency
  • πŸ”’ Task deduplication - Support for unique task options
  • ⏱️ Timeout control - Per-task timeout and deadline support
  • πŸ“¦ Task aggregation - Support for batch processing of multiple tasks
  • πŸ”Œ Flexible interface - Support for middleware and custom handlers
  • ⏸️ Queue pause - Ability to pause/resume specific queues
  • πŸ•’ Periodic tasks - Support for cron-style scheduled tasks
  • 🏠 High availability - Support for Redis Cluster
  • πŸ–₯️ Web UI - Web-based management interface for queues and tasks
  • πŸ”„ Go compatible - Fully compatible with Go version asynq, can be deployed together
  • 🎯 Macro support - Attribute macros for easy handler registration (optional feature)

πŸš€ Quick Start

Add Dependencies

Add to your Cargo.toml:

[dependencies]
asynq = { version = "0.1", features = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

Basic Usage

Producer (Enqueue Tasks)

use asynq::{client::Client, task::Task, redis::RedisConnectionConfig};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create Redis configuration
    let redis_config = RedisConnectionConfig::single("redis://127.0.0.1:6379")?;

    // Create client
    let client = Client::new(redis_config).await?;

    // Create task
    let payload = EmailPayload {
        to: "user@example.com".to_string(),
        subject: "Welcome!".to_string(),
        body: "Welcome to our service!".to_string(),
    };

    let task = Task::new_with_json("email:send", &payload)?;

    // Enqueue task
    let task_info = client.enqueue(task).await?;
    println!("Task enqueued with ID: {}", task_info.id);

    Ok(())
}

Consumer (Process Tasks)

use asynq::{server::Server,server::Handler,task::Task, redis::RedisConnectionConfig, config::ServerConfig};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

struct EmailProcessor;

#[async_trait]
impl Handler for EmailProcessor {
    async fn process_task(&self, task: Task) -> asynq::error::Result<()> {
        match task.get_type() {
            "email:send" => {
                let payload: EmailPayload = task.get_payload_with_json()?;
                println!("Sending email to: {}", payload.to);
                // Implement actual email sending logic
                Ok(())
            }
            _ => {
                Err(asynq::error::Error::other("Unknown task type"))
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Redis configuration
    let redis_config = RedisConnectionConfig::single("redis://127.0.0.1:6379")?;

    // Configure queues
    let mut queues = HashMap::new();
    queues.insert("critical".to_string(), 6);
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 1);

    // Server configuration
    let config = ServerConfig::new()
        .concurrency(4)
        .queues(queues);

    // Create server
    let mut server = Server::new(redis_config, config).await?;

    // Start server
    server.run(EmailProcessor).await?;

    Ok(())
}

Using ServeMux for Task Routing

ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:

use asynq::{serve_mux::ServeMux, task::Task, redis::RedisConnectionConfig, config::ServerConfig, server::ServerBuilder};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConnectionConfig::single("redis://127.0.0.1:6379")?;

    // Create ServeMux
    let mut mux = ServeMux::new();

    // Register synchronous handler
    mux.handle_func("email:send", |task: Task| {
        println!("Processing email:send {:?}",task);
        Ok(())
    });

    // Register asynchronous handler
    mux.handle_async_func("image:resize", |task: Task| async move {
        println!("Processing image:resize {:?}",task);
        // Async processing logic
        Ok(())
    });

    mux.handle_func("payment:process", |task: Task| {
        println!("Processing payment {:?}",task);
        Ok(())
    });

    // Configure server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);

    // Create and run server
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;

    // ServeMux implements Handler trait, can be passed directly to server.run()
    server.run(mux).await?;

    Ok(())
}

Features:

  • 🎯 Automatically route tasks to corresponding handlers based on task type
  • ⚑ Support for both synchronous (handle_func) and asynchronous (handle_async_func) handlers
  • πŸ”„ Fully compatible with Go version ServeMux
  • πŸ›‘οΈ Type-safe with compile-time checking
  • πŸ“ Clean API, easy to use

See examples/servemux_example.rs for more examples.

Task Handler Macros (Optional Feature)

When the macros feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:

use asynq::{
    serve_mux::ServeMux, 
    task::Task, 
    task_handler, 
    task_handler_async,
    register_handlers,
    register_async_handlers,
    redis::RedisConnectionConfig, 
    config::ServerConfig, 
    server::ServerBuilder
};
use std::collections::HashMap;

// Define handlers with attribute macros
#[task_handler("email:send")]
fn handle_email(task: Task) -> asynq::error::Result<()> {
    println!("Processing email:send");
    Ok(())
}

#[task_handler_async("image:resize")]
async fn handle_image(task: Task) -> asynq::error::Result<()> {
    println!("Processing image:resize");
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = RedisConnectionConfig::single("redis://127.0.0.1:6379")?;
    
    // Create ServeMux and register handlers with convenience macros
    let mut mux = ServeMux::new();
    register_handlers!(mux, handle_email);
    register_async_handlers!(mux, handle_image);
    
    // Configure and run server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);
    
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;
    
    server.run(mux).await?;
    Ok(())
}

Macro Features:

  • 🎯 Declarative syntax: Define handlers with clean attribute syntax
  • πŸ“ Reduced boilerplate: Pattern strings are stored with the function
  • πŸ”§ Convenient registration: Use register_handlers! and register_async_handlers! macros
  • 🌐 Familiar pattern: Similar to actix-web's #[get("/path")] routing macros

See examples/macro_example.rs for a complete example.

πŸ“š Advanced Usage

Delayed Tasks

use std::time::Duration;
// Execute after 5 minutes delay
client.enqueue_in(task, Duration::from_secs(300)).await?;

Unique Tasks (Deduplication)

use std::time::Duration;

// Keep unique within 1 hour
let unique_task = Task::new_with_json("report:daily", &payload)?;
client.enqueue_unique(unique_task, Duration::from_secs(3600)).await?;

Task Groups (Batch Processing)

// Add tasks to group for aggregation
for i in 1..=10 {
    let item_task = Task::new_with_json("batch:process", &serde_json::json!({"item": i}))?;
    client.add_to_group(item_task, "daily_batch").await?;
}

Task Options

let task = Task::new_with_json("image:resize", &payload)?
    .with_queue("image_processing")     // Specify queue
    .with_max_retry(5)                  // Maximum retry attempts
    .with_timeout(Duration::from_secs(300)) // Timeout
    .with_unique_ttl(Duration::from_secs(3600)); // Uniqueness TTL

Priority Queues

let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6);  // Highest priority
queues.insert("default".to_string(), 3);   // Medium priority
queues.insert("low".to_string(), 1);       // Low priority

let config = ServerConfig::new()
    .queues(queues)
    .strict_priority(true); // Strict priority mode

πŸ—οΈ Architecture Design

Asynq uses a modular design with main components:

asynq/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ lib.rs              # Library entry and public API
β”‚   β”œβ”€β”€ client.rs           # Client implementation
β”‚   β”œβ”€β”€ server.rs           # Server implementation
β”‚   β”œβ”€β”€ serve_mux.rs         # ServeMux routing (compatible with Go serve_mux.go)
β”‚   β”œβ”€β”€ processor.rs        # Processor implementation (compatible with Go processor.go)
β”‚   β”œβ”€β”€ task.rs             # Task data structures
β”‚   β”œβ”€β”€ error.rs            # Error handling
β”‚   β”œβ”€β”€ config.rs           # Configuration management
β”‚   β”œβ”€β”€ redis.rs            # Redis connection management
β”‚   β”œβ”€β”€ inspector.rs        # Queue inspector
β”‚   └── broker/             # Storage backend abstraction
β”‚       β”œβ”€β”€ mod.rs          # Broker trait definition
β”‚       └── redis_broker.rs # Redis implementation
β”œβ”€β”€ proto/
β”‚   └── asynq.proto         # Protocol Buffer definitions
└── examples/
    β”œβ”€β”€ producer.rs         # Producer example
    β”œβ”€β”€ consumer.rs         # Consumer example
    β”œβ”€β”€ servemux_example.rs # ServeMux usage example
    └── processor_example.rs # Processor example

Core Components

  • Client: Responsible for enqueueing tasks
  • Server: Responsible for dequeuing and processing tasks
  • ServeMux: Task routing multiplexer, routes tasks to different handlers by type (compatible with Go servemux.go)
  • Processor: Task processor core, handles concurrency control and task execution (compatible with Go asynq processor.go)
  • Aggregator: Task aggregator, aggregates tasks from the same group into batch tasks (compatible with Go asynq aggregator.go)
  • Broker: Storage backend abstraction, currently supports Redis
  • Task: Task data structure containing type, payload, and options
  • Handler: Task handler trait that users need to implement
  • Inspector: Queue and task inspection and management tool

Processor Features

The Processor module implements task processing architecture compatible with Go asynq processor.go:

  • βœ… Semaphore concurrency control: Uses Tokio Semaphore for precise control of concurrent workers
  • βœ… Queue priority: Supports both strict priority and weighted priority modes
  • βœ… Task timeout: Supports task-level and global timeout settings
  • βœ… Graceful shutdown: Waits for all active workers to complete before shutdown
  • βœ… Automatic retry: Failed tasks automatically retry with exponential backoff
  • βœ… Task archiving: Tasks automatically archived after reaching max retry count

GroupAggregator Features

The GroupAggregator module implements task aggregation functionality compatible with Go asynq aggregator.go:

  • βœ… Task grouping: Set group label for tasks using with_group()
  • βœ… Batch aggregation: Automatically aggregate tasks from the same group into a single batch task
  • βœ… Flexible triggers: Supports three trigger conditions: grace period, max group size, max delay
  • βœ… Custom aggregation: Customize aggregation logic via GroupAggregator trait
  • βœ… Functional interface: Quickly create aggregators using GroupAggregatorFunc

Example usage:

use asynq::components::aggregator::GroupAggregatorFunc;

// Define aggregation function
let aggregator = GroupAggregatorFunc::new(|group, tasks| {
    // Merge multiple tasks into a single batch task
    let combined = tasks.iter()
        .map(|t| t.get_payload())
        .collect::<Vec<_>>()
        .join(&b"\n"[..]);
    Task::new("batch:process", &combined)
});

// Set on server
server.set_group_aggregator(aggregator);

See GROUP_AGGREGATOR.md for more details.

πŸ› οΈ Configuration Options

Server Configuration

use asynq::config::ServerConfig;
use std::time::Duration;

let config = ServerConfig::new()
    .concurrency(8)                                          // Number of concurrent workers
    .task_check_interval(Duration::from_secs(1))            // Task check interval
    .delayed_task_check_interval(Duration::from_secs(5))    // Delayed task check interval
    .shutdown_timeout(Duration::from_secs(10))              // Shutdown timeout
    .health_check_interval(Duration::from_secs(15))         // Health check interval
    .group_grace_period(Duration::from_secs(60))?           // Group aggregation grace period
    .group_max_delay(Duration::from_secs(300))              // Group max delay
    .group_max_size(100)                                    // Group max size
    .janitor_interval(Duration::from_secs(8))               // Janitor interval
    .janitor_batch_size(50);                                // Janitor batch size

Redis Configuration

use asynq::redis::{RedisConnectionConfig};
use std::time::Duration;

// Basic configuration
let redis_config = RedisConnectionConfig::single("redis://127.0.0.1:6379")?;
let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
let redis_config = RedisConnectionConfig::cluster(nodes)?;

πŸ“Š Monitoring and Management

Queue Inspector

use asynq::base::keys::TaskState;
use asynq::inspector::Inspector;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let inspector = Inspector::new(broker);

    // Get queue statistics
    let stats = inspector.get_queue_stats("default").await?;
    println!("Pending: {}, Active: {}", stats.pending, stats.active);

    // List tasks
    let tasks = inspector.list_tasks("default", TaskState::Pending, 1, 10).await?;

    // Requeue archived task
    inspector.requeue_archived_task("default", "task-id").await?;

    // Pause queue
    inspector.pause_queue("default").await?;

    Ok(())
}

πŸ”§ Development Guide

Local Development

  1. Clone the repository:
git clone https://github.com/emo-crab/asynq.git
cd asynq
  1. Install dependencies:
cargo build
  1. Start Redis:
docker run -d -p 6379:6379 redis:alpine
  1. Run examples:
# Terminal 1: Start consumer
cargo run --example consumer

# Terminal 2: Run producer
cargo run --example producer

Run Tests

# Unit tests
cargo test

# Integration tests (requires Redis)
cargo test --features integration-tests

🀝 Contributing

We welcome contributions of all kinds! Please read CONTRIBUTING.md for details.

Development Principles

  • Use Rust features and best practices
  • Keep the API simple and easy to use
  • Provide comprehensive documentation
  • Ensure code quality and test coverage
  • Follow semantic versioning

πŸ“ License

This project is licensed under the MIT License OR GPL License.

πŸ™ Acknowledgments

  • Thanks to hibiken/asynq for design inspiration
  • Thanks to the Rust community for excellent library support

πŸ“ž Contact

If you have any questions or suggestions, please:


⭐ If this project helps you, please give us a star!