Runner-Q
A robust, scalable Redis-based activity queue and worker system for Rust applications.
Features
- Priority-based activity processing - Support for Critical, High, Normal, and Low priority levels
- Activity scheduling - Precise timestamp-based scheduling for future execution
- Intelligent retry mechanism - Built-in retry mechanism with exponential backoff
- Dead letter queue - Failed activities are moved to a dead letter queue for inspection
- Concurrent activity processing - Configurable number of concurrent workers
- Graceful shutdown - Proper shutdown handling with signal support
- Activity orchestration - Activities can execute other activities for complex workflows
- Comprehensive error handling - Retryable and non-retryable error types
- Activity metadata - Support for custom metadata on activities
- Redis persistence - Activities are stored in Redis for durability
- Built-in observability console - Real-time web UI for monitoring and managing activities
- Queue statistics - Monitoring capabilities and metrics collection
Installation
cargo add runner_q
Quick Start
use runner_q::{WorkerEngine, ActivityPriority, ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::json;
use serde::{Serialize, Deserialize};
use std::time::Duration;
#[derive(Debug, Clone)]
enum MyActivityType {
SendEmail,
ProcessPayment,
}
impl std::fmt::Display for MyActivityType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MyActivityType::SendEmail => write!(f, "send_email"),
MyActivityType::ProcessPayment => write!(f, "process_payment"),
}
}
}
pub struct SendEmailActivity;
#[async_trait]
impl ActivityHandler for SendEmailActivity {
async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
let email_data: serde_json::Map<String, serde_json::Value> = payload
.as_object()
.ok_or_else(|| ActivityError::NonRetry("Invalid payload format".to_string()))?
.clone();
let to = email_data.get("to")
.and_then(|v| v.as_str())
.ok_or_else(|| ActivityError::NonRetry("Missing 'to' field".to_string()))?;
println!("Sending email to: {}", to);
Ok(Some(serde_json::json!({
"message": format!("Email sent to {}", to),
"status": "delivered"
})))
}
fn activity_type(&self) -> String {
MyActivityType::SendEmail.to_string()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EmailResult {
message: String,
status: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let engine = WorkerEngine::builder()
.redis_url("redis://localhost:6379")
.queue_name("my_app")
.max_workers(8)
.schedule_poll_interval(Duration::from_secs(30))
.build()
.await?;
let send_email_activity = SendEmailActivity;
engine.register_activity(MyActivityType::SendEmail.to_string(), Arc::new(send_email_activity));
let activity_executor = engine.get_activity_executor();
let future = activity_executor
.activity("send_email")
.payload(json!({"to": "user@example.com", "subject": "Welcome!"}))
.max_retries(5)
.timeout(Duration::from_secs(600))
.execute()
.await?;
let scheduled_future = activity_executor
.activity("send_email")
.payload(json!({"to": "user@example.com", "subject": "Reminder"}))
.max_retries(3)
.timeout(Duration::from_secs(300))
.delay(Duration::from_secs(10))
.execute()
.await?;
let future2 = activity_executor
.activity("send_email")
.payload(json!({"to": "admin@example.com"}))
.execute()
.await?;
tokio::spawn(async move {
if let Ok(result) = future.get_result().await {
match result {
None => {}
Some(data) => {
let email_result: EmailResult = serde_json::from_value(data).unwrap();
println!("Email result: {:?}", email_result);
}
}
}
});
engine.start().await?;
Ok(())
}
Builder Pattern API
Runner-Q provides a fluent builder pattern for both WorkerEngine configuration and activity execution, making the API more ergonomic and easier to use.
WorkerEngine Builder
use runner_q::WorkerEngine;
use std::time::Duration;
let engine = WorkerEngine::builder()
.redis_url("redis://localhost:6379")
.queue_name("my_app")
.max_workers(8)
.schedule_poll_interval(Duration::from_secs(30))
.build()
.await?;
use runner_q::{RedisConfig, MetricsSink};
use std::sync::Arc;
let redis_config = RedisConfig {
max_size: 100,
min_idle: 10,
conn_timeout: Duration::from_secs(60),
idle_timeout: Duration::from_secs(600),
max_lifetime: Duration::from_secs(3600),
};
struct PrometheusMetrics;
impl MetricsSink for PrometheusMetrics {
fn inc_counter(&self, name: &str, value: u64) {
println!("Counter {}: {}", name, value);
}
fn observe_duration(&self, name: &str, duration: Duration) {
println!("Duration {}: {:?}", name, duration);
}
}
let engine = WorkerEngine::builder()
.redis_url("redis://localhost:6379")
.queue_name("my_app")
.max_workers(8)
.redis_config(redis_config)
.metrics(Arc::new(PrometheusMetrics))
.build()
.await?;
Activity Builder
use runner_q::{WorkerEngine, ActivityPriority};
use serde_json::json;
use std::time::Duration;
let activity_executor = engine.get_activity_executor();
let future = activity_executor
.activity("send_email")
.payload(json!({"to": "user@example.com", "subject": "Hello"}))
.max_retries(5)
.timeout(Duration::from_secs(600))
.execute()
.await?;
let scheduled_future = activity_executor
.activity("send_reminder")
.payload(json!({"user_id": 123}))
.delay(Duration::from_secs(3600)) .execute()
.await?;
let simple_future = activity_executor
.activity("process_data")
.payload(json!({"data": "example"}))
.execute()
.await?;
Activity Types
Activity types in Runner-Q are simple strings that identify different types of activities. You can use any string as an activity type identifier.
Examples
"send_email"
"process_payment"
"provision_card"
"update_card_status"
"process_webhook_event"
"user.registration"
"email-notification"
"background_sync"
Custom Activity Handlers
You can create custom activity handlers by implementing the ActivityHandler trait:
use runner_q::{ActivityContext, ActivityHandler, ActivityResult};
use async_trait::async_trait;
use serde_json::Value;
pub struct PaymentActivity {
}
#[async_trait]
impl ActivityHandler for PaymentActivity {
async fn handle(&self, payload: Value, context: ActivityContext) -> ActivityHandlerResult {
let amount = payload["amount"]
.as_f64()
.ok_or_else(|| ActivityError::NonRetry("Missing or invalid amount".to_string()))?;
let currency = payload["currency"]
.as_str()
.unwrap_or("USD");
println!("Processing payment: {} {}", amount, currency);
if amount <= 0.0 {
return Err(ActivityError::NonRetry("Invalid amount".to_string()));
}
Ok(Some(serde_json::json!({
"transaction_id": "txn_123456",
"amount": amount,
"currency": currency,
"status": "completed"
})))
}
fn activity_type(&self) -> String {
"process_payment".to_string()
}
}
worker_engine.register_activity("process_payment".to_string(), Arc::new(PaymentActivity {}));
Activity Priority and Options
Activities can be configured using the ActivityOption struct:
use runner_q::{ActivityPriority, ActivityOption};
let future = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "user@example.com"}),
Some(ActivityOption {
priority: Some(ActivityPriority::Critical), max_retries: 10, timeout_seconds: 900, })
).await?;
let future = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "user@example.com"}),
None
).await?;
Available priorities:
ActivityPriority::Critical - Highest priority (processed first)
ActivityPriority::High - High priority
ActivityPriority::Normal - Default priority
ActivityPriority::Low - Lowest priority
Getting Activity Results
Activities can return results that can be retrieved asynchronously:
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize)]
struct EmailResult {
message: String,
status: String,
}
let future = worker_engine.execute_activity(
"send_email".to_string(),
serde_json::json!({"to": "user@example.com"}),
None
).await?;
let result_value = future.get_result().await?;
let email_result: EmailResult = serde_json::from_value(result_value)?;
println!("Email result: {:?}", email_result);
Activity Orchestration
Activities can execute other activities using the ActivityExecutor available in the ActivityContext. This enables powerful workflow orchestration with the improved fluent API:
use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityPriority, ActivityError};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct OrderData {
id: String,
customer_email: String,
items: Vec<String>,
}
pub struct OrderProcessingActivity;
#[async_trait]
impl ActivityHandler for OrderProcessingActivity {
async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
let order_id = payload["order_id"]
.as_str()
.ok_or_else(|| ActivityError::NonRetry("Missing order_id".to_string()))?;
let _payment_future = context.activity_executor
.activity("validate_payment")
.payload(serde_json::json!({"order_id": order_id}))
.priority(ActivityPriority::High)
.max_retries(3)
.timeout(std::time::Duration::from_secs(120))
.execute()
.await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue payment validation: {}", e)))?;
let _inventory_future = context.activity_executor
.activity("update_inventory")
.payload(serde_json::json!({"order_id": order_id}))
.execute()
.await.map_err(|e| ActivityError::Retry(format!("Failed to enqueue inventory update: {}", e)))?;
context.activity_executor
.activity("send_delivery_notification")
.payload(serde_json::json!({"order_id": order_id, "customer_email": payload["customer_email"]}))
.priority(ActivityPriority::Normal)
.max_retries(5)
.timeout(std::time::Duration::from_secs(300))
.delay(std::time::Duration::from_secs(3600)) .execute()
.await.map_err(|e| ActivityError::Retry(format!("Failed to schedule notification: {}", e)))?;
Ok(Some(serde_json::json!({
"order_id": order_id,
"status": "processing",
"steps_initiated": ["payment_validation", "inventory_update", "delivery_notification"]
})))
}
fn activity_type(&self) -> String {
"process_order".to_string()
}
}
Benefits of Activity Orchestration
- Modularity: Break complex workflows into smaller, reusable activities
- Reliability: Each sub-activity has its own retry logic and error handling
- Monitoring: Track progress of individual workflow steps
- Scalability: Sub-activities can be processed by different workers
- Flexibility: Different priority levels and timeouts for different steps
- Scheduling: Schedule activities for future execution
- Fluent API: Clean, readable activity execution with method chaining
Metrics and Monitoring
Runner-Q provides comprehensive metrics collection through the MetricsSink trait, allowing you to integrate with your preferred monitoring system.
Basic Metrics Implementation
use runner_q::{MetricsSink, WorkerEngine};
use std::time::Duration;
use std::sync::Arc;
struct LoggingMetrics;
impl MetricsSink for LoggingMetrics {
fn inc_counter(&self, name: &str, value: u64) {
println!("METRIC: {} += {}", name, value);
}
fn observe_duration(&self, name: &str, duration: Duration) {
println!("METRIC: {} = {:?}", name, duration);
}
}
let engine = WorkerEngine::builder()
.redis_url("redis://localhost:6379")
.queue_name("my_app")
.metrics(Arc::new(LoggingMetrics))
.build()
.await?;
Prometheus Integration
use runner_q::{MetricsSink, WorkerEngine};
use std::time::Duration;
use std::sync::Arc;
use std::collections::HashMap;
struct PrometheusMetrics {
counters: HashMap<String, prometheus::Counter>,
histograms: HashMap<String, prometheus::Histogram>,
}
impl MetricsSink for PrometheusMetrics {
fn inc_counter(&self, name: &str, value: u64) {
if let Some(counter) = self.counters.get(name) {
counter.inc_by(value as f64);
}
}
fn observe_duration(&self, name: &str, duration: Duration) {
if let Some(histogram) = self.histograms.get(name) {
histogram.observe(duration.as_secs_f64());
}
}
}
struct CustomMetrics {
activity_completed: u64,
activity_failed: u64,
activity_retry: u64,
total_execution_time: Duration,
}
impl MetricsSink for CustomMetrics {
fn inc_counter(&self, name: &str, value: u64) {
match name {
"activity_completed" => {
println!("Activities completed: {}", value);
}
"activity_failed_non_retry" => {
println!("Activities failed (non-retry): {}", value);
}
"activity_retry" => {
println!("Activity retries: {}", value);
}
_ => {}
}
}
fn observe_duration(&self, name: &str, duration: Duration) {
match name {
"activity_execution" => {
println!("Activity execution time: {:?}", duration);
}
_ => {}
}
}
}
Available Metrics
The library automatically collects the following metrics:
activity_completed - Number of activities completed successfully
activity_retry - Number of activities that requested retry
activity_failed_non_retry - Number of activities that failed permanently
activity_timeout - Number of activities that timed out
No-op Metrics
If you don't need metrics collection, you can use the built-in NoopMetrics:
use runner_q::{NoopMetrics, MetricsSink};
use std::time::Duration;
let metrics = NoopMetrics;
metrics.inc_counter("activities_completed", 1);
metrics.observe_duration("activity_execution", Duration::from_secs(5));
Advanced Features
Activity Scheduling
Runner-Q supports scheduling activities for future execution with precise timestamp-based scheduling:
use runner_q::{WorkerEngine, ActivityPriority};
use serde_json::json;
use std::time::Duration;
let activity_executor = engine.get_activity_executor();
let future = activity_executor
.activity("send_reminder")
.payload(json!({"user_id": 123, "message": "Don't forget!"}))
.delay(Duration::from_secs(3600)) .execute()
.await?;
use chrono::{DateTime, Utc, Duration as ChronoDuration};
let scheduled_time = Utc::now() + ChronoDuration::hours(2);
let future = activity_executor
.activity("process_report")
.payload(json!({"report_type": "monthly"}))
.delay(Duration::from_secs(7200)) .execute()
.await?;
Redis Configuration
Fine-tune Redis connection behavior for your specific needs:
use runner_q::{WorkerEngine, RedisConfig};
use std::time::Duration;
let redis_config = RedisConfig {
max_size: 100, min_idle: 10, conn_timeout: Duration::from_secs(60), idle_timeout: Duration::from_secs(600), max_lifetime: Duration::from_secs(3600), };
let engine = WorkerEngine::builder()
.redis_url("redis://localhost:6379")
.queue_name("my_app")
.redis_config(redis_config)
.build()
.await?;
Graceful Shutdown
The worker engine supports graceful shutdown with proper cleanup:
use runner_q::WorkerEngine;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
let engine = Arc::new(engine);
let engine_clone = engine.clone();
let engine_handle = tokio::spawn(async move {
engine_clone.start().await
});
sleep(Duration::from_secs(10)).await;
engine.stop().await;
engine_handle.await??;
Activity Context and Metadata
Access rich context information in your activity handlers:
use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult};
use async_trait::async_trait;
#[async_trait]
impl ActivityHandler for MyActivity {
async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
println!("Processing activity {} of type {}", context.activity_id, context.activity_type);
println!("This is retry attempt #{}", context.retry_count);
if context.cancel_token.is_cancelled() {
return Err(ActivityError::NonRetry("Activity was cancelled".to_string()));
}
if let Some(correlation_id) = context.metadata.get("correlation_id") {
println!("Correlation ID: {}", correlation_id);
}
Ok(Some(serde_json::json!({"status": "processed"})))
}
fn activity_type(&self) -> String {
"my_activity".to_string()
}
}
Queue Statistics
Monitor queue performance and health using the inspector:
use runner_q::{WorkerEngine, QueueStats};
let engine = WorkerEngine::builder()
.redis_url("redis://localhost:6379")
.queue_name("my_app")
.build()
.await?;
let inspector = engine.inspector();
let stats: QueueStats = inspector.stats().await?;
println!("Queue stats:");
println!(" Pending activities: {}", stats.pending_activities);
println!(" Processing activities: {}", stats.processing_activities);
println!(" Scheduled activities: {}", stats.scheduled_activities);
println!(" Dead letter queue size: {}", stats.dead_letter_activities);
println!("Priority distribution:");
println!(" Critical: {}", stats.critical_priority);
println!(" High: {}", stats.high_priority);
println!(" Normal: {}", stats.normal_priority);
println!(" Low: {}", stats.low_priority);
For a visual dashboard with real-time updates, see the Observability Console section.
Error Handling
The library provides comprehensive error handling with clear separation between retryable and non-retryable errors.
Activity Handler Results
In your activity handlers, you can use the convenient ActivityHandlerResult type with the ? operator for clean error handling:
use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
use async_trait::async_trait;
use serde_json::Value;
#[derive(serde::Deserialize)]
struct MyData {
id: String,
value: String,
}
#[async_trait]
impl ActivityHandler for MyActivity {
async fn handle(&self, payload: Value, context: ActivityContext) -> ActivityHandlerResult {
let data: MyData = serde_json::from_value(payload)?;
if data.id.is_empty() {
return Err(ActivityError::NonRetry("Invalid data format".to_string()));
}
let result = external_api_call(&data)
.await
.map_err(|e| ActivityError::Retry(format!("API call failed: {}", e)))?;
Ok(Some(serde_json::json!({"result": result})))
}
fn activity_type(&self) -> String {
"my_activity".to_string()
}
}
Error Types:
ActivityError::Retry(message) - Will be retried with exponential backoff
ActivityError::NonRetry(message) - Will not be retried, goes to dead letter queue
- Any error implementing
Into<ActivityError> can be used with ?
Worker Engine Errors
use runner_q::{WorkerEngine, WorkerError, ActivityPriority};
use serde_json::json;
use std::time::Duration;
let activity_executor = engine.get_activity_executor();
match activity_executor
.activity("my_activity")
.payload(json!({"id": "123", "value": "test"}))
.execute()
.await
{
Ok(future) => {
match future.get_result().await {
Ok(result) => match result {
Some(data) => println!("Activity completed: {:?}", data),
None => println!("Activity completed with no result"),
},
Err(WorkerError::Timeout) => println!("Activity timed out"),
Err(WorkerError::CustomError(msg)) => println!("Activity failed: {}", msg),
Err(e) => println!("Activity failed: {}", e),
}
}
Err(e) => println!("Failed to enqueue activity: {}", e),
}
Error Recovery Patterns
use runner_q::{ActivityHandler, ActivityContext, ActivityHandlerResult, ActivityError};
use async_trait::async_trait;
#[async_trait]
impl ActivityHandler for ResilientActivity {
async fn handle(&self, payload: serde_json::Value, context: ActivityContext) -> ActivityHandlerResult {
match context.retry_count {
0..=2 => {
self.process_with_retry(payload).await
}
3..=5 => {
self.process_conservative(payload).await
}
_ => {
self.process_final_attempt(payload).await
}
}
}
fn activity_type(&self) -> String {
"resilient_activity".to_string()
}
}
impl ResilientActivity {
async fn process_with_retry(&self, payload: serde_json::Value) -> ActivityHandlerResult {
Ok(Some(serde_json::json!({"status": "processed"})))
}
async fn process_conservative(&self, payload: serde_json::Value) -> ActivityHandlerResult {
Ok(Some(serde_json::json!({"status": "processed_conservative"})))
}
async fn process_final_attempt(&self, payload: serde_json::Value) -> ActivityHandlerResult {
Ok(Some(serde_json::json!({"status": "processed_final"})))
}
}
Default Values
When using the builder pattern, sensible defaults are provided:
use runner_q::WorkerEngine;
let engine = WorkerEngine::builder().build().await?;
Redis Configuration
Fine-tune Redis connection behavior:
use runner_q::{RedisConfig, WorkerEngine};
use std::time::Duration;
let redis_config = RedisConfig {
max_size: 100, min_idle: 10, conn_timeout: Duration::from_secs(60), idle_timeout: Duration::from_secs(600), max_lifetime: Duration::from_secs(3600), };
let engine = WorkerEngine::builder()
.redis_config(redis_config)
.build()
.await?;
Observability Console
Runner-Q includes a built-in web-based observability console for monitoring and managing your activity queues in real-time.
Features
- Real-time Updates - Server-Sent Events (SSE) for instant activity updates
- Live Statistics - Monitor queue health with processing, pending, scheduled, and dead-letter counts
- Priority Distribution - See activity breakdown by priority level (Critical, High, Normal, Low)
- Activity Management - Browse and search activities across all queues (pending, processing, scheduled, completed, dead-letter)
- Activity Results - View execution results and outputs for completed activities
- Event Timeline - Detailed activity lifecycle events with multiple view modes
- 7-Day History - Query completed activities for up to 7 days
- Zero Setup - No build tools, npm, or dependencies required
Dashboard Preview

