pleme-redis 0.1.0

Redis patterns for the Pleme platform - presence management, caching, and real-time features
Documentation

pleme-redis

Redis patterns and utilities for the Pleme platform, providing reusable infrastructure for distributed systems.

Features

  • Job Queue Processing - FIFO queue with worker pool, retry logic, and anti-stuck mechanisms
  • Presence Management - Online/offline status with TTL
  • Typing Indicators - Ephemeral status for real-time UX
  • Caching - Product-scoped Redis caching
  • Pub/Sub Messaging - Event broadcasting
  • Distributed Locking - Prevent duplicate job execution

All patterns are multi-tenant safe with product scoping.


Job Queue System

Generic job queue infrastructure extracted from product-catalog service. Battle-tested with ~1000 lines of reusable code.

Architecture

┌─────────────────────────────────────────────────────┐
│           YOUR SERVICE (Business Logic)             │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────┐  │
│  │  Your Job    │  │ JobExecutor  │  │ JobStore │  │
│  │   (domain)   │  │  (business)  │  │   (DB)   │  │
│  └──────┬───────┘  └──────┬───────┘  └────┬─────┘  │
│         │                 │                │        │
│         │ implements      │ implements     │ impl   │
│         ▼                 ▼                ▼        │
├─────────────────────────────────────────────────────┤
│              pleme-redis (Infrastructure)           │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌────────────────────────────────────────────┐    │
│  │          Job Trait (Generic)               │    │
│  │  - id(), job_type(), retry_count()         │    │
│  │  - can_retry(), calculate_backoff_delay()  │    │
│  └────────────────────────────────────────────┘    │
│                                                     │
│  ┌────────────────────────────────────────────┐    │
│  │       JobExecutor<J> Trait                 │    │
│  │  - async fn execute(&self, job: &J)        │    │
│  └────────────────────────────────────────────┘    │
│                                                     │
│  ┌────────────────────────────────────────────┐    │
│  │       JobStore<J> Trait                    │    │
│  │  - claim_next_job()                        │    │
│  │  - mark_job_completed()                    │    │
│  │  - schedule_retry()                        │    │
│  │  - find_stale_jobs()                       │    │
│  │  ... 11 methods total                      │    │
│  └────────────────────────────────────────────┘    │
│                                                     │
│  ┌────────────────────────────────────────────┐    │
│  │         WorkerPool<J, E, S>                │    │
│  │  - Worker pool with configurable workers   │    │
│  │  - Exponential backoff retry logic         │    │
│  │  - Stale job detection & recovery          │    │
│  │  - 4-layer anti-stuck mechanisms           │    │
│  └────────────────────────────────────────────┘    │
│                                                     │
│  ┌────────────────────────────────────────────┐    │
│  │          JobQueue (Redis FIFO)             │    │
│  │  - LPUSH/BRPOP for job distribution       │    │
│  │  - Distributed locking (de-duplication)    │    │
│  │  - Queue statistics                        │    │
│  └────────────────────────────────────────────┘    │
│                                                     │
└─────────────────────────────────────────────────────┘

Key Components

1. Job Trait

Define what a job is and how it behaves:

pub trait Job: Send + Sync {
    fn id(&self) -> Uuid;
    fn job_type(&self) -> String;
    fn retry_count(&self) -> i32;
    fn max_retries(&self) -> i32;
    fn parameters(&self) -> &serde_json::Value;
    fn configuration(&self) -> Option<&serde_json::Value>;

    // Optional: Override to customize retry logic
    fn can_retry(&self) -> bool {
        self.retry_count() < self.max_retries()
    }

    // Optional: Override to customize backoff
    fn calculate_backoff_delay(&self) -> i64 {
        // Exponential backoff: 60s, 120s, 240s, 480s, 960s, capped at 3600s
        let base_delay = 60;
        let max_delay = 3600;
        let delay = base_delay * 2_i64.pow(self.retry_count() as u32);
        delay.min(max_delay)
    }

    fn calculate_next_retry_at(&self) -> DateTime<Utc> {
        Utc::now() + chrono::Duration::seconds(self.calculate_backoff_delay())
    }
}

2. JobExecutor Trait

Implement your business logic:

#[async_trait]
pub trait JobExecutor<J: Job>: Send + Sync {
    async fn execute(&self, job: &J) -> Result<serde_json::Value, ExecutorError>;
}

