runner_q 0.3.0

Redis-based activity queue and worker system
Documentation
runner_q-0.3.0 has been yanked.

Runner-Q

A robust, scalable Redis-based activity queue and worker system for Rust applications.

Features

  • Priority-based activity processing - Support for Critical, High, Normal, and Low priority levels
  • Activity scheduling and retries - Built-in retry mechanism with exponential backoff
  • Dead letter queue - Failed activities are moved to a dead letter queue for inspection
  • Concurrent activity processing - Configurable number of concurrent workers
  • Graceful shutdown - Proper shutdown handling with signal support
  • Activity timeouts - Configurable timeout per activity type
  • Activity metadata - Support for custom metadata on activities
  • Redis persistence - Activities are stored in Redis for durability

Installation

Add this to your Cargo.toml:

[dependencies]
runner_q = "0.1.0"

Quick Start

use runner_q::{ActivityQueue, WorkerEngine, ActivityPriority, ActivityOption};
use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
use runner_q::config::WorkerConfig;
use std::sync::Arc;
use async_trait::async_trait;

// Implement activity handler
pub struct SendEmailActivity;

#[async_trait]
impl ActivityHandler for SendEmailActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
        // Process the email activity - use ? operator for clean error handling
        let to = payload["to"]
            .as_str()
            .ok_or_else(|| ActivityError::NonRetry("Missing 'to' field".to_string()))?;
        
        println!("Sending email to: {}", to);
        
        Ok(Some(serde_json::json!({
            "message": format!("Email sent to {}", to),
            "status": "delivered"
        })))
    }

    fn activity_type(&self) -> String {
        "send_email".to_string()
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create Redis connection pool
    let redis_pool = bb8_redis::bb8::Pool::builder()
        .build(bb8_redis::RedisConnectionManager::new("redis://127.0.0.1:6379")?)
        .await?;

    // Create worker engine
    let config = WorkerConfig::default();
    let mut worker_engine = WorkerEngine::new(redis_pool, config);

    // Register activity handler
    let send_email_activity = SendEmailActivity;
    worker_engine.register_activity("send_email".to_string(), Arc::new(send_email_activity));

    // Execute an activity with custom options
    let future = worker_engine.execute_activity(
        "send_email".to_string(),
        serde_json::json!({"to": "user@example.com", "subject": "Welcome!"}),
        Some(ActivityOption {
            priority: Some(ActivityPriority::High),
            max_retries: 5,
            timeout_seconds: 600, // 10 minutes
        })
    ).await?;

    // Execute an activity with default options
    let future2 = worker_engine.execute_activity(
        "send_email".to_string(),
        serde_json::json!({"to": "admin@example.com"}),
        None // Uses default priority (Normal), 3 retries, 300s timeout
    ).await?;

    // Spawn a task to handle the result
    tokio::spawn(async move {
        if let Ok(result) = future.get_result().await {
            println!("Email result: {:?}", result);
        }
    });

    // Start the worker engine (this will run indefinitely)
    worker_engine.start().await?;

    Ok(())
}

Activity Types

Activity types in Runner-Q are simple strings that identify different types of activities. You can use any string as an activity type identifier.

Examples

// Common activity types
"send_email"
"process_payment"
"provision_card"
"update_card_status"
"process_webhook_event"

// You can use any string format you prefer
"user.registration"
"email-notification"
"background_sync"

Configuration

use runner_q::config::WorkerConfig;

let config = WorkerConfig {
    queue_name: "my_queue".to_string(),
    max_concurrent_activities: 10,
    redis_url: "redis://127.0.0.1:6379".to_string(),
};

Custom Activity Handlers

You can create custom activity handlers by implementing the ActivityHandler trait:

use runner_q::{ActivityContext, ActivityHandler, ActivityResult};
use async_trait::async_trait;
use serde_json::Value;

pub struct PaymentActivity {
    // Add your dependencies here (database connections, external APIs, etc.)
}

#[async_trait]
impl ActivityHandler for PaymentActivity {
    async fn handle(&self, payload: Value, context: ActivityContext) -> ActivityHandlerResult {
        // Parse the payment data using ? operator
        let amount = payload["amount"]
            .as_f64()
            .ok_or_else(|| ActivityError::NonRetry("Missing or invalid amount".to_string()))?;
        
        let currency = payload["currency"]
            .as_str()
            .unwrap_or("USD");
        
        println!("Processing payment: {} {}", amount, currency);
        
        // Validate amount
        if amount <= 0.0 {
            return Err(ActivityError::NonRetry("Invalid amount".to_string()));
        }
        
        // Simulate payment processing
        Ok(Some(serde_json::json!({
            "transaction_id": "txn_123456",
            "amount": amount,
            "currency": currency,
            "status": "completed"
        })))
    }

    fn activity_type(&self) -> String {
        "process_payment".to_string()
    }
}

