Expand description
ยงqml-rs
A production-ready Rust implementation of background job processing.
qml provides a complete, enterprise-grade background job processing solution with multiple storage backends, multi-threaded processing, race condition prevention, and real-time monitoring capabilities.
ยง๐ Production Ready Features
- 3 Storage Backends: Memory, Redis, PostgreSQL with atomic operations
- Multi-threaded Processing: Configurable worker pools with job scheduling
- Web Dashboard: Real-time monitoring with WebSocket updates
- Race Condition Prevention: Comprehensive locking across all backends
- Job Lifecycle Management: Complete state tracking and retry logic
- Production Deployment: Docker, Kubernetes, and clustering support
ยง๐ฏ Storage Backends
ยงMemory Storage (Development/Testing)
use qml_rs::{MemoryStorage, Job, Storage};
use std::sync::Arc;
let storage = Arc::new(MemoryStorage::new());
let job = Job::new("process_data", vec!["input.csv".to_string()]);
storage.enqueue(&job).await.unwrap();
ยงRedis Storage (Distributed/High-Traffic)
#[cfg(feature = "redis")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
use qml_rs::storage::{RedisConfig, StorageInstance};
use std::time::Duration;
let config = RedisConfig::new()
.with_url("redis://localhost:6379")
.with_pool_size(20)
.with_command_timeout(Duration::from_secs(5));
let storage = StorageInstance::redis(config).await?;
Ok(())
}
ยงPostgreSQL Storage (Enterprise/ACID)
#[cfg(feature = "postgres")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
use qml_rs::storage::{PostgresConfig, StorageInstance};
use std::sync::Arc;
let config = PostgresConfig::new()
.with_database_url("postgresql://user:pass@localhost:5432/qml")
.with_auto_migrate(true)
.with_max_connections(50);
let storage = StorageInstance::postgres(config).await?;
Ok(())
}
ยงโก Job Processing Engine
ยงBasic Worker Implementation
use qml_rs::{Worker, Job, WorkerContext, WorkerResult, QmlError};
use async_trait::async_trait;
struct EmailWorker;
#[async_trait]
impl Worker for EmailWorker {
async fn execute(&self, job: &Job, _context: &WorkerContext) -> Result<WorkerResult, QmlError> {
let email = &job.arguments[0];
println!("Sending email to: {}", email);
// Email sending logic here
Ok(WorkerResult::success(None, 0))
}
fn method_name(&self) -> &str {
"send_email"
}
}
ยงComplete Job Server Setup
use qml_rs::{
BackgroundJobServer, MemoryStorage, ServerConfig,
WorkerRegistry, Job
};
use std::sync::Arc;
// Setup storage and registry
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
// registry.register(Box::new(EmailWorker)); // Add your workers
// Configure server
let config = ServerConfig::new("server-1")
.worker_count(4)
.queues(vec!["critical".to_string(), "normal".to_string()]);
// Start job server
let server = BackgroundJobServer::new(config, Arc::new(MemoryStorage::new()), Arc::new(registry));
// server.start().await?; // Start processing
ยง๐ Dashboard & Monitoring
ยงReal-time Web Dashboard
use qml_rs::{DashboardServer, MemoryStorage};
use std::sync::Arc;
let storage = Arc::new(MemoryStorage::new());
let dashboard = DashboardServer::new(storage, Default::default());
// Start dashboard on http://localhost:8080
// dashboard.start("0.0.0.0:8080").await?;
The dashboard provides:
- Real-time Statistics: Job counts by state, throughput metrics
- Job Management: View, retry, delete jobs through web UI
- WebSocket Updates: Live updates without page refresh
- Queue Monitoring: Per-queue statistics and performance
ยง๐ Race Condition Prevention
All storage backends implement atomic job fetching to prevent race conditions:
- PostgreSQL:
SELECT FOR UPDATE SKIP LOCKED
with dedicated lock table - Redis: Lua scripts for atomic operations with distributed locking
- Memory: Mutex-based locking with automatic cleanup
use qml_rs::{Storage, MemoryStorage};
let storage = MemoryStorage::new();
// Atomic job fetching - prevents multiple workers from processing same job
let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
match job {
Some(job) => println!("Got exclusive lock on job: {}", job.id),
None => println!("No jobs available"),
}
ยง๐ Job States & Lifecycle
Jobs progress through well-defined states:
Enqueued โ Processing โ Succeeded
โ โ
Scheduled Failed โ AwaitingRetry โ Enqueued
โ โ
Deleted Deleted
ยงState Management Example
use qml_rs::{Job, JobState};
let mut job = Job::new("process_payment", vec!["order_123".to_string()]);
// Job starts as Enqueued
assert!(matches!(job.state, JobState::Enqueued { .. }));
// Transition to Processing
job.set_state(JobState::processing("worker-1", "server-1")).unwrap();
// Complete successfully
job.set_state(JobState::succeeded(1500, Some("Payment processed".to_string()))).unwrap();
ยง๐ Production Architecture
ยงMulti-Server Setup
#[cfg(feature = "postgres")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
use qml_rs::storage::{PostgresConfig, StorageInstance};
use std::sync::Arc;
use qml_rs::{BackgroundJobServer, DashboardServer, ServerConfig, WorkerRegistry};
let storage_config = PostgresConfig::new()
.with_database_url(std::env::var("DATABASE_URL")?)
.with_auto_migrate(true)
.with_max_connections(50);
let storage = Arc::new(StorageInstance::postgres(storage_config).await?);
let registry = Arc::new(WorkerRegistry::new());
let server_config = ServerConfig::new("production-server")
.worker_count(20)
.queues(vec!["critical".to_string(), "normal".to_string()]);
let job_server = BackgroundJobServer::new(server_config, storage.clone(), registry);
let dashboard = DashboardServer::new(storage.clone(), Default::default());
// Note: The error types returned by job_server.start() and dashboard.start() may not match,
// so this try_join! block is for illustration only and may require custom error handling in real code.
tokio::try_join!(
job_server.start(),
dashboard.start()
);
Ok(())
}
ยง๐ Configuration Examples
ยงServer Configuration
use qml_rs::ServerConfig;
use chrono::Duration;
let config = ServerConfig::new("production-server")
.worker_count(20) // 20 worker threads
.polling_interval(Duration::seconds(1)) // Check for jobs every second
.job_timeout(Duration::minutes(5)) // 5-minute job timeout
.queues(vec!["critical".to_string(), "normal".to_string()]) // Process specific queues
.fetch_batch_size(10) // Fetch 10 jobs at once
.enable_scheduler(true); // Enable scheduled jobs
ยงPostgreSQL Configuration
#[cfg(feature = "postgres")]
use qml_rs::storage::PostgresConfig;
use std::time::Duration;
let config = PostgresConfig::new()
.with_database_url("postgresql://user:pass@host:5432/db")
.with_max_connections(50) // Connection pool size
.with_min_connections(5) // Minimum connections
.with_connect_timeout(Duration::from_secs(10))
.with_auto_migrate(true); // Run migrations automatically
ยง๐งช Testing Support
ยงUnit Testing with Memory Storage
use qml_rs::{MemoryStorage, Job, Storage};
#[tokio::test]
async fn test_job_processing() {
let storage = MemoryStorage::new();
let job = Job::new("test_job", vec!["arg1".to_string()]);
storage.enqueue(&job).await.unwrap();
let retrieved = storage.get(&job.id).await.unwrap().unwrap();
assert_eq!(job.id, retrieved.id);
}
ยงStress Testing
use qml_rs::{MemoryStorage, Job, Storage};
use futures::future::join_all;
#[tokio::test]
async fn test_high_concurrency() {
let storage = std::sync::Arc::new(MemoryStorage::new());
// Create 100 jobs concurrently
let jobs: Vec<_> = (0..100).map(|i| {
Job::new("concurrent_job", vec![i.to_string()])
}).collect();
let futures: Vec<_> = jobs.iter().map(|job| {
let storage = storage.clone();
let job = job.clone();
async move { storage.enqueue(&job).await }
}).collect();
let results = join_all(futures).await;
assert!(results.iter().all(|r| r.is_ok()));
}
ยง๐ง Error Handling
Comprehensive error types for robust error handling:
use qml_rs::{QmlError, Result};
fn handle_job_error(result: Result<()>) {
match result {
Ok(()) => println!("Job completed successfully"),
Err(QmlError::JobNotFound { job_id }) => {
println!("Job {} not found", job_id);
},
Err(QmlError::StorageError { message }) => {
println!("Storage error: {}", message);
},
Err(QmlError::WorkerError { message }) => {
println!("Worker error: {}", message);
},
Err(e) => println!("Other error: {}", e),
}
}
ยง๐ Examples
Run the included examples to see qml in action:
# Basic job creation and serialization
cargo run --example basic_job
# Multi-backend storage operations
cargo run --example storage_demo
# Real-time dashboard with WebSocket
cargo run --example dashboard_demo
# Complete job processing with workers
cargo run --example processing_demo
# PostgreSQL setup and operations
cargo run --example postgres_simple
ยง๐ Getting Started
-
Add qml to your project:
[dependencies] qml = "0.1.0" # For PostgreSQL support: qml = { version = "0.1.0", features = ["postgres"] }
-
Define your workers (implement the
Worker
trait) -
Choose a storage backend (
MemoryStorage
, [RedisStorage
], [PostgresStorage
]) -
Configure and start the
BackgroundJobServer
-
Monitor with the
DashboardServer
(optional)
See the [examples] for complete working implementations.
[examples]:
Re-exportsยง
pub use core::Job;
pub use core::JobState;
pub use dashboard::DashboardConfig;
pub use dashboard::DashboardServer;
pub use dashboard::DashboardService;
pub use dashboard::JobStatistics;
pub use dashboard::QueueStatistics;
pub use error::QmlError;
pub use error::Result;
pub use processing::BackgroundJobServer;
pub use processing::JobActivator;
pub use processing::JobProcessor;
pub use processing::JobScheduler;
pub use processing::RetryPolicy;
pub use processing::RetryStrategy;
pub use processing::ServerConfig;
pub use processing::Worker;
pub use processing::WorkerConfig;
pub use processing::WorkerContext;
pub use processing::WorkerRegistry;
pub use processing::WorkerResult;
pub use storage::MemoryStorage;
pub use storage::Storage;
pub use storage::StorageConfig;
pub use storage::StorageError;
pub use storage::StorageInstance;
Modulesยง
- core
- Core types for QML.
- dashboard
- error
- Error types for QML Rust.
- processing
- Job Processing Engine
- storage