pub enum ExecutorError {
    ExecutionFailed(String),
    ProviderError(String),
    ValidationError(String),
    JobCancelled,
}

3. JobStore Trait

Implement database persistence:

#[async_trait]
pub trait JobStore<J: Job>: Send + Sync {
    // Atomic job claiming (SELECT FOR UPDATE SKIP LOCKED)
    async fn claim_next_job(&self, worker_id: &str) -> Result<Option<J>, StoreError>;

    // Load specific job
    async fn load_job(&self, job_id: Uuid) -> Result<J, StoreError>;

    // Check if job was soft-deleted (cancellation support)
    async fn is_job_deleted(&self, job_id: Uuid) -> Result<bool, StoreError>;

    // State transitions
    async fn mark_job_running(&self, job_id: Uuid) -> Result<(), StoreError>;
    async fn mark_job_completed(&self, job_id: Uuid, results: serde_json::Value) -> Result<(), StoreError>;
    async fn mark_job_failed(&self, job_id: Uuid, error: &str, details: serde_json::Value) -> Result<(), StoreError>;

    // Retry scheduling
    async fn schedule_retry(
        &self,
        job_id: Uuid,
        retry_count: i32,
        next_retry_at: DateTime<Utc>,
        error: &str,
        details: serde_json::Value,
    ) -> Result<(), StoreError>;

    // Recovery mechanisms
    async fn find_orphaned_jobs(&self) -> Result<Vec<(Uuid, String, i32, i32)>, StoreError>;
    async fn find_stale_jobs(&self, threshold_seconds: i64) -> Result<Vec<J>, StoreError>;

    // Monitoring
    async fn count_claimable_jobs(&self) -> Result<i64, StoreError>;
}

4. WorkerPool

Orchestrates job processing with multiple workers:

pub struct WorkerPool<J, E, S>
where
    J: Job + Clone + Send + Sync + 'static,
    E: JobExecutor<J> + Clone + Send + Sync + 'static,
    S: JobStore<J> + Clone + Send + Sync + 'static,
{
    config: WorkerPoolConfig,
    job_queue: JobQueue,
    executor: E,
    store: S,
    // ...
}

pub struct WorkerPoolConfig {
    pub worker_count: usize,              // Number of concurrent workers
    pub queue_key: String,                // Redis queue name
    pub poll_timeout: u64,                // Queue poll timeout (seconds)
    pub max_retries: i32,                 // Maximum retry attempts
    pub base_retry_delay: i64,            // Initial retry delay (seconds)
    pub max_retry_delay: i64,             // Maximum retry delay (seconds)
    pub stale_job_threshold_seconds: i64, // When to consider job stale (default: 7200 = 2h)
    pub stale_check_interval_seconds: u64,// How often to check (default: 300 = 5m)
}

Features

Anti-Stuck Mechanisms (4 Layers)

  1. Queue Timeout → Database Scan (Every 30s)

    • Worker polls Redis, times out
    • Scans database for PENDING jobs
    • Re-enqueues any missed jobs
  2. Progress Updates (Every ~100 items)

    • Executor updates job progress regularly
    • Provides heartbeat signal
    • Enables stale detection
  3. Stale Job Checker (Every 5 minutes)

    • Finds jobs stuck in RUNNING without updates
    • If recoverable: Reset to PENDING + retry
    • If exhausted: Mark FAILED with context
  4. Retry Logic with Exponential Backoff

    • Transient failures → automatic retry
    • Exponential backoff: 60s, 120s, 240s, 480s, 960s
    • Permanent failure after max retries

Guarantees:

  • ✅ No job waits indefinitely (max 30s for PENDING)
  • ✅ No job stuck in RUNNING (max 2h before recovery)
  • ✅ All jobs reach terminal state (COMPLETED or FAILED)
  • ✅ Full error context for debugging

Distributed Locking

Prevents duplicate job execution:

// Jobs with same type + parameters get same lock key
let lock_key = format!("job:lock:{}:{}", job_type, hash(parameters));

// Only one worker can hold lock at a time
if lock_manager.try_lock(&lock_key).await? {
    // Execute job
} else {
    // Skip, another worker processing
}

Atomic Job Claiming

Race-free job claiming with PostgreSQL:

UPDATE jobs
SET status = 'RUNNING', worker_id = $1, claimed_at = NOW()
WHERE id = (
    SELECT id FROM jobs
    WHERE status IN ('PENDING', 'QUEUED')
    ORDER BY created_at ASC
    LIMIT 1
    FOR UPDATE SKIP LOCKED  -- ← Key: Skip locked rows
)
RETURNING *