// Register the handler
worker_engine.register_activity("process_payment".to_string(), Arc::new(PaymentActivity {}));

Activity Priority and Options

Activities can be configured using the ActivityOption struct:

use runner_q::{ActivityPriority, ActivityOption};

// High priority with custom retry and timeout settings
let future = worker_engine.execute_activity(
    "send_email".to_string(),
    serde_json::json!({"to": "user@example.com"}),
    Some(ActivityOption {
        priority: Some(ActivityPriority::Critical), // Highest priority
        max_retries: 10,                            // Retry up to 10 times
        timeout_seconds: 900,                       // 15 minute timeout
    })
).await?;

// Use default options (Normal priority, 3 retries, 300s timeout)
let future = worker_engine.execute_activity(
    "send_email".to_string(),
    serde_json::json!({"to": "user@example.com"}),
    None
).await?;

Available priorities:

  • ActivityPriority::Critical - Highest priority (processed first)
  • ActivityPriority::High - High priority
  • ActivityPriority::Normal - Default priority
  • ActivityPriority::Low - Lowest priority

Getting Activity Results

Activities can return results that can be retrieved asynchronously:

use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize)]
struct EmailResult {
    message: String,
    status: String,
}

let future = worker_engine.execute_activity(
    "send_email".to_string(),
    serde_json::json!({"to": "user@example.com"}),
    None
).await?;

// Get the result (this will wait until the activity completes)
let result_value = future.get_result().await?;
let email_result: EmailResult = serde_json::from_value(result_value)?;
println!("Email result: {:?}", email_result);

Nested Activities (Activity Orchestration)

Activities can execute other activities using the ActivityExecutor available in the ActivityContext. This enables powerful workflow orchestration:

use runner_q::{ActivityExecutor, ActivityOption};

#[async_trait]
impl ActivityHandler for ProcessOrderActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityResult {
        let order: OrderData = serde_json::from_value(payload)?;
        
        // Execute sub-activities using the context's worker engine
        
        // 1. Update inventory
        let inventory_future = context.worker_engine.execute_activity(
            "update_inventory".to_string(),
            serde_json::json!({"item": "product_123", "quantity": -1}),
            Some(ActivityOption {
                priority: Some(ActivityPriority::High),
                max_retries: 3,
                timeout_seconds: 60,
            })
        ).await?;
        
        // 2. Send confirmation email
        let email_future = context.worker_engine.execute_activity(
            "send_email".to_string(),
            serde_json::json!({"to": order.customer_email, "template": "order_confirmation"}),
            None
        ).await?;
        
        // 3. Log the transaction
        context.worker_engine.execute_activity(
            "log_transaction".to_string(),
            serde_json::json!({"order_id": order.id, "status": "processed"}),
            None
        ).await?;
        
        ActivityResult::Success(Some(serde_json::json!({
            "order_id": order.id,
            "status": "completed"
        })))
    }
    
    fn activity_type(&self) -> String {
        "process_order".to_string()
    }
}

Benefits of Nested Activities

  • Modularity: Break complex workflows into smaller, reusable activities
  • Reliability: Each sub-activity has its own retry logic and error handling
  • Monitoring: Track progress of individual workflow steps
  • Scalability: Sub-activities can be processed by different workers
  • Flexibility: Different priority levels and timeouts for different steps

Error Handling

The library provides comprehensive error handling:

Activity Handler Results

In your activity handlers, you can use the convenient ActivityHandlerResult type with the ? operator for clean error handling:

use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};

#[async_trait]
impl ActivityHandler for MyActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
        // Use ? operator for automatic error conversion
        let data: MyData = serde_json::from_value(payload)?;
        
        // Validate data
        if data.is_invalid() {
            return Err(ActivityError::NonRetry("Invalid data format".to_string()));
        }
        
        // Perform operation that might temporarily fail
        let result = external_api_call(&data)
            .await
            .map_err(|e| ActivityError::Retry(format!("API call failed: {}", e)))?;
        
        // Return success with result data
        Ok(Some(serde_json::json!({"result": result})))
    }

    fn activity_type(&self) -> String {
        "my_activity".to_string()
    }
}

Error Types:

  • ActivityError::Retry(message) - Will be retried with exponential backoff
  • ActivityError::NonRetry(message) - Will not be retried, goes to dead letter queue
  • Any error implementing Into<ActivityError> can be used with ?

Worker Engine Errors

use runner_q::WorkerError;

match worker_engine.execute_activity(activity_type.to_string(), payload, options).await {
    Ok(future) => {
        // Activity was successfully enqueued
        match future.get_result().await {
            Ok(result) => println!("Activity completed: {:?}", result),
            Err(WorkerError::Timeout) => println!("Activity timed out"),
            Err(e) => println!("Activity failed: {}", e),
        }
    }
    Err(e) => println!("Failed to enqueue activity: {}", e),
}

License

This project is licensed under the MIT License - see the LICENSE file for details.