Quick Start
The console is designed to work just like Swagger UI - simply pass an inspector instance:
use runner_q::{runnerq_ui, WorkerEngine};
use axum::{serve, Router};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let engine = WorkerEngine::builder()
.redis_url("redis://127.0.0.1:6379")
.queue_name("my_app")
.build()
.await?;
let inspector = engine.inspector();
let app = Router::new()
.nest("/console", runnerq_ui(inspector));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8081").await?;
println!("✨ RunnerQ Console: http://localhost:8081/console");
serve(listener, app).await?;
Ok(())
}
Integration with Existing Apps
You can easily integrate the console into your existing Axum application:
use runner_q::{runnerq_ui, WorkerEngine};
use axum::Router;
let engine = WorkerEngine::builder()
.redis_url("redis://localhost:6379")
.queue_name("my_app")
.build()
.await?;
let inspector = engine.inspector();
let app = Router::new()
.route("/api/users", get(list_users))
.route("/api/posts", get(list_posts))
.nest("/console", runnerq_ui(inspector))
.with_state(app_state);
API-Only Mode
If you prefer to build a custom UI, you can serve just the API:
use runner_q::observability_api;
let app = Router::new()
.nest("/api/observability", observability_api(inspector));
Example
See the complete example in examples/console_ui.rs:
redis-server
cargo run --example console_ui
For more details, see the UI README.
License
This project is licensed under the MIT License - see the LICENSE file for details.