Usage Example

Step 1: Define Your Job

use pleme_redis::Job as JobTrait;
use uuid::Uuid;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
    pub id: Uuid,
    pub job_type: EmailJobType,
    pub retry_count: i32,
    pub max_retries: i32,
    pub parameters: serde_json::Value,
    pub status: JobStatus,
    // ... other domain-specific fields
}

#[derive(Debug, Clone, Copy)]
pub enum EmailJobType {
    SendWelcomeEmail,
    SendResetPassword,
    SendOrderConfirmation,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum JobStatus {
    Pending,
    Running,
    Completed,
    Failed,
}

// Implement pleme_redis::Job trait
impl pleme_redis::Job for EmailJob {
    fn id(&self) -> Uuid {
        self.id
    }

    fn job_type(&self) -> String {
        format!("{:?}", self.job_type)
    }

    fn retry_count(&self) -> i32 {
        self.retry_count
    }

    fn max_retries(&self) -> i32 {
        self.max_retries
    }

    fn parameters(&self) -> &serde_json::Value {
        &self.parameters
    }

    fn configuration(&self) -> Option<&serde_json::Value> {
        None
    }

    // Optional: Override retry logic
    fn can_retry(&self) -> bool {
        // Custom: Only retry if status is Failed (not Completed or Cancelled)
        self.status == JobStatus::Failed && self.retry_count < self.max_retries
    }
}

Step 2: Implement JobExecutor (Business Logic)

use pleme_redis::{JobExecutor, ExecutorError};
use async_trait::async_trait;

#[derive(Clone)]
pub struct EmailJobExecutor {
    email_client: Arc<EmailClient>,
    template_engine: Arc<TemplateEngine>,
}

#[async_trait]
impl JobExecutor<EmailJob> for EmailJobExecutor {
    async fn execute(&self, job: &EmailJob) -> Result<serde_json::Value, ExecutorError> {
        // Parse job parameters
        let params: EmailParams = serde_json::from_value(job.parameters.clone())
            .map_err(|e| ExecutorError::ValidationError(e.to_string()))?;

        // Business logic
        match job.job_type {
            EmailJobType::SendWelcomeEmail => {
                let template = self.template_engine
                    .render("welcome", &params)
                    .map_err(|e| ExecutorError::ExecutionFailed(e.to_string()))?;

                self.email_client
                    .send(&params.to, &params.subject, &template)
                    .await
                    .map_err(|e| ExecutorError::ProviderError(e.to_string()))?;

                Ok(serde_json::json!({
                    "email_sent": true,
                    "recipient": params.to,
                    "message_id": "msg-123"
                }))
            }

            EmailJobType::SendResetPassword => {
                // Similar implementation...
                todo!()
            }

            EmailJobType::SendOrderConfirmation => {
                // Similar implementation...
                todo!()
            }
        }
    }
}

Step 3: Implement JobStore (Database Persistence)

use pleme_redis::{JobStore as JobStoreTrait, StoreError};
use sqlx::PgPool;
use async_trait::async_trait;

#[derive(Clone)]
pub struct EmailJobStore {
    db_pool: PgPool,
}

#[async_trait]
impl JobStoreTrait<EmailJob> for EmailJobStore {
    async fn claim_next_job(&self, worker_id: &str) -> Result<Option<EmailJob>, StoreError> {
        let mut tx = self.db_pool.begin().await
            .map_err(|e| StoreError::DatabaseError(e.to_string()))?;

        let job = sqlx::query_as::<_, EmailJob>(
            r#"
            UPDATE email_jobs
            SET status = 'RUNNING', worker_id = $1, claimed_at = NOW(), updated_at = NOW()
            WHERE id = (
                SELECT id FROM email_jobs
                WHERE status IN ('PENDING', 'QUEUED') AND deleted_at IS NULL
                ORDER BY created_at ASC LIMIT 1
                FOR UPDATE SKIP LOCKED
            )
            RETURNING *
            "#,
        )
        .bind(worker_id)
        .fetch_optional(&mut *tx)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))?;

        tx.commit().await
            .map_err(|e| StoreError::DatabaseError(e.to_string()))?;

