ferro_queue/
queue.rs

1//! Queue connection and operations.
2
3use crate::{Error, JobPayload, QueueConfig};
4use chrono::{DateTime, Utc};
5use redis::aio::ConnectionManager;
6use redis::AsyncCommands;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tracing::debug;
10
11/// Queue statistics for introspection
12#[derive(Debug, Clone, Serialize, Deserialize, Default)]
13pub struct QueueStats {
14    /// Stats per queue
15    pub queues: Vec<SingleQueueStats>,
16    /// Total failed jobs count
17    pub total_failed: usize,
18}
19
20/// Statistics for a single queue
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SingleQueueStats {
23    /// Queue name
24    pub name: String,
25    /// Number of pending jobs
26    pub pending: usize,
27    /// Number of delayed jobs
28    pub delayed: usize,
29}
30
31/// Job information for introspection (without full payload data)
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct JobInfo {
34    /// Unique job ID
35    pub id: String,
36    /// Job type name
37    pub job_type: String,
38    /// Queue name
39    pub queue: String,
40    /// Number of attempts made
41    pub attempts: u32,
42    /// Maximum retry attempts
43    pub max_retries: u32,
44    /// When the job was created
45    pub created_at: DateTime<Utc>,
46    /// When the job should be available for processing
47    pub available_at: DateTime<Utc>,
48    /// Job state
49    pub state: JobState,
50}
51
52/// Job state for introspection
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(rename_all = "snake_case")]
55pub enum JobState {
56    Pending,
57    Delayed,
58    Failed,
59}
60
61/// Failed job information
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct FailedJobInfo {
64    /// Job info
65    pub job: JobInfo,
66    /// Error message
67    pub error: String,
68    /// When the job failed
69    pub failed_at: DateTime<Utc>,
70}
71
72/// Stored format for failed jobs
73#[derive(Debug, Deserialize)]
74struct StoredFailedJob {
75    payload: JobPayload,
76    error: String,
77    failed_at: DateTime<Utc>,
78}
79
80/// A connection to the queue backend.
81#[derive(Clone)]
82pub struct QueueConnection {
83    /// Redis connection manager.
84    conn: ConnectionManager,
85    /// Queue configuration.
86    config: Arc<QueueConfig>,
87}
88
89impl QueueConnection {
90    /// Create a new queue connection.
91    pub async fn new(config: QueueConfig) -> Result<Self, Error> {
92        let client = redis::Client::open(config.redis_url.as_str())
93            .map_err(|e| Error::ConnectionFailed(e.to_string()))?;
94
95        let conn = ConnectionManager::new(client)
96            .await
97            .map_err(|e| Error::ConnectionFailed(e.to_string()))?;
98
99        Ok(Self {
100            conn,
101            config: Arc::new(config),
102        })
103    }
104
105    /// Get the configuration.
106    pub fn config(&self) -> &QueueConfig {
107        &self.config
108    }
109
110    /// Push a job to a queue.
111    pub async fn push(&self, payload: JobPayload) -> Result<(), Error> {
112        let queue = &payload.queue;
113        let json = payload.to_json()?;
114
115        if payload.is_available() {
116            // Push to the immediate queue
117            let key = self.config.queue_key(queue);
118            self.conn
119                .clone()
120                .lpush::<_, _, ()>(&key, &json)
121                .await
122                .map_err(Error::Redis)?;
123
124            debug!(queue = queue, job_id = %payload.id, "Job pushed to queue");
125        } else {
126            // Push to the delayed queue (sorted set by available_at timestamp)
127            let key = self.config.delayed_key(queue);
128            let score = payload.available_at.timestamp() as f64;
129            self.conn
130                .clone()
131                .zadd::<_, _, _, ()>(&key, &json, score)
132                .await
133                .map_err(Error::Redis)?;
134
135            debug!(
136                queue = queue,
137                job_id = %payload.id,
138                available_at = %payload.available_at,
139                "Job pushed to delayed queue"
140            );
141        }
142
143        Ok(())
144    }
145
146    /// Pop a job from a queue (blocking).
147    pub async fn pop(&self, queue: &str) -> Result<Option<JobPayload>, Error> {
148        let key = self.config.queue_key(queue);
149        let timeout = self.config.block_timeout.as_secs() as f64;
150
151        // BRPOP returns [key, value] or nil
152        let result: Option<(String, String)> = self
153            .conn
154            .clone()
155            .brpop(&key, timeout)
156            .await
157            .map_err(Error::Redis)?;
158
159        match result {
160            Some((_, json)) => {
161                let mut payload = JobPayload::from_json(&json)?;
162                payload.reserve();
163                Ok(Some(payload))
164            }
165            None => Ok(None),
166        }
167    }
168
169    /// Pop a job from a queue (non-blocking).
170    pub async fn pop_nowait(&self, queue: &str) -> Result<Option<JobPayload>, Error> {
171        let key = self.config.queue_key(queue);
172
173        let result: Option<String> = self
174            .conn
175            .clone()
176            .rpop(&key, None)
177            .await
178            .map_err(Error::Redis)?;
179
180        match result {
181            Some(json) => {
182                let mut payload = JobPayload::from_json(&json)?;
183                payload.reserve();
184                Ok(Some(payload))
185            }
186            None => Ok(None),
187        }
188    }
189
190    /// Move delayed jobs that are ready to the main queue.
191    pub async fn migrate_delayed(&self, queue: &str) -> Result<usize, Error> {
192        let delayed_key = self.config.delayed_key(queue);
193        let queue_key = self.config.queue_key(queue);
194        let now = chrono::Utc::now().timestamp() as f64;
195
196        // Get jobs that are ready (score <= now)
197        let ready_jobs: Vec<String> = self
198            .conn
199            .clone()
200            .zrangebyscore(&delayed_key, "-inf", now)
201            .await
202            .map_err(Error::Redis)?;
203
204        let count = ready_jobs.len();
205
206        for job in ready_jobs {
207            // Remove from delayed set
208            self.conn
209                .clone()
210                .zrem::<_, _, ()>(&delayed_key, &job)
211                .await
212                .map_err(Error::Redis)?;
213
214            // Push to main queue
215            self.conn
216                .clone()
217                .lpush::<_, _, ()>(&queue_key, &job)
218                .await
219                .map_err(Error::Redis)?;
220        }
221
222        if count > 0 {
223            debug!(queue = queue, count = count, "Migrated delayed jobs");
224        }
225
226        Ok(count)
227    }
228
229    /// Release a job back to the queue (for retry).
230    pub async fn release(
231        &self,
232        mut payload: JobPayload,
233        delay: std::time::Duration,
234    ) -> Result<(), Error> {
235        payload.increment_attempts();
236        payload.reserved_at = None;
237
238        if delay.is_zero() {
239            payload.available_at = chrono::Utc::now();
240        } else {
241            payload.available_at =
242                chrono::Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
243        }
244
245        self.push(payload).await
246    }
247
248    /// Mark a job as failed.
249    pub async fn fail(&self, payload: JobPayload, error: &Error) -> Result<(), Error> {
250        let failed_key = self.config.failed_key();
251
252        #[derive(serde::Serialize)]
253        struct FailedJob {
254            payload: JobPayload,
255            error: String,
256            failed_at: chrono::DateTime<chrono::Utc>,
257        }
258
259        let failed = FailedJob {
260            payload,
261            error: error.to_string(),
262            failed_at: chrono::Utc::now(),
263        };
264
265        let json = serde_json::to_string(&failed)
266            .map_err(|e| Error::SerializationFailed(e.to_string()))?;
267
268        self.conn
269            .clone()
270            .lpush::<_, _, ()>(&failed_key, &json)
271            .await
272            .map_err(Error::Redis)?;
273
274        Ok(())
275    }
276
277    /// Get the number of jobs in a queue.
278    pub async fn size(&self, queue: &str) -> Result<usize, Error> {
279        let key = self.config.queue_key(queue);
280        let len: usize = self.conn.clone().llen(&key).await.map_err(Error::Redis)?;
281        Ok(len)
282    }
283
284    /// Get the number of delayed jobs in a queue.
285    pub async fn delayed_size(&self, queue: &str) -> Result<usize, Error> {
286        let key = self.config.delayed_key(queue);
287        let len: usize = self.conn.clone().zcard(&key).await.map_err(Error::Redis)?;
288        Ok(len)
289    }
290
291    /// Clear all jobs from a queue.
292    pub async fn clear(&self, queue: &str) -> Result<(), Error> {
293        let queue_key = self.config.queue_key(queue);
294        let delayed_key = self.config.delayed_key(queue);
295
296        self.conn
297            .clone()
298            .del::<_, ()>(&queue_key)
299            .await
300            .map_err(Error::Redis)?;
301        self.conn
302            .clone()
303            .del::<_, ()>(&delayed_key)
304            .await
305            .map_err(Error::Redis)?;
306
307        Ok(())
308    }
309
310    /// Get pending jobs from a queue (without removing them).
311    pub async fn get_pending_jobs(&self, queue: &str, limit: usize) -> Result<Vec<JobInfo>, Error> {
312        let key = self.config.queue_key(queue);
313        let jobs: Vec<String> = self
314            .conn
315            .clone()
316            .lrange(&key, 0, limit as isize - 1)
317            .await
318            .map_err(Error::Redis)?;
319
320        let mut result = Vec::with_capacity(jobs.len());
321        for json in jobs {
322            if let Ok(payload) = JobPayload::from_json(&json) {
323                result.push(JobInfo {
324                    id: payload.id.to_string(),
325                    job_type: payload.job_type,
326                    queue: payload.queue,
327                    attempts: payload.attempts,
328                    max_retries: payload.max_retries,
329                    created_at: payload.created_at,
330                    available_at: payload.available_at,
331                    state: JobState::Pending,
332                });
333            }
334        }
335        Ok(result)
336    }
337
338    /// Get delayed jobs from a queue (without removing them).
339    pub async fn get_delayed_jobs(&self, queue: &str, limit: usize) -> Result<Vec<JobInfo>, Error> {
340        let key = self.config.delayed_key(queue);
341        let jobs: Vec<String> = self
342            .conn
343            .clone()
344            .zrange(&key, 0, limit as isize - 1)
345            .await
346            .map_err(Error::Redis)?;
347
348        let mut result = Vec::with_capacity(jobs.len());
349        for json in jobs {
350            if let Ok(payload) = JobPayload::from_json(&json) {
351                result.push(JobInfo {
352                    id: payload.id.to_string(),
353                    job_type: payload.job_type,
354                    queue: payload.queue,
355                    attempts: payload.attempts,
356                    max_retries: payload.max_retries,
357                    created_at: payload.created_at,
358                    available_at: payload.available_at,
359                    state: JobState::Delayed,
360                });
361            }
362        }
363        Ok(result)
364    }
365
366    /// Get failed jobs (without removing them).
367    pub async fn get_failed_jobs(&self, limit: usize) -> Result<Vec<FailedJobInfo>, Error> {
368        let key = self.config.failed_key();
369        let jobs: Vec<String> = self
370            .conn
371            .clone()
372            .lrange(&key, 0, limit as isize - 1)
373            .await
374            .map_err(Error::Redis)?;
375
376        let mut result = Vec::with_capacity(jobs.len());
377        for json in jobs {
378            if let Ok(failed) = serde_json::from_str::<StoredFailedJob>(&json) {
379                result.push(FailedJobInfo {
380                    job: JobInfo {
381                        id: failed.payload.id.to_string(),
382                        job_type: failed.payload.job_type,
383                        queue: failed.payload.queue,
384                        attempts: failed.payload.attempts,
385                        max_retries: failed.payload.max_retries,
386                        created_at: failed.payload.created_at,
387                        available_at: failed.payload.available_at,
388                        state: JobState::Failed,
389                    },
390                    error: failed.error,
391                    failed_at: failed.failed_at,
392                });
393            }
394        }
395        Ok(result)
396    }
397
398    /// Get the count of failed jobs.
399    pub async fn failed_count(&self) -> Result<usize, Error> {
400        let key = self.config.failed_key();
401        let len: usize = self.conn.clone().llen(&key).await.map_err(Error::Redis)?;
402        Ok(len)
403    }
404
405    /// Get queue statistics for specified queues.
406    pub async fn get_stats(&self, queues: &[&str]) -> Result<QueueStats, Error> {
407        let mut stats = QueueStats::default();
408
409        for queue in queues {
410            let pending = self.size(queue).await?;
411            let delayed = self.delayed_size(queue).await?;
412            stats.queues.push(SingleQueueStats {
413                name: queue.to_string(),
414                pending,
415                delayed,
416            });
417        }
418
419        stats.total_failed = self.failed_count().await?;
420        Ok(stats)
421    }
422}
423
424/// Queue facade for static access.
425pub struct Queue;
426
427impl Queue {
428    /// Get the global queue connection.
429    pub fn connection() -> &'static QueueConnection {
430        GLOBAL_CONNECTION
431            .get()
432            .expect("Queue not initialized. Call Queue::init() first.")
433    }
434
435    /// Initialize the global queue connection.
436    pub async fn init(config: QueueConfig) -> Result<(), Error> {
437        let conn = QueueConnection::new(config).await?;
438        GLOBAL_CONNECTION
439            .set(conn)
440            .map_err(|_| Error::custom("Queue already initialized"))?;
441        Ok(())
442    }
443
444    /// Check if the queue is initialized.
445    pub fn is_initialized() -> bool {
446        GLOBAL_CONNECTION.get().is_some()
447    }
448}
449
450static GLOBAL_CONNECTION: std::sync::OnceLock<QueueConnection> = std::sync::OnceLock::new();
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    #[test]
457    fn test_queue_stats_default() {
458        let stats = QueueStats::default();
459        assert!(stats.queues.is_empty());
460        assert_eq!(stats.total_failed, 0);
461    }
462
463    #[test]
464    fn test_queue_stats_serialization() {
465        let stats = QueueStats {
466            queues: vec![
467                SingleQueueStats {
468                    name: "default".to_string(),
469                    pending: 5,
470                    delayed: 2,
471                },
472                SingleQueueStats {
473                    name: "emails".to_string(),
474                    pending: 10,
475                    delayed: 0,
476                },
477            ],
478            total_failed: 3,
479        };
480
481        let json = serde_json::to_string(&stats).unwrap();
482        let restored: QueueStats = serde_json::from_str(&json).unwrap();
483
484        assert_eq!(restored.queues.len(), 2);
485        assert_eq!(restored.queues[0].name, "default");
486        assert_eq!(restored.queues[0].pending, 5);
487        assert_eq!(restored.queues[1].name, "emails");
488        assert_eq!(restored.total_failed, 3);
489    }
490
491    #[test]
492    fn test_single_queue_stats_clone() {
493        let stats = SingleQueueStats {
494            name: "test".to_string(),
495            pending: 10,
496            delayed: 5,
497        };
498
499        let cloned = stats.clone();
500        assert_eq!(cloned.name, stats.name);
501        assert_eq!(cloned.pending, stats.pending);
502        assert_eq!(cloned.delayed, stats.delayed);
503    }
504
505    #[test]
506    fn test_job_state_serialization() {
507        assert_eq!(
508            serde_json::to_string(&JobState::Pending).unwrap(),
509            "\"pending\""
510        );
511        assert_eq!(
512            serde_json::to_string(&JobState::Delayed).unwrap(),
513            "\"delayed\""
514        );
515        assert_eq!(
516            serde_json::to_string(&JobState::Failed).unwrap(),
517            "\"failed\""
518        );
519    }
520
521    #[test]
522    fn test_job_state_deserialization() {
523        let pending: JobState = serde_json::from_str("\"pending\"").unwrap();
524        let delayed: JobState = serde_json::from_str("\"delayed\"").unwrap();
525        let failed: JobState = serde_json::from_str("\"failed\"").unwrap();
526
527        assert!(matches!(pending, JobState::Pending));
528        assert!(matches!(delayed, JobState::Delayed));
529        assert!(matches!(failed, JobState::Failed));
530    }
531
532    #[test]
533    fn test_job_info_serialization() {
534        let now = Utc::now();
535        let job_info = JobInfo {
536            id: "job-123".to_string(),
537            job_type: "SendEmailJob".to_string(),
538            queue: "emails".to_string(),
539            attempts: 2,
540            max_retries: 3,
541            created_at: now,
542            available_at: now,
543            state: JobState::Pending,
544        };
545
546        let json = serde_json::to_string(&job_info).unwrap();
547        let restored: JobInfo = serde_json::from_str(&json).unwrap();
548
549        assert_eq!(restored.id, "job-123");
550        assert_eq!(restored.job_type, "SendEmailJob");
551        assert_eq!(restored.queue, "emails");
552        assert_eq!(restored.attempts, 2);
553        assert_eq!(restored.max_retries, 3);
554        assert!(matches!(restored.state, JobState::Pending));
555    }
556
557    #[test]
558    fn test_job_info_clone() {
559        let now = Utc::now();
560        let job_info = JobInfo {
561            id: "job-456".to_string(),
562            job_type: "ProcessOrder".to_string(),
563            queue: "orders".to_string(),
564            attempts: 0,
565            max_retries: 5,
566            created_at: now,
567            available_at: now,
568            state: JobState::Delayed,
569        };
570
571        let cloned = job_info.clone();
572        assert_eq!(cloned.id, job_info.id);
573        assert_eq!(cloned.job_type, job_info.job_type);
574    }
575
576    #[test]
577    fn test_failed_job_info_serialization() {
578        let now = Utc::now();
579        let failed_job = FailedJobInfo {
580            job: JobInfo {
581                id: "job-789".to_string(),
582                job_type: "FailingJob".to_string(),
583                queue: "default".to_string(),
584                attempts: 3,
585                max_retries: 3,
586                created_at: now,
587                available_at: now,
588                state: JobState::Failed,
589            },
590            error: "Connection refused".to_string(),
591            failed_at: now,
592        };
593
594        let json = serde_json::to_string(&failed_job).unwrap();
595        let restored: FailedJobInfo = serde_json::from_str(&json).unwrap();
596
597        assert_eq!(restored.job.id, "job-789");
598        assert_eq!(restored.error, "Connection refused");
599        assert!(matches!(restored.job.state, JobState::Failed));
600    }
601
602    #[test]
603    fn test_failed_job_info_clone() {
604        let now = Utc::now();
605        let failed_job = FailedJobInfo {
606            job: JobInfo {
607                id: "job-999".to_string(),
608                job_type: "TestJob".to_string(),
609                queue: "test".to_string(),
610                attempts: 1,
611                max_retries: 3,
612                created_at: now,
613                available_at: now,
614                state: JobState::Failed,
615            },
616            error: "Test error".to_string(),
617            failed_at: now,
618        };
619
620        let cloned = failed_job.clone();
621        assert_eq!(cloned.job.id, failed_job.job.id);
622        assert_eq!(cloned.error, failed_job.error);
623    }
624
625    #[test]
626    fn test_job_state_debug() {
627        assert!(format!("{:?}", JobState::Pending).contains("Pending"));
628        assert!(format!("{:?}", JobState::Delayed).contains("Delayed"));
629        assert!(format!("{:?}", JobState::Failed).contains("Failed"));
630    }
631
632    // Note: Tests for actual queue operations (get_pending_jobs, get_delayed_jobs, etc.)
633    // require integration tests with a running Redis instance.
634}