Trait Storage

Source
pub trait Storage: Send + Sync {
    // Required methods
    fn enqueue<'life0, 'life1, 'async_trait>(
        &'life0 self,
        job: &'life1 Job,
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn get<'life0, 'life1, 'async_trait>(
        &'life0 self,
        job_id: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Job>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn update<'life0, 'life1, 'async_trait>(
        &'life0 self,
        job: &'life1 Job,
    ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn delete<'life0, 'life1, 'async_trait>(
        &'life0 self,
        job_id: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn list<'life0, 'life1, 'async_trait>(
        &'life0 self,
        state_filter: Option<&'life1 JobState>,
        limit: Option<usize>,
        offset: Option<usize>,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<Job>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn get_job_counts<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<HashMap<JobState, usize>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn get_available_jobs<'life0, 'async_trait>(
        &'life0 self,
        limit: Option<usize>,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<Job>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn fetch_and_lock_job<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        worker_id: &'life1 str,
        queues: Option<&'life2 [String]>,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Job>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn try_acquire_job_lock<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        job_id: &'life1 str,
        worker_id: &'life2 str,
        timeout_seconds: u64,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn release_job_lock<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        job_id: &'life1 str,
        worker_id: &'life2 str,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn fetch_available_jobs_atomic<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        worker_id: &'life1 str,
        limit: Option<usize>,
        queues: Option<&'life2 [String]>,
    ) -> Pin<Box<dyn Future<Output = Result<Vec<Job>, StorageError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
}
Expand description

Core storage trait that defines the interface for job persistence across all backends.

The Storage trait provides a unified API for job persistence operations, supporting multiple storage backends including in-memory, Redis, and PostgreSQL. All implementations provide atomic operations and race condition prevention for production use.

§Storage Backends

  • MemoryStorage: Fast in-memory storage for development and testing
  • [RedisStorage]: Distributed Redis storage with Lua script atomicity
  • [PostgresStorage]: ACID-compliant PostgreSQL with row-level locking

§Core Operations

The trait provides standard CRUD operations (enqueue, get, update, delete) plus advanced operations for job processing:

  • Job Management: Store, retrieve, update, and delete jobs
  • Querying: List jobs with filtering and pagination
  • Processing: Atomic job fetching with race condition prevention
  • Locking: Explicit job locking for distributed coordination

§Race Condition Prevention

All storage backends implement atomic job fetching to prevent multiple workers from processing the same job simultaneously:

Worker A ──┐
           ├── fetch_and_lock_job() ──→ Gets Job #123
Worker B ──┘                         ──→ Gets Job #124 (not #123)

§Examples

§Basic Storage Operations

use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();

// Create and store a job
let job = Job::new("send_email", vec!["user@example.com".to_string()]);
storage.enqueue(&job).await.unwrap();

// Retrieve the job
let retrieved = storage.get(&job.id).await.unwrap().unwrap();
assert_eq!(job.id, retrieved.id);

// Update job state
let mut updated_job = retrieved;
updated_job.set_state(qml_rs::JobState::processing("worker-1", "server-1")).unwrap();
storage.update(&updated_job).await.unwrap();

// Delete the job
let deleted = storage.delete(&job.id).await.unwrap();
assert!(deleted);

§Atomic Job Processing

use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();

// Enqueue some jobs
for i in 0..5 {
    let job = Job::new("process_item", vec![i.to_string()]);
    storage.enqueue(&job).await.unwrap();
}

// Worker fetches and locks a job atomically
let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
match job {
    Some(job) => {
        println!("Worker-1 processing job: {}", job.id);
        // Job is automatically locked and marked as processing
    },
    None => println!("No jobs available"),
}

§Storage Backend Selection

use qml_rs::storage::{StorageInstance, StorageConfig, MemoryConfig};

// Memory storage for development
let memory_storage = StorageInstance::memory();

// Redis storage for production
use qml_rs::storage::RedisConfig;
let redis_config = RedisConfig::new().with_url("redis://localhost:6379");
match StorageInstance::redis(redis_config).await {
    Ok(redis_storage) => println!("Redis storage ready"),
    Err(e) => println!("Redis connection failed: {}", e),
}

