asynq 0.1.8

Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# Asynq - Rust Distributed Task Queue

[English]README.md | [δΈ­ζ–‡]README.zh-CN.md

![License]https://img.shields.io/badge/license-MIT-blue.svg
![Rust]https://img.shields.io/badge/rust-1.70+-orange.svg
![Go Compatible]https://img.shields.io/badge/go%20compatible-hibiken%2Fasynq-green.svg

Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by [hibiken/asynq](https://github.com/hibiken/asynq).

**πŸ”— Fully Compatible with Go asynq**: This implementation is fully compatible with the Go version of [hibiken/asynq](https://github.com/hibiken/asynq), allowing seamless interoperation with Go services.

## 🌟 Features

- βœ… **Guaranteed at-least-once execution** - Tasks won't be lost
- ⏰ **Task scheduling** - Support for delayed and scheduled tasks
- πŸ”„ **Automatic retry** - Configurable retry policies for failed tasks
- πŸ›‘οΈ **Fault recovery** - Automatic task recovery on worker crashes
- 🎯 **Priority queues** - Support for weighted and strict priority
- ⚑ **Low latency** - Fast Redis writes with low task enqueue latency
- πŸ”’ **Task deduplication** - Support for unique task options
- ⏱️ **Timeout control** - Per-task timeout and deadline support
- πŸ“¦ **Task aggregation** - Support for batch processing of multiple tasks
- πŸ”Œ **Flexible interface** - Support for middleware and custom handlers
- ⏸️ **Queue pause** - Ability to pause/resume specific queues
- πŸ•’ **Periodic tasks** - Support for cron-style scheduled tasks
- 🏠 **High availability** - Support for Redis Cluster
- πŸ–₯️ **Web UI** - Web-based management interface for queues and tasks
- πŸ”„ **Go compatible** - Fully compatible with Go version asynq, can be deployed together
- 🎯 **Macro support** - Attribute macros for easy handler registration (optional feature)

## πŸš€ Quick Start

### Add Dependencies

Add to your `Cargo.toml`:

```toml
[dependencies]
asynq = { version = "0.1", features = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```

### Basic Usage

#### Producer (Enqueue Tasks)

```rust
use asynq::{client::Client, task::Task, redis::RedisConnectionType};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create Redis configuration
    let redis_config = asynq::backend::RedisConnectionType::single("redis://127.0.0.1:6379")?;

    // Create client
    let client = Client::new(redis_config).await?;

    // Create task
    let payload = EmailPayload {
        to: "user@example.com".to_string(),
        subject: "Welcome!".to_string(),
        body: "Welcome to our service!".to_string(),
    };

    let task = Task::new_with_json("email:send", &payload)?;

    // Enqueue task
    let task_info = client.enqueue(task).await?;
    println!("Task enqueued with ID: {}", task_info.id);

    Ok(())
}
```

#### Consumer (Process Tasks)

```rust
use asynq::{server::Server,server::Handler,task::Task, redis::RedisConnectionType, config::ServerConfig};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;

#[derive(Serialize, Deserialize)]
struct EmailPayload {
    to: String,
    subject: String,
    body: String,
}

struct EmailProcessor;

#[async_trait]
impl Handler for EmailProcessor {
    async fn process_task(&self, task: Task) -> asynq::error::Result<()> {
        match task.get_type() {
            "email:send" => {
                let payload: EmailPayload = task.get_payload_with_json()?;
                println!("Sending email to: {}", payload.to);
                // Implement actual email sending logic
                Ok(())
            }
            _ => {
                Err(asynq::error::Error::other("Unknown task type"))
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Redis configuration
    let redis_config = asynq::backend::RedisConnectionType::single("redis://127.0.0.1:6379")?;

    // Configure queues
    let mut queues = HashMap::new();
    queues.insert("critical".to_string(), 6);
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 1);

    // Server configuration
    let config = ServerConfig::new()
        .concurrency(4)
        .queues(queues);

    // Create server
    let mut server = Server::new(redis_config, config).await?;

    // Start server
    server.run(EmailProcessor).await?;

    Ok(())
}
```

#### Using ServeMux for Task Routing

ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:

```rust
use asynq::{serve_mux::ServeMux, task::Task, redis::RedisConnectionType, config::ServerConfig, server::ServerBuilder};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = asynq::backend::RedisConnectionType::single("redis://127.0.0.1:6379")?;

    // Create ServeMux
    let mut mux = ServeMux::new();

    // Register synchronous handler
    mux.handle_func("email:send", |task: Task| {
        println!("Processing email:send {:?}",task);
        Ok(())
    });

    // Register asynchronous handler
    mux.handle_async_func("image:resize", |task: Task| async move {
        println!("Processing image:resize {:?}",task);
        // Async processing logic
        Ok(())
    });

    mux.handle_func("payment:process", |task: Task| {
        println!("Processing payment {:?}",task);
        Ok(())
    });

    // Configure server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);

    // Create and run server
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;

    // ServeMux implements Handler trait, can be passed directly to server.run()
    server.run(mux).await?;

    Ok(())
}
```

**Features:**
- 🎯 Automatically route tasks to corresponding handlers based on task type
- ⚑ Support for both synchronous (`handle_func`) and asynchronous (`handle_async_func`) handlers
- πŸ”„ Fully compatible with Go version ServeMux
- πŸ›‘οΈ Type-safe with compile-time checking
- πŸ“ Clean API, easy to use

See `examples/servemux_example.rs` for more examples.

### Task Handler Macros (Optional Feature)

When the `macros` feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:

```rust
use asynq::{
    serve_mux::ServeMux, 
    task::Task, 
    task_handler, 
    task_handler_async,
    register_handlers,
    register_async_handlers,
    redis::RedisConnectionType, 
    config::ServerConfig, 
    server::ServerBuilder
};
use std::collections::HashMap;

// Define handlers with attribute macros
#[task_handler("email:send")]
fn handle_email(task: Task) -> asynq::error::Result<()> {
    println!("Processing email:send");
    Ok(())
}

#[task_handler_async("image:resize")]
async fn handle_image(task: Task) -> asynq::error::Result<()> {
    println!("Processing image:resize");
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_config = asynq::backend::RedisConnectionType::single("redis://127.0.0.1:6379")?;
    
    // Create ServeMux and register handlers with convenience macros
    let mut mux = ServeMux::new();
    register_handlers!(mux, handle_email);
    register_async_handlers!(mux, handle_image);
    
    // Configure and run server
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    let config = ServerConfig::new().concurrency(4).queues(queues);
    
    let mut server = ServerBuilder::new()
        .redis_config(redis_config)
        .server_config(config)
        .build()
        .await?;
    
    server.run(mux).await?;
    Ok(())
}
```

**Macro Features:**
- 🎯 **Declarative syntax**: Define handlers with clean attribute syntax
- πŸ“ **Reduced boilerplate**: Pattern strings are stored with the function
- πŸ”§ **Convenient registration**: Use `register_handlers!` and `register_async_handlers!` macros
- 🌐 **Familiar pattern**: Similar to actix-web's `#[get("/path")]` routing macros

See `examples/macro_example.rs` for a complete example.

## πŸ“š Advanced Usage

### Delayed Tasks

```rust
use std::time::Duration;
// Execute after 5 minutes delay
client.enqueue_in(task, Duration::from_secs(300)).await?;
```

### Unique Tasks (Deduplication)

```rust
use std::time::Duration;

// Keep unique within 1 hour
let unique_task = Task::new_with_json("report:daily", &payload)?;
client.enqueue_unique(unique_task, Duration::from_secs(3600)).await?;
```

### Task Groups (Batch Processing)

```rust
// Add tasks to group for aggregation
for i in 1..=10 {
    let item_task = Task::new_with_json("batch:process", &serde_json::json!({"item": i}))?;
    client.add_to_group(item_task, "daily_batch").await?;
}
```

### Task Options

```rust
let task = Task::new_with_json("image:resize", &payload)?
    .with_queue("image_processing")     // Specify queue
    .with_max_retry(5)                  // Maximum retry attempts
    .with_timeout(Duration::from_secs(300)) // Timeout
    .with_unique_ttl(Duration::from_secs(3600)); // Uniqueness TTL
```

### Priority Queues

```rust
let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6);  // Highest priority
queues.insert("default".to_string(), 3);   // Medium priority
queues.insert("low".to_string(), 1);       // Low priority

let config = ServerConfig::new()
    .queues(queues)
    .strict_priority(true); // Strict priority mode
```

## πŸ—οΈ Architecture Design

Asynq uses a modular design with main components:

```
asynq/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ lib.rs              # Library entry and public API
β”‚   β”œβ”€β”€ client.rs           # Client implementation
β”‚   β”œβ”€β”€ server.rs           # Server implementation
β”‚   β”œβ”€β”€ serve_mux.rs         # ServeMux routing (compatible with Go serve_mux.go)
β”‚   β”œβ”€β”€ processor.rs        # Processor implementation (compatible with Go processor.go)
β”‚   β”œβ”€β”€ task.rs             # Task data structures
β”‚   β”œβ”€β”€ error.rs            # Error handling
β”‚   β”œβ”€β”€ config.rs           # Configuration management
β”‚   β”œβ”€β”€ redis.rs            # Redis connection management
β”‚   β”œβ”€β”€ inspector.rs        # Queue inspector
β”‚   └── broker/             # Storage backend abstraction
β”‚       β”œβ”€β”€ mod.rs          # Broker trait definition
β”‚       └── redis_broker.rs # Redis implementation
β”œβ”€β”€ proto/
β”‚   └── asynq.proto         # Protocol Buffer definitions
└── examples/
    β”œβ”€β”€ producer.rs         # Producer example
    β”œβ”€β”€ consumer.rs         # Consumer example
    β”œβ”€β”€ servemux_example.rs # ServeMux usage example
    └── processor_example.rs # Processor example
```

### Core Components

- **Client**: Responsible for enqueueing tasks
- **Server**: Responsible for dequeuing and processing tasks
- **ServeMux**: Task routing multiplexer, routes tasks to different handlers by type (compatible with Go servemux.go)
- **Processor**: Task processor core, handles concurrency control and task execution (compatible with Go asynq processor.go)
- **Aggregator**: Task aggregator, aggregates tasks from the same group into batch tasks (compatible with Go asynq aggregator.go)
- **Broker**: Storage backend abstraction, currently supports Redis
- **Task**: Task data structure containing type, payload, and options
- **Handler**: Task handler trait that users need to implement
- **Inspector**: Queue and task inspection and management tool

### Processor Features

The Processor module implements task processing architecture compatible with Go asynq processor.go:

- βœ… **Semaphore concurrency control**: Uses Tokio Semaphore for precise control of concurrent workers
- βœ… **Queue priority**: Supports both strict priority and weighted priority modes
- βœ… **Task timeout**: Supports task-level and global timeout settings
- βœ… **Graceful shutdown**: Waits for all active workers to complete before shutdown
- βœ… **Automatic retry**: Failed tasks automatically retry with exponential backoff
- βœ… **Task archiving**: Tasks automatically archived after reaching max retry count

### GroupAggregator Features

The GroupAggregator module implements task aggregation functionality compatible with Go asynq aggregator.go:

- βœ… **Task grouping**: Set group label for tasks using `with_group()`
- βœ… **Batch aggregation**: Automatically aggregate tasks from the same group into a single batch task
- βœ… **Flexible triggers**: Supports three trigger conditions: grace period, max group size, max delay
- βœ… **Custom aggregation**: Customize aggregation logic via `GroupAggregator` trait
- βœ… **Functional interface**: Quickly create aggregators using `GroupAggregatorFunc`

Example usage:

```rust
use asynq::components::aggregator::GroupAggregatorFunc;

// Define aggregation function
let aggregator = GroupAggregatorFunc::new(|group, tasks| {
    // Merge multiple tasks into a single batch task
    let combined = tasks.iter()
        .map(|t| t.get_payload())
        .collect::<Vec<_>>()
        .join(&b"\n"[..]);
    Task::new("batch:process", &combined)
});

// Set on server
server.set_group_aggregator(aggregator);
```


## πŸ› οΈ Configuration Options

### Server Configuration

```rust
use asynq::config::ServerConfig;
use std::time::Duration;

let config = ServerConfig::new()
    .concurrency(8)                                          // Number of concurrent workers
    .task_check_interval(Duration::from_secs(1))            // Task check interval
    .delayed_task_check_interval(Duration::from_secs(5))    // Delayed task check interval
    .shutdown_timeout(Duration::from_secs(10))              // Shutdown timeout
    .health_check_interval(Duration::from_secs(15))         // Health check interval
    .group_grace_period(Duration::from_secs(60))?           // Group aggregation grace period
    .group_max_delay(Duration::from_secs(300))              // Group max delay
    .group_max_size(100)                                    // Group max size
    .janitor_interval(Duration::from_secs(8))               // Janitor interval
    .janitor_batch_size(50);                                // Janitor batch size
```

### Redis Configuration

```rust
use asynq::redis::{RedisConnectionType};
use std::time::Duration;

// Basic configuration
let redis_config = asynq::backend::RedisConnectionType::single("redis://127.0.0.1:6379")?;
let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
let redis_config = asynq::backend::RedisConnectionType::cluster(nodes)?;
```

## πŸ“Š Monitoring and Management

### Queue Inspector

```rust
use asynq::base::keys::TaskState;
use asynq::inspector::Inspector;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let inspector = Inspector::new(broker);

    // Get queue statistics
    let stats = inspector.get_queue_stats("default").await?;
    println!("Pending: {}, Active: {}", stats.pending, stats.active);

    // List tasks
    let tasks = inspector.list_tasks("default", TaskState::Pending, 1, 10).await?;

    // Requeue archived task
    inspector.requeue_archived_task("default", "task-id").await?;

    // Pause queue
    inspector.pause_queue("default").await?;

    Ok(())
}
```

## πŸ”§ Development Guide

### Local Development

1. Clone the repository:
```bash
git clone https://github.com/emo-crab/asynq.git
cd asynq
```

2. Install dependencies:
```bash
cargo build
```

3. Start Redis:
```bash
docker run -d -p 6379:6379 redis:alpine
```

4. Run examples:
```bash
# Terminal 1: Start consumer
cargo run --example consumer

# Terminal 2: Run producer
cargo run --example producer
```

### Run Tests

```bash
# Unit tests
cargo test

# Integration tests (requires Redis)
cargo test --features integration-tests
```

## 🀝 Contributing

We welcome contributions of all kinds! Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details.

### Development Principles

- Use Rust features and best practices
- Keep the API simple and easy to use
- Provide comprehensive documentation
- Ensure code quality and test coverage
- Follow semantic versioning

## πŸ“ License

This project is licensed under the [MIT License](LICENSE-MIT) OR [GPL License](LICENSE-GPL).

## πŸ™ Acknowledgments

- Thanks to [hibiken/asynq]https://github.com/hibiken/asynq for design inspiration
- Thanks to the Rust community for excellent library support

## πŸ“ž Contact

If you have any questions or suggestions, please:

- Submit an [Issue]https://github.com/emo-crab/asynq/issues
- Create a [Pull Request]https://github.com/emo-crab/asynq/pulls
- Join our discussions

---

⭐ If this project helps you, please give us a star!