pleme-redis
Redis patterns and utilities for the Pleme platform, providing reusable infrastructure for distributed systems.
Features
- Job Queue Processing - FIFO queue with worker pool, retry logic, and anti-stuck mechanisms
- Presence Management - Online/offline status with TTL
- Typing Indicators - Ephemeral status for real-time UX
- Caching - Product-scoped Redis caching
- Pub/Sub Messaging - Event broadcasting
- Distributed Locking - Prevent duplicate job execution
All patterns are multi-tenant safe with product scoping.
Job Queue System
Generic job queue infrastructure extracted from product-catalog service. Battle-tested with ~1000 lines of reusable code.
Architecture
┌─────────────────────────────────────────────────────┐
│ YOUR SERVICE (Business Logic) │
├─────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ Your Job │ │ JobExecutor │ │ JobStore │ │
│ │ (domain) │ │ (business) │ │ (DB) │ │
│ └──────┬───────┘ └──────┬───────┘ └────┬─────┘ │
│ │ │ │ │
│ │ implements │ implements │ impl │
│ ▼ ▼ ▼ │
├─────────────────────────────────────────────────────┤
│ pleme-redis (Infrastructure) │
├─────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ Job Trait (Generic) │ │
│ │ - id(), job_type(), retry_count() │ │
│ │ - can_retry(), calculate_backoff_delay() │ │
│ └────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ JobExecutor<J> Trait │ │
│ │ - async fn execute(&self, job: &J) │ │
│ └────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ JobStore<J> Trait │ │
│ │ - claim_next_job() │ │
│ │ - mark_job_completed() │ │
│ │ - schedule_retry() │ │
│ │ - find_stale_jobs() │ │
│ │ ... 11 methods total │ │
│ └────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ WorkerPool<J, E, S> │ │
│ │ - Worker pool with configurable workers │ │
│ │ - Exponential backoff retry logic │ │
│ │ - Stale job detection & recovery │ │
│ │ - 4-layer anti-stuck mechanisms │ │
│ └────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ JobQueue (Redis FIFO) │ │
│ │ - LPUSH/BRPOP for job distribution │ │
│ │ - Distributed locking (de-duplication) │ │
│ │ - Queue statistics │ │
│ └────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
Key Components
1. Job Trait
Define what a job is and how it behaves:
pub trait Job: Send + Sync {
fn id(&self) -> Uuid;
fn job_type(&self) -> String;
fn retry_count(&self) -> i32;
fn max_retries(&self) -> i32;
fn parameters(&self) -> &serde_json::Value;
fn configuration(&self) -> Option<&serde_json::Value>;
fn can_retry(&self) -> bool {
self.retry_count() < self.max_retries()
}
fn calculate_backoff_delay(&self) -> i64 {
let base_delay = 60;
let max_delay = 3600;
let delay = base_delay * 2_i64.pow(self.retry_count() as u32);
delay.min(max_delay)
}
fn calculate_next_retry_at(&self) -> DateTime<Utc> {
Utc::now() + chrono::Duration::seconds(self.calculate_backoff_delay())
}
}
2. JobExecutor Trait
Implement your business logic:
#[async_trait]
pub trait JobExecutor<J: Job>: Send + Sync {
async fn execute(&self, job: &J) -> Result<serde_json::Value, ExecutorError>;
}
pub enum ExecutorError {
ExecutionFailed(String),
ProviderError(String),
ValidationError(String),
JobCancelled,
}
3. JobStore Trait
Implement database persistence:
#[async_trait]
pub trait JobStore<J: Job>: Send + Sync {
async fn claim_next_job(&self, worker_id: &str) -> Result<Option<J>, StoreError>;
async fn load_job(&self, job_id: Uuid) -> Result<J, StoreError>;
async fn is_job_deleted(&self, job_id: Uuid) -> Result<bool, StoreError>;
async fn mark_job_running(&self, job_id: Uuid) -> Result<(), StoreError>;
async fn mark_job_completed(&self, job_id: Uuid, results: serde_json::Value) -> Result<(), StoreError>;
async fn mark_job_failed(&self, job_id: Uuid, error: &str, details: serde_json::Value) -> Result<(), StoreError>;
async fn schedule_retry(
&self,
job_id: Uuid,
retry_count: i32,
next_retry_at: DateTime<Utc>,
error: &str,
details: serde_json::Value,
) -> Result<(), StoreError>;
async fn find_orphaned_jobs(&self) -> Result<Vec<(Uuid, String, i32, i32)>, StoreError>;
async fn find_stale_jobs(&self, threshold_seconds: i64) -> Result<Vec<J>, StoreError>;
async fn count_claimable_jobs(&self) -> Result<i64, StoreError>;
}
4. WorkerPool
Orchestrates job processing with multiple workers:
pub struct WorkerPool<J, E, S>
where
J: Job + Clone + Send + Sync + 'static,
E: JobExecutor<J> + Clone + Send + Sync + 'static,
S: JobStore<J> + Clone + Send + Sync + 'static,
{
config: WorkerPoolConfig,
job_queue: JobQueue,
executor: E,
store: S,
}
pub struct WorkerPoolConfig {
pub worker_count: usize, pub queue_key: String, pub poll_timeout: u64, pub max_retries: i32, pub base_retry_delay: i64, pub max_retry_delay: i64, pub stale_job_threshold_seconds: i64, pub stale_check_interval_seconds: u64,}
Features
Anti-Stuck Mechanisms (4 Layers)
-
Queue Timeout → Database Scan (Every 30s)
- Worker polls Redis, times out
- Scans database for PENDING jobs
- Re-enqueues any missed jobs
-
Progress Updates (Every ~100 items)
- Executor updates job progress regularly
- Provides heartbeat signal
- Enables stale detection
-
Stale Job Checker (Every 5 minutes)
- Finds jobs stuck in RUNNING without updates
- If recoverable: Reset to PENDING + retry
- If exhausted: Mark FAILED with context
-
Retry Logic with Exponential Backoff
- Transient failures → automatic retry
- Exponential backoff: 60s, 120s, 240s, 480s, 960s
- Permanent failure after max retries
Guarantees:
- ✅ No job waits indefinitely (max 30s for PENDING)
- ✅ No job stuck in RUNNING (max 2h before recovery)
- ✅ All jobs reach terminal state (COMPLETED or FAILED)
- ✅ Full error context for debugging
Distributed Locking
Prevents duplicate job execution:
let lock_key = format!("job:lock:{}:{}", job_type, hash(parameters));
if lock_manager.try_lock(&lock_key).await? {
} else {
}
Atomic Job Claiming
Race-free job claiming with PostgreSQL:
UPDATE jobs
SET status = 'RUNNING', worker_id = $1, claimed_at = NOW()
WHERE id = (
SELECT id FROM jobs
WHERE status IN ('PENDING', 'QUEUED')
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED )
RETURNING *
Usage Example
Step 1: Define Your Job
use pleme_redis::Job as JobTrait;
use uuid::Uuid;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
pub id: Uuid,
pub job_type: EmailJobType,
pub retry_count: i32,
pub max_retries: i32,
pub parameters: serde_json::Value,
pub status: JobStatus,
}
#[derive(Debug, Clone, Copy)]
pub enum EmailJobType {
SendWelcomeEmail,
SendResetPassword,
SendOrderConfirmation,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum JobStatus {
Pending,
Running,
Completed,
Failed,
}
impl pleme_redis::Job for EmailJob {
fn id(&self) -> Uuid {
self.id
}
fn job_type(&self) -> String {
format!("{:?}", self.job_type)
}
fn retry_count(&self) -> i32 {
self.retry_count
}
fn max_retries(&self) -> i32 {
self.max_retries
}
fn parameters(&self) -> &serde_json::Value {
&self.parameters
}
fn configuration(&self) -> Option<&serde_json::Value> {
None
}
fn can_retry(&self) -> bool {
self.status == JobStatus::Failed && self.retry_count < self.max_retries
}
}
Step 2: Implement JobExecutor (Business Logic)
use pleme_redis::{JobExecutor, ExecutorError};
use async_trait::async_trait;
#[derive(Clone)]
pub struct EmailJobExecutor {
email_client: Arc<EmailClient>,
template_engine: Arc<TemplateEngine>,
}
#[async_trait]
impl JobExecutor<EmailJob> for EmailJobExecutor {
async fn execute(&self, job: &EmailJob) -> Result<serde_json::Value, ExecutorError> {
let params: EmailParams = serde_json::from_value(job.parameters.clone())
.map_err(|e| ExecutorError::ValidationError(e.to_string()))?;
match job.job_type {
EmailJobType::SendWelcomeEmail => {
let template = self.template_engine
.render("welcome", ¶ms)
.map_err(|e| ExecutorError::ExecutionFailed(e.to_string()))?;
self.email_client
.send(¶ms.to, ¶ms.subject, &template)
.await
.map_err(|e| ExecutorError::ProviderError(e.to_string()))?;
Ok(serde_json::json!({
"email_sent": true,
"recipient": params.to,
"message_id": "msg-123"
}))
}
EmailJobType::SendResetPassword => {
todo!()
}
EmailJobType::SendOrderConfirmation => {
todo!()
}
}
}
}
Step 3: Implement JobStore (Database Persistence)
use pleme_redis::{JobStore as JobStoreTrait, StoreError};
use sqlx::PgPool;
use async_trait::async_trait;
#[derive(Clone)]
pub struct EmailJobStore {
db_pool: PgPool,
}
#[async_trait]
impl JobStoreTrait<EmailJob> for EmailJobStore {
async fn claim_next_job(&self, worker_id: &str) -> Result<Option<EmailJob>, StoreError> {
let mut tx = self.db_pool.begin().await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
let job = sqlx::query_as::<_, EmailJob>(
r#"
UPDATE email_jobs
SET status = 'RUNNING', worker_id = $1, claimed_at = NOW(), updated_at = NOW()
WHERE id = (
SELECT id FROM email_jobs
WHERE status IN ('PENDING', 'QUEUED') AND deleted_at IS NULL
ORDER BY created_at ASC LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *
"#,
)
.bind(worker_id)
.fetch_optional(&mut *tx)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
tx.commit().await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
Ok(job)
}
async fn load_job(&self, job_id: Uuid) -> Result<EmailJob, StoreError> {
sqlx::query_as::<_, EmailJob>(
"SELECT * FROM email_jobs WHERE id = $1 AND deleted_at IS NULL"
)
.bind(job_id)
.fetch_one(&self.db_pool)
.await
.map_err(|e| match e {
sqlx::Error::RowNotFound => StoreError::JobNotFound(job_id),
_ => StoreError::DatabaseError(e.to_string()),
})
}
async fn is_job_deleted(&self, job_id: Uuid) -> Result<bool, StoreError> {
let deleted: Option<bool> = sqlx::query_scalar(
"SELECT deleted_at IS NOT NULL FROM email_jobs WHERE id = $1"
)
.bind(job_id)
.fetch_optional(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
Ok(deleted.unwrap_or(true))
}
async fn mark_job_running(&self, job_id: Uuid) -> Result<(), StoreError> {
sqlx::query(
"UPDATE email_jobs SET status = 'RUNNING', started_at = NOW(), updated_at = NOW() WHERE id = $1"
)
.bind(job_id)
.execute(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
Ok(())
}
async fn mark_job_completed(
&self,
job_id: Uuid,
results: serde_json::Value,
) -> Result<(), StoreError> {
sqlx::query(
"UPDATE email_jobs SET status = 'COMPLETED', completed_at = NOW(), updated_at = NOW(), results = $2 WHERE id = $1"
)
.bind(job_id)
.bind(results)
.execute(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
Ok(())
}
async fn schedule_retry(
&self,
job_id: Uuid,
retry_count: i32,
next_retry_at: DateTime<Utc>,
error_message: &str,
error_details: serde_json::Value,
) -> Result<(), StoreError> {
sqlx::query(
r#"
UPDATE email_jobs
SET status = 'PENDING', worker_id = NULL, claimed_at = NULL,
retry_count = $2, next_retry_at = $3,
error_message = $4, error_details = $5,
failed_at = NOW(), updated_at = NOW()
WHERE id = $1
"#
)
.bind(job_id)
.bind(retry_count)
.bind(next_retry_at)
.bind(error_message)
.bind(&error_details)
.execute(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
Ok(())
}
async fn mark_job_failed(
&self,
job_id: Uuid,
error_message: &str,
error_details: serde_json::Value,
) -> Result<(), StoreError> {
sqlx::query(
"UPDATE email_jobs SET status = 'FAILED', error_message = $2, error_details = $3, failed_at = NOW(), updated_at = NOW() WHERE id = $1"
)
.bind(job_id)
.bind(error_message)
.bind(&error_details)
.execute(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))?;
Ok(())
}
async fn find_orphaned_jobs(&self) -> Result<Vec<(Uuid, String, i32, i32)>, StoreError> {
sqlx::query_as(
r#"
SELECT id, job_type::text, retry_count, max_retries
FROM email_jobs
WHERE deleted_at IS NULL
AND (status = 'PENDING' OR status = 'RUNNING' OR (status = 'FAILED' AND retry_count < max_retries))
ORDER BY created_at ASC
"#
)
.fetch_all(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))
}
async fn find_stale_jobs(&self, threshold_seconds: i64) -> Result<Vec<EmailJob>, StoreError> {
let threshold = chrono::Duration::seconds(threshold_seconds);
let cutoff_time = Utc::now() - threshold;
sqlx::query_as(
r#"
SELECT * FROM email_jobs
WHERE status = 'RUNNING' AND deleted_at IS NULL AND updated_at < $1
ORDER BY updated_at ASC
"#
)
.bind(cutoff_time)
.fetch_all(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))
}
async fn count_claimable_jobs(&self) -> Result<i64, StoreError> {
sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM email_jobs
WHERE status IN ('PENDING', 'QUEUED') AND deleted_at IS NULL
AND (worker_id IS NULL OR updated_at < NOW() - INTERVAL '5 minutes')
"#
)
.fetch_one(&self.db_pool)
.await
.map_err(|e| StoreError::DatabaseError(e.to_string()))
}
}
Step 4: Start Worker Pool
use pleme_redis::{WorkerPool, WorkerPoolConfig, JobQueue};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_pool = ;
let redis_conn = ;
let email_client = Arc::new(EmailClient::new());
let template_engine = Arc::new(TemplateEngine::new());
let executor = EmailJobExecutor {
email_client,
template_engine,
};
let store = EmailJobStore { db_pool };
let job_queue = JobQueue::new(redis_conn, "email_jobs:queue".to_string());
let config = WorkerPoolConfig {
worker_count: 5, queue_key: "email_jobs:queue".to_string(),
poll_timeout: 30, max_retries: 3, base_retry_delay: 60, max_retry_delay: 3600, stale_job_threshold_seconds: 7200, stale_check_interval_seconds: 300, };
let mut worker_pool = WorkerPool::new(config, job_queue, executor, store);
worker_pool.start().await?;
Ok(())
}
Step 5: Enqueue Jobs
use pleme_redis::{QueuedJob, JobQueue};
let job = EmailJob {
id: Uuid::new_v4(),
job_type: EmailJobType::SendWelcomeEmail,
retry_count: 0,
max_retries: 3,
parameters: serde_json::json!({
"to": "user@example.com",
"subject": "Welcome!",
"user_name": "Alice"
}),
status: JobStatus::Pending,
};
sqlx::query("INSERT INTO email_jobs (...) VALUES (...)")
.execute(&db_pool)
.await?;
let queued_job = QueuedJob {
job_id: job.id,
job_type: "SendWelcomeEmail".to_string(),
parameters: job.parameters.clone(),
retry_count: 0,
max_retries: 3,
enqueued_at: Utc::now(),
};
job_queue.enqueue(queued_job).await?;
Database Schema Example
CREATE TABLE email_jobs (
id UUID PRIMARY KEY,
job_type VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
parameters JSONB NOT NULL,
configuration JSONB,
worker_id VARCHAR(50),
claimed_at TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
failed_at TIMESTAMP,
results JSONB,
error_message TEXT,
error_details JSONB,
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
next_retry_at TIMESTAMP,
product VARCHAR(50) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMP,
INDEX idx_email_jobs_status (status) WHERE deleted_at IS NULL,
INDEX idx_email_jobs_retry (next_retry_at) WHERE status = 'PENDING' AND deleted_at IS NULL,
INDEX idx_email_jobs_stale (updated_at) WHERE status = 'RUNNING' AND deleted_at IS NULL
);
Configuration Tuning
High Throughput (Reliable Infrastructure)
WorkerPoolConfig {
worker_count: 10, poll_timeout: 10, stale_job_threshold_seconds: 3600, stale_check_interval_seconds: 60, ..Default::default()
}
Unreliable Network (Careful Recovery)
WorkerPoolConfig {
worker_count: 3, poll_timeout: 60, max_retries: 5, stale_job_threshold_seconds: 14400, base_retry_delay: 120, ..Default::default()
}
Development/Testing
WorkerPoolConfig {
worker_count: 2, poll_timeout: 5, stale_job_threshold_seconds: 300, stale_check_interval_seconds: 60, ..Default::default()
}
Monitoring
Job State Metrics
let metrics = sqlx::query!(
"SELECT status, COUNT(*) as count FROM email_jobs GROUP BY status"
)
.fetch_all(&pool)
.await?;
Stale Job Detection
let stuck_jobs = sqlx::query!(
r#"
SELECT * FROM email_jobs
WHERE status = 'RUNNING'
AND updated_at < NOW() - INTERVAL '30 minutes'
AND deleted_at IS NULL
"#
)
.fetch_all(&pool)
.await?;
Retry Rate Analysis
let avg_retries = sqlx::query_scalar!(
"SELECT AVG(retry_count) FROM email_jobs WHERE status = 'COMPLETED'"
)
.fetch_one(&pool)
.await?;
Failure Analysis
let failure_reasons = sqlx::query!(
r#"
SELECT error_details->>'error_code' as error_code, COUNT(*) as count
FROM email_jobs
WHERE status = 'FAILED'
GROUP BY error_code
ORDER BY count DESC
"#
)
.fetch_all(&pool)
.await?;
Migration Guide
From Custom Job Queue Implementation
If your service has a custom job queue similar to product-catalog's original implementation, follow these steps:
1. Add Dependency
[dependencies]
pleme-redis = { path = "path/to/pleme-redis" }
2. Implement Job Trait
pub struct MyJob {
pub id: Uuid,
pub job_type: MyJobType,
pub retry_count: i32,
}
use pleme_redis::Job as JobTrait;
impl pleme_redis::Job for MyJob {
fn id(&self) -> Uuid { self.id }
fn job_type(&self) -> String { self.job_type.to_string() }
fn retry_count(&self) -> i32 { self.retry_count }
fn max_retries(&self) -> i32 { self.max_retries }
fn parameters(&self) -> &serde_json::Value { &self.parameters }
fn configuration(&self) -> Option<&serde_json::Value> { self.configuration.as_ref() }
fn can_retry(&self) -> bool {
self.status == MyJobStatus::Failed && self.retry_count < self.max_retries
}
}
3. Implement JobExecutor Trait
pub struct MyExecutor {
}
impl MyExecutor {
pub async fn execute(&self, job: &MyJob) -> Result<serde_json::Value, MyError> {
}
}
use async_trait::async_trait;
#[derive(Clone)] pub struct MyExecutor {
}
#[async_trait]
impl pleme_redis::JobExecutor<MyJob> for MyExecutor {
async fn execute(&self, job: &MyJob) -> Result<serde_json::Value, pleme_redis::ExecutorError> {
self.execute_internal(job).await.map_err(|e| {
pleme_redis::ExecutorError::ExecutionFailed(e.to_string())
})
}
}
impl MyExecutor {
async fn execute_internal(&self, job: &MyJob) -> Result<serde_json::Value, MyError> {
}
}
4. Create JobStore Implementation
use pleme_redis::{JobStore as JobStoreTrait, StoreError};
#[derive(Clone)]
pub struct MyJobStore {
db_pool: PgPool,
}
#[async_trait]
impl JobStoreTrait<MyJob> for MyJobStore {
async fn claim_next_job(&self, worker_id: &str) -> Result<Option<MyJob>, StoreError> {
}
}
5. Update Worker Pool Initialization
let worker_pool = CustomWorkerPool::new();
use pleme_redis::{WorkerPool, WorkerPoolConfig, JobQueue};
let job_queue = JobQueue::new(redis_conn, "my_jobs:queue".to_string());
let executor = Arc::new(MyExecutor::new());
let store = Arc::new(MyJobStore::new(db_pool));
let config = WorkerPoolConfig {
worker_count: 5,
queue_key: "my_jobs:queue".to_string(),
poll_timeout: 30,
max_retries: 3,
base_retry_delay: 60,
max_retry_delay: 3600,
stale_job_threshold_seconds: 7200,
stale_check_interval_seconds: 300,
};
let mut worker_pool = WorkerPool::new(config, job_queue, executor, store);
worker_pool.start().await?;
6. Remove Old Files
rm src/queue/redis_queue.rs
rm src/queue/worker.rs
7. Update Imports
use crate::queue::{RedisQueue, Worker, WorkerPool};
use pleme_redis::{JobQueue, WorkerPool, WorkerPoolConfig};
Real-World Example: product-catalog
See pkgs/products/novaskyn/services/rust/product-catalog/src/queue/ for a complete migration example:
job.rs - Job trait implementation
executor.rs - JobExecutor trait implementation
job_store.rs - JobStore trait implementation
mod.rs - Re-exports pleme-redis types
Other Features
Presence Management
Track user online/offline status with TTL:
use pleme_redis::PresenceManager;
let presence = PresenceManager::new(redis_conn, "novaskyn".to_string());
presence.set_online(user_id).await?;
let is_online = presence.is_online(user_id).await?;
let online_users = presence.get_online_users().await?;
Typing Indicators
Ephemeral typing status for real-time UX:
use pleme_redis::TypingManager;
let typing = TypingManager::new(redis_conn, "novaskyn".to_string());
typing.set_typing(conversation_id, user_id).await?;
let typing_users = typing.get_typing_users(conversation_id).await?;
Caching
Product-scoped Redis caching:
use pleme_redis::CacheManager;
let cache = CacheManager::new(redis_conn, "novaskyn".to_string());
cache.set("product:123", &product, Some(300)).await?;
let product: Option<Product> = cache.get("product:123").await?;
cache.delete("product:123").await?;
Pub/Sub Messaging
Event broadcasting:
use pleme_redis::PubSubManager;
let pubsub = PubSubManager::new(redis_conn, "novaskyn".to_string());
pubsub.publish("order:created", &order_event).await?;
let mut subscription = pubsub.subscribe("order:*").await?;
while let Some(message) = subscription.recv().await {
}
Distributed Locking
Prevent duplicate operations:
use pleme_redis::LockManager;
let lock_manager = LockManager::new(redis_conn);
let lock = lock_manager.try_lock("resource:123", 60).await?;
if let Some(guard) = lock {
perform_operation().await?;
}
Testing
cargo test
cargo test --lib job
cargo test --lib worker_pool
docker run -d -p 6379:6379 redis:7.2
cargo test
Contributing
When adding new patterns:
- Trait-based design - Define traits for abstraction
- Generic type parameters - Enable flexibility
- Comprehensive tests - Cover edge cases
- Clear documentation - Explain usage with examples
- Multi-tenant safe - Support product scoping
License
MIT
References
- Product-Catalog Integration:
pkgs/products/novaskyn/services/rust/product-catalog/src/queue/
- Job Queue Extraction Status:
pkgs/libraries/rust/crates/JOB_QUEUE_EXTRACTION_STATUS.md
- Platform Libraries Evaluation:
pkgs/libraries/rust/crates/PLEME_LIBRARIES_EVALUATION.md