runner_q-0.2.0 has been yanked.
Runner-Q
A robust, scalable Redis-based activity queue and worker system for Rust applications.
Features
- Priority-based activity processing - Support for Critical, High, Normal, and Low priority levels
- Activity scheduling and retries - Built-in retry mechanism with exponential backoff
- Dead letter queue - Failed activities are moved to a dead letter queue for inspection
- Concurrent activity processing - Configurable number of concurrent workers
- Graceful shutdown - Proper shutdown handling with signal support
- Activity timeouts - Configurable timeout per activity type
- Activity metadata - Support for custom metadata on activities
- Redis persistence - Activities are stored in Redis for durability
Installation
Add this to your Cargo.toml:
[dependencies]
runner_q = "0.1.0"
Quick Start
use runner_q::{ActivityQueue, WorkerEngine, ActivityPriority, ActivityOption};
use runner_q::{ActivityHandler, ActivityContext, ActivityResult};
use runner_q::config::WorkerConfig;
use std::sync::Arc;
use async_trait::async_trait;
pub struct SendEmailActivity;
#[async_trait]
impl ActivityHandler for SendEmailActivity {
async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityResult {
let to = payload["to"].as_str().unwrap_or("unknown");
println!("Sending email to: {}", to);
ActivityResult::Success(Some(serde_json::json!({
"message": format!("Email sent to {}", to),
"status": "delivered"
})))
}
fn activity_type(&self) -> String {
"send_email".to_string()
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_pool = bb8_redis::bb8::Pool::builder()
.build(bb8_redis::RedisConnectionManager::new("redis://127.0.0.1:6379")?)
.await?;
let config = WorkerConfig::default();
let mut worker_engine = WorkerEngine::new(redis_pool, config);
let send_email_activity = SendEmailActivity;
worker_engine.register_activity("send_email".to_string(), Arc::new(send_email_activity));
let future = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "user@example.com", "subject": "Welcome!"}),
Some(ActivityOption {
priority: Some(ActivityPriority::High),
max_retries: 5,
timeout_seconds: 600, })
).await?;
let future2 = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "admin@example.com"}),
None ).await?;
tokio::spawn(async move {
if let Ok(result) = future.get_result().await {
println!("Email result: {:?}", result);
}
});
worker_engine.start().await?;
Ok(())
}
Activity Types
Activity types in Runner-Q are simple strings that identify different types of activities. You can use any string as an activity type identifier.
Examples
"send_email"
"process_payment"
"provision_card"
"update_card_status"
"process_webhook_event"
"user.registration"
"email-notification"
"background_sync"
Configuration
use runner_q::config::WorkerConfig;
let config = WorkerConfig {
queue_name: "my_queue".to_string(),
max_concurrent_activities: 10,
redis_url: "redis://127.0.0.1:6379".to_string(),
};
Custom Activity Handlers
You can create custom activity handlers by implementing the ActivityHandler trait:
use runner_q::{ActivityContext, ActivityHandler, ActivityResult};
use async_trait::async_trait;
use serde_json::Value;
pub struct PaymentActivity {
}
#[async_trait]
impl ActivityHandler for PaymentActivity {
async fn handle(&self, payload: Value, context: ActivityContext) -> ActivityResult {
let amount = payload["amount"].as_f64().unwrap_or(0.0);
let currency = payload["currency"].as_str().unwrap_or("USD");
println!("Processing payment: {} {}", amount, currency);
if amount > 0.0 {
ActivityResult::Success(Some(serde_json::json!({
"transaction_id": "txn_123456",
"amount": amount,
"currency": currency,
"status": "completed"
})))
} else {
ActivityResult::NonRetry("Invalid amount".to_string())
}
}
fn activity_type(&self) -> String {
"process_payment".to_string()
}
}
worker_engine.register_activity("process_payment".to_string(), Arc::new(PaymentActivity {}));
Activity Priority and Options
Activities can be configured using the ActivityOption struct:
use runner_q::{ActivityPriority, ActivityOption};
let future = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "user@example.com"}),
Some(ActivityOption {
priority: Some(ActivityPriority::Critical), max_retries: 10, timeout_seconds: 900, })
).await?;
let future = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "user@example.com"}),
None
).await?;
Available priorities:
ActivityPriority::Critical - Highest priority (processed first)
ActivityPriority::High - High priority
ActivityPriority::Normal - Default priority
ActivityPriority::Low - Lowest priority
Getting Activity Results
Activities can return results that can be retrieved asynchronously:
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize)]
struct EmailResult {
message: String,
status: String,
}
let future = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "user@example.com"}),
None
).await?;
let result_value = future.get_result().await?;
let email_result: EmailResult = serde_json::from_value(result_value)?;
println!("Email result: {:?}", email_result);
Nested Activities (Activity Orchestration)
Activities can execute other activities using the ActivityExecutor available in the ActivityContext. This enables powerful workflow orchestration:
use runner_q::{ActivityExecutor, ActivityOption};
#[async_trait]
impl ActivityHandler for ProcessOrderActivity {
async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityResult {
let order: OrderData = serde_json::from_value(payload)?;
let inventory_future = context.worker_engine.execute_activity(
"update_inventory".to_string(),
serde_json::json!({"item": "product_123", "quantity": -1}),
Some(ActivityOption {
priority: Some(ActivityPriority::High),
max_retries: 3,
timeout_seconds: 60,
})
).await?;
let email_future = context.worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": order.customer_email, "template": "order_confirmation"}),
None
).await?;
context.worker_engine.execute_activity(
"log_transaction".to_string(),
serde_json::json!({"order_id": order.id, "status": "processed"}),
None
).await?;
ActivityResult::Success(Some(serde_json::json!({
"order_id": order.id,
"status": "completed"
})))
}
fn activity_type(&self) -> String {
"process_order".to_string()
}
}
Benefits of Nested Activities
- Modularity: Break complex workflows into smaller, reusable activities
- Reliability: Each sub-activity has its own retry logic and error handling
- Monitoring: Track progress of individual workflow steps
- Scalability: Sub-activities can be processed by different workers
- Flexibility: Different priority levels and timeouts for different steps
Error Handling
The library provides comprehensive error handling:
Activity Handler Results
In your activity handlers, you can return different result types:
#[async_trait]
impl ActivityHandler for MyActivity {
async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityResult {
match some_operation() {
Ok(data) => {
ActivityResult::Success(Some(serde_json::json!({"result": data})))
}
Err(retryable_error) => {
ActivityResult::Retry(format!("Temporary error: {}", retryable_error))
}
Err(permanent_error) => {
ActivityResult::NonRetry(format!("Permanent error: {}", permanent_error))
}
}
}
}
Worker Engine Errors
use runner_q::WorkerError;
match worker_engine.execute_activity(activity_type.to_string(), payload, options).await {
Ok(future) => {
match future.get_result().await {
Ok(result) => println!("Activity completed: {:?}", result),
Err(WorkerError::Timeout) => println!("Activity timed out"),
Err(e) => println!("Activity failed: {}", e),
}
}
Err(e) => println!("Failed to enqueue activity: {}", e),
}
License
This project is licensed under the MIT License - see the LICENSE file for details.