Runner-Q
A robust, scalable activity queue and worker system for Rust applications with pluggable storage backends.
Features
- Pluggable backend system - Trait-based storage abstraction supporting Redis, PostgreSQL, Valkey, and custom backends
- Priority-based activity processing - Support for Critical, High, Normal, and Low priority levels
- Activity scheduling - Precise timestamp-based scheduling for future execution
- Intelligent retry mechanism - Built-in retry mechanism with exponential backoff
- Dead letter queue - Failed activities are moved to a dead letter queue for inspection
- Concurrent activity processing - Configurable number of concurrent workers
- Graceful shutdown - Proper shutdown handling with signal support
- Activity orchestration - Activities can execute other activities for complex workflows
- Comprehensive error handling - Retryable and non-retryable error types
- Activity metadata - Support for custom metadata on activities
- Built-in observability console - Real-time web UI for monitoring and managing activities
- Queue statistics - Monitoring capabilities and metrics collection
Storage Backends
| Backend | Status | Use Case |
|---|---|---|
| Redis | โ Stable | Default. High-performance, ephemeral storage with TTL |
| Valkey | โ Stable | Redis-compatible, drop-in replacement |
| PostgreSQL | ๐งช Experimental | Permanent persistence, SQL-based queries |
| Custom | โ Supported | Implement Storage trait for your own backend |
Installation
Quick Start
use ;
use Arc;
use async_trait;
use json;
use ;
use Duration;
// Define activity types
// Implement activity handler
;
async
Builder Pattern API
Runner-Q provides a fluent builder pattern for both WorkerEngine configuration and activity execution, making the API more ergonomic and easier to use.
WorkerEngine Builder
use WorkerEngine;
use Duration;
// Basic configuration
let engine = builder
.redis_url
.queue_name
.max_workers
.schedule_poll_interval
.build
.await?;
// Advanced configuration with Redis config and metrics
use ;
use Arc;
let redis_config = RedisConfig ;
// Custom metrics implementation
;
let engine = builder
.redis_url
.queue_name
.max_workers
.redis_config
.metrics
.build
.await?;
// Using a custom backend
use RedisBackend;
let backend = builder
.redis_url
.queue_name
.build
.await?;
let engine = builder
.backend
.max_workers
.build
.await?;
Activity Builder
use ;
use json;
use Duration;
// Get activity executor for fluent activity execution
// Note: The fluent API is available through the activity executor, not directly on the engine
let activity_executor = engine.get_activity_executor;
// Fluent activity execution
let future = activity_executor
.activity
.payload
.max_retries
.timeout
.execute
.await?;
// Schedule activity for future execution
let scheduled_future = activity_executor
.activity
.payload
.delay // 1 hour delay
.execute
.await?;
// Simple activity with defaults
let simple_future = activity_executor
.activity
.payload
.execute
.await?;
Activity Types
Activity types in Runner-Q are simple strings that identify different types of activities. You can use any string as an activity type identifier.
Examples
// Common activity types
"send_email"
"process_payment"
"provision_card"
"update_card_status"
"process_webhook_event"
// You can use any string format you prefer
"user.registration"
"email-notification"
"background_sync"
Custom Activity Handlers
You can create custom activity handlers by implementing the ActivityHandler trait:
use ;
use async_trait;
use Value;
// Register the handler
worker_engine.register_activity;
Activity Priority and Options
Activities can be configured using the ActivityOption struct:
use ;
// High priority with custom retry and timeout settings
let future = worker_engine.execute_activity.await?;
// Use default options (Normal priority, 3 retries, 300s timeout)
let future = worker_engine.execute_activity.await?;
Available priorities:
ActivityPriority::Critical- Highest priority (processed first)ActivityPriority::High- High priorityActivityPriority::Normal- Default priorityActivityPriority::Low- Lowest priority
Getting Activity Results
Activities can return results that can be retrieved asynchronously:
use ;
let future = worker_engine.execute_activity.await?;
// Get the result (this will wait until the activity completes)
let result_value = future.get_result.await?;
let email_result: EmailResult = from_value?;
println!;
Activity Orchestration
Activities can execute other activities using the ActivityExecutor available in the ActivityContext. This enables powerful workflow orchestration with the improved fluent API:
use ;
use async_trait;
use ;
;
Benefits of Activity Orchestration
- Modularity: Break complex workflows into smaller, reusable activities
- Reliability: Each sub-activity has its own retry logic and error handling
- Monitoring: Track progress of individual workflow steps
- Scalability: Sub-activities can be processed by different workers
- Flexibility: Different priority levels and timeouts for different steps
- Scheduling: Schedule activities for future execution
- Fluent API: Clean, readable activity execution with method chaining
Metrics and Monitoring
Runner-Q provides comprehensive metrics collection through the MetricsSink trait, allowing you to integrate with your preferred monitoring system.
Basic Metrics Implementation
use ;
use Duration;
use Arc;
// Simple logging metrics implementation
;
// Use with WorkerEngine
let engine = builder
.redis_url
.queue_name
.metrics
.build
.await?;
Prometheus Integration
use ;
use Duration;
use Arc;
use HashMap;
// Prometheus metrics implementation
// Custom metrics implementation
Available Metrics
The library automatically collects the following metrics:
activity_completed- Number of activities completed successfullyactivity_retry- Number of activities that requested retryactivity_failed_non_retry- Number of activities that failed permanentlyactivity_timeout- Number of activities that timed out
No-op Metrics
If you don't need metrics collection, you can use the built-in NoopMetrics:
use ;
use Duration;
let metrics = NoopMetrics;
// These calls do nothing
metrics.inc_counter;
metrics.observe_duration;
Advanced Features
Activity Scheduling
Runner-Q supports scheduling activities for future execution with precise timestamp-based scheduling:
use ;
use json;
use Duration;
// Get activity executor for scheduling
let activity_executor = engine.get_activity_executor;
// Schedule an activity to run in 1 hour
let future = activity_executor
.activity
.payload
.delay // 1 hour from now
.execute
.await?;
// Schedule for a specific time (using chrono)
use ;
let scheduled_time = now + hours;
let future = activity_executor
.activity
.payload
.delay // 2 hours
.execute
.await?;
Redis Configuration
Fine-tune Redis connection behavior for your specific needs:
use ;
use Duration;
let redis_config = RedisConfig ;
let engine = builder
.redis_url
.queue_name
.redis_config
.build
.await?;
Pluggable Storage Backends
Runner-Q uses a trait-based storage abstraction that allows you to swap out the persistence layer. Built-in backends include Redis (default) and PostgreSQL (experimental), with full support for custom implementations.
Architecture
graph TD
subgraph PublicAPI [Public API]
WEB[WorkerEngineBuilder]
WE[WorkerEngine]
end
subgraph StorageTraits [Storage Module]
QB[QueueStorage trait]
IB[InspectionStorage trait]
end
subgraph Implementations [Backend Implementations]
RB[RedisBackend]
PB[PostgresBackend]
Future[Future: KafkaBackend, etc.]
end
WEB -->|".backend()"| QB
WEB -->|".redis_url()"| RB
WE --> QB
WE --> IB
RB --> QB
RB --> IB
PB --> QB
PB --> IB
Future -.-> QB
Future -.-> IB
The public API remains unchanged - you can continue using .redis_url() for the default experience, or use .backend() to inject a custom implementation.
Using the Default Redis Backend
use ;
use Arc;
// Option 1: Use the simple redis_url API (recommended for most cases)
let engine = builder
.redis_url
.queue_name
.build
.await?;
// Option 2: Create a RedisBackend explicitly for more control
let backend = builder
.redis_url
.queue_name
.lease_ms // Custom lease duration
.build
.await?;
let engine = builder
.backend
.max_workers
.build
.await?;
Valkey Compatibility
Since Valkey is Redis protocol-compatible, you can use it directly by pointing the URL to your Valkey server:
let engine = builder
.redis_url // Works with Valkey!
.queue_name
.build
.await?;
PostgreSQL Backend (Experimental)
โ ๏ธ IN DEVELOPMENT - The PostgreSQL backend is feature-gated and experimental. Not recommended for production use yet.
For use cases requiring permanent persistence and SQL-based queries, RunnerQ provides an experimental PostgreSQL backend:
# Cargo.toml
[]
= { = "0.5", = ["postgres"] }
use PostgresBackend;
use Arc;
// Create PostgreSQL backend
let backend = new;
// Use with WorkerEngine
let engine = builder
.backend
.max_workers
.build
.await?;
PostgreSQL Backend Features:
- Permanent Persistence - Activities stored indefinitely (no TTL expiration)
- Multi-node Safe - Uses
FOR UPDATE SKIP LOCKEDfor concurrent job claiming - Cross-process Events - PostgreSQL
LISTEN/NOTIFYfor real-time event streaming - Atomic Idempotency - Separate table with
INSERT ... ON CONFLICTfor race-safe key claiming - History Preservation - Never deletes activity records
Schema Tables Created:
runnerq_activities- Main activity storagerunnerq_events- Event history timelinerunnerq_results- Activity execution resultsrunnerq_idempotency- Idempotency key mapping
See examples/postgres_example.rs for a complete working example.
Implementing a Custom Backend
You can implement your own backend by implementing the Storage trait (which combines QueueStorage and InspectionStorage):
use ;
use ;
use async_trait;
use Duration;
use Uuid;
// Use your custom backend
let backend = new;
let engine = builder
.backend
.max_workers
.build
.await?;
Storage Trait Reference
The storage abstraction consists of two traits:
QueueStorage - Core queue operations:
enqueue()- Add activity to the queuedequeue()- Claim an activity for processingack_success()- Mark activity as completedack_failure()- Handle activity failure (retry or dead-letter)process_scheduled()- Move due scheduled activities to ready queuerequeue_expired()- Reclaim activities with expired leasesextend_lease()- Extend activity processing leasestore_result()/get_result()- Activity result storagecheck_idempotency()- Idempotency key handling
InspectionStorage - Observability operations:
stats()- Get queue statisticslist_pending()/list_processing()/list_scheduled()/list_completed()- List activities by statuslist_dead_letter()- List dead-lettered activitiesget_activity()- Get specific activity detailsget_activity_events()- Get activity lifecycle eventsevent_stream()- Stream real-time events (for SSE)
Graceful Shutdown
The worker engine supports graceful shutdown with proper cleanup:
use WorkerEngine;
use Arc;
use ;
// Start the engine in a background task
let engine = new;
let engine_clone = engine.clone;
let engine_handle = spawn;
// Let it run for a while
sleep.await;
// Gracefully stop the engine
engine.stop.await;
// Wait for the engine to finish
engine_handle.await??;
Activity Context and Metadata
Access rich context information in your activity handlers:
use ;
use async_trait;
Queue Statistics
Monitor queue performance and health using the inspector:
use ;
let engine = builder
.redis_url
.queue_name
.build
.await?;
// Get the inspector
let inspector = engine.inspector;
// Get queue statistics
let stats: QueueStats = inspector.stats.await?;
println!;
println!;
println!;
println!;
println!;
println!;
println!;
println!;
println!;
println!;
For a visual dashboard with real-time updates, see the Observability Console section.
Error Handling
The library provides comprehensive error handling with clear separation between retryable and non-retryable errors.
Activity Handler Results
In your activity handlers, you can use the convenient ActivityHandlerResult type with the ? operator for clean error handling:
use ;
use async_trait;
use Value;
Error Types:
ActivityError::Retry(message)- Will be retried with exponential backoffActivityError::NonRetry(message)- Will not be retried- Any error implementing
Into<ActivityError>can be used with?
Dead Letter Callback
When an activity exhausts all retries, it moves to the dead letter queue. You can handle this event by implementing the optional on_dead_letter callback:
use ;
use async_trait;
The callback has a default empty implementation, so existing handlers continue to work without modification.
Worker Engine Errors
use ;
use json;
use Duration;
// Using the fluent API for error handling
let activity_executor = engine.get_activity_executor;
match activity_executor
.activity
.payload
.execute
.await
Error Recovery Patterns
use ;
use async_trait;
Default Values
When using the builder pattern, sensible defaults are provided:
use WorkerEngine;
// Uses these defaults:
// - redis_url: "redis://127.0.0.1:6379"
// - queue_name: "default"
// - max_workers: 10
// - schedule_poll_interval: 5 seconds
let engine = builder.build.await?;
Redis Configuration
Fine-tune Redis connection behavior:
use ;
use Duration;
let redis_config = RedisConfig ;
let engine = builder
.redis_config
.build
.await?;
Observability Console
Runner-Q includes a built-in web-based observability console for monitoring and managing your activity queues in real-time.
Features
- Real-time Updates - Server-Sent Events (SSE) for instant activity updates
- Live Statistics - Monitor queue health with processing, pending, scheduled, and dead-letter counts
- Priority Distribution - See activity breakdown by priority level (Critical, High, Normal, Low)
- Activity Management - Browse and search activities across all queues (pending, processing, scheduled, completed, dead-letter)
- Activity Results - View execution results and outputs for completed activities
- Event Timeline - Detailed activity lifecycle events with multiple view modes
- 7-Day History - Query completed activities for up to 7 days
- Zero Setup - No build tools, npm, or dependencies required
Dashboard Preview

Quick Start
The console is designed to work just like Swagger UI - simply pass an inspector instance:
use ;
use ;
async
Integration with Existing Apps
You can easily integrate the console into your existing Axum application:
use ;
use Router;
let engine = builder
.redis_url
.queue_name
.build
.await?;
let inspector = engine.inspector;
// Your existing app routes
let app = new
.route
.route
// Add the console
.nest
.with_state;
API-Only Mode
If you prefer to build a custom UI, you can serve just the API:
use observability_api;
let app = new
.nest;
Example
See the complete example in examples/console_ui.rs:
# Start Redis
# Run the console example
# Open http://localhost:8081/console
For more details, see the UI README.
License
This project is licensed under the MIT License - see the LICENSE file for details.