        Ok(job)
    }

    async fn load_job(&self, job_id: Uuid) -> Result<EmailJob, StoreError> {
        sqlx::query_as::<_, EmailJob>(
            "SELECT * FROM email_jobs WHERE id = $1 AND deleted_at IS NULL"
        )
        .bind(job_id)
        .fetch_one(&self.db_pool)
        .await
        .map_err(|e| match e {
            sqlx::Error::RowNotFound => StoreError::JobNotFound(job_id),
            _ => StoreError::DatabaseError(e.to_string()),
        })
    }

    async fn is_job_deleted(&self, job_id: Uuid) -> Result<bool, StoreError> {
        let deleted: Option<bool> = sqlx::query_scalar(
            "SELECT deleted_at IS NOT NULL FROM email_jobs WHERE id = $1"
        )
        .bind(job_id)
        .fetch_optional(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))?;

        Ok(deleted.unwrap_or(true))
    }

    async fn mark_job_running(&self, job_id: Uuid) -> Result<(), StoreError> {
        sqlx::query(
            "UPDATE email_jobs SET status = 'RUNNING', started_at = NOW(), updated_at = NOW() WHERE id = $1"
        )
        .bind(job_id)
        .execute(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
        Ok(())
    }

    async fn mark_job_completed(
        &self,
        job_id: Uuid,
        results: serde_json::Value,
    ) -> Result<(), StoreError> {
        sqlx::query(
            "UPDATE email_jobs SET status = 'COMPLETED', completed_at = NOW(), updated_at = NOW(), results = $2 WHERE id = $1"
        )
        .bind(job_id)
        .bind(results)
        .execute(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
        Ok(())
    }

    async fn schedule_retry(
        &self,
        job_id: Uuid,
        retry_count: i32,
        next_retry_at: DateTime<Utc>,
        error_message: &str,
        error_details: serde_json::Value,
    ) -> Result<(), StoreError> {
        sqlx::query(
            r#"
            UPDATE email_jobs
            SET status = 'PENDING', worker_id = NULL, claimed_at = NULL,
                retry_count = $2, next_retry_at = $3,
                error_message = $4, error_details = $5,
                failed_at = NOW(), updated_at = NOW()
            WHERE id = $1
            "#
        )
        .bind(job_id)
        .bind(retry_count)
        .bind(next_retry_at)
        .bind(error_message)
        .bind(&error_details)
        .execute(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
        Ok(())
    }

    async fn mark_job_failed(
        &self,
        job_id: Uuid,
        error_message: &str,
        error_details: serde_json::Value,
    ) -> Result<(), StoreError> {
        sqlx::query(
            "UPDATE email_jobs SET status = 'FAILED', error_message = $2, error_details = $3, failed_at = NOW(), updated_at = NOW() WHERE id = $1"
        )
        .bind(job_id)
        .bind(error_message)
        .bind(&error_details)
        .execute(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))?;
        Ok(())
    }

    async fn find_orphaned_jobs(&self) -> Result<Vec<(Uuid, String, i32, i32)>, StoreError> {
        sqlx::query_as(
            r#"
            SELECT id, job_type::text, retry_count, max_retries
            FROM email_jobs
            WHERE deleted_at IS NULL
              AND (status = 'PENDING' OR status = 'RUNNING' OR (status = 'FAILED' AND retry_count < max_retries))
            ORDER BY created_at ASC
            "#
        )
        .fetch_all(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))
    }

    async fn find_stale_jobs(&self, threshold_seconds: i64) -> Result<Vec<EmailJob>, StoreError> {
        let threshold = chrono::Duration::seconds(threshold_seconds);
        let cutoff_time = Utc::now() - threshold;

        sqlx::query_as(
            r#"
            SELECT * FROM email_jobs
            WHERE status = 'RUNNING' AND deleted_at IS NULL AND updated_at < $1
            ORDER BY updated_at ASC
            "#
        )
        .bind(cutoff_time)
        .fetch_all(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))
    }

    async fn count_claimable_jobs(&self) -> Result<i64, StoreError> {
        sqlx::query_scalar(
            r#"
            SELECT COUNT(*)
            FROM email_jobs
            WHERE status IN ('PENDING', 'QUEUED') AND deleted_at IS NULL
              AND (worker_id IS NULL OR updated_at < NOW() - INTERVAL '5 minutes')
            "#
        )
        .fetch_one(&self.db_pool)
        .await
        .map_err(|e| StoreError::DatabaseError(e.to_string()))
    }
}

Step 4: Start Worker Pool

