1use crate::error::{QueueError, QueueResult};
4use crate::job::{Job, JobData, JobId, JobPriority, JobState};
5use armature_log::{debug, info};
6use chrono::Utc;
7use redis::{AsyncCommands, Client, aio::ConnectionManager};
8use std::time::Duration;
9
10#[derive(Debug, Clone)]
12pub struct QueueConfig {
13 pub redis_url: String,
15
16 pub queue_name: String,
18
19 pub key_prefix: String,
21
22 pub max_size: usize,
24
25 pub retention_time: Duration,
27}
28
29impl QueueConfig {
30 pub fn new(redis_url: impl Into<String>, queue_name: impl Into<String>) -> Self {
32 let queue_name = queue_name.into();
33 Self {
34 redis_url: redis_url.into(),
35 key_prefix: format!("armature:queue:{}", queue_name),
36 queue_name,
37 max_size: 0,
38 retention_time: Duration::from_secs(86400), }
40 }
41
42 pub fn with_key_prefix(mut self, prefix: impl Into<String>) -> Self {
44 self.key_prefix = prefix.into();
45 self
46 }
47
48 pub fn with_max_size(mut self, max_size: usize) -> Self {
50 self.max_size = max_size;
51 self
52 }
53
54 pub fn with_retention_time(mut self, retention_time: Duration) -> Self {
56 self.retention_time = retention_time;
57 self
58 }
59
60 fn key(&self, suffix: &str) -> String {
62 format!("{}:{}", self.key_prefix, suffix)
63 }
64}
65
66#[derive(Clone)]
68pub struct Queue {
69 connection: ConnectionManager,
70 config: QueueConfig,
71}
72
73impl Queue {
74 pub async fn new(
76 redis_url: impl Into<String>,
77 queue_name: impl Into<String>,
78 ) -> QueueResult<Self> {
79 let config = QueueConfig::new(redis_url, queue_name);
80 Self::with_config(config).await
81 }
82
83 pub async fn with_config(config: QueueConfig) -> QueueResult<Self> {
85 info!("Initializing job queue: {}", config.queue_name);
86 debug!(
87 "Queue config - prefix: {}, max_size: {}",
88 config.key_prefix, config.max_size
89 );
90
91 let client = Client::open(config.redis_url.as_str())
92 .map_err(|e| QueueError::Config(e.to_string()))?;
93
94 let connection = ConnectionManager::new(client).await?;
95
96 info!("Job queue '{}' ready", config.queue_name);
97 Ok(Self { connection, config })
98 }
99
100 pub async fn enqueue(&self, job_type: impl Into<String>, data: JobData) -> QueueResult<JobId> {
102 let job_type = job_type.into();
103 debug!(
104 "Enqueueing job: {} on queue '{}'",
105 job_type, self.config.queue_name
106 );
107 let job = Job::new(&self.config.queue_name, &job_type, data);
108 self.enqueue_job(job).await
109 }
110
111 pub async fn enqueue_job(&self, job: Job) -> QueueResult<JobId> {
113 if self.config.max_size > 0 {
115 let size = self.size().await?;
116 if size >= self.config.max_size {
117 return Err(QueueError::QueueFull);
118 }
119 }
120
121 let job_id = job.id;
122 let mut conn = self.connection.clone();
123
124 let job_json =
126 serde_json::to_string(&job).map_err(|e| QueueError::Serialization(e.to_string()))?;
127
128 let job_key = self.config.key(&format!("job:{}", job_id));
130 let _: () = conn
131 .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
132 .await?;
133
134 if job.is_ready() {
136 let queue_key = self.priority_queue_key(job.priority);
137 let score = -(job.priority as i64); let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
139 } else {
140 let delayed_key = self.config.key("delayed");
142 let score = job.scheduled_at.unwrap().timestamp();
143 let _: () = conn.zadd(&delayed_key, job_id.to_string(), score).await?;
144 }
145
146 Ok(job_id)
147 }
148
149 pub async fn dequeue(&self) -> QueueResult<Option<Job>> {
151 self.move_delayed_jobs().await?;
152
153 let mut conn = self.connection.clone();
154
155 for priority in [
157 JobPriority::Critical,
158 JobPriority::High,
159 JobPriority::Normal,
160 JobPriority::Low,
161 ] {
162 let queue_key = self.priority_queue_key(priority);
163
164 let result: Option<Vec<String>> = conn.zpopmin(&queue_key, 1).await?;
166
167 if let Some(items) = result {
168 if let Some(job_id_str) = items.first() {
169 if let Ok(job_id) = job_id_str.parse::<JobId>() {
170 if let Some(mut job) = self.get_job(job_id).await? {
171 job.start_processing();
172 self.save_job(&job).await?;
173
174 let processing_key = self.config.key("processing");
176 let _: () = conn
177 .zadd(&processing_key, job_id.to_string(), Utc::now().timestamp())
178 .await?;
179
180 return Ok(Some(job));
181 }
182 }
183 }
184 }
185 }
186
187 Ok(None)
188 }
189
190 pub async fn complete(&self, job_id: JobId) -> QueueResult<()> {
192 if let Some(mut job) = self.get_job(job_id).await? {
193 job.complete();
194 self.save_job(&job).await?;
195 self.remove_from_processing(job_id).await?;
196 }
197 Ok(())
198 }
199
200 pub async fn fail(&self, job_id: JobId, error: String) -> QueueResult<()> {
202 if let Some(mut job) = self.get_job(job_id).await? {
203 job.fail(error);
204
205 if job.status.state == JobState::Failed && job.can_retry() {
206 let retry_at = Utc::now() + job.backoff_delay();
208 job.scheduled_at = Some(retry_at);
209 self.save_job(&job).await?;
210
211 let mut conn = self.connection.clone();
213 let delayed_key = self.config.key("delayed");
214 let _: () = conn
215 .zadd(&delayed_key, job_id.to_string(), retry_at.timestamp())
216 .await?;
217 } else {
218 self.save_job(&job).await?;
220 let mut conn = self.connection.clone();
221 let dead_key = self.config.key("dead");
222 let _: () = conn
223 .zadd(&dead_key, job_id.to_string(), Utc::now().timestamp())
224 .await?;
225 }
226
227 self.remove_from_processing(job_id).await?;
228 }
229 Ok(())
230 }
231
232 pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<Job>> {
234 let mut conn = self.connection.clone();
235 let job_key = self.config.key(&format!("job:{}", job_id));
236
237 let job_json: Option<String> = conn.get(&job_key).await?;
238
239 if let Some(json) = job_json {
240 let job: Job = serde_json::from_str(&json)
241 .map_err(|e| QueueError::Deserialization(e.to_string()))?;
242 Ok(Some(job))
243 } else {
244 Ok(None)
245 }
246 }
247
248 async fn save_job(&self, job: &Job) -> QueueResult<()> {
250 let mut conn = self.connection.clone();
251 let job_key = self.config.key(&format!("job:{}", job.id));
252 let job_json =
253 serde_json::to_string(job).map_err(|e| QueueError::Serialization(e.to_string()))?;
254
255 let _: () = conn
256 .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
257 .await?;
258 Ok(())
259 }
260
261 pub async fn size(&self) -> QueueResult<usize> {
263 let mut conn = self.connection.clone();
264 let mut total = 0;
265
266 for priority in [
267 JobPriority::Critical,
268 JobPriority::High,
269 JobPriority::Normal,
270 JobPriority::Low,
271 ] {
272 let queue_key = self.priority_queue_key(priority);
273 let count: usize = conn.zcard(&queue_key).await?;
274 total += count;
275 }
276
277 Ok(total)
278 }
279
280 async fn move_delayed_jobs(&self) -> QueueResult<()> {
282 let mut conn = self.connection.clone();
283 let delayed_key = self.config.key("delayed");
284 let now = Utc::now().timestamp();
285
286 let job_ids: Vec<String> = conn.zrangebyscore(&delayed_key, "-inf", now).await?;
288
289 for job_id_str in job_ids {
290 if let Ok(job_id) = job_id_str.parse::<JobId>() {
291 if let Some(job) = self.get_job(job_id).await? {
292 if job.is_ready() {
293 let _: () = conn.zrem(&delayed_key, job_id.to_string()).await?;
295
296 let queue_key = self.priority_queue_key(job.priority);
298 let score = -(job.priority as i64);
299 let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
300 }
301 }
302 }
303 }
304
305 Ok(())
306 }
307
308 async fn remove_from_processing(&self, job_id: JobId) -> QueueResult<()> {
310 let mut conn = self.connection.clone();
311 let processing_key = self.config.key("processing");
312 let _: () = conn.zrem(&processing_key, job_id.to_string()).await?;
313 Ok(())
314 }
315
316 fn priority_queue_key(&self, priority: JobPriority) -> String {
318 self.config
319 .key(&format!("pending:{:?}", priority).to_lowercase())
320 }
321
322 pub async fn clear(&self) -> QueueResult<()> {
324 let mut conn = self.connection.clone();
325
326 let pattern = format!("{}:*", self.config.key_prefix);
327 let keys: Vec<String> = conn.keys(&pattern).await?;
328
329 if !keys.is_empty() {
330 let _: () = conn.del(keys).await?;
331 }
332
333 Ok(())
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_queue_config() {
343 let config = QueueConfig::new("redis://localhost:6379", "test");
344 assert_eq!(config.queue_name, "test");
345 assert!(config.key_prefix.contains("test"));
346 }
347
348 #[test]
349 fn test_priority_queue_key() {
350 let config = QueueConfig::new("redis://localhost:6379", "test");
351 assert!(config.key("pending:high").contains("high"));
352 }
353
354 #[test]
355 fn test_queue_config_with_custom_prefix() {
356 let config = QueueConfig::new("redis://localhost:6379", "myqueue").with_key_prefix("app");
357 assert!(config.key_prefix.contains("app"));
358 }
359
360 #[test]
361 fn test_queue_config_default_retention() {
362 let config = QueueConfig::new("redis://localhost:6379", "test");
363 assert_eq!(config.retention_time, Duration::from_secs(86400)); }
365
366 #[test]
367 fn test_queue_config_custom_retention() {
368 let retention = Duration::from_secs(3600);
369 let config =
370 QueueConfig::new("redis://localhost:6379", "test").with_retention_time(retention);
371 assert_eq!(config.retention_time, retention);
372 }
373
374 #[test]
375 fn test_queue_config_default_max_size() {
376 let config = QueueConfig::new("redis://localhost:6379", "test");
377 assert_eq!(config.max_size, 0); }
379
380 #[test]
381 fn test_queue_config_custom_max_size() {
382 let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(1000);
383 assert_eq!(config.max_size, 1000);
384 }
385
386 #[test]
387 fn test_queue_key_generation() {
388 let config = QueueConfig::new("redis://localhost:6379", "jobs");
389
390 let pending_key = config.key("pending:normal");
391 let processing_key = config.key("processing");
392 let completed_key = config.key("completed");
393
394 assert!(pending_key.contains("jobs"));
395 assert!(processing_key.contains("jobs"));
396 assert!(completed_key.contains("jobs"));
397 }
398
399 #[test]
400 fn test_queue_config_clone() {
401 let config1 = QueueConfig::new("redis://localhost:6379", "test");
402 let config2 = config1.clone();
403
404 assert_eq!(config1.queue_name, config2.queue_name);
405 assert_eq!(config1.redis_url, config2.redis_url);
406 }
407
408 #[test]
409 fn test_queue_config_different_queues() {
410 let config1 = QueueConfig::new("redis://localhost:6379", "queue1");
411 let config2 = QueueConfig::new("redis://localhost:6379", "queue2");
412
413 assert_ne!(config1.key_prefix, config2.key_prefix);
414 }
415
416 #[test]
417 fn test_queue_config_key_consistency() {
418 let config = QueueConfig::new("redis://localhost:6379", "test");
419
420 let key1 = config.key("pending");
421 let key2 = config.key("pending");
422
423 assert_eq!(key1, key2);
424 }
425
426 #[test]
427 fn test_queue_config_builder_pattern() {
428 let config = QueueConfig::new("redis://localhost:6379", "test")
429 .with_key_prefix("app")
430 .with_retention_time(Duration::from_secs(7200))
431 .with_max_size(500);
432
433 assert!(config.key_prefix.contains("app"));
434 assert_eq!(config.retention_time, Duration::from_secs(7200));
435 assert_eq!(config.max_size, 500);
436 }
437
438 #[test]
439 fn test_queue_config_redis_url() {
440 let url = "redis://user:pass@host:6380/2";
441 let config = QueueConfig::new(url, "test");
442 assert_eq!(config.redis_url, url);
443 }
444
445 #[test]
446 fn test_queue_config_key_with_empty_suffix() {
447 let config = QueueConfig::new("redis://localhost:6379", "test");
448 let key = config.key("");
449 assert!(key.contains("test"));
450 }
451
452 #[test]
453 fn test_queue_config_key_with_special_characters() {
454 let config = QueueConfig::new("redis://localhost:6379", "test");
455 let key = config.key("pending:high:priority");
456 assert!(key.contains("pending:high:priority"));
457 }
458
459 #[test]
460 fn test_queue_config_multiple_prefixes() {
461 let config1 =
462 QueueConfig::new("redis://localhost:6379", "app1").with_key_prefix("production");
463 let config2 =
464 QueueConfig::new("redis://localhost:6379", "app2").with_key_prefix("development");
465
466 let key1 = config1.key("jobs");
467 let key2 = config2.key("jobs");
468
469 assert_ne!(key1, key2);
470 }
471
472 #[test]
473 fn test_queue_config_unlimited_max_size() {
474 let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(0);
475 assert_eq!(config.max_size, 0);
476 }
477
478 #[test]
479 fn test_queue_config_large_retention() {
480 let week = Duration::from_secs(7 * 24 * 3600);
481 let config = QueueConfig::new("redis://localhost:6379", "test").with_retention_time(week);
482 assert_eq!(config.retention_time, week);
483 }
484}