// PostgreSQL storage for enterprise
use qml_rs::storage::PostgresConfig;
let pg_config = PostgresConfig::new()
    .with_database_url("postgresql://localhost:5432/qml")
    .with_auto_migrate(true);
match StorageInstance::postgres(pg_config).await {
    Ok(pg_storage) => println!("PostgreSQL storage ready"),
    Err(e) => println!("PostgreSQL connection failed: {}", e),
}

§Job Filtering and Statistics

use qml_rs::{MemoryStorage, Job, JobState, Storage};

let storage = MemoryStorage::new();

// Create jobs in different states
let mut job1 = Job::new("task1", vec![]);
let mut job2 = Job::new("task2", vec![]);
job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();

storage.enqueue(&job1).await.unwrap();
storage.enqueue(&job2).await.unwrap();

// List all jobs
let all_jobs = storage.list(None, None, None).await.unwrap();
println!("Total jobs: {}", all_jobs.len());

// Get job counts by state
let counts = storage.get_job_counts().await;
match counts {
    Ok(counts) => {
        for (state, count) in counts {
            println!("{:?}: {}", state, count);
        }
    },
    Err(e) => println!("Error: {}", e),
}

// Get available jobs for processing
let available = storage.get_available_jobs(Some(10)).await.unwrap();
println!("Available for processing: {}", available.len());

Required Methods§

Source

fn enqueue<'life0, 'life1, 'async_trait>( &'life0 self, job: &'life1 Job, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Store a new job in the storage backend.

Persists a job to the storage system, making it available for processing. The job is typically stored in the “enqueued” state unless specified otherwise.

§Arguments
  • job - The job to store with all its metadata and configuration
§Returns
  • Ok(()) - Job was stored successfully
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();

let job = Job::with_config(
    "send_notification",
    vec!["user123".to_string()],
    "notifications", // queue
    5,              // priority
    3               // max_retries
);

storage.enqueue(&job).await.unwrap();
println!("Job {} enqueued successfully", job.id);
Source

fn get<'life0, 'life1, 'async_trait>( &'life0 self, job_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<Job>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Retrieve a job by its unique identifier.

Fetches a complete job record including all metadata, current state, and configuration. Returns None if the job doesn’t exist.

§Arguments
  • job_id - The unique identifier of the job to retrieve
§Returns
  • Ok(Some(job)) - Job exists and was retrieved successfully
  • Ok(None) - Job doesn’t exist in storage
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();
let job = Job::new("process_data", vec!["file.csv".to_string()]);

storage.enqueue(&job).await.unwrap();

// Retrieve the job
match storage.get(&job.id).await.unwrap() {
    Some(retrieved_job) => {
        println!("Found job: {} ({})", retrieved_job.id, retrieved_job.method);
    },
    None => println!("Job not found"),
}
Source

fn update<'life0, 'life1, 'async_trait>( &'life0 self, job: &'life1 Job, ) -> Pin<Box<dyn Future<Output = Result<(), StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Update an existing job’s state and metadata.

Modifies a job record in storage, typically used for state transitions (e.g., Enqueued → Processing → Succeeded). The entire job record is updated.

§Arguments
  • job - The job with updated information to persist
§Returns
  • Ok(()) - Job was updated successfully
  • Err(StorageError) - Storage operation failed (job may not exist)
§Examples
use qml_rs::{MemoryStorage, Job, JobState, Storage};

let storage = MemoryStorage::new();
let mut job = Job::new("process_order", vec!["order123".to_string()]);

storage.enqueue(&job).await.unwrap();

// Update job state to processing
job.set_state(JobState::processing("worker-1", "server-1")).unwrap();
storage.update(&job).await.unwrap();

// Add metadata and update again
job.add_metadata("processed_by", "worker-1");
storage.update(&job).await.unwrap();
Source

fn delete<'life0, 'life1, 'async_trait>( &'life0 self, job_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Remove a job from storage (soft or hard delete).

Deletes a job record from the storage system. Some implementations may perform soft deletion (marking as deleted) while others perform hard deletion.

§Arguments
  • job_id - The unique identifier of the job to delete
§Returns
  • Ok(true) - Job existed and was deleted successfully
  • Ok(false) - Job didn’t exist (nothing to delete)
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();
let job = Job::new("cleanup_task", vec![]);

storage.enqueue(&job).await.unwrap();

