Crate qml_rs

Source
Expand description

ยงqml-rs

A production-ready Rust implementation of background job processing.

qml provides a complete, enterprise-grade background job processing solution with multiple storage backends, multi-threaded processing, race condition prevention, and real-time monitoring capabilities.

ยง๐Ÿš€ Production Ready Features

  • 3 Storage Backends: Memory, Redis, PostgreSQL with atomic operations
  • Multi-threaded Processing: Configurable worker pools with job scheduling
  • Web Dashboard: Real-time monitoring with WebSocket updates
  • Race Condition Prevention: Comprehensive locking across all backends
  • Job Lifecycle Management: Complete state tracking and retry logic
  • Production Deployment: Docker, Kubernetes, and clustering support

ยง๐ŸŽฏ Storage Backends

ยงMemory Storage (Development/Testing)

use qml_rs::{MemoryStorage, Job, Storage};
use std::sync::Arc;

let storage = Arc::new(MemoryStorage::new());
let job = Job::new("process_data", vec!["input.csv".to_string()]);
storage.enqueue(&job).await.unwrap();

ยงRedis Storage (Distributed/High-Traffic)

โ“˜
#[cfg(feature = "redis")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
    use qml_rs::storage::{RedisConfig, StorageInstance};
    use std::time::Duration;
    let config = RedisConfig::new()
        .with_url("redis://localhost:6379")
        .with_pool_size(20)
        .with_command_timeout(Duration::from_secs(5));

    let storage = StorageInstance::redis(config).await?;
    Ok(())
}

ยงPostgreSQL Storage (Enterprise/ACID)

โ“˜
#[cfg(feature = "postgres")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
    use qml_rs::storage::{PostgresConfig, StorageInstance};
    use std::sync::Arc;
    let config = PostgresConfig::new()
        .with_database_url("postgresql://user:pass@localhost:5432/qml")
        .with_auto_migrate(true)
        .with_max_connections(50);

    let storage = StorageInstance::postgres(config).await?;
    Ok(())
}

ยงโšก Job Processing Engine

ยงBasic Worker Implementation

use qml_rs::{Worker, Job, WorkerContext, WorkerResult, QmlError};
use async_trait::async_trait;

struct EmailWorker;

#[async_trait]
impl Worker for EmailWorker {
    async fn execute(&self, job: &Job, _context: &WorkerContext) -> Result<WorkerResult, QmlError> {
        let email = &job.arguments[0];
        println!("Sending email to: {}", email);
        // Email sending logic here
        Ok(WorkerResult::success(None, 0))
    }

    fn method_name(&self) -> &str {
        "send_email"
    }
}

ยงComplete Job Server Setup

use qml_rs::{
    BackgroundJobServer, MemoryStorage, ServerConfig,
    WorkerRegistry, Job
};
use std::sync::Arc;

// Setup storage and registry
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
// registry.register(Box::new(EmailWorker)); // Add your workers

// Configure server
let config = ServerConfig::new("server-1")
    .worker_count(4)
    .queues(vec!["critical".to_string(), "normal".to_string()]);

// Start job server
let server = BackgroundJobServer::new(config, Arc::new(MemoryStorage::new()), Arc::new(registry));
// server.start().await?; // Start processing

ยง๐Ÿ“Š Dashboard & Monitoring

ยงReal-time Web Dashboard

use qml_rs::{DashboardServer, MemoryStorage};
use std::sync::Arc;

let storage = Arc::new(MemoryStorage::new());
let dashboard = DashboardServer::new(storage, Default::default());

// Start dashboard on http://localhost:8080
// dashboard.start("0.0.0.0:8080").await?;

The dashboard provides:

  • Real-time Statistics: Job counts by state, throughput metrics
  • Job Management: View, retry, delete jobs through web UI
  • WebSocket Updates: Live updates without page refresh
  • Queue Monitoring: Per-queue statistics and performance

ยง๐Ÿ”’ Race Condition Prevention

All storage backends implement atomic job fetching to prevent race conditions:

  • PostgreSQL: SELECT FOR UPDATE SKIP LOCKED with dedicated lock table
  • Redis: Lua scripts for atomic operations with distributed locking
  • Memory: Mutex-based locking with automatic cleanup
use qml_rs::{Storage, MemoryStorage};

let storage = MemoryStorage::new();

// Atomic job fetching - prevents multiple workers from processing same job
let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
match job {
    Some(job) => println!("Got exclusive lock on job: {}", job.id),
    None => println!("No jobs available"),
}

ยง๐Ÿ”„ Job States & Lifecycle

Jobs progress through well-defined states:

Enqueued โ†’ Processing โ†’ Succeeded
    โ†“           โ†“
Scheduled   Failed โ†’ AwaitingRetry โ†’ Enqueued
    โ†“           โ†“
 Deleted    Deleted

ยงState Management Example

use qml_rs::{Job, JobState};

let mut job = Job::new("process_payment", vec!["order_123".to_string()]);

// Job starts as Enqueued
assert!(matches!(job.state, JobState::Enqueued { .. }));

// Transition to Processing
job.set_state(JobState::processing("worker-1", "server-1")).unwrap();

// Complete successfully
job.set_state(JobState::succeeded(1500, Some("Payment processed".to_string()))).unwrap();

ยง๐Ÿ— Production Architecture

ยงMulti-Server Setup

โ“˜
#[cfg(feature = "postgres")]
async fn example() -> Result<(), Box<dyn std::error::Error>> {
    use qml_rs::storage::{PostgresConfig, StorageInstance};
    use std::sync::Arc;
    use qml_rs::{BackgroundJobServer, DashboardServer, ServerConfig, WorkerRegistry};
    let storage_config = PostgresConfig::new()
        .with_database_url(std::env::var("DATABASE_URL")?)
        .with_auto_migrate(true)
        .with_max_connections(50);

    let storage = Arc::new(StorageInstance::postgres(storage_config).await?);

    let registry = Arc::new(WorkerRegistry::new());
    let server_config = ServerConfig::new("production-server")
        .worker_count(20)
        .queues(vec!["critical".to_string(), "normal".to_string()]);

    let job_server = BackgroundJobServer::new(server_config, storage.clone(), registry);
    let dashboard = DashboardServer::new(storage.clone(), Default::default());

    // Note: The error types returned by job_server.start() and dashboard.start() may not match,
    // so this try_join! block is for illustration only and may require custom error handling in real code.
    tokio::try_join!(
        job_server.start(),
        dashboard.start()
    );
    Ok(())
}

ยง๐Ÿ“‹ Configuration Examples

ยงServer Configuration

use qml_rs::ServerConfig;
use chrono::Duration;

let config = ServerConfig::new("production-server")
    .worker_count(20)                           // 20 worker threads
    .polling_interval(Duration::seconds(1))     // Check for jobs every second
    .job_timeout(Duration::minutes(5))          // 5-minute job timeout
    .queues(vec!["critical".to_string(), "normal".to_string()]) // Process specific queues
    .fetch_batch_size(10)                       // Fetch 10 jobs at once
    .enable_scheduler(true);                    // Enable scheduled jobs

ยงPostgreSQL Configuration

โ“˜
#[cfg(feature = "postgres")]
use qml_rs::storage::PostgresConfig;
use std::time::Duration;

let config = PostgresConfig::new()
    .with_database_url("postgresql://user:pass@host:5432/db")
    .with_max_connections(50)                   // Connection pool size
    .with_min_connections(5)                    // Minimum connections
    .with_connect_timeout(Duration::from_secs(10))
    .with_auto_migrate(true);                   // Run migrations automatically

ยง๐Ÿงช Testing Support

ยงUnit Testing with Memory Storage

use qml_rs::{MemoryStorage, Job, Storage};

#[tokio::test]
async fn test_job_processing() {
    let storage = MemoryStorage::new();
    let job = Job::new("test_job", vec!["arg1".to_string()]);

    storage.enqueue(&job).await.unwrap();
    let retrieved = storage.get(&job.id).await.unwrap().unwrap();

    assert_eq!(job.id, retrieved.id);
}

ยงStress Testing

use qml_rs::{MemoryStorage, Job, Storage};
use futures::future::join_all;

#[tokio::test]
async fn test_high_concurrency() {
    let storage = std::sync::Arc::new(MemoryStorage::new());

    // Create 100 jobs concurrently
    let jobs: Vec<_> = (0..100).map(|i| {
        Job::new("concurrent_job", vec![i.to_string()])
    }).collect();

    let futures: Vec<_> = jobs.iter().map(|job| {
        let storage = storage.clone();
        let job = job.clone();
        async move { storage.enqueue(&job).await }
    }).collect();

    let results = join_all(futures).await;
    assert!(results.iter().all(|r| r.is_ok()));
}

ยง๐Ÿ”ง Error Handling

Comprehensive error types for robust error handling:

use qml_rs::{QmlError, Result};

fn handle_job_error(result: Result<()>) {
    match result {
        Ok(()) => println!("Job completed successfully"),
        Err(QmlError::JobNotFound { job_id }) => {
            println!("Job {} not found", job_id);
        },
        Err(QmlError::StorageError { message }) => {
            println!("Storage error: {}", message);
        },
        Err(QmlError::WorkerError { message }) => {
            println!("Worker error: {}", message);
        },
        Err(e) => println!("Other error: {}", e),
    }
}

ยง๐Ÿ“š Examples

Run the included examples to see qml in action:

# Basic job creation and serialization
cargo run --example basic_job

# Multi-backend storage operations
cargo run --example storage_demo

# Real-time dashboard with WebSocket
cargo run --example dashboard_demo

# Complete job processing with workers
cargo run --example processing_demo

# PostgreSQL setup and operations
cargo run --example postgres_simple

ยง๐Ÿš€ Getting Started

  1. Add qml to your project:

    [dependencies]
    qml = "0.1.0"
    # For PostgreSQL support:
    qml = { version = "0.1.0", features = ["postgres"] }
  2. Define your workers (implement the Worker trait)

  3. Choose a storage backend (MemoryStorage, [RedisStorage], [PostgresStorage])

  4. Configure and start the BackgroundJobServer

  5. Monitor with the DashboardServer (optional)

See the [examples] for complete working implementations.

[examples]:

Re-exportsยง

pub use core::Job;
pub use core::JobState;
pub use dashboard::DashboardConfig;
pub use dashboard::DashboardServer;
pub use dashboard::DashboardService;
pub use dashboard::JobStatistics;
pub use dashboard::QueueStatistics;
pub use error::QmlError;
pub use error::Result;
pub use processing::BackgroundJobServer;
pub use processing::JobActivator;
pub use processing::JobProcessor;
pub use processing::JobScheduler;
pub use processing::RetryPolicy;
pub use processing::RetryStrategy;
pub use processing::ServerConfig;
pub use processing::Worker;
pub use processing::WorkerConfig;
pub use processing::WorkerContext;
pub use processing::WorkerRegistry;
pub use processing::WorkerResult;
pub use storage::MemoryStorage;
pub use storage::Storage;
pub use storage::StorageConfig;
pub use storage::StorageError;
pub use storage::StorageInstance;

Modulesยง

core
Core types for QML.
dashboard
error
Error types for QML Rust.
processing
Job Processing Engine
storage