runner_q 0.4.3

Redis-based activity queue and worker system
Documentation
# Runner-Q

A robust, scalable Redis-based activity queue and worker system for Rust applications.

## Features

- **Priority-based activity processing** - Support for Critical, High, Normal, and Low priority levels
- **Activity scheduling and retries** - Built-in retry mechanism with exponential backoff
- **Dead letter queue** - Failed activities are moved to a dead letter queue for inspection
- **Concurrent activity processing** - Configurable number of concurrent workers
- **Graceful shutdown** - Proper shutdown handling with signal support
- **Activity timeouts** - Configurable timeout per activity type
- **Activity metadata** - Support for custom metadata on activities
- **Redis persistence** - Activities are stored in Redis for durability

## Installation

```sh
cargo add runner_q
```

## Quick Start

```rust
use runner_q::{ActivityQueue, WorkerEngine, ActivityPriority, ActivityOption};
use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
use runner_q::config::WorkerConfig;
use std::sync::Arc;
use async_trait::async_trait;

// Implement activity handler
pub struct SendEmailActivity;

#[async_trait]
impl ActivityHandler for SendEmailActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
        // Process the email activity - use ? operator for clean error handling
        let to = payload["to"]
            .as_str()
            .ok_or_else(|| ActivityError::NonRetry("Missing 'to' field".to_string()))?;

        println!("Sending email to: {}", to);

        Ok(Some(serde_json::json!({
            "message": format!("Email sent to {}", to),
            "status": "delivered"
        })))
    }

    fn activity_type(&self) -> String {
        "send_email".to_string()
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create Redis connection pool
    let redis_pool = bb8_redis::bb8::Pool::builder()
        .build(bb8_redis::RedisConnectionManager::new("redis://127.0.0.1:6379")?)
        .await?;

    // Create worker engine
    let config = WorkerConfig::default();
    let mut worker_engine = WorkerEngine::new(redis_pool, config);

    // Register activity handler
    let send_email_activity = SendEmailActivity;
    worker_engine.register_activity("send_email".to_string(), Arc::new(send_email_activity));

    // Execute an activity with custom options
    let future = worker_engine.execute_activity(
        "send_email".to_string(),
        serde_json::json!({"to": "user@example.com", "subject": "Welcome!"}),
        Some(ActivityOption {
            priority: Some(ActivityPriority::High),
            max_retries: 5,
            timeout_seconds: 600, // 10 minutes
        })
    ).await?;

    // Execute an activity with default options
    let future2 = worker_engine.execute_activity(
        "send_email".to_string(),
        serde_json::json!({"to": "admin@example.com"}),
        None // Uses default priority (Normal), 3 retries, 300s timeout
    ).await?;

    // Spawn a task to handle the result
    tokio::spawn(async move {
        if let Ok(result) = future.get_result().await {
            match result {
                Some(data) => {
                    println!("Email result: {:?}", data);
                }
                _ => {}
            }
        }
    });

    // Start the worker engine (this will run indefinitely)
    worker_engine.start().await?;

    Ok(())
}
```

## Activity Types

Activity types in Runner-Q are simple strings that identify different types of activities. You can use any string as an activity type identifier.

### Examples

```rust
// Common activity types
"send_email"
"process_payment"
"provision_card"
"update_card_status"
"process_webhook_event"

// You can use any string format you prefer
"user.registration"
"email-notification"
"background_sync"
```

## Configuration

```rust
use runner_q::config::WorkerConfig;

let config = WorkerConfig {
    queue_name: "my_queue".to_string(),
    max_concurrent_activities: 10,
    redis_url: "redis://127.0.0.1:6379".to_string(),
};
```

## Custom Activity Handlers

You can create custom activity handlers by implementing the `ActivityHandler` trait:

```rust
use runner_q::{ActivityContext, ActivityHandler, ActivityResult};
use async_trait::async_trait;
use serde_json::Value;

pub struct PaymentActivity {
    // Add your dependencies here (database connections, external APIs, etc.)
}

#[async_trait]
impl ActivityHandler for PaymentActivity {
    async fn handle(&self, payload: Value, context: ActivityContext) -> ActivityHandlerResult {
        // Parse the payment data using ? operator
        let amount = payload["amount"]
            .as_f64()
            .ok_or_else(|| ActivityError::NonRetry("Missing or invalid amount".to_string()))?;

        let currency = payload["currency"]
            .as_str()
            .unwrap_or("USD");

        println!("Processing payment: {} {}", amount, currency);

        // Validate amount
        if amount <= 0.0 {
            return Err(ActivityError::NonRetry("Invalid amount".to_string()));
        }

        // Simulate payment processing
        Ok(Some(serde_json::json!({
            "transaction_id": "txn_123456",
            "amount": amount,
            "currency": currency,
            "status": "completed"
        })))
    }

    fn activity_type(&self) -> String {
        "process_payment".to_string()
    }
}

// Register the handler
worker_engine.register_activity("process_payment".to_string(), Arc::new(PaymentActivity {}));
```

## Activity Priority and Options

Activities can be configured using the `ActivityOption` struct:

```rust
use runner_q::{ActivityPriority, ActivityOption};

// High priority with custom retry and timeout settings
let future = worker_engine.execute_activity(
    "send_email".to_string(),
    serde_json::json!({"to": "user@example.com"}),
    Some(ActivityOption {
        priority: Some(ActivityPriority::Critical), // Highest priority
        max_retries: 10,                            // Retry up to 10 times
        timeout_seconds: 900,                       // 15 minute timeout
    })
).await?;

// Use default options (Normal priority, 3 retries, 300s timeout)
let future = worker_engine.execute_activity(
    "send_email".to_string(),
    serde_json::json!({"to": "user@example.com"}),
    None
).await?;
```

Available priorities:
- `ActivityPriority::Critical` - Highest priority (processed first)
- `ActivityPriority::High` - High priority
- `ActivityPriority::Normal` - Default priority
- `ActivityPriority::Low` - Lowest priority

## Getting Activity Results

Activities can return results that can be retrieved asynchronously:

```rust
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize)]
struct EmailResult {
    message: String,
    status: String,
}

let future = worker_engine.execute_activity(
    "send_email".to_string(),
    serde_json::json!({"to": "user@example.com"}),
    None
).await?;

// Get the result (this will wait until the activity completes)
let result_value = future.get_result().await?;
let email_result: EmailResult = serde_json::from_value(result_value)?;
println!("Email result: {:?}", email_result);
```

## Nested Activities (Activity Orchestration)

Activities can execute other activities using the `ActivityExecutor` available in the `ActivityContext`. This enables powerful workflow orchestration:

```rust
use runner_q::{ActivityExecutor, ActivityOption, ActivityHandlerResult, ActivityError, ActivityPriority};
use serde::{Deserialize, Serialize};

#[derive(Deserialize)]
struct OrderData {
    id: String,
    customer_email: String,
    items: Vec<String>,
}

#[async_trait]
impl ActivityHandler for ProcessOrderActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
        // Parse order data using ? operator for clean error handling
        let order: OrderData = serde_json::from_value(payload)?;

        // Execute sub-activities using the context's worker engine

        // 1. Update inventory
        let _inventory_future = context.worker_engine.execute_activity(
            "update_inventory".to_string(),
            serde_json::json!({"item": "product_123", "quantity": -1}),
            Some(ActivityOption {
                priority: Some(ActivityPriority::High),
                max_retries: 3,
                timeout_seconds: 60,
                scheduled_at: None,
            })
        ).await.map_err(|e| ActivityError::Retry(format!("Failed to update inventory: {}", e)))?;

        // 2. Send confirmation email
        let _email_future = context.worker_engine.execute_activity(
            "send_email".to_string(),
            serde_json::json!({"to": order.customer_email, "template": "order_confirmation"}),
            None
        ).await.map_err(|e| ActivityError::Retry(format!("Failed to send email: {}", e)))?;

        // 3. Log the transaction
        context.worker_engine.execute_activity(
            "log_transaction".to_string(),
            serde_json::json!({"order_id": order.id, "status": "processed"}),
            None
        ).await.map_err(|e| ActivityError::NonRetry(format!("Failed to log transaction: {}", e)))?;

        // Return success with result data
        Ok(Some(serde_json::json!({
            "order_id": order.id,
            "status": "completed"
        })))
    }

    fn activity_type(&self) -> String {
        "process_order".to_string()
    }
}
```

### Benefits of Nested Activities

- **Modularity**: Break complex workflows into smaller, reusable activities
- **Reliability**: Each sub-activity has its own retry logic and error handling
- **Monitoring**: Track progress of individual workflow steps
- **Scalability**: Sub-activities can be processed by different workers
- **Flexibility**: Different priority levels and timeouts for different steps

## Error Handling

The library provides comprehensive error handling:

### Activity Handler Results

In your activity handlers, you can use the convenient `ActivityHandlerResult` type with the `?` operator for clean error handling:

```rust
use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};

#[async_trait]
impl ActivityHandler for MyActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
        // Use ? operator for automatic error conversion
        let data: MyData = serde_json::from_value(payload)?;

        // Validate data
        if data.is_invalid() {
            return Err(ActivityError::NonRetry("Invalid data format".to_string()));
        }

        // Perform operation that might temporarily fail
        let result = external_api_call(&data)
            .await
            .map_err(|e| ActivityError::Retry(format!("API call failed: {}", e)))?;

        // Return success with result data
        Ok(Some(serde_json::json!({"result": result})))
    }

    fn activity_type(&self) -> String {
        "my_activity".to_string()
    }
}
```

**Error Types:**
- `ActivityError::Retry(message)` - Will be retried with exponential backoff
- `ActivityError::NonRetry(message)` - Will not be retried, goes to dead letter queue
- Any error implementing `Into<ActivityError>` can be used with `?`

### Worker Engine Errors

```rust
use runner_q::WorkerError;

match worker_engine.execute_activity(activity_type.to_string(), payload, options).await {
    Ok(future) => {
        // Activity was successfully enqueued
        match future.get_result().await {
            Ok(result) => match result {
                Some(data) => println!("Activity completed: {:?}", result),
                None => {}
            },
            Err(WorkerError::Timeout) => println!("Activity timed out"),
            Err(e) => println!("Activity failed: {}", e),
        }
    }
    Err(e) => println!("Failed to enqueue activity: {}", e),
}
```

## License

This project is licensed under the MIT License - see the LICENSE file for details.