Expand description
Β§qml-rs
A production-ready Rust implementation of background job processing.
qml provides a complete, enterprise-grade background job processing solution with multiple storage backends, multi-threaded processing, race condition prevention, and real-time monitoring capabilities.
Β§π Production Ready Features
- 3 Storage Backends: Memory, Redis, PostgreSQL with atomic operations
- Multi-threaded Processing: Configurable worker pools with job scheduling
- Web Dashboard: Real-time monitoring with WebSocket updates
- Race Condition Prevention: Comprehensive locking across all backends
- Job Lifecycle Management: Complete state tracking and retry logic
- Production Deployment: Docker, Kubernetes, and clustering support
Β§π― Storage Backends
Β§Memory Storage (Development/Testing)
use qml_rs::{Job, MemoryStorage};
use qml_rs::storage::prelude::*;
use std::sync::Arc;
let storage = Arc::new(MemoryStorage::new());
let job = Job::new("process_data", serde_json::json!({ "file": "input.csv" }));
storage.enqueue(&job).await.unwrap();Β§Redis Storage (Distributed/High-Traffic)
#[cfg(feature = "redis")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
use qml_rs::storage::{RedisConfig, StorageInstance};
use std::time::Duration;
let config = RedisConfig::new()
.with_url("redis://localhost:6379")
.with_pool_size(20)
.with_command_timeout(Duration::from_secs(5));
let storage = StorageInstance::redis(config).await?;
Ok(())
}Β§PostgreSQL Storage (Enterprise/ACID)
#[cfg(feature = "postgres")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
use qml_rs::storage::{PostgresConfig, StorageInstance};
use std::sync::Arc;
let config = PostgresConfig::new()
.with_database_url("postgresql://user:pass@localhost:5432/qml")
.with_auto_migrate(true)
.with_max_connections(50);
let storage = StorageInstance::postgres(config).await?;
Ok(())
}Β§β‘ Job Processing Engine
Β§Basic Worker Implementation
use qml_rs::{Worker, Job, WorkerContext, WorkerResult, QmlError};
use async_trait::async_trait;
struct EmailWorker;
#[async_trait]
impl Worker for EmailWorker {
async fn execute(&self, job: &Job, _context: &WorkerContext) -> Result<WorkerResult, QmlError> {
let email = job.payload.get("to").and_then(|v| v.as_str()).unwrap_or("");
println!("Sending email to: {}", email);
Ok(WorkerResult::success(None, 0))
}
fn method_name(&self) -> &str {
"send_email"
}
}Β§Complete Job Server Setup
use qml_rs::{
BackgroundJobServer, MemoryStorage, ServerConfig,
WorkerRegistry, Job
};
use std::sync::Arc;
// Setup storage and registry
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
// registry.register(Box::new(EmailWorker)); // Add your workers
// Configure server
let config = ServerConfig::new("server-1")
.worker_count(4)
.queues(vec!["critical".to_string(), "normal".to_string()]);
// Start job server
let server = BackgroundJobServer::new(config, Arc::new(MemoryStorage::new()), Arc::new(registry));
// server.start().await?; // Start processingΒ§π Dashboard & Monitoring
Β§Real-time Web Dashboard
Requires the dashboard cargo feature (off by default):
qml-rs = { version = "β¦", features = ["dashboard"] }.
use qml_rs::{DashboardServer, MemoryStorage};
use std::sync::Arc;
let storage = Arc::new(MemoryStorage::new());
let dashboard = DashboardServer::new(storage, Default::default());
// Start dashboard on http://localhost:8080
// dashboard.start().await?;The dashboard provides:
- Real-time Statistics: Job counts by state, throughput metrics
- Job Management: View, retry, delete jobs through web UI
- WebSocket Updates: Live updates without page refresh
- Queue Monitoring: Per-queue statistics and performance
Β§π Race Condition Prevention
All storage backends implement atomic job fetching to prevent race conditions:
- PostgreSQL:
SELECT FOR UPDATE SKIP LOCKEDwith dedicated lock table - Redis: Lua scripts for atomic operations with distributed locking
- Memory: Mutex-based locking with automatic cleanup
use qml_rs::MemoryStorage;
use qml_rs::storage::prelude::*;
let storage = MemoryStorage::new();
// Atomic job fetching - prevents multiple workers from processing same job
let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
match job {
Some(job) => println!("Got exclusive lock on job: {}", job.id),
None => println!("No jobs available"),
}Β§β° Recurring Jobs
Register cron-scheduled templates with
BackgroundJobServer::schedule_recurring. The built-in
RecurringJobPoller wakes periodically, materializes each due template
into a regular Job, and advances its next_run_at. Templates
persist in storage, so restarts and multi-server deployments donβt lose
schedule state β a claim-and-park discipline across backends ensures no
two servers fire the same tick.
use qml_rs::{BackgroundJobServer, MemoryStorage, ServerConfig, WorkerRegistry};
use std::sync::Arc;
let storage = Arc::new(MemoryStorage::new());
let registry = Arc::new(WorkerRegistry::new());
let server = BackgroundJobServer::new(
ServerConfig::new("srv-1"),
storage,
registry,
);
// Cron expression uses the `cron` crate's 6-field format:
// `sec min hour day-of-month month day-of-week`.
server
.schedule_recurring(
"daily-report",
"0 0 9 * * *",
"generate_report",
serde_json::json!({ "kind": "daily" }),
"default",
)
.await?;
// ... later ...
server.remove_recurring("daily-report").await?;Β§π§Ή Automatic Expiration
Final-state jobs (Succeeded and permanently-Failed) are stamped with
expires_at by JobProcessor on transition. A background
CleanupWorker sweeps expired rows on a fixed interval so the hot
enqueue path stays O(1). Defaults: succeeded_ttl = 24h,
failed_ttl = 7 days, sweep every minute β all configurable on
ServerConfig.
use chrono::Duration;
use qml_rs::ServerConfig;
let config = ServerConfig::new("srv-1")
.succeeded_ttl(Duration::hours(12))
.failed_ttl(Duration::days(14))
.cleanup_interval(Duration::minutes(5));Both enable_recurring and enable_cleanup default to true; flip
them off if you want to run the poller or sweep out-of-process.
Β§π Job States & Lifecycle
Jobs progress through well-defined states:
Enqueued β Processing β Succeeded
β β
Scheduled Failed β AwaitingRetry β Enqueued
β β
Deleted DeletedΒ§State Management Example
use qml_rs::{Job, JobState};
let mut job = Job::new("process_payment", serde_json::json!({ "order": "order_123" }));
// Job starts as Enqueued
assert!(matches!(job.state, JobState::Enqueued { .. }));
// Transition to Processing
job.set_state(JobState::processing("worker-1", "server-1")).unwrap();
// Complete successfully
job.set_state(JobState::succeeded(1500, Some("Payment processed".to_string()))).unwrap();Β§π Production Architecture
Β§Multi-Server Setup
#[cfg(feature = "postgres")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
use qml_rs::storage::{PostgresConfig, StorageInstance};
use std::sync::Arc;
use qml_rs::{BackgroundJobServer, DashboardServer, ServerConfig, WorkerRegistry};
let storage_config = PostgresConfig::new()
.with_database_url(std::env::var("DATABASE_URL")?)
.with_auto_migrate(true)
.with_max_connections(50);
// `StorageInstance::postgres` returns `Arc<dyn Storage>` β
// no outer Arc::new wrap.
let storage = StorageInstance::postgres(storage_config).await?;
let registry = Arc::new(WorkerRegistry::new());
let server_config = ServerConfig::new("production-server")
.worker_count(20)
.queues(vec!["critical".to_string(), "normal".to_string()]);
let job_server = BackgroundJobServer::new(server_config, storage.clone(), registry);
let dashboard = DashboardServer::new(storage.clone(), Default::default());
// Note: The error types returned by job_server.start() and dashboard.start() may not match,
// so this try_join! block is for illustration only and may require custom error handling in real code.
tokio::try_join!(
job_server.start(),
dashboard.start()
);
Ok(())
}Β§π Configuration Examples
Β§Server Configuration
use qml_rs::ServerConfig;
use chrono::Duration;
let config = ServerConfig::new("production-server")
.worker_count(20) // 20 worker threads
.polling_interval(Duration::seconds(1)) // Check for jobs every second
.job_timeout(Duration::minutes(5)) // 5-minute job timeout
.queues(vec!["critical".to_string(), "normal".to_string()]) // Process specific queues
.fetch_batch_size(10) // Fetch 10 jobs at once
.enable_scheduler(true); // Enable scheduled jobsΒ§PostgreSQL Configuration
#[cfg(feature = "postgres")]
use qml_rs::storage::PostgresConfig;
use std::time::Duration;
let config = PostgresConfig::new()
.with_database_url("postgresql://user:pass@host:5432/db")
.with_max_connections(50) // Connection pool size
.with_min_connections(5) // Minimum connections
.with_connect_timeout(Duration::from_secs(10))
.with_auto_migrate(true); // Run migrations automaticallyΒ§π§ͺ Testing Support
Β§Unit Testing with Memory Storage
use qml_rs::{Job, MemoryStorage};
use qml_rs::storage::prelude::*;
#[tokio::test]
async fn test_job_processing() {
let storage = MemoryStorage::new();
let job = Job::new("test_job", serde_json::json!({ "arg": "arg1" }));
storage.enqueue(&job).await.unwrap();
let retrieved = storage.get(&job.id).await.unwrap().unwrap();
assert_eq!(job.id, retrieved.id);
}Β§Stress Testing
use qml_rs::{Job, MemoryStorage};
use qml_rs::storage::prelude::*;
use futures::future::join_all;
#[tokio::test]
async fn test_high_concurrency() {
let storage = std::sync::Arc::new(MemoryStorage::new());
// Create 100 jobs concurrently
let jobs: Vec<_> = (0..100).map(|i| {
Job::new("concurrent_job", serde_json::json!({ "i": i }))
}).collect();
let futures: Vec<_> = jobs.iter().map(|job| {
let storage = storage.clone();
let job = job.clone();
async move { storage.enqueue(&job).await }
}).collect();
let results = join_all(futures).await;
assert!(results.iter().all(|r| r.is_ok()));
}Β§π§ Error Handling
Comprehensive error types for robust error handling:
use qml_rs::{QmlError, Result};
fn handle_job_error(result: Result<()>) {
match result {
Ok(()) => println!("Job completed successfully"),
Err(QmlError::JobNotFound { job_id }) => {
println!("Job {} not found", job_id);
},
Err(QmlError::StorageError { message }) => {
println!("Storage error: {}", message);
},
Err(QmlError::WorkerError { message }) => {
println!("Worker error: {}", message);
},
Err(e) => println!("Other error: {}", e),
}
}Β§π Examples
Run the included examples to see qml in action:
# Basic job creation and serialization
cargo run --example basic_job
# Multi-backend storage operations
cargo run --example storage_demo
# Real-time dashboard with WebSocket
cargo run --example dashboard_demo
# Complete job processing with workers
cargo run --example processing_demo
# PostgreSQL setup and operations
cargo run --example postgres_simpleΒ§π Getting Started
-
Add qml to your project:
[dependencies] qml = "0.1.0" # For PostgreSQL support: qml = { version = "0.1.0", features = ["postgres"] } -
Define your workers (implement the
Workertrait) -
Choose a storage backend (
MemoryStorage, [RedisStorage], [PostgresStorage]) -
Configure and start the
BackgroundJobServer -
Monitor with the [
DashboardServer] (optional)
See the [examples] for complete working implementations.
[examples]:
Re-exportsΒ§
pub use core::Job;pub use core::JobState;pub use core::JobStateKind;pub use core::RecurringJob;pub use core::ServerInfo;pub use error::QmlError;pub use error::Result;pub use processing::BackgroundJobServer;pub use processing::CleanupWorker;pub use processing::JobMiddleware;pub use processing::JobProcessor;pub use processing::JobScheduler;pub use processing::Next;pub use processing::RecurringJobPoller;pub use processing::RetryPolicy;pub use processing::RetryStrategy;pub use processing::ServerConfig;pub use processing::StateChangeHook;pub use processing::TracingMiddleware;pub use processing::TypedWorker;pub use processing::TypedWorkerAdapter;pub use processing::Worker;pub use processing::WorkerConfig;pub use processing::WorkerContext;pub use processing::WorkerRegistry;pub use processing::WorkerResult;pub use storage::JobLocker;pub use storage::JobStore;pub use storage::MemoryStorage;pub use storage::MonitoringApi;pub use storage::NamedLocks;pub use storage::RecurringStore;pub use storage::ServerRegistry;pub use storage::Storage;pub use storage::StorageConfig;pub use storage::StorageError;pub use storage::StorageInstance;
ModulesΒ§
- core
- Core types for QML.
- error
- Error types for QML Rust.
- processing
- Job Processing Engine
- storage