1use redis::aio::ConnectionManager;
11use redis::{AsyncCommands, RedisError};
12use serde::{Deserialize, Serialize};
13use std::collections::hash_map::DefaultHasher;
14use std::hash::{Hash, Hasher};
15use std::time::Duration;
16use thiserror::Error;
17use tokio::time::timeout;
18use uuid::Uuid;
19
20use crate::error::RedisError as PlemeRedisError;
21
22fn calculate_hash(value: &serde_json::Value) -> String {
27 let mut hasher = DefaultHasher::new();
28 value.to_string().hash(&mut hasher);
30 format!("{:x}", hasher.finish())
31}
32
33pub struct JobQueue {
41 conn: ConnectionManager,
42 queue_key: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct QueuedJob {
48 pub job_id: Uuid,
49 pub job_type: String,
50 pub enqueued_at: chrono::DateTime<chrono::Utc>,
51}
52
53impl JobQueue {
54 pub fn new(conn: ConnectionManager, queue_key: String) -> Self {
60 Self { conn, queue_key }
61 }
62
63 pub async fn enqueue(&mut self, job_id: Uuid, job_type: String) -> Result<(), QueueError> {
76 tracing::info!(
77 "📮 [REDIS_QUEUE] Enqueueing job - id: {}, type: {}, queue: {}",
78 job_id,
79 job_type,
80 self.queue_key
81 );
82
83 let message = QueuedJob {
84 job_id,
85 job_type: job_type.clone(),
86 enqueued_at: chrono::Utc::now(),
87 };
88
89 let serialized = serde_json::to_string(&message)
90 .map_err(|e| {
91 tracing::error!("❌ [REDIS_QUEUE] Serialization error for job {}: {}", job_id, e);
92 QueueError::SerializationError(e.to_string())
93 })?;
94
95 tracing::debug!("💾 [REDIS_QUEUE] Pushing job {} to Redis list '{}'", job_id, self.queue_key);
96
97 self.conn
98 .lpush::<_, _, ()>(&self.queue_key, serialized)
99 .await
100 .map_err(|e| {
101 tracing::error!("❌ [REDIS_QUEUE] Redis LPUSH error for job {}: {}", job_id, e);
102 QueueError::RedisError(e.to_string())
103 })?;
104
105 tracing::info!("✅ [REDIS_QUEUE] Job {} enqueued successfully (type: {})", job_id, job_type);
106
107 Ok(())
108 }
109
110 pub async fn dequeue(&mut self, timeout_seconds: u64) -> Result<Option<QueuedJob>, QueueError> {
122 tracing::debug!(
123 "🔍 [REDIS_QUEUE] Waiting for job - queue: {}, timeout: {}s",
124 self.queue_key,
125 timeout_seconds
126 );
127
128 let result: Option<(String, String)> = self
130 .conn
131 .brpop(&self.queue_key, timeout_seconds as f64)
132 .await
133 .map_err(|e| {
134 tracing::error!("❌ [REDIS_QUEUE] Redis BRPOP error: {}", e);
135 QueueError::RedisError(e.to_string())
136 })?;
137
138 match result {
139 Some((_key, value)) => {
140 let job: QueuedJob = serde_json::from_str(&value)
141 .map_err(|e| {
142 tracing::error!("❌ [REDIS_QUEUE] Deserialization error: {}", e);
143 QueueError::DeserializationError(e.to_string())
144 })?;
145
146 tracing::info!(
147 "📥 [REDIS_QUEUE] Job dequeued - id: {}, type: {}, queue_age: {}s",
148 job.job_id,
149 job.job_type,
150 (chrono::Utc::now() - job.enqueued_at).num_seconds()
151 );
152
153 Ok(Some(job))
154 }
155 None => {
156 tracing::debug!("⏱️ [REDIS_QUEUE] Dequeue timeout - no jobs available");
157 Ok(None) }
159 }
160 }
161
162 pub async fn dequeue_with_timeout(
174 &mut self,
175 timeout_duration: Duration,
176 ) -> Result<Option<QueuedJob>, QueueError> {
177 let timeout_seconds = timeout_duration.as_secs();
178
179 match timeout(timeout_duration, self.dequeue(timeout_seconds)).await {
180 Ok(Ok(job)) => Ok(job),
181 Ok(Err(e)) => Err(e),
182 Err(_) => Ok(None), }
184 }
185
186 pub async fn length(&mut self) -> Result<u64, QueueError> {
190 self.conn
191 .llen(&self.queue_key)
192 .await
193 .map_err(|e| QueueError::RedisError(e.to_string()))
194 }
195
196 pub async fn peek(&mut self) -> Result<Option<QueuedJob>, QueueError> {
200 let result: Option<String> = self
201 .conn
202 .lindex(&self.queue_key, -1) .await
204 .map_err(|e| QueueError::RedisError(e.to_string()))?;
205
206 match result {
207 Some(value) => {
208 let job: QueuedJob = serde_json::from_str(&value)
209 .map_err(|e| QueueError::DeserializationError(e.to_string()))?;
210 Ok(Some(job))
211 }
212 None => Ok(None),
213 }
214 }
215
216 pub async fn clear(&mut self) -> Result<(), QueueError> {
220 self.conn
221 .ltrim::<_, ()>(&self.queue_key, 1, 0) .await
223 .map_err(|e| QueueError::RedisError(e.to_string()))?;
224
225 Ok(())
226 }
227
228 pub async fn stats(&mut self) -> Result<QueueStats, QueueError> {
230 let length = self.length().await?;
231 let oldest_job = self.peek().await?;
232
233 let oldest_age = oldest_job.map(|job| {
234 let now = chrono::Utc::now();
235 let age = now - job.enqueued_at;
236 age.num_seconds()
237 });
238
239 Ok(QueueStats {
240 queue_length: length,
241 oldest_job_age_seconds: oldest_age,
242 })
243 }
244
245 pub async fn contains(&mut self, job_id: Uuid) -> Result<bool, QueueError> {
258 let queue_items: Vec<String> = self
260 .conn
261 .lrange(&self.queue_key, 0, -1)
262 .await
263 .map_err(|e| QueueError::RedisError(e.to_string()))?;
264
265 for item in queue_items {
267 if let Ok(job) = serde_json::from_str::<QueuedJob>(&item) {
268 if job.job_id == job_id {
269 return Ok(true);
270 }
271 }
272 }
273
274 Ok(false)
275 }
276
277 pub async fn acquire_job_type_lock(
293 &mut self,
294 job_type: &str,
295 parameters: &serde_json::Value,
296 ttl_seconds: u64,
297 ) -> Result<bool, QueueError> {
298 let params_hash = calculate_hash(parameters);
300 let lock_key = format!("job_lock:{}:{}", job_type, params_hash);
301
302 tracing::debug!(
303 "🔒 [REDIS_LOCK] Attempting to acquire lock - job_type: '{}', params_hash: '{}', TTL: {}s",
304 job_type,
305 params_hash,
306 ttl_seconds
307 );
308
309 let acquired: bool = redis::cmd("SET")
312 .arg(&lock_key)
313 .arg("locked")
314 .arg("NX") .arg("EX") .arg(ttl_seconds)
317 .query_async(&mut self.conn)
318 .await
319 .map_err(|e| QueueError::RedisError(e.to_string()))?;
320
321 if acquired {
322 tracing::info!(
323 "✅ [REDIS_LOCK] Lock acquired - job_type: '{}', params_hash: '{}', TTL: {}s ({}h)",
324 job_type,
325 params_hash,
326 ttl_seconds,
327 ttl_seconds / 3600
328 );
329 } else {
330 tracing::warn!(
331 "⏸️ [REDIS_LOCK] Lock already held - job_type: '{}', params_hash: '{}' - job will be skipped",
332 job_type,
333 params_hash
334 );
335 }
336
337 Ok(acquired)
338 }
339
340 pub async fn is_job_type_locked(
351 &mut self,
352 job_type: &str,
353 parameters: &serde_json::Value,
354 ) -> Result<bool, QueueError> {
355 let params_hash = calculate_hash(parameters);
356 let lock_key = format!("job_lock:{}:{}", job_type, params_hash);
357
358 let exists: bool = self
359 .conn
360 .exists(&lock_key)
361 .await
362 .map_err(|e| QueueError::RedisError(e.to_string()))?;
363
364 Ok(exists)
365 }
366
367 pub async fn get_lock_ttl(
378 &mut self,
379 job_type: &str,
380 parameters: &serde_json::Value,
381 ) -> Result<Option<i64>, QueueError> {
382 let params_hash = calculate_hash(parameters);
383 let lock_key = format!("job_lock:{}:{}", job_type, params_hash);
384
385 let ttl: i64 = self
386 .conn
387 .ttl(&lock_key)
388 .await
389 .map_err(|e| QueueError::RedisError(e.to_string()))?;
390
391 match ttl {
393 -2 => Ok(None), -1 => Ok(None), seconds => Ok(Some(seconds)),
396 }
397 }
398}
399
400#[derive(Debug, Clone)]
402pub struct QueueStats {
403 pub queue_length: u64,
404 pub oldest_job_age_seconds: Option<i64>,
405}
406
407#[derive(Error, Debug)]
409pub enum QueueError {
410 #[error("Redis error: {0}")]
411 RedisError(String),
412
413 #[error("Serialization error: {0}")]
414 SerializationError(String),
415
416 #[error("Deserialization error: {0}")]
417 DeserializationError(String),
418
419 #[error("Timeout: no job available")]
420 Timeout,
421}
422
423impl From<RedisError> for QueueError {
424 fn from(err: RedisError) -> Self {
425 QueueError::RedisError(err.to_string())
426 }
427}
428
429impl From<PlemeRedisError> for QueueError {
430 fn from(err: PlemeRedisError) -> Self {
431 QueueError::RedisError(err.to_string())
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_queued_job_serialization() {
441 let job = QueuedJob {
442 job_id: Uuid::new_v4(),
443 job_type: "FETCH_PROVIDER_PRODUCTS".to_string(),
444 enqueued_at: chrono::Utc::now(),
445 };
446
447 let serialized = serde_json::to_string(&job).unwrap();
448 let deserialized: QueuedJob = serde_json::from_str(&serialized).unwrap();
449
450 assert_eq!(job.job_id, deserialized.job_id);
451 assert_eq!(job.job_type, deserialized.job_type);
452 }
453
454 #[test]
455 fn test_queue_stats() {
456 let stats = QueueStats {
457 queue_length: 10,
458 oldest_job_age_seconds: Some(120),
459 };
460
461 assert_eq!(stats.queue_length, 10);
462 assert_eq!(stats.oldest_job_age_seconds, Some(120));
463 }
464
465 #[test]
466 fn test_calculate_hash() {
467 let params1 = serde_json::json!({"provider_id": "dropi", "category": "electronics"});
468 let params2 = serde_json::json!({"provider_id": "dropi", "category": "electronics"});
469 let params3 = serde_json::json!({"provider_id": "dropi", "category": "fashion"});
470
471 let hash1 = calculate_hash(¶ms1);
472 let hash2 = calculate_hash(¶ms2);
473 let hash3 = calculate_hash(¶ms3);
474
475 assert_eq!(hash1, hash2);
477
478 assert_ne!(hash1, hash3);
480 }
481}