// Delete the job
let was_deleted = storage.delete(&job.id).await.unwrap();
assert!(was_deleted);

// Verify it's gone
let retrieved = storage.get(&job.id).await.unwrap();
assert!(retrieved.is_none());
Source

fn list<'life0, 'life1, 'async_trait>( &'life0 self, state_filter: Option<&'life1 JobState>, limit: Option<usize>, offset: Option<usize>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Job>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List jobs with optional filtering and pagination.

Retrieves multiple jobs from storage with optional filtering by state and pagination support. Useful for building dashboards and monitoring tools.

§Arguments
  • state_filter - Optional job state to filter by (e.g., only failed jobs)
  • limit - Maximum number of jobs to return (None = no limit)
  • offset - Number of jobs to skip for pagination (None = start from beginning)
§Returns
  • Ok(jobs) - Vector of jobs matching the criteria
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, JobState, Storage};

let storage = MemoryStorage::new();

// Create several jobs
for i in 0..10 {
    let job = Job::new("task", vec![i.to_string()]);
    storage.enqueue(&job).await.unwrap();
}

// List all jobs
let all_jobs = storage.list(None, None, None).await.unwrap();
println!("Total jobs: {}", all_jobs.len());

// List first 5 jobs
let first_five = storage.list(None, Some(5), None).await.unwrap();
println!("First 5 jobs: {}", first_five.len());

// List next 5 jobs (pagination)
let next_five = storage.list(None, Some(5), Some(5)).await.unwrap();
println!("Next 5 jobs: {}", next_five.len());
Source

fn get_job_counts<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<HashMap<JobState, usize>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get the count of jobs grouped by their current state.

Returns statistics about job distribution across different states. Useful for monitoring and dashboard displays.

§Returns
  • Ok(counts) - HashMap mapping each job state to its count
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, JobState, Storage};

let storage = MemoryStorage::new();

// Create jobs in different states
let mut job1 = Job::new("task1", vec![]);
storage.enqueue(&job1).await.unwrap();

let mut job2 = Job::new("task2", vec![]);
job2.set_state(JobState::processing("worker-1", "server-1")).unwrap();
storage.update(&job2).await.unwrap();

// Get statistics
let counts = storage.get_job_counts().await;
match counts {
    Ok(counts) => for (state, count) in counts {
        println!("State {:?}: {} jobs", state, count);
    },
    Err(e) => println!("Error: {}", e),
}
Source

fn get_available_jobs<'life0, 'async_trait>( &'life0 self, limit: Option<usize>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Job>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get jobs that are ready to be processed immediately.

Returns jobs that are available for processing: enqueued jobs, scheduled jobs whose time has arrived, and jobs awaiting retry whose retry time has passed.

§Arguments
  • limit - Maximum number of jobs to return (None = no limit)
§Returns
  • Ok(jobs) - Vector of jobs ready for processing
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();

// Enqueue several jobs
for i in 0..5 {
    let job = Job::new("process_item", vec![i.to_string()]);
    storage.enqueue(&job).await.unwrap();
}

// Get available jobs for processing
let available = storage.get_available_jobs(Some(3)).await.unwrap();
println!("Available for processing: {}", available.len());

for job in available {
    println!("Job {} is ready: {}", job.id, job.method);
}
Source

fn fetch_and_lock_job<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, worker_id: &'life1 str, queues: Option<&'life2 [String]>, ) -> Pin<Box<dyn Future<Output = Result<Option<Job>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Atomically fetch and lock a job for processing to prevent race conditions.

This is the primary method for job processing in production environments. It atomically finds an available job, locks it, and marks it as processing in a single operation, preventing multiple workers from processing the same job.

§Race Condition Prevention

Different storage backends use different mechanisms:

  • PostgreSQL: SELECT FOR UPDATE SKIP LOCKED with dedicated lock table
  • Redis: Lua scripts for atomic operations with distributed locking
  • Memory: Mutex-based locking with automatic cleanup
§Arguments
  • worker_id - Unique identifier of the worker claiming the job
  • queues - Optional list of specific queues to fetch from (None = all queues)
§Returns
  • Ok(Some(job)) - Job was successfully fetched and locked
  • Ok(None) - No jobs are available for processing
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();

