armature_queue/
queue.rs

1//! Queue implementation with Redis backend.
2
3use crate::error::{QueueError, QueueResult};
4use crate::job::{Job, JobData, JobId, JobPriority, JobState};
5use armature_log::{debug, info};
6use chrono::Utc;
7use redis::{AsyncCommands, Client, aio::ConnectionManager};
8use std::time::Duration;
9
10/// Queue configuration.
11#[derive(Debug, Clone)]
12pub struct QueueConfig {
13    /// Redis connection URL
14    pub redis_url: String,
15
16    /// Queue name
17    pub queue_name: String,
18
19    /// Key prefix for Redis keys
20    pub key_prefix: String,
21
22    /// Maximum queue size (0 = unlimited)
23    pub max_size: usize,
24
25    /// Job retention time for completed jobs
26    pub retention_time: Duration,
27}
28
29impl QueueConfig {
30    /// Create a new queue configuration.
31    pub fn new(redis_url: impl Into<String>, queue_name: impl Into<String>) -> Self {
32        let queue_name = queue_name.into();
33        Self {
34            redis_url: redis_url.into(),
35            key_prefix: format!("armature:queue:{}", queue_name),
36            queue_name,
37            max_size: 0,
38            retention_time: Duration::from_secs(86400), // 24 hours
39        }
40    }
41
42    /// Set the key prefix.
43    pub fn with_key_prefix(mut self, prefix: impl Into<String>) -> Self {
44        self.key_prefix = prefix.into();
45        self
46    }
47
48    /// Set the maximum queue size.
49    pub fn with_max_size(mut self, max_size: usize) -> Self {
50        self.max_size = max_size;
51        self
52    }
53
54    /// Set the retention time for completed jobs.
55    pub fn with_retention_time(mut self, retention_time: Duration) -> Self {
56        self.retention_time = retention_time;
57        self
58    }
59
60    /// Build Redis key.
61    fn key(&self, suffix: &str) -> String {
62        format!("{}:{}", self.key_prefix, suffix)
63    }
64}
65
66/// Job queue backed by Redis.
67#[derive(Clone)]
68pub struct Queue {
69    connection: ConnectionManager,
70    config: QueueConfig,
71}
72
73impl Queue {
74    /// Create a new queue.
75    pub async fn new(
76        redis_url: impl Into<String>,
77        queue_name: impl Into<String>,
78    ) -> QueueResult<Self> {
79        let config = QueueConfig::new(redis_url, queue_name);
80        Self::with_config(config).await
81    }
82
83    /// Create a queue with custom configuration.
84    pub async fn with_config(config: QueueConfig) -> QueueResult<Self> {
85        info!("Initializing job queue: {}", config.queue_name);
86        debug!(
87            "Queue config - prefix: {}, max_size: {}",
88            config.key_prefix, config.max_size
89        );
90
91        let client = Client::open(config.redis_url.as_str())
92            .map_err(|e| QueueError::Config(e.to_string()))?;
93
94        let connection = ConnectionManager::new(client).await?;
95
96        info!("Job queue '{}' ready", config.queue_name);
97        Ok(Self { connection, config })
98    }
99
100    /// Enqueue a job.
101    pub async fn enqueue(&self, job_type: impl Into<String>, data: JobData) -> QueueResult<JobId> {
102        let job_type = job_type.into();
103        debug!(
104            "Enqueueing job: {} on queue '{}'",
105            job_type, self.config.queue_name
106        );
107        let job = Job::new(&self.config.queue_name, &job_type, data);
108        self.enqueue_job(job).await
109    }
110
111    /// Enqueue a job with options.
112    pub async fn enqueue_job(&self, job: Job) -> QueueResult<JobId> {
113        // Check queue size limit
114        if self.config.max_size > 0 {
115            let size = self.size().await?;
116            if size >= self.config.max_size {
117                return Err(QueueError::QueueFull);
118            }
119        }
120
121        let job_id = job.id;
122        let mut conn = self.connection.clone();
123
124        // Serialize job
125        let job_json =
126            serde_json::to_string(&job).map_err(|e| QueueError::Serialization(e.to_string()))?;
127
128        // Store job data
129        let job_key = self.config.key(&format!("job:{}", job_id));
130        let _: () = conn
131            .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
132            .await?;
133
134        // Add to appropriate queue based on priority and schedule
135        if job.is_ready() {
136            let queue_key = self.priority_queue_key(job.priority);
137            let score = -(job.priority as i64); // Negative for high priority first
138            let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
139        } else {
140            // Scheduled job
141            let delayed_key = self.config.key("delayed");
142            let score = job.scheduled_at.unwrap().timestamp();
143            let _: () = conn.zadd(&delayed_key, job_id.to_string(), score).await?;
144        }
145
146        Ok(job_id)
147    }
148
149    /// Dequeue the next job.
150    pub async fn dequeue(&self) -> QueueResult<Option<Job>> {
151        self.move_delayed_jobs().await?;
152
153        let mut conn = self.connection.clone();
154
155        // Try to get job from priority queues (high to low)
156        for priority in [
157            JobPriority::Critical,
158            JobPriority::High,
159            JobPriority::Normal,
160            JobPriority::Low,
161        ] {
162            let queue_key = self.priority_queue_key(priority);
163
164            // Pop the highest priority job
165            let result: Option<Vec<String>> = conn.zpopmin(&queue_key, 1).await?;
166
167            if let Some(items) = result {
168                if let Some(job_id_str) = items.first() {
169                    if let Ok(job_id) = job_id_str.parse::<JobId>() {
170                        if let Some(mut job) = self.get_job(job_id).await? {
171                            job.start_processing();
172                            self.save_job(&job).await?;
173
174                            // Add to processing set
175                            let processing_key = self.config.key("processing");
176                            let _: () = conn
177                                .zadd(&processing_key, job_id.to_string(), Utc::now().timestamp())
178                                .await?;
179
180                            return Ok(Some(job));
181                        }
182                    }
183                }
184            }
185        }
186
187        Ok(None)
188    }
189
190    /// Complete a job.
191    pub async fn complete(&self, job_id: JobId) -> QueueResult<()> {
192        if let Some(mut job) = self.get_job(job_id).await? {
193            job.complete();
194            self.save_job(&job).await?;
195            self.remove_from_processing(job_id).await?;
196        }
197        Ok(())
198    }
199
200    /// Fail a job.
201    pub async fn fail(&self, job_id: JobId, error: String) -> QueueResult<()> {
202        if let Some(mut job) = self.get_job(job_id).await? {
203            job.fail(error);
204
205            if job.status.state == JobState::Failed && job.can_retry() {
206                // Retry with backoff
207                let retry_at = Utc::now() + job.backoff_delay();
208                job.scheduled_at = Some(retry_at);
209                self.save_job(&job).await?;
210
211                // Add to delayed queue
212                let mut conn = self.connection.clone();
213                let delayed_key = self.config.key("delayed");
214                let _: () = conn
215                    .zadd(&delayed_key, job_id.to_string(), retry_at.timestamp())
216                    .await?;
217            } else {
218                // Move to dead letter queue
219                self.save_job(&job).await?;
220                let mut conn = self.connection.clone();
221                let dead_key = self.config.key("dead");
222                let _: () = conn
223                    .zadd(&dead_key, job_id.to_string(), Utc::now().timestamp())
224                    .await?;
225            }
226
227            self.remove_from_processing(job_id).await?;
228        }
229        Ok(())
230    }
231
232    /// Get a job by ID.
233    pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<Job>> {
234        let mut conn = self.connection.clone();
235        let job_key = self.config.key(&format!("job:{}", job_id));
236
237        let job_json: Option<String> = conn.get(&job_key).await?;
238
239        if let Some(json) = job_json {
240            let job: Job = serde_json::from_str(&json)
241                .map_err(|e| QueueError::Deserialization(e.to_string()))?;
242            Ok(Some(job))
243        } else {
244            Ok(None)
245        }
246    }
247
248    /// Save a job.
249    async fn save_job(&self, job: &Job) -> QueueResult<()> {
250        let mut conn = self.connection.clone();
251        let job_key = self.config.key(&format!("job:{}", job.id));
252        let job_json =
253            serde_json::to_string(job).map_err(|e| QueueError::Serialization(e.to_string()))?;
254
255        let _: () = conn
256            .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
257            .await?;
258        Ok(())
259    }
260
261    /// Get queue size.
262    pub async fn size(&self) -> QueueResult<usize> {
263        let mut conn = self.connection.clone();
264        let mut total = 0;
265
266        for priority in [
267            JobPriority::Critical,
268            JobPriority::High,
269            JobPriority::Normal,
270            JobPriority::Low,
271        ] {
272            let queue_key = self.priority_queue_key(priority);
273            let count: usize = conn.zcard(&queue_key).await?;
274            total += count;
275        }
276
277        Ok(total)
278    }
279
280    /// Move delayed jobs to ready queue.
281    async fn move_delayed_jobs(&self) -> QueueResult<()> {
282        let mut conn = self.connection.clone();
283        let delayed_key = self.config.key("delayed");
284        let now = Utc::now().timestamp();
285
286        // Get all jobs that are ready
287        let job_ids: Vec<String> = conn.zrangebyscore(&delayed_key, "-inf", now).await?;
288
289        for job_id_str in job_ids {
290            if let Ok(job_id) = job_id_str.parse::<JobId>() {
291                if let Some(job) = self.get_job(job_id).await? {
292                    if job.is_ready() {
293                        // Remove from delayed
294                        let _: () = conn.zrem(&delayed_key, job_id.to_string()).await?;
295
296                        // Add to priority queue
297                        let queue_key = self.priority_queue_key(job.priority);
298                        let score = -(job.priority as i64);
299                        let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
300                    }
301                }
302            }
303        }
304
305        Ok(())
306    }
307
308    /// Remove job from processing set.
309    async fn remove_from_processing(&self, job_id: JobId) -> QueueResult<()> {
310        let mut conn = self.connection.clone();
311        let processing_key = self.config.key("processing");
312        let _: () = conn.zrem(&processing_key, job_id.to_string()).await?;
313        Ok(())
314    }
315
316    /// Get the priority queue key.
317    fn priority_queue_key(&self, priority: JobPriority) -> String {
318        self.config
319            .key(&format!("pending:{:?}", priority).to_lowercase())
320    }
321
322    /// Clear all jobs from the queue.
323    pub async fn clear(&self) -> QueueResult<()> {
324        let mut conn = self.connection.clone();
325
326        let pattern = format!("{}:*", self.config.key_prefix);
327        let keys: Vec<String> = conn.keys(&pattern).await?;
328
329        if !keys.is_empty() {
330            let _: () = conn.del(keys).await?;
331        }
332
333        Ok(())
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn test_queue_config() {
343        let config = QueueConfig::new("redis://localhost:6379", "test");
344        assert_eq!(config.queue_name, "test");
345        assert!(config.key_prefix.contains("test"));
346    }
347
348    #[test]
349    fn test_priority_queue_key() {
350        let config = QueueConfig::new("redis://localhost:6379", "test");
351        assert!(config.key("pending:high").contains("high"));
352    }
353
354    #[test]
355    fn test_queue_config_with_custom_prefix() {
356        let config = QueueConfig::new("redis://localhost:6379", "myqueue").with_key_prefix("app");
357        assert!(config.key_prefix.contains("app"));
358    }
359
360    #[test]
361    fn test_queue_config_default_retention() {
362        let config = QueueConfig::new("redis://localhost:6379", "test");
363        assert_eq!(config.retention_time, Duration::from_secs(86400)); // 1 day
364    }
365
366    #[test]
367    fn test_queue_config_custom_retention() {
368        let retention = Duration::from_secs(3600);
369        let config =
370            QueueConfig::new("redis://localhost:6379", "test").with_retention_time(retention);
371        assert_eq!(config.retention_time, retention);
372    }
373
374    #[test]
375    fn test_queue_config_default_max_size() {
376        let config = QueueConfig::new("redis://localhost:6379", "test");
377        assert_eq!(config.max_size, 0); // 0 means unlimited
378    }
379
380    #[test]
381    fn test_queue_config_custom_max_size() {
382        let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(1000);
383        assert_eq!(config.max_size, 1000);
384    }
385
386    #[test]
387    fn test_queue_key_generation() {
388        let config = QueueConfig::new("redis://localhost:6379", "jobs");
389
390        let pending_key = config.key("pending:normal");
391        let processing_key = config.key("processing");
392        let completed_key = config.key("completed");
393
394        assert!(pending_key.contains("jobs"));
395        assert!(processing_key.contains("jobs"));
396        assert!(completed_key.contains("jobs"));
397    }
398
399    #[test]
400    fn test_queue_config_clone() {
401        let config1 = QueueConfig::new("redis://localhost:6379", "test");
402        let config2 = config1.clone();
403
404        assert_eq!(config1.queue_name, config2.queue_name);
405        assert_eq!(config1.redis_url, config2.redis_url);
406    }
407
408    #[test]
409    fn test_queue_config_different_queues() {
410        let config1 = QueueConfig::new("redis://localhost:6379", "queue1");
411        let config2 = QueueConfig::new("redis://localhost:6379", "queue2");
412
413        assert_ne!(config1.key_prefix, config2.key_prefix);
414    }
415
416    #[test]
417    fn test_queue_config_key_consistency() {
418        let config = QueueConfig::new("redis://localhost:6379", "test");
419
420        let key1 = config.key("pending");
421        let key2 = config.key("pending");
422
423        assert_eq!(key1, key2);
424    }
425
426    #[test]
427    fn test_queue_config_builder_pattern() {
428        let config = QueueConfig::new("redis://localhost:6379", "test")
429            .with_key_prefix("app")
430            .with_retention_time(Duration::from_secs(7200))
431            .with_max_size(500);
432
433        assert!(config.key_prefix.contains("app"));
434        assert_eq!(config.retention_time, Duration::from_secs(7200));
435        assert_eq!(config.max_size, 500);
436    }
437
438    #[test]
439    fn test_queue_config_redis_url() {
440        let url = "redis://user:pass@host:6380/2";
441        let config = QueueConfig::new(url, "test");
442        assert_eq!(config.redis_url, url);
443    }
444
445    #[test]
446    fn test_queue_config_key_with_empty_suffix() {
447        let config = QueueConfig::new("redis://localhost:6379", "test");
448        let key = config.key("");
449        assert!(key.contains("test"));
450    }
451
452    #[test]
453    fn test_queue_config_key_with_special_characters() {
454        let config = QueueConfig::new("redis://localhost:6379", "test");
455        let key = config.key("pending:high:priority");
456        assert!(key.contains("pending:high:priority"));
457    }
458
459    #[test]
460    fn test_queue_config_multiple_prefixes() {
461        let config1 =
462            QueueConfig::new("redis://localhost:6379", "app1").with_key_prefix("production");
463        let config2 =
464            QueueConfig::new("redis://localhost:6379", "app2").with_key_prefix("development");
465
466        let key1 = config1.key("jobs");
467        let key2 = config2.key("jobs");
468
469        assert_ne!(key1, key2);
470    }
471
472    #[test]
473    fn test_queue_config_unlimited_max_size() {
474        let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(0);
475        assert_eq!(config.max_size, 0);
476    }
477
478    #[test]
479    fn test_queue_config_large_retention() {
480        let week = Duration::from_secs(7 * 24 * 3600);
481        let config = QueueConfig::new("redis://localhost:6379", "test").with_retention_time(week);
482        assert_eq!(config.retention_time, week);
483    }
484}