Crate runner_q

Crate runner_q 

Source
Expand description

Runner-Q: A pluggable activity queue and worker system for Rust

This crate provides a robust, scalable activity queue system with pluggable storage backends:

§Features

  • Priority-based activity processing with Critical, High, Normal, and Low priority levels
  • Activity scheduling with precise timestamp-based scheduling for future execution
  • Intelligent retry mechanism with exponential backoff for failed activities
  • Dead letter queue handling for activities that exceed retry limits
  • Concurrent activity processing with configurable worker pools
  • Graceful shutdown handling with proper cleanup
  • Activity orchestration enabling activities to execute other activities
  • Comprehensive error handling with retryable and non-retryable error types
  • Activity metadata support for context and tracking
  • Pluggable storage backends - Redis (default), PostgreSQL, or bring your own
  • Queue statistics and monitoring capabilities
  • Web-based observability console for real-time monitoring

§Storage Backends

Runner-Q supports multiple storage backends through the Storage trait:

BackendFeature FlagStatusDescription
Redis(default)StableHigh-performance with Lua scripts for atomicity
PostgreSQLpostgresIn DevelopmentPermanent persistence with FOR UPDATE SKIP LOCKED

§Using a Custom Backend

use runner_q::{WorkerEngine, Storage};
use std::sync::Arc;

// With default Redis backend
let engine = WorkerEngine::builder()
    .redis_url("redis://localhost:6379")
    .queue_name("my_app")
    .build()
    .await?;

// With custom backend (e.g., PostgreSQL)
#[cfg(feature = "postgres")]
{
    use runner_q::storage::PostgresBackend;
    let backend = Arc::new(
        PostgresBackend::new("postgres://localhost/mydb", "my_queue").await?
    );
    let engine = WorkerEngine::builder()
        .backend(backend)
        .build()
        .await?;
}

§Example

use runner_q::{WorkerEngine, ActivityPriority, ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::json;
use serde::{Serialize, Deserialize};
use std::time::Duration;

// Define activity types
#[derive(Debug, Clone)]
enum MyActivityType {
    SendEmail,
    ProcessPayment,
}

impl std::fmt::Display for MyActivityType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            MyActivityType::SendEmail => write!(f, "send_email"),
            MyActivityType::ProcessPayment => write!(f, "process_payment"),
        }
    }
}

// Implement activity handler
pub struct SendEmailActivity;

#[async_trait]
impl ActivityHandler for SendEmailActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
        // Parse the email data - use ? operator for clean error handling
        let email_data: serde_json::Map<String, serde_json::Value> = payload
            .as_object()
            .ok_or_else(|| ActivityError::NonRetry("Invalid payload format".to_string()))?
            .clone();

        let to = email_data.get("to")
            .and_then(|v| v.as_str())
            .ok_or_else(|| ActivityError::NonRetry("Missing 'to' field".to_string()))?;

        // Simulate sending email
        println!("Sending email to: {}", to);

        // Return success with result data
        Ok(Some(serde_json::json!({
            "message": format!("Email sent to {}", to),
            "status": "delivered"
        })))
    }

    fn activity_type(&self) -> String {
        MyActivityType::SendEmail.to_string()
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct EmailResult {
    message: String,
    status: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Improved API: Builder pattern for WorkerEngine
    let engine = WorkerEngine::builder()
        .redis_url("redis://localhost:6379")
        .queue_name("my_app")
        .max_workers(8)
        .schedule_poll_interval(Duration::from_secs(30))
        .build()
        .await?;

    // Register activity handler
    let send_email_activity = SendEmailActivity;
    engine.register_activity(MyActivityType::SendEmail.to_string(), Arc::new(send_email_activity));

    // Get activity executor for fluent activity execution
    let activity_executor = engine.get_activity_executor();

    // Improved API: Fluent activity execution
    let future = activity_executor
        .activity("send_email")
        .payload(json!({"to": "user@example.com", "subject": "Welcome!"}))
        .max_retries(5)
        .timeout(Duration::from_secs(600))
        .execute()
        .await?;

    // Schedule an activity for future execution (10 seconds from now)
    let scheduled_future = activity_executor
        .activity("send_email")
        .payload(json!({"to": "user@example.com", "subject": "Reminder"}))
        .max_retries(3)
        .timeout(Duration::from_secs(300))
        .delay(Duration::from_secs(10))
        .execute()
        .await?;

    // Execute an activity with default options
    let future2 = activity_executor
        .activity("send_email")
        .payload(json!({"to": "admin@example.com"}))
        .execute()
        .await?;

    // Spawn a task to handle the result
    tokio::spawn(async move {
        if let Ok(result) = future.get_result().await {
            match result {
                None => {}
                Some(data) => {
                    let email_result: EmailResult = serde_json::from_value(data).unwrap();
                    println!("Email result: {:?}", email_result);
                }
            }
        }
    });

    // Start the worker engine (this will run indefinitely)
    engine.start().await?;

    Ok(())
}

§Activity Orchestration

Activities can execute other activities using the ActivityExecutor available in the ActivityContext. This enables building complex workflows and activity orchestration patterns.

use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityOption, ActivityPriority, ActivityError};
use async_trait::async_trait;