// Enqueue some jobs
for i in 0..3 {
    let job = Job::with_config(
        "process_item",
        vec![i.to_string()],
        if i == 0 { "critical" } else { "normal" }, // different queues
        i as i32,
        3
    );
    storage.enqueue(&job).await.unwrap();
}

// Worker fetches from any queue
let job = storage.fetch_and_lock_job("worker-1", None).await.unwrap();
match job {
    Some(job) => {
        println!("Worker-1 got job: {} from queue: {}", job.id, job.queue);
        // Job is now locked and marked as processing
    },
    None => println!("No jobs available"),
}

// Worker fetches only from critical queue
let critical_job = storage.fetch_and_lock_job(
    "worker-2",
    Some(&["critical".to_string()])
).await.unwrap();
Source

fn try_acquire_job_lock<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, job_id: &'life1 str, worker_id: &'life2 str, timeout_seconds: u64, ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Try to acquire an explicit lock on a specific job.

Attempts to acquire an exclusive lock on a job for coordination between workers. This is useful for implementing custom job processing logic or manual job management.

§Arguments
  • job_id - The unique identifier of the job to lock
  • worker_id - Unique identifier of the worker trying to acquire the lock
  • timeout_seconds - Lock timeout in seconds (auto-release after this time)
§Returns
  • Ok(true) - Lock was successfully acquired
  • Ok(false) - Lock could not be acquired (already locked by another worker)
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();
let job = Job::new("exclusive_task", vec![]);
storage.enqueue(&job).await.unwrap();

// Worker 1 tries to acquire lock
let acquired = storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();
assert!(acquired);

// Worker 2 tries to acquire the same lock (should fail)
let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
assert!(!acquired);

// Worker 1 releases the lock
storage.release_job_lock(&job.id, "worker-1").await.unwrap();

// Now worker 2 can acquire it
let acquired = storage.try_acquire_job_lock(&job.id, "worker-2", 300).await.unwrap();
assert!(acquired);
Source

fn release_job_lock<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, job_id: &'life1 str, worker_id: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<bool, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Release an explicit lock on a job.

Releases a lock that was previously acquired with try_acquire_job_lock. Only the worker that acquired the lock can release it.

§Arguments
  • job_id - The unique identifier of the job to unlock
  • worker_id - Unique identifier of the worker releasing the lock
§Returns
  • Ok(true) - Lock was successfully released
  • Ok(false) - Lock was not held by this worker (or already expired)
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();
let job = Job::new("task_with_lock", vec![]);
storage.enqueue(&job).await.unwrap();

// Acquire lock
storage.try_acquire_job_lock(&job.id, "worker-1", 300).await.unwrap();

// Do some work...

// Release lock
let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
assert!(released);

// Trying to release again should return false
let released = storage.release_job_lock(&job.id, "worker-1").await.unwrap();
assert!(!released);
Source

fn fetch_available_jobs_atomic<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, worker_id: &'life1 str, limit: Option<usize>, queues: Option<&'life2 [String]>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Job>, StorageError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Atomically fetch multiple available jobs with locking.

Similar to fetch_and_lock_job but fetches multiple jobs in a single atomic operation. Useful for batch processing scenarios where a worker can handle multiple jobs simultaneously.

§Arguments
  • worker_id - Unique identifier of the worker claiming the jobs
  • limit - Maximum number of jobs to fetch (None = implementation default)
  • queues - Optional list of specific queues to fetch from (None = all queues)
§Returns
  • Ok(jobs) - Vector of jobs that were successfully fetched and locked
  • Err(StorageError) - Storage operation failed
§Examples
use qml_rs::{MemoryStorage, Job, Storage};

let storage = MemoryStorage::new();

// Enqueue batch of jobs
for i in 0..10 {
    let job = Job::new("batch_process", vec![i.to_string()]);
    storage.enqueue(&job).await.unwrap();
}

// Worker fetches multiple jobs at once
let jobs = storage.fetch_available_jobs_atomic("worker-1", Some(5), None).await.unwrap();
println!("Worker-1 got {} jobs for batch processing", jobs.len());

for job in jobs {
    println!("Processing job {} with argument: {}", job.id, job.arguments[0]);
    // All jobs are now locked and marked as processing
}

Implementors§