armature_queue/
queue.rs

1//! Queue implementation with Redis backend.
2
3use crate::error::{QueueError, QueueResult};
4use crate::job::{Job, JobData, JobId, JobPriority, JobState};
5use armature_log::{debug, info};
6use chrono::Utc;
7use redis::{AsyncCommands, Client, aio::ConnectionManager};
8use std::time::Duration;
9
10/// Queue configuration.
11#[derive(Debug, Clone)]
12pub struct QueueConfig {
13    /// Redis connection URL
14    pub redis_url: String,
15
16    /// Queue name
17    pub queue_name: String,
18
19    /// Key prefix for Redis keys
20    pub key_prefix: String,
21
22    /// Maximum queue size (0 = unlimited)
23    pub max_size: usize,
24
25    /// Job retention time for completed jobs
26    pub retention_time: Duration,
27}
28
29impl QueueConfig {
30    /// Create a new queue configuration.
31    pub fn new(redis_url: impl Into<String>, queue_name: impl Into<String>) -> Self {
32        let queue_name = queue_name.into();
33        Self {
34            redis_url: redis_url.into(),
35            key_prefix: format!("armature:queue:{}", queue_name),
36            queue_name,
37            max_size: 0,
38            retention_time: Duration::from_secs(86400), // 24 hours
39        }
40    }
41
42    /// Set the key prefix.
43    pub fn with_key_prefix(mut self, prefix: impl Into<String>) -> Self {
44        self.key_prefix = prefix.into();
45        self
46    }
47
48    /// Set the maximum queue size.
49    pub fn with_max_size(mut self, max_size: usize) -> Self {
50        self.max_size = max_size;
51        self
52    }
53
54    /// Set the retention time for completed jobs.
55    pub fn with_retention_time(mut self, retention_time: Duration) -> Self {
56        self.retention_time = retention_time;
57        self
58    }
59
60    /// Build Redis key.
61    fn key(&self, suffix: &str) -> String {
62        format!("{}:{}", self.key_prefix, suffix)
63    }
64}
65
66/// Job queue backed by Redis.
67#[derive(Clone)]
68pub struct Queue {
69    connection: ConnectionManager,
70    config: QueueConfig,
71}
72
73impl Queue {
74    /// Create a new queue.
75    pub async fn new(
76        redis_url: impl Into<String>,
77        queue_name: impl Into<String>,
78    ) -> QueueResult<Self> {
79        let config = QueueConfig::new(redis_url, queue_name);
80        Self::with_config(config).await
81    }
82
83    /// Create a queue with custom configuration.
84    pub async fn with_config(config: QueueConfig) -> QueueResult<Self> {
85        info!("Initializing job queue: {}", config.queue_name);
86        debug!(
87            "Queue config - prefix: {}, max_size: {}",
88            config.key_prefix, config.max_size
89        );
90
91        let client = Client::open(config.redis_url.as_str())
92            .map_err(|e| QueueError::Config(e.to_string()))?;
93
94        let connection = ConnectionManager::new(client).await?;
95
96        info!("Job queue '{}' ready", config.queue_name);
97        Ok(Self { connection, config })
98    }
99
100    /// Enqueue a job.
101    pub async fn enqueue(&self, job_type: impl Into<String>, data: JobData) -> QueueResult<JobId> {
102        let job_type = job_type.into();
103        debug!(
104            "Enqueueing job: {} on queue '{}'",
105            job_type, self.config.queue_name
106        );
107        let job = Job::new(&self.config.queue_name, &job_type, data);
108        self.enqueue_job(job).await
109    }
110
111    /// Enqueue a job with options.
112    pub async fn enqueue_job(&self, job: Job) -> QueueResult<JobId> {
113        // Check queue size limit
114        if self.config.max_size > 0 {
115            let size = self.size().await?;
116            if size >= self.config.max_size {
117                return Err(QueueError::QueueFull);
118            }
119        }
120
121        let job_id = job.id;
122        let mut conn = self.connection.clone();
123
124        // Serialize job
125        let job_json =
126            serde_json::to_string(&job).map_err(|e| QueueError::Serialization(e.to_string()))?;
127
128        // Store job data
129        let job_key = self.config.key(&format!("job:{}", job_id));
130        let _: () = conn
131            .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
132            .await?;
133
134        // Add to appropriate queue based on priority and schedule
135        if job.is_ready() {
136            let queue_key = self.priority_queue_key(job.priority);
137            let score = -(job.priority as i64); // Negative for high priority first
138            let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
139        } else {
140            // Scheduled job
141            let delayed_key = self.config.key("delayed");
142            let score = job.scheduled_at.unwrap().timestamp();
143            let _: () = conn.zadd(&delayed_key, job_id.to_string(), score).await?;
144        }
145
146        Ok(job_id)
147    }
148
149    /// Dequeue the next job.
150    pub async fn dequeue(&self) -> QueueResult<Option<Job>> {
151        self.move_delayed_jobs().await?;
152
153        let mut conn = self.connection.clone();
154
155        // Try to get job from priority queues (high to low)
156        for priority in [
157            JobPriority::Critical,
158            JobPriority::High,
159            JobPriority::Normal,
160            JobPriority::Low,
161        ] {
162            let queue_key = self.priority_queue_key(priority);
163
164            // Pop the highest priority job
165            let result: Option<Vec<String>> = conn.zpopmin(&queue_key, 1).await?;
166
167            if let Some(items) = result
168                && let Some(job_id_str) = items.first()
169                && let Ok(job_id) = job_id_str.parse::<JobId>()
170                && let Some(mut job) = self.get_job(job_id).await?
171            {
172                job.start_processing();
173                self.save_job(&job).await?;
174
175                // Add to processing set
176                let processing_key = self.config.key("processing");
177                let _: () = conn
178                    .zadd(&processing_key, job_id.to_string(), Utc::now().timestamp())
179                    .await?;
180
181                return Ok(Some(job));
182            }
183        }
184
185        Ok(None)
186    }
187
188    /// Complete a job.
189    pub async fn complete(&self, job_id: JobId) -> QueueResult<()> {
190        if let Some(mut job) = self.get_job(job_id).await? {
191            job.complete();
192            self.save_job(&job).await?;
193            self.remove_from_processing(job_id).await?;
194        }
195        Ok(())
196    }
197
198    /// Fail a job.
199    pub async fn fail(&self, job_id: JobId, error: String) -> QueueResult<()> {
200        if let Some(mut job) = self.get_job(job_id).await? {
201            job.fail(error);
202
203            if job.status.state == JobState::Failed && job.can_retry() {
204                // Retry with backoff
205                let retry_at = Utc::now() + job.backoff_delay();
206                job.scheduled_at = Some(retry_at);
207                self.save_job(&job).await?;
208
209                // Add to delayed queue
210                let mut conn = self.connection.clone();
211                let delayed_key = self.config.key("delayed");
212                let _: () = conn
213                    .zadd(&delayed_key, job_id.to_string(), retry_at.timestamp())
214                    .await?;
215            } else {
216                // Move to dead letter queue
217                self.save_job(&job).await?;
218                let mut conn = self.connection.clone();
219                let dead_key = self.config.key("dead");
220                let _: () = conn
221                    .zadd(&dead_key, job_id.to_string(), Utc::now().timestamp())
222                    .await?;
223            }
224
225            self.remove_from_processing(job_id).await?;
226        }
227        Ok(())
228    }
229
230    /// Get a job by ID.
231    pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<Job>> {
232        let mut conn = self.connection.clone();
233        let job_key = self.config.key(&format!("job:{}", job_id));
234
235        let job_json: Option<String> = conn.get(&job_key).await?;
236
237        if let Some(json) = job_json {
238            let job: Job = serde_json::from_str(&json)
239                .map_err(|e| QueueError::Deserialization(e.to_string()))?;
240            Ok(Some(job))
241        } else {
242            Ok(None)
243        }
244    }
245
246    /// Save a job.
247    async fn save_job(&self, job: &Job) -> QueueResult<()> {
248        let mut conn = self.connection.clone();
249        let job_key = self.config.key(&format!("job:{}", job.id));
250        let job_json =
251            serde_json::to_string(job).map_err(|e| QueueError::Serialization(e.to_string()))?;
252
253        let _: () = conn
254            .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
255            .await?;
256        Ok(())
257    }
258
259    /// Get queue size.
260    pub async fn size(&self) -> QueueResult<usize> {
261        let mut conn = self.connection.clone();
262        let mut total = 0;
263
264        for priority in [
265            JobPriority::Critical,
266            JobPriority::High,
267            JobPriority::Normal,
268            JobPriority::Low,
269        ] {
270            let queue_key = self.priority_queue_key(priority);
271            let count: usize = conn.zcard(&queue_key).await?;
272            total += count;
273        }
274
275        Ok(total)
276    }
277
278    /// Move delayed jobs to ready queue.
279    async fn move_delayed_jobs(&self) -> QueueResult<()> {
280        let mut conn = self.connection.clone();
281        let delayed_key = self.config.key("delayed");
282        let now = Utc::now().timestamp();
283
284        // Get all jobs that are ready
285        let job_ids: Vec<String> = conn.zrangebyscore(&delayed_key, "-inf", now).await?;
286
287        for job_id_str in job_ids {
288            if let Ok(job_id) = job_id_str.parse::<JobId>()
289                && let Some(job) = self.get_job(job_id).await?
290                && job.is_ready()
291            {
292                // Remove from delayed
293                let _: () = conn.zrem(&delayed_key, job_id.to_string()).await?;
294
295                // Add to priority queue
296                let queue_key = self.priority_queue_key(job.priority);
297                let score = -(job.priority as i64);
298                let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
299            }
300        }
301
302        Ok(())
303    }
304
305    /// Remove job from processing set.
306    async fn remove_from_processing(&self, job_id: JobId) -> QueueResult<()> {
307        let mut conn = self.connection.clone();
308        let processing_key = self.config.key("processing");
309        let _: () = conn.zrem(&processing_key, job_id.to_string()).await?;
310        Ok(())
311    }
312
313    /// Get the priority queue key.
314    fn priority_queue_key(&self, priority: JobPriority) -> String {
315        self.config
316            .key(&format!("pending:{:?}", priority).to_lowercase())
317    }
318
319    /// Clear all jobs from the queue.
320    pub async fn clear(&self) -> QueueResult<()> {
321        let mut conn = self.connection.clone();
322
323        let pattern = format!("{}:*", self.config.key_prefix);
324        let keys: Vec<String> = conn.keys(&pattern).await?;
325
326        if !keys.is_empty() {
327            let _: () = conn.del(keys).await?;
328        }
329
330        Ok(())
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn test_queue_config() {
340        let config = QueueConfig::new("redis://localhost:6379", "test");
341        assert_eq!(config.queue_name, "test");
342        assert!(config.key_prefix.contains("test"));
343    }
344
345    #[test]
346    fn test_priority_queue_key() {
347        let config = QueueConfig::new("redis://localhost:6379", "test");
348        assert!(config.key("pending:high").contains("high"));
349    }
350
351    #[test]
352    fn test_queue_config_with_custom_prefix() {
353        let config = QueueConfig::new("redis://localhost:6379", "myqueue").with_key_prefix("app");
354        assert!(config.key_prefix.contains("app"));
355    }
356
357    #[test]
358    fn test_queue_config_default_retention() {
359        let config = QueueConfig::new("redis://localhost:6379", "test");
360        assert_eq!(config.retention_time, Duration::from_secs(86400)); // 1 day
361    }
362
363    #[test]
364    fn test_queue_config_custom_retention() {
365        let retention = Duration::from_secs(3600);
366        let config =
367            QueueConfig::new("redis://localhost:6379", "test").with_retention_time(retention);
368        assert_eq!(config.retention_time, retention);
369    }
370
371    #[test]
372    fn test_queue_config_default_max_size() {
373        let config = QueueConfig::new("redis://localhost:6379", "test");
374        assert_eq!(config.max_size, 0); // 0 means unlimited
375    }
376
377    #[test]
378    fn test_queue_config_custom_max_size() {
379        let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(1000);
380        assert_eq!(config.max_size, 1000);
381    }
382
383    #[test]
384    fn test_queue_key_generation() {
385        let config = QueueConfig::new("redis://localhost:6379", "jobs");
386
387        let pending_key = config.key("pending:normal");
388        let processing_key = config.key("processing");
389        let completed_key = config.key("completed");
390
391        assert!(pending_key.contains("jobs"));
392        assert!(processing_key.contains("jobs"));
393        assert!(completed_key.contains("jobs"));
394    }
395
396    #[test]
397    fn test_queue_config_clone() {
398        let config1 = QueueConfig::new("redis://localhost:6379", "test");
399        let config2 = config1.clone();
400
401        assert_eq!(config1.queue_name, config2.queue_name);
402        assert_eq!(config1.redis_url, config2.redis_url);
403    }
404
405    #[test]
406    fn test_queue_config_different_queues() {
407        let config1 = QueueConfig::new("redis://localhost:6379", "queue1");
408        let config2 = QueueConfig::new("redis://localhost:6379", "queue2");
409
410        assert_ne!(config1.key_prefix, config2.key_prefix);
411    }
412
413    #[test]
414    fn test_queue_config_key_consistency() {
415        let config = QueueConfig::new("redis://localhost:6379", "test");
416
417        let key1 = config.key("pending");
418        let key2 = config.key("pending");
419
420        assert_eq!(key1, key2);
421    }
422
423    #[test]
424    fn test_queue_config_builder_pattern() {
425        let config = QueueConfig::new("redis://localhost:6379", "test")
426            .with_key_prefix("app")
427            .with_retention_time(Duration::from_secs(7200))
428            .with_max_size(500);
429
430        assert!(config.key_prefix.contains("app"));
431        assert_eq!(config.retention_time, Duration::from_secs(7200));
432        assert_eq!(config.max_size, 500);
433    }
434
435    #[test]
436    fn test_queue_config_redis_url() {
437        let url = "redis://user:pass@host:6380/2";
438        let config = QueueConfig::new(url, "test");
439        assert_eq!(config.redis_url, url);
440    }
441
442    #[test]
443    fn test_queue_config_key_with_empty_suffix() {
444        let config = QueueConfig::new("redis://localhost:6379", "test");
445        let key = config.key("");
446        assert!(key.contains("test"));
447    }
448
449    #[test]
450    fn test_queue_config_key_with_special_characters() {
451        let config = QueueConfig::new("redis://localhost:6379", "test");
452        let key = config.key("pending:high:priority");
453        assert!(key.contains("pending:high:priority"));
454    }
455
456    #[test]
457    fn test_queue_config_multiple_prefixes() {
458        let config1 =
459            QueueConfig::new("redis://localhost:6379", "app1").with_key_prefix("production");
460        let config2 =
461            QueueConfig::new("redis://localhost:6379", "app2").with_key_prefix("development");
462
463        let key1 = config1.key("jobs");
464        let key2 = config2.key("jobs");
465
466        assert_ne!(key1, key2);
467    }
468
469    #[test]
470    fn test_queue_config_unlimited_max_size() {
471        let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(0);
472        assert_eq!(config.max_size, 0);
473    }
474
475    #[test]
476    fn test_queue_config_large_retention() {
477        let week = Duration::from_secs(7 * 24 * 3600);
478        let config = QueueConfig::new("redis://localhost:6379", "test").with_retention_time(week);
479        assert_eq!(config.retention_time, week);
480    }
481}