pub struct OrderProcessingActivity;

#[async_trait]
impl ActivityHandler for OrderProcessingActivity {
    async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
        let order_id = payload["order_id"]
            .as_str()
            .ok_or_else(|| ActivityError::NonRetry("Missing order_id".to_string()))?;

        // Step 1: Validate payment
        let _payment_future = context.activity_executor
            .activity("validate_payment")
            .payload(serde_json::json!({"order_id": order_id}))
            .priority(ActivityPriority::High)
            .max_retries(3)
            .timeout(std::time::Duration::from_secs(120))
            .execute()
            .await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue payment validation: {}", e)))?;

        // Step 2: Update inventory
        let _inventory_future = context.activity_executor
            .activity("update_inventory")
            .payload(serde_json::json!({"order_id": order_id}))
            .execute()
            .await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue inventory update: {}", e)))?;

        // Step 3: Schedule delivery notification for later
        context.activity_executor
            .activity("send_delivery_notification")
            .payload(serde_json::json!({"order_id": order_id, "customer_email": payload["customer_email"]}))
            .max_retries(5)
            .timeout(std::time::Duration::from_secs(300))
            .delay(std::time::Duration::from_secs(3600)) // 1 hour
            .execute()
            .await.map_err(|e| ActivityError::Retry(format!("Failed to schedule notification: {}", e)))?;

        Ok(Some(serde_json::json!({
            "order_id": order_id,
            "status": "processing",
            "steps_initiated": ["payment_validation", "inventory_update", "delivery_notification"]
        })))
    }

    fn activity_type(&self) -> String {
        "process_order".to_string()
    }
}

Re-exports§

pub use crate::config::WorkerConfig;
pub use crate::observability::QueueInspector;
pub use crate::observability::observability_api;
pub use crate::observability::runnerq_ui;
pub use crate::observability::ActivityEvent;
pub use crate::observability::ActivityEventType;
pub use crate::observability::ActivitySnapshot;
pub use crate::observability::DeadLetterRecord;
pub use crate::observability::QueueStats;
pub use crate::observability::CONSOLE_HTML;
pub use crate::runner::error::WorkerError;
pub use crate::runner::runner::ActivityBuilder;
pub use crate::runner::runner::ActivityExecutor;
pub use crate::runner::runner::MetricsSink;
pub use crate::runner::runner::WorkerEngine;
pub use crate::runner::runner::WorkerEngineBuilder;
pub use activity::activity::ActivityContext;
pub use activity::activity::ActivityFuture;
pub use activity::activity::ActivityHandler;
pub use activity::activity::ActivityHandlerResult;
pub use activity::activity::ActivityPriority;
pub use activity::activity::ActivityStatus;
pub use activity::activity::OnDuplicate;
pub use activity::error::ActivityError;
pub use activity::error::RetryableError;
pub use crate::storage::InspectionStorage;
pub use crate::storage::QueueStorage;
pub use crate::storage::Storage;
pub use crate::storage::StorageError;
pub use crate::storage::redis::RedisConfig;
pub use crate::storage::RedisBackend;

Modules§

activity
config
observability
Observability module for queue inspection and monitoring.
queue
runner
storage
Backend abstraction layer for RunnerQ.