chainmq 0.2.0

A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
# ChainMQ

A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.

This crate is library-first. Runnable examples demonstrate typical patterns (single worker, multiple jobs, multiple workers, delayed jobs, failure/retry).

## Features

- 🚀 Redis-Powered: Built on Redis for reliable job persistence and distribution
- 🔄 Background Jobs: Process jobs asynchronously in the background
- 🏗️ Job Registry: Simple Type-safe job registration and execution
- 🔧 Worker Management: Configurable workers with lifecycle management
- ⚡ Async/Await: Full async support throughout the system
- ⏰ Delayed jobs: Schedule jobs for future execution with atomic operations
- 🗄️ Backoff strategies: Configurable retry logic for failed jobs
- 📊 Application Context: Share application state across jobs

## Quick Start:

Add ChainMQ to your `Cargo.toml`:

```toml
[dependencies]
chainmq = "0.2.0"
actix-web = "4.0"
redis = "0.23"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```

## Basic Usage:

### 1. Define Your Job

```rust
use chainmq::{Job, AppContext};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
    pub to: String,
    pub subject: String,
    pub body: String,
}

#[async_trait]
impl Job for EmailJob {
    async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
        if let Some(app_ctx) = ctx.app::<AppState>() {
            let response = app_ctx
                .mail_client
                .send_email(&self.to, &self.subject, &self.body)
                .await;

            match response {
                Ok(result) => println!("Email sent successfully: {:#?}", result),
                Err(error) => println!("Failed to send email: {}", error),
            }
        }

        Ok(())
    }

    fn name() -> &'static str {
        "EmailJob"
    }

    fn queue_name() -> &'static str {
        "emails"
    }
}
```

### 2. Set Up Application Context

```rust
use chainmq::AppContext;
use std::sync::Arc;

#[derive(Clone)]
pub struct AppState {
    pub mail_client: Arc<MailClient>,
    pub redis_client: Arc<redis::Client>,
}

impl AppContext for AppState {
    fn clone_context(&self) -> Arc<dyn AppContext> {
        Arc::new(self.clone())
    }
}
```

### 3. Configure Workers and Your Preffered Web Server

```rust
use chainmq::{JobRegistry, WorkerBuilder};
use actix_web::{web::Data, App, HttpServer};
use redis::Client as RedisClient;
use tokio::sync::broadcast;

async fn setup_application() -> Result<(), anyhow::Error> {
    // Initialize Redis connection
    let redis_client = RedisClient::open("redis://127.0.0.1/")?;

    // Create application state
    let app_state = Arc::new(AppState {
        mail_client: Arc::new(MailClient::new()),
        redis_client: Arc::new(redis_client.clone()),
    });

    // Set up job registry
    let mut registry = JobRegistry::new();
    registry.register::<EmailJob>();


    // Start background workers
    tokio::spawn(async move {
        let mut worker = WorkerBuilder::new_with_redis_instance(redis_client.clone(), registry)
            .with_app_context(app_state.clone())
            .with_queue_name(EmailJob::queue_name())
            .spawn()
            .await
            .expect("Failed to initialize workers");

        let _ = worker.start().await;
    });

    // Start web server
    HttpServer::new(move || {
        App::new()
            .app_data(Data::new(app_state.clone()))
            .service(your_routes)
    })
    .bind("127.0.0.1:8000")?
    .run()
    .await?;

    Ok(())
}
```

### 4. Enqueue Jobs from Anywhere

```rust
use chainmq::{Queue, QueueOptions};

async fn enqueue_email_job(app_state: &AppState) -> chainmq::Result<()> {
    let email_job = EmailJob {
        to: "user@example.com".to_string(),
        subject: "Welcome!".to_string(),
        body: "Thank you for signing up!".to_string(),
    };

    let options = QueueOptions {
        redis_instance: Some(app_state.redis_client.clone()),
        ..Default::default()
    };

    let queue = Queue::new(options).await?;

    // Enqueue the job
    match queue.enqueue(email_job).await {
        Ok(_) => println!("Email job enqueued successfully"),
        Err(error) => eprintln!("Failed to enqueue email job: {}", error),
    }

    Ok(())
}
```

## Examples

This repo provides runnable examples. Build them all:

```bash
cargo build --examples
```

Run Redis first, then in separate terminals run workers/enqueuers:

```bash
# Single worker for emails queue
cargo run --example worker_main

# Enqueue email jobs (normal + delayed/high priority)
cargo run --example enqueue_email

# One worker handling multiple job types on a single queue
cargo run --example multi_jobs_single_worker

# Two workers on different queues (emails + reports)
cargo run --example multi_workers

# Failure and retry with backoff demonstration
cargo run --example failure_retry

# Delayed jobs demonstration
cargo run --example delayed_jobs
```

**Notes:**

- You can enqueue before or after workers start. Jobs persist in Redis until claimed.
- Ensure both worker and enqueuer use the same Redis URL and queue name.
- Some examples default to `redis://localhost:6379`. Adjust to your setup.

## Core Concepts

- **Job**: Defines work to be done. Implements `trait Job { async fn perform(&self, &JobContext) -> Result<()>; fn name() -> &str; fn queue_name() -> &str }`
- **Queue**: Persists job metadata and manages wait/delayed/active/failed job lists in Redis
- **Worker**: Polls queues, claims jobs atomically via Lua scripts, executes jobs via JobRegistry
- **Registry**: Maps job type names to executors for deserialization and dispatch
- **JobContext**: Provides access to application state and job metadata during execution

## Configuration

### Redis Configuration

ChainMQ works with any Redis instance:

```rust
// Local Redis
let redis_client = redis::Client::open("redis://127.0.0.1:6379/")?;

// Redis with authentication
let redis_client = redis::Client::open("redis://:password@127.0.0.1:6379/")?;

// Redis with database selection
let redis_client = redis::Client::open("redis://127.0.0.1:6379/1")?;
```

### Worker Configuration

```rust
// Using Redis instance
let worker = WorkerBuilder::new_with_redis_instance(redis_client, registry)
    .with_app_context(app_state)
    .with_queue_name("priority_queue")
    .with_concurrency(10)                           // Number of concurrent jobs
    .with_poll_interval(Duration::from_secs(5))     // How often to check for jobs
    .spawn()
    .await?;

// Using Redis URI
let worker = WorkerBuilder::new_with_redis_uri("redis://127.0.0.1:6379/", registry)
    .with_app_context(app_state)
    .with_queue_name("background_tasks")
    .with_concurrency(5)
    .spawn()
    .await?;
```

### Queue Configuration

```rust
let options = QueueOptions {
    name: "default".to_string(),
    redis_url: "redis://127.0.0.1:6379".to_string(),
    redis_instance: None,
    key_prefix: "rbq".to_string(),
    default_concurrency: 10,
    max_stalled_interval: 30000, // 30 seconds
};

let queue = Queue::new(options).await?;
```

### Job Configuration

```rust
let job = EmailJob {
    to: "user@example.com".into(),
    subject: "Urgent".into(),
    body: "Please read".into(),
};

let opts = JobOptions {
    delay_secs: Some(60),
    priority: Priority::High,
    attempts: 5,
    backoff: BackoffStrategy::Exponential { base: 2, cap: 10 },
    timeout_secs: Some(60),
    rate_limit_key: Some("i_rater".to_string()),
};

let job_id = queue.enqueue_with_options(job, opts).await?;
```

## Advanced Usage

### Service Injection with AppContext

Inject your own services (database pools, HTTP clients, caches, etc.) via `AppContext`. The worker holds an `Arc<dyn AppContext>` and each job receives it through `JobContext`.

```rust
use chainmq::AppContext;
use std::sync::Arc;

#[derive(Clone)]
struct AppState {
    pub database: sqlx::PgPool,
    pub http_client: reqwest::Client,
    pub cache: Arc<RedisCache>,
    pub mail_client: Arc<MailClient>,
}

impl AppContext for AppState {
    fn clone_context(&self) -> Arc<dyn AppContext> {
        Arc::new(self.clone())
    }
}
```

Use it inside jobs via the helper `ctx.app::<T>()`:

```rust
#[async_trait]
impl Job for DatabaseJob {
    async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
        if let Some(app) = ctx.app::<AppState>() {
            // Use database
            let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", self.user_id)
                .fetch_one(&app.database)
                .await?;

            // Use HTTP client
            let response = app.http_client
                .get("https://api.example.com/data")
                .send()
                .await?;

            // Use cache
            app.cache.set(&format!("user:{}", user.id), &user).await?;
        }

        Ok(())
    }

    fn name() -> &'static str { "DatabaseJob" }
    fn queue_name() -> &'static str { "database" }
}
```

### Multiple Job Types

Register multiple job types in a single registry:

```rust
let mut registry = JobRegistry::new();
registry.register::<EmailJob>();
registry.register::<ImageProcessingJob>();
registry.register::<ReportGenerationJob>();
registry.register::<CleanupJob>();

// Single worker can handle all job types
let worker = WorkerBuilder::new_with_redis_instance(redis_client, registry)
    .with_queue_name("mixed_jobs")
    .spawn()
    .await?;
```

### Delayed Jobs

Schedule jobs for future execution:

```rust
use chainmq::JobOptions;
use std::time::Duration;

let delayed_job = EmailJob {
    to: "user@example.com".to_string(),
    subject: "Reminder".to_string(),
    body: "Don't forget about your appointment!".to_string(),
};

let options = JobOptions {
    delay_secs: Some(3600), // 1 hour delay
    ..Default::default()
};

queue.enqueue_with_options(delayed_job, options).await?;
```

### Error Handling and Retries

Jobs that fail are automatically retried with configurable backoff:

```rust
#[async_trait]
impl Job for RiskyJob {
    async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
        // This job might fail and will be retried
        if random::<f32>() < 0.3 {
            return Err("Random failure".into());
        }

        println!("Job succeeded!");
        Ok(())
    }

    fn name() -> &'static str { "RiskyJob" }
    fn queue_name() -> &'static str { "risky" }
}
```

## Service injection (AppContext)

Inject your own services (DB pools, HTTP clients, caches, etc.) via `AppContext`. The worker holds an `Arc<dyn AppContext>` and each job receives it through `JobContext`.

Define your application state:

```rust
use chainmq::AppContext;
use std::sync::Arc;

#[derive(Clone)]
struct AppState {
    db: sqlx::PgPool,
    http: reqwest::Client,
}

impl AppContext for AppState {
    fn clone_context(&self) -> Arc<dyn AppContext> { Arc::new(self.clone()) }
}
```

Pass it to the worker:

```rust
let app = Arc::new(AppState { db: pool, http: reqwest::Client::new() });
let mut worker = WorkerBuilder::new_with_redis_uri("redis://localhost:6379", registry)
    .with_app_context(app)
    .with_queue_name("default")
    .spawn()
    .await?;
```

Use it inside jobs via the helper `ctx.app::<T>()` (preferred) or explicit downcast:

```rust
#[async_trait]
impl Job for MyJob {
    async fn perform(&self, ctx: &JobContext) -> Result<()> {
        // Preferred typed helper
        if let Some(app) = ctx.app::<AppState>() {
            let row = sqlx::query!("select 1 as one").fetch_one(&app.db).await?;
            let _ = app.http.get("https://example.com").send().await?;
            println!("db one = {}", row.one);
        }

        // Or manual downcast if needed
        // if let Some(app) = ctx.app_context.as_ref().as_any().downcast_ref::<AppState>() { /* ... */ }
        Ok(())
    }
    fn name() -> &'static str { "MyJob" }
}
```

## Internals (high level)

ChainMQ uses Lua scripts to ensure atomic operations:

- **`move_delayed.lua`**: Moves due jobs from delayed sorted set to wait list
- **`claim_job.lua`**: Atomically pops from wait list and adds to active list

Redis keys use a configurable prefix (default `rbq`):

- `rbq:queue:{name}:wait` - Jobs waiting to be processed
- `rbq:queue:{name}:active` - Jobs currently being processed
- `rbq:queue:{name}:delayed` - Jobs scheduled for future execution
- `rbq:queue:{name}:failed` - Jobs that have failed processing
- `rbq:job:{id}` - Individual job metadata and payload

## Troubleshooting

**Jobs not being processed:**

- Ensure worker `.with_queue_name()` matches `Job::queue_name()`
- Verify same Redis URL for both worker and enqueuer
- Check jobs are enqueued: `redis-cli LRANGE rbq:queue:{queue}:wait 0 -1`

**Connection issues:**

- Verify Redis server is running and accessible
- Check Redis URL format and credentials
- Test connection with `redis-cli ping`

**Jobs failing silently:**

- Check Redis logs and failed job queue: `LRANGE rbq:queue:{queue}:failed 0 -1`
- Add logging/tracing to your job implementations
- Ensure job payload can be properly serialized/deserialized

**Performance issues:**

- Increase worker concurrency with `.with_concurrency(n)`
- Reduce poll interval with `.with_poll_interval(duration)`
- Monitor Redis memory usage and job queue lengths

## Development

```bash
# Build the library
cargo build

# Run examples (requires Redis)
cargo run --example worker_main
```

## License

MIT

## Acknowledgements

Inspired by existing Redis-backed job queues; built for ergonomic, type-safe Rust applications.