use pleme_redis::{WorkerPool, WorkerPoolConfig, JobQueue};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize dependencies
    let db_pool = /* PostgreSQL connection pool */;
    let redis_conn = /* Redis connection */;
    let email_client = Arc::new(EmailClient::new(/* ... */));
    let template_engine = Arc::new(TemplateEngine::new(/* ... */));

    // Create executor and store
    let executor = EmailJobExecutor {
        email_client,
        template_engine,
    };
    let store = EmailJobStore { db_pool };

    // Create job queue
    let job_queue = JobQueue::new(redis_conn, "email_jobs:queue".to_string());

    // Configure worker pool
    let config = WorkerPoolConfig {
        worker_count: 5,                     // 5 concurrent workers
        queue_key: "email_jobs:queue".to_string(),
        poll_timeout: 30,                    // 30 second poll timeout
        max_retries: 3,                      // Retry up to 3 times
        base_retry_delay: 60,                // Start with 60s delay
        max_retry_delay: 3600,               // Max 1 hour delay
        stale_job_threshold_seconds: 7200,   // 2 hour stale threshold
        stale_check_interval_seconds: 300,   // Check every 5 minutes
    };

    // Start worker pool
    let mut worker_pool = WorkerPool::new(config, job_queue, executor, store);

    // This blocks and runs workers
    worker_pool.start().await?;

    Ok(())
}

Step 5: Enqueue Jobs

use pleme_redis::{QueuedJob, JobQueue};

// Create job in database
let job = EmailJob {
    id: Uuid::new_v4(),
    job_type: EmailJobType::SendWelcomeEmail,
    retry_count: 0,
    max_retries: 3,
    parameters: serde_json::json!({
        "to": "user@example.com",
        "subject": "Welcome!",
        "user_name": "Alice"
    }),
    status: JobStatus::Pending,
    // ... other fields
};

sqlx::query("INSERT INTO email_jobs (...) VALUES (...)")
    .execute(&db_pool)
    .await?;

// Enqueue to Redis for worker pickup
let queued_job = QueuedJob {
    job_id: job.id,
    job_type: "SendWelcomeEmail".to_string(),
    parameters: job.parameters.clone(),
    retry_count: 0,
    max_retries: 3,
    enqueued_at: Utc::now(),
};

job_queue.enqueue(queued_job).await?;

Database Schema Example

CREATE TABLE email_jobs (
    id UUID PRIMARY KEY,
    job_type VARCHAR(50) NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'PENDING',

    -- Job parameters
    parameters JSONB NOT NULL,
    configuration JSONB,

    -- Worker tracking
    worker_id VARCHAR(50),
    claimed_at TIMESTAMP,

    -- Execution metadata
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    failed_at TIMESTAMP,

    -- Results and errors
    results JSONB,
    error_message TEXT,
    error_details JSONB,

    -- Retry logic
    retry_count INT NOT NULL DEFAULT 0,
    max_retries INT NOT NULL DEFAULT 3,
    next_retry_at TIMESTAMP,

    -- Multi-tenancy
    product VARCHAR(50) NOT NULL,

    -- Audit fields
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
    deleted_at TIMESTAMP,  -- Soft delete for cancellation

    -- Indexes
    INDEX idx_email_jobs_status (status) WHERE deleted_at IS NULL,
    INDEX idx_email_jobs_retry (next_retry_at) WHERE status = 'PENDING' AND deleted_at IS NULL,
    INDEX idx_email_jobs_stale (updated_at) WHERE status = 'RUNNING' AND deleted_at IS NULL
);

Configuration Tuning

High Throughput (Reliable Infrastructure)

WorkerPoolConfig {
    worker_count: 10,                    // More concurrent workers
    poll_timeout: 10,                    // Faster polling
    stale_job_threshold_seconds: 3600,   // 1 hour (aggressive recovery)
    stale_check_interval_seconds: 60,    // Check every minute
    ..Default::default()
}

Unreliable Network (Careful Recovery)

WorkerPoolConfig {
    worker_count: 3,                     // Fewer workers
    poll_timeout: 60,                    // Longer timeout
    max_retries: 5,                      // More retries
    stale_job_threshold_seconds: 14400,  // 4 hours (patient recovery)
    base_retry_delay: 120,               // Longer initial delay
    ..Default::default()
}

Development/Testing

WorkerPoolConfig {
    worker_count: 2,                     // Lightweight
    poll_timeout: 5,                     // Fast feedback
    stale_job_threshold_seconds: 300,    // 5 minutes
    stale_check_interval_seconds: 60,    // Check frequently
    ..Default::default()
}

Monitoring

Job State Metrics

// Count jobs by state
let metrics = sqlx::query!(
    "SELECT status, COUNT(*) as count FROM email_jobs GROUP BY status"
)
.fetch_all(&pool)
.await?;

Stale Job Detection

// Find jobs that might be stuck
let stuck_jobs = sqlx::query!(
    r#"
    SELECT * FROM email_jobs
    WHERE status = 'RUNNING'
      AND updated_at < NOW() - INTERVAL '30 minutes'
      AND deleted_at IS NULL
    "#
)
.fetch_all(&pool)
.await?;

Retry Rate Analysis

// Average retries for completed jobs
let avg_retries = sqlx::query_scalar!(
    "SELECT AVG(retry_count) FROM email_jobs WHERE status = 'COMPLETED'"
)
.fetch_one(&pool)
.await?;

Failure Analysis

// Most common failure reasons
let failure_reasons = sqlx::query!(
    r#"
    SELECT error_details->>'error_code' as error_code, COUNT(*) as count
    FROM email_jobs
    WHERE status = 'FAILED'
    GROUP BY error_code
    ORDER BY count DESC
    "#
)
.fetch_all(&pool)
.await?;

Migration Guide

From Custom Job Queue Implementation

If your service has a custom job queue similar to product-catalog's original implementation, follow these steps:

1. Add Dependency

# Cargo.toml
[dependencies]
pleme-redis = { path = "path/to/pleme-redis" }

2. Implement Job Trait

// Before: Custom Job struct
pub struct MyJob {
    pub id: Uuid,
    pub job_type: MyJobType,
    pub retry_count: i32,
    // ... other fields
}

// After: Implement pleme_redis::Job trait
use pleme_redis::Job as JobTrait;

impl pleme_redis::Job for MyJob {
    fn id(&self) -> Uuid { self.id }
    fn job_type(&self) -> String { self.job_type.to_string() }
    fn retry_count(&self) -> i32 { self.retry_count }
    fn max_retries(&self) -> i32 { self.max_retries }
    fn parameters(&self) -> &serde_json::Value { &self.parameters }
    fn configuration(&self) -> Option<&serde_json::Value> { self.configuration.as_ref() }

    // Override if you have custom retry logic
    fn can_retry(&self) -> bool {
        self.status == MyJobStatus::Failed && self.retry_count < self.max_retries
    }
}

3. Implement JobExecutor Trait

// Before: Custom executor
pub struct MyExecutor {
    // dependencies
}

impl MyExecutor {
    pub async fn execute(&self, job: &MyJob) -> Result<serde_json::Value, MyError> {
        // business logic
    }
}

// After: Implement pleme_redis::JobExecutor trait
use async_trait::async_trait;

#[derive(Clone)]  // ← Add Clone derive
pub struct MyExecutor {
    // dependencies
}

#[async_trait]
impl pleme_redis::JobExecutor<MyJob> for MyExecutor {
    async fn execute(&self, job: &MyJob) -> Result<serde_json::Value, pleme_redis::ExecutorError> {
        // Wrap existing logic with error conversion
        self.execute_internal(job).await.map_err(|e| {
            pleme_redis::ExecutorError::ExecutionFailed(e.to_string())
        })
    }
}

impl MyExecutor {
    async fn execute_internal(&self, job: &MyJob) -> Result<serde_json::Value, MyError> {
        // Existing business logic unchanged
    }
}

4. Create JobStore Implementation

// NEW: Create JobStore implementation
use pleme_redis::{JobStore as JobStoreTrait, StoreError};

#[derive(Clone)]
pub struct MyJobStore {
    db_pool: PgPool,
}

#[async_trait]
impl JobStoreTrait<MyJob> for MyJobStore {
    async fn claim_next_job(&self, worker_id: &str) -> Result<Option<MyJob>, StoreError> {
        // Migrate existing atomic job claiming logic
        // Use SELECT FOR UPDATE SKIP LOCKED pattern
    }

    // ... implement other 10 methods
    // See full example above
}

5. Update Worker Pool Initialization

// Before: Custom worker pool
let worker_pool = CustomWorkerPool::new(/* ... */);

// After: pleme_redis worker pool
use pleme_redis::{WorkerPool, WorkerPoolConfig, JobQueue};

let job_queue = JobQueue::new(redis_conn, "my_jobs:queue".to_string());
let executor = Arc::new(MyExecutor::new(/* deps */));
let store = Arc::new(MyJobStore::new(db_pool));

let config = WorkerPoolConfig {
    worker_count: 5,
    queue_key: "my_jobs:queue".to_string(),
    poll_timeout: 30,
    max_retries: 3,
    base_retry_delay: 60,
    max_retry_delay: 3600,
    stale_job_threshold_seconds: 7200,
    stale_check_interval_seconds: 300,
};

let mut worker_pool = WorkerPool::new(config, job_queue, executor, store);
worker_pool.start().await?;

6. Remove Old Files

# Delete custom implementations now in pleme-redis
rm src/queue/redis_queue.rs
rm src/queue/worker.rs
# Keep domain-specific files:
# - job.rs (with trait implementation)
# - executor.rs (with trait implementation)
# - job_store.rs (NEW)

7. Update Imports

// Before:
use crate::queue::{RedisQueue, Worker, WorkerPool};

// After:
use pleme_redis::{JobQueue, WorkerPool, WorkerPoolConfig};

Real-World Example: product-catalog

See pkgs/products/novaskyn/services/rust/product-catalog/src/queue/ for a complete migration example:

  • job.rs - Job trait implementation
  • executor.rs - JobExecutor trait implementation
  • job_store.rs - JobStore trait implementation
  • mod.rs - Re-exports pleme-redis types

Other Features

Presence Management

Track user online/offline status with TTL:

use pleme_redis::PresenceManager;

let presence = PresenceManager::new(redis_conn, "novaskyn".to_string());

// Mark user online
presence.set_online(user_id).await?;

// Check if user online
let is_online = presence.is_online(user_id).await?;

// Get all online users
let online_users = presence.get_online_users().await?;

// Automatically expires after TTL (default: 5 minutes)

Typing Indicators

Ephemeral typing status for real-time UX:

use pleme_redis::TypingManager;

let typing = TypingManager::new(redis_conn, "novaskyn".to_string());

// Set user typing in conversation
typing.set_typing(conversation_id, user_id).await?;

// Get users typing
let typing_users = typing.get_typing_users(conversation_id).await?;

// Automatically expires after TTL (default: 5 seconds)

Caching

Product-scoped Redis caching:

use pleme_redis::CacheManager;

let cache = CacheManager::new(redis_conn, "novaskyn".to_string());

// Set cache value
cache.set("product:123", &product, Some(300)).await?;  // 5 min TTL

// Get cache value
let product: Option<Product> = cache.get("product:123").await?;

// Delete cache
cache.delete("product:123").await?;

Pub/Sub Messaging

Event broadcasting:

use pleme_redis::PubSubManager;

let pubsub = PubSubManager::new(redis_conn, "novaskyn".to_string());

// Publish event
pubsub.publish("order:created", &order_event).await?;

// Subscribe to channel
let mut subscription = pubsub.subscribe("order:*").await?;
while let Some(message) = subscription.recv().await {
    // Handle message
}

Distributed Locking

Prevent duplicate operations:

use pleme_redis::LockManager;

let lock_manager = LockManager::new(redis_conn);

// Acquire lock
let lock = lock_manager.try_lock("resource:123", 60).await?;

// Do work while holding lock
if let Some(guard) = lock {
    // Guaranteed exclusive access
    perform_operation().await?;

    // Lock automatically released when guard drops
}

Testing

# Run all tests
cargo test

# Run job queue tests
cargo test --lib job
cargo test --lib worker_pool

# Run with Redis (integration tests)
docker run -d -p 6379:6379 redis:7.2
cargo test

Contributing

When adding new patterns:

  1. Trait-based design - Define traits for abstraction
  2. Generic type parameters - Enable flexibility
  3. Comprehensive tests - Cover edge cases
  4. Clear documentation - Explain usage with examples
  5. Multi-tenant safe - Support product scoping

License

MIT


References

  • Product-Catalog Integration: pkgs/products/novaskyn/services/rust/product-catalog/src/queue/
  • Job Queue Extraction Status: pkgs/libraries/rust/crates/JOB_QUEUE_EXTRACTION_STATUS.md
  • Platform Libraries Evaluation: pkgs/libraries/rust/crates/PLEME_LIBRARIES_EVALUATION.md