Skip to main content

pleme_redis/
job_queue.rs

1//! Redis-based FIFO job queue using LPUSH/BRPOP pattern
2//!
3//! This module provides a Redis-backed job queue with:
4//! - FIFO ordering (jobs processed in submission order)
5//! - Blocking pop with timeout (BRPOP)
6//! - Atomic operations
7//! - Multiple worker support
8//! - Distributed locking for job de-duplication
9
10use redis::aio::ConnectionManager;
11use redis::{AsyncCommands, RedisError};
12use serde::{Deserialize, Serialize};
13use std::collections::hash_map::DefaultHasher;
14use std::hash::{Hash, Hasher};
15use std::time::Duration;
16use thiserror::Error;
17use tokio::time::timeout;
18use uuid::Uuid;
19
20use crate::error::RedisError as PlemeRedisError;
21
22/// Calculate hash of job parameters for unique lock keys
23///
24/// This creates a unique identifier for each parameter combination,
25/// allowing concurrent execution of jobs with different parameters.
26fn calculate_hash(value: &serde_json::Value) -> String {
27    let mut hasher = DefaultHasher::new();
28    // Serialize to string for consistent hashing
29    value.to_string().hash(&mut hasher);
30    format!("{:x}", hasher.finish())
31}
32
33/// Redis-based FIFO job queue using LPUSH/BRPOP pattern
34///
35/// This implementation provides:
36/// - FIFO ordering (jobs processed in submission order)
37/// - Blocking pop with timeout (BRPOP)
38/// - Atomic operations
39/// - Multiple worker support
40pub struct JobQueue {
41    conn: ConnectionManager,
42    queue_key: String,
43}
44
45/// Job message in Redis queue
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct QueuedJob {
48    pub job_id: Uuid,
49    pub job_type: String,
50    pub enqueued_at: chrono::DateTime<chrono::Utc>,
51}
52
53impl JobQueue {
54    /// Create new Redis queue
55    ///
56    /// # Arguments
57    /// * `conn` - Redis multiplexed connection
58    /// * `queue_key` - Redis key for the queue (e.g., "sync_jobs:queue")
59    pub fn new(conn: ConnectionManager, queue_key: String) -> Self {
60        Self { conn, queue_key }
61    }
62
63    /// Enqueue a job (LPUSH)
64    ///
65    /// Adds job to the left side of the list (head). Workers pop from the right (tail).
66    /// This creates FIFO behavior.
67    ///
68    /// # Arguments
69    /// * `job_id` - Job ID to enqueue
70    /// * `job_type` - Job type string
71    ///
72    /// # Returns
73    /// * `Ok(())` - Job enqueued successfully
74    /// * `Err(QueueError)` - Failed to enqueue
75    pub async fn enqueue(&mut self, job_id: Uuid, job_type: String) -> Result<(), QueueError> {
76        tracing::info!(
77            "📮 [REDIS_QUEUE] Enqueueing job - id: {}, type: {}, queue: {}",
78            job_id,
79            job_type,
80            self.queue_key
81        );
82
83        let message = QueuedJob {
84            job_id,
85            job_type: job_type.clone(),
86            enqueued_at: chrono::Utc::now(),
87        };
88
89        let serialized = serde_json::to_string(&message)
90            .map_err(|e| {
91                tracing::error!("❌ [REDIS_QUEUE] Serialization error for job {}: {}", job_id, e);
92                QueueError::SerializationError(e.to_string())
93            })?;
94
95        tracing::debug!("💾 [REDIS_QUEUE] Pushing job {} to Redis list '{}'", job_id, self.queue_key);
96
97        self.conn
98            .lpush::<_, _, ()>(&self.queue_key, serialized)
99            .await
100            .map_err(|e| {
101                tracing::error!("❌ [REDIS_QUEUE] Redis LPUSH error for job {}: {}", job_id, e);
102                QueueError::RedisError(e.to_string())
103            })?;
104
105        tracing::info!("✅ [REDIS_QUEUE] Job {} enqueued successfully (type: {})", job_id, job_type);
106
107        Ok(())
108    }
109
110    /// Dequeue a job with blocking wait (BRPOP)
111    ///
112    /// Blocks until a job is available or timeout is reached.
113    ///
114    /// # Arguments
115    /// * `timeout_seconds` - Maximum time to wait for a job (0 = wait forever)
116    ///
117    /// # Returns
118    /// * `Ok(Some(QueuedJob))` - Job dequeued successfully
119    /// * `Ok(None)` - Timeout reached, no job available
120    /// * `Err(QueueError)` - Failed to dequeue
121    pub async fn dequeue(&mut self, timeout_seconds: u64) -> Result<Option<QueuedJob>, QueueError> {
122        tracing::debug!(
123            "🔍 [REDIS_QUEUE] Waiting for job - queue: {}, timeout: {}s",
124            self.queue_key,
125            timeout_seconds
126        );
127
128        // BRPOP returns (key, value) tuple or nil
129        let result: Option<(String, String)> = self
130            .conn
131            .brpop(&self.queue_key, timeout_seconds as f64)
132            .await
133            .map_err(|e| {
134                tracing::error!("❌ [REDIS_QUEUE] Redis BRPOP error: {}", e);
135                QueueError::RedisError(e.to_string())
136            })?;
137
138        match result {
139            Some((_key, value)) => {
140                let job: QueuedJob = serde_json::from_str(&value)
141                    .map_err(|e| {
142                        tracing::error!("❌ [REDIS_QUEUE] Deserialization error: {}", e);
143                        QueueError::DeserializationError(e.to_string())
144                    })?;
145
146                tracing::info!(
147                    "📥 [REDIS_QUEUE] Job dequeued - id: {}, type: {}, queue_age: {}s",
148                    job.job_id,
149                    job.job_type,
150                    (chrono::Utc::now() - job.enqueued_at).num_seconds()
151                );
152
153                Ok(Some(job))
154            }
155            None => {
156                tracing::debug!("⏱️  [REDIS_QUEUE] Dequeue timeout - no jobs available");
157                Ok(None) // Timeout, no job available
158            }
159        }
160    }
161
162    /// Dequeue with Tokio timeout wrapper
163    ///
164    /// Prevents indefinite blocking by adding application-level timeout.
165    ///
166    /// # Arguments
167    /// * `timeout_duration` - Maximum wait time
168    ///
169    /// # Returns
170    /// * `Ok(Some(QueuedJob))` - Job dequeued
171    /// * `Ok(None)` - Timeout reached
172    /// * `Err(QueueError)` - Error occurred
173    pub async fn dequeue_with_timeout(
174        &mut self,
175        timeout_duration: Duration,
176    ) -> Result<Option<QueuedJob>, QueueError> {
177        let timeout_seconds = timeout_duration.as_secs();
178
179        match timeout(timeout_duration, self.dequeue(timeout_seconds)).await {
180            Ok(Ok(job)) => Ok(job),
181            Ok(Err(e)) => Err(e),
182            Err(_) => Ok(None), // Tokio timeout
183        }
184    }
185
186    /// Get queue length (LLEN)
187    ///
188    /// Returns the number of jobs currently in the queue.
189    pub async fn length(&mut self) -> Result<u64, QueueError> {
190        self.conn
191            .llen(&self.queue_key)
192            .await
193            .map_err(|e| QueueError::RedisError(e.to_string()))
194    }
195
196    /// Peek at next job without removing it (LINDEX)
197    ///
198    /// Returns the job that would be dequeued next, without removing it.
199    pub async fn peek(&mut self) -> Result<Option<QueuedJob>, QueueError> {
200        let result: Option<String> = self
201            .conn
202            .lindex(&self.queue_key, -1) // Last element (right side)
203            .await
204            .map_err(|e| QueueError::RedisError(e.to_string()))?;
205
206        match result {
207            Some(value) => {
208                let job: QueuedJob = serde_json::from_str(&value)
209                    .map_err(|e| QueueError::DeserializationError(e.to_string()))?;
210                Ok(Some(job))
211            }
212            None => Ok(None),
213        }
214    }
215
216    /// Clear all jobs from queue (LTRIM 1 0)
217    ///
218    /// WARNING: This removes all jobs. Use with caution.
219    pub async fn clear(&mut self) -> Result<(), QueueError> {
220        self.conn
221            .ltrim::<_, ()>(&self.queue_key, 1, 0) // Invalid range = clear all
222            .await
223            .map_err(|e| QueueError::RedisError(e.to_string()))?;
224
225        Ok(())
226    }
227
228    /// Get queue statistics
229    pub async fn stats(&mut self) -> Result<QueueStats, QueueError> {
230        let length = self.length().await?;
231        let oldest_job = self.peek().await?;
232
233        let oldest_age = oldest_job.map(|job| {
234            let now = chrono::Utc::now();
235            let age = now - job.enqueued_at;
236            age.num_seconds()
237        });
238
239        Ok(QueueStats {
240            queue_length: length,
241            oldest_job_age_seconds: oldest_age,
242        })
243    }
244
245    /// Check if a job exists in the queue (LRANGE scan)
246    ///
247    /// Scans the queue to see if a job with the given ID exists.
248    /// This is an O(N) operation, use sparingly.
249    ///
250    /// # Arguments
251    /// * `job_id` - Job ID to search for
252    ///
253    /// # Returns
254    /// * `Ok(true)` - Job exists in queue
255    /// * `Ok(false)` - Job not found in queue
256    /// * `Err(QueueError)` - Failed to check
257    pub async fn contains(&mut self, job_id: Uuid) -> Result<bool, QueueError> {
258        // Get all jobs in the queue (LRANGE 0 -1)
259        let queue_items: Vec<String> = self
260            .conn
261            .lrange(&self.queue_key, 0, -1)
262            .await
263            .map_err(|e| QueueError::RedisError(e.to_string()))?;
264
265        // Check if any item contains the job_id
266        for item in queue_items {
267            if let Ok(job) = serde_json::from_str::<QueuedJob>(&item) {
268                if job.job_id == job_id {
269                    return Ok(true);
270                }
271            }
272        }
273
274        Ok(false)
275    }
276
277    /// Attempt to acquire a distributed lock for a job type + parameters combination
278    ///
279    /// Uses Redis SET NX EX to atomically set a key with TTL only if it doesn't exist.
280    /// This prevents multiple jobs of the same type with the same parameters from running
281    /// within the lock window. Jobs with different parameters can run concurrently.
282    ///
283    /// # Arguments
284    /// * `job_type` - Job type to lock (e.g., "FETCH_PROVIDER_PRODUCTS")
285    /// * `parameters` - Job parameters to include in lock key (enables concurrent jobs with different params)
286    /// * `ttl_seconds` - Lock duration in seconds (e.g., 1800 for 30 minutes, 172800 for 48 hours)
287    ///
288    /// # Returns
289    /// * `Ok(true)` - Lock acquired successfully
290    /// * `Ok(false)` - Lock already held by another job
291    /// * `Err(QueueError)` - Redis error occurred
292    pub async fn acquire_job_type_lock(
293        &mut self,
294        job_type: &str,
295        parameters: &serde_json::Value,
296        ttl_seconds: u64,
297    ) -> Result<bool, QueueError> {
298        // Hash parameters to create unique lock key per job variant
299        let params_hash = calculate_hash(parameters);
300        let lock_key = format!("job_lock:{}:{}", job_type, params_hash);
301
302        tracing::debug!(
303            "🔒 [REDIS_LOCK] Attempting to acquire lock - job_type: '{}', params_hash: '{}', TTL: {}s",
304            job_type,
305            params_hash,
306            ttl_seconds
307        );
308
309        // SET NX EX: Set if Not eXists with EXpiration
310        // Returns true if key was set, false if key already existed
311        let acquired: bool = redis::cmd("SET")
312            .arg(&lock_key)
313            .arg("locked")
314            .arg("NX")  // Only set if key doesn't exist
315            .arg("EX")  // Set expiration in seconds
316            .arg(ttl_seconds)
317            .query_async(&mut self.conn)
318            .await
319            .map_err(|e| QueueError::RedisError(e.to_string()))?;
320
321        if acquired {
322            tracing::info!(
323                "✅ [REDIS_LOCK] Lock acquired - job_type: '{}', params_hash: '{}', TTL: {}s ({}h)",
324                job_type,
325                params_hash,
326                ttl_seconds,
327                ttl_seconds / 3600
328            );
329        } else {
330            tracing::warn!(
331                "⏸️  [REDIS_LOCK] Lock already held - job_type: '{}', params_hash: '{}' - job will be skipped",
332                job_type,
333                params_hash
334            );
335        }
336
337        Ok(acquired)
338    }
339
340    /// Check if a job type lock is currently held
341    ///
342    /// # Arguments
343    /// * `job_type` - Job type to check
344    /// * `parameters` - Job parameters
345    ///
346    /// # Returns
347    /// * `Ok(true)` - Lock is held
348    /// * `Ok(false)` - Lock is not held
349    /// * `Err(QueueError)` - Redis error occurred
350    pub async fn is_job_type_locked(
351        &mut self,
352        job_type: &str,
353        parameters: &serde_json::Value,
354    ) -> Result<bool, QueueError> {
355        let params_hash = calculate_hash(parameters);
356        let lock_key = format!("job_lock:{}:{}", job_type, params_hash);
357
358        let exists: bool = self
359            .conn
360            .exists(&lock_key)
361            .await
362            .map_err(|e| QueueError::RedisError(e.to_string()))?;
363
364        Ok(exists)
365    }
366
367    /// Get remaining TTL for a job type lock
368    ///
369    /// # Arguments
370    /// * `job_type` - Job type to check
371    /// * `parameters` - Job parameters
372    ///
373    /// # Returns
374    /// * `Ok(Some(seconds))` - Lock exists with TTL
375    /// * `Ok(None)` - Lock doesn't exist or has no TTL
376    /// * `Err(QueueError)` - Redis error occurred
377    pub async fn get_lock_ttl(
378        &mut self,
379        job_type: &str,
380        parameters: &serde_json::Value,
381    ) -> Result<Option<i64>, QueueError> {
382        let params_hash = calculate_hash(parameters);
383        let lock_key = format!("job_lock:{}:{}", job_type, params_hash);
384
385        let ttl: i64 = self
386            .conn
387            .ttl(&lock_key)
388            .await
389            .map_err(|e| QueueError::RedisError(e.to_string()))?;
390
391        // TTL returns -2 if key doesn't exist, -1 if key exists but has no expire
392        match ttl {
393            -2 => Ok(None),  // Key doesn't exist
394            -1 => Ok(None),  // Key exists but no TTL (shouldn't happen with our locks)
395            seconds => Ok(Some(seconds)),
396        }
397    }
398}
399
400/// Queue statistics
401#[derive(Debug, Clone)]
402pub struct QueueStats {
403    pub queue_length: u64,
404    pub oldest_job_age_seconds: Option<i64>,
405}
406
407/// Queue errors
408#[derive(Error, Debug)]
409pub enum QueueError {
410    #[error("Redis error: {0}")]
411    RedisError(String),
412
413    #[error("Serialization error: {0}")]
414    SerializationError(String),
415
416    #[error("Deserialization error: {0}")]
417    DeserializationError(String),
418
419    #[error("Timeout: no job available")]
420    Timeout,
421}
422
423impl From<RedisError> for QueueError {
424    fn from(err: RedisError) -> Self {
425        QueueError::RedisError(err.to_string())
426    }
427}
428
429impl From<PlemeRedisError> for QueueError {
430    fn from(err: PlemeRedisError) -> Self {
431        QueueError::RedisError(err.to_string())
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_queued_job_serialization() {
441        let job = QueuedJob {
442            job_id: Uuid::new_v4(),
443            job_type: "FETCH_PROVIDER_PRODUCTS".to_string(),
444            enqueued_at: chrono::Utc::now(),
445        };
446
447        let serialized = serde_json::to_string(&job).unwrap();
448        let deserialized: QueuedJob = serde_json::from_str(&serialized).unwrap();
449
450        assert_eq!(job.job_id, deserialized.job_id);
451        assert_eq!(job.job_type, deserialized.job_type);
452    }
453
454    #[test]
455    fn test_queue_stats() {
456        let stats = QueueStats {
457            queue_length: 10,
458            oldest_job_age_seconds: Some(120),
459        };
460
461        assert_eq!(stats.queue_length, 10);
462        assert_eq!(stats.oldest_job_age_seconds, Some(120));
463    }
464
465    #[test]
466    fn test_calculate_hash() {
467        let params1 = serde_json::json!({"provider_id": "dropi", "category": "electronics"});
468        let params2 = serde_json::json!({"provider_id": "dropi", "category": "electronics"});
469        let params3 = serde_json::json!({"provider_id": "dropi", "category": "fashion"});
470
471        let hash1 = calculate_hash(&params1);
472        let hash2 = calculate_hash(&params2);
473        let hash3 = calculate_hash(&params3);
474
475        // Same parameters should produce same hash
476        assert_eq!(hash1, hash2);
477
478        // Different parameters should produce different hash
479        assert_ne!(hash1, hash3);
480    }
481}