1use crate::error::{QueueError, QueueResult};
4use crate::job::{Job, JobData, JobId, JobPriority, JobState};
5use armature_log::{debug, info};
6use chrono::Utc;
7use redis::{AsyncCommands, Client, aio::ConnectionManager};
8use std::time::Duration;
9
10#[derive(Debug, Clone)]
12pub struct QueueConfig {
13 pub redis_url: String,
15
16 pub queue_name: String,
18
19 pub key_prefix: String,
21
22 pub max_size: usize,
24
25 pub retention_time: Duration,
27}
28
29impl QueueConfig {
30 pub fn new(redis_url: impl Into<String>, queue_name: impl Into<String>) -> Self {
32 let queue_name = queue_name.into();
33 Self {
34 redis_url: redis_url.into(),
35 key_prefix: format!("armature:queue:{}", queue_name),
36 queue_name,
37 max_size: 0,
38 retention_time: Duration::from_secs(86400), }
40 }
41
42 pub fn with_key_prefix(mut self, prefix: impl Into<String>) -> Self {
44 self.key_prefix = prefix.into();
45 self
46 }
47
48 pub fn with_max_size(mut self, max_size: usize) -> Self {
50 self.max_size = max_size;
51 self
52 }
53
54 pub fn with_retention_time(mut self, retention_time: Duration) -> Self {
56 self.retention_time = retention_time;
57 self
58 }
59
60 fn key(&self, suffix: &str) -> String {
62 format!("{}:{}", self.key_prefix, suffix)
63 }
64}
65
66#[derive(Clone)]
68pub struct Queue {
69 connection: ConnectionManager,
70 config: QueueConfig,
71}
72
73impl Queue {
74 pub async fn new(
76 redis_url: impl Into<String>,
77 queue_name: impl Into<String>,
78 ) -> QueueResult<Self> {
79 let config = QueueConfig::new(redis_url, queue_name);
80 Self::with_config(config).await
81 }
82
83 pub async fn with_config(config: QueueConfig) -> QueueResult<Self> {
85 info!("Initializing job queue: {}", config.queue_name);
86 debug!(
87 "Queue config - prefix: {}, max_size: {}",
88 config.key_prefix, config.max_size
89 );
90
91 let client = Client::open(config.redis_url.as_str())
92 .map_err(|e| QueueError::Config(e.to_string()))?;
93
94 let connection = ConnectionManager::new(client).await?;
95
96 info!("Job queue '{}' ready", config.queue_name);
97 Ok(Self { connection, config })
98 }
99
100 pub async fn enqueue(&self, job_type: impl Into<String>, data: JobData) -> QueueResult<JobId> {
102 let job_type = job_type.into();
103 debug!(
104 "Enqueueing job: {} on queue '{}'",
105 job_type, self.config.queue_name
106 );
107 let job = Job::new(&self.config.queue_name, &job_type, data);
108 self.enqueue_job(job).await
109 }
110
111 pub async fn enqueue_job(&self, job: Job) -> QueueResult<JobId> {
113 if self.config.max_size > 0 {
115 let size = self.size().await?;
116 if size >= self.config.max_size {
117 return Err(QueueError::QueueFull);
118 }
119 }
120
121 let job_id = job.id;
122 let mut conn = self.connection.clone();
123
124 let job_json =
126 serde_json::to_string(&job).map_err(|e| QueueError::Serialization(e.to_string()))?;
127
128 let job_key = self.config.key(&format!("job:{}", job_id));
130 let _: () = conn
131 .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
132 .await?;
133
134 if job.is_ready() {
136 let queue_key = self.priority_queue_key(job.priority);
137 let score = -(job.priority as i64); let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
139 } else {
140 let delayed_key = self.config.key("delayed");
142 let score = job.scheduled_at.unwrap().timestamp();
143 let _: () = conn.zadd(&delayed_key, job_id.to_string(), score).await?;
144 }
145
146 Ok(job_id)
147 }
148
149 pub async fn dequeue(&self) -> QueueResult<Option<Job>> {
151 self.move_delayed_jobs().await?;
152
153 let mut conn = self.connection.clone();
154
155 for priority in [
157 JobPriority::Critical,
158 JobPriority::High,
159 JobPriority::Normal,
160 JobPriority::Low,
161 ] {
162 let queue_key = self.priority_queue_key(priority);
163
164 let result: Option<Vec<String>> = conn.zpopmin(&queue_key, 1).await?;
166
167 if let Some(items) = result
168 && let Some(job_id_str) = items.first()
169 && let Ok(job_id) = job_id_str.parse::<JobId>()
170 && let Some(mut job) = self.get_job(job_id).await?
171 {
172 job.start_processing();
173 self.save_job(&job).await?;
174
175 let processing_key = self.config.key("processing");
177 let _: () = conn
178 .zadd(&processing_key, job_id.to_string(), Utc::now().timestamp())
179 .await?;
180
181 return Ok(Some(job));
182 }
183 }
184
185 Ok(None)
186 }
187
188 pub async fn complete(&self, job_id: JobId) -> QueueResult<()> {
190 if let Some(mut job) = self.get_job(job_id).await? {
191 job.complete();
192 self.save_job(&job).await?;
193 self.remove_from_processing(job_id).await?;
194 }
195 Ok(())
196 }
197
198 pub async fn fail(&self, job_id: JobId, error: String) -> QueueResult<()> {
200 if let Some(mut job) = self.get_job(job_id).await? {
201 job.fail(error);
202
203 if job.status.state == JobState::Failed && job.can_retry() {
204 let retry_at = Utc::now() + job.backoff_delay();
206 job.scheduled_at = Some(retry_at);
207 self.save_job(&job).await?;
208
209 let mut conn = self.connection.clone();
211 let delayed_key = self.config.key("delayed");
212 let _: () = conn
213 .zadd(&delayed_key, job_id.to_string(), retry_at.timestamp())
214 .await?;
215 } else {
216 self.save_job(&job).await?;
218 let mut conn = self.connection.clone();
219 let dead_key = self.config.key("dead");
220 let _: () = conn
221 .zadd(&dead_key, job_id.to_string(), Utc::now().timestamp())
222 .await?;
223 }
224
225 self.remove_from_processing(job_id).await?;
226 }
227 Ok(())
228 }
229
230 pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<Job>> {
232 let mut conn = self.connection.clone();
233 let job_key = self.config.key(&format!("job:{}", job_id));
234
235 let job_json: Option<String> = conn.get(&job_key).await?;
236
237 if let Some(json) = job_json {
238 let job: Job = serde_json::from_str(&json)
239 .map_err(|e| QueueError::Deserialization(e.to_string()))?;
240 Ok(Some(job))
241 } else {
242 Ok(None)
243 }
244 }
245
246 async fn save_job(&self, job: &Job) -> QueueResult<()> {
248 let mut conn = self.connection.clone();
249 let job_key = self.config.key(&format!("job:{}", job.id));
250 let job_json =
251 serde_json::to_string(job).map_err(|e| QueueError::Serialization(e.to_string()))?;
252
253 let _: () = conn
254 .set_ex(&job_key, job_json, self.config.retention_time.as_secs())
255 .await?;
256 Ok(())
257 }
258
259 pub async fn size(&self) -> QueueResult<usize> {
261 let mut conn = self.connection.clone();
262 let mut total = 0;
263
264 for priority in [
265 JobPriority::Critical,
266 JobPriority::High,
267 JobPriority::Normal,
268 JobPriority::Low,
269 ] {
270 let queue_key = self.priority_queue_key(priority);
271 let count: usize = conn.zcard(&queue_key).await?;
272 total += count;
273 }
274
275 Ok(total)
276 }
277
278 async fn move_delayed_jobs(&self) -> QueueResult<()> {
280 let mut conn = self.connection.clone();
281 let delayed_key = self.config.key("delayed");
282 let now = Utc::now().timestamp();
283
284 let job_ids: Vec<String> = conn.zrangebyscore(&delayed_key, "-inf", now).await?;
286
287 for job_id_str in job_ids {
288 if let Ok(job_id) = job_id_str.parse::<JobId>()
289 && let Some(job) = self.get_job(job_id).await?
290 && job.is_ready()
291 {
292 let _: () = conn.zrem(&delayed_key, job_id.to_string()).await?;
294
295 let queue_key = self.priority_queue_key(job.priority);
297 let score = -(job.priority as i64);
298 let _: () = conn.zadd(&queue_key, job_id.to_string(), score).await?;
299 }
300 }
301
302 Ok(())
303 }
304
305 async fn remove_from_processing(&self, job_id: JobId) -> QueueResult<()> {
307 let mut conn = self.connection.clone();
308 let processing_key = self.config.key("processing");
309 let _: () = conn.zrem(&processing_key, job_id.to_string()).await?;
310 Ok(())
311 }
312
313 fn priority_queue_key(&self, priority: JobPriority) -> String {
315 self.config
316 .key(&format!("pending:{:?}", priority).to_lowercase())
317 }
318
319 pub async fn clear(&self) -> QueueResult<()> {
321 let mut conn = self.connection.clone();
322
323 let pattern = format!("{}:*", self.config.key_prefix);
324 let keys: Vec<String> = conn.keys(&pattern).await?;
325
326 if !keys.is_empty() {
327 let _: () = conn.del(keys).await?;
328 }
329
330 Ok(())
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn test_queue_config() {
340 let config = QueueConfig::new("redis://localhost:6379", "test");
341 assert_eq!(config.queue_name, "test");
342 assert!(config.key_prefix.contains("test"));
343 }
344
345 #[test]
346 fn test_priority_queue_key() {
347 let config = QueueConfig::new("redis://localhost:6379", "test");
348 assert!(config.key("pending:high").contains("high"));
349 }
350
351 #[test]
352 fn test_queue_config_with_custom_prefix() {
353 let config = QueueConfig::new("redis://localhost:6379", "myqueue").with_key_prefix("app");
354 assert!(config.key_prefix.contains("app"));
355 }
356
357 #[test]
358 fn test_queue_config_default_retention() {
359 let config = QueueConfig::new("redis://localhost:6379", "test");
360 assert_eq!(config.retention_time, Duration::from_secs(86400)); }
362
363 #[test]
364 fn test_queue_config_custom_retention() {
365 let retention = Duration::from_secs(3600);
366 let config =
367 QueueConfig::new("redis://localhost:6379", "test").with_retention_time(retention);
368 assert_eq!(config.retention_time, retention);
369 }
370
371 #[test]
372 fn test_queue_config_default_max_size() {
373 let config = QueueConfig::new("redis://localhost:6379", "test");
374 assert_eq!(config.max_size, 0); }
376
377 #[test]
378 fn test_queue_config_custom_max_size() {
379 let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(1000);
380 assert_eq!(config.max_size, 1000);
381 }
382
383 #[test]
384 fn test_queue_key_generation() {
385 let config = QueueConfig::new("redis://localhost:6379", "jobs");
386
387 let pending_key = config.key("pending:normal");
388 let processing_key = config.key("processing");
389 let completed_key = config.key("completed");
390
391 assert!(pending_key.contains("jobs"));
392 assert!(processing_key.contains("jobs"));
393 assert!(completed_key.contains("jobs"));
394 }
395
396 #[test]
397 fn test_queue_config_clone() {
398 let config1 = QueueConfig::new("redis://localhost:6379", "test");
399 let config2 = config1.clone();
400
401 assert_eq!(config1.queue_name, config2.queue_name);
402 assert_eq!(config1.redis_url, config2.redis_url);
403 }
404
405 #[test]
406 fn test_queue_config_different_queues() {
407 let config1 = QueueConfig::new("redis://localhost:6379", "queue1");
408 let config2 = QueueConfig::new("redis://localhost:6379", "queue2");
409
410 assert_ne!(config1.key_prefix, config2.key_prefix);
411 }
412
413 #[test]
414 fn test_queue_config_key_consistency() {
415 let config = QueueConfig::new("redis://localhost:6379", "test");
416
417 let key1 = config.key("pending");
418 let key2 = config.key("pending");
419
420 assert_eq!(key1, key2);
421 }
422
423 #[test]
424 fn test_queue_config_builder_pattern() {
425 let config = QueueConfig::new("redis://localhost:6379", "test")
426 .with_key_prefix("app")
427 .with_retention_time(Duration::from_secs(7200))
428 .with_max_size(500);
429
430 assert!(config.key_prefix.contains("app"));
431 assert_eq!(config.retention_time, Duration::from_secs(7200));
432 assert_eq!(config.max_size, 500);
433 }
434
435 #[test]
436 fn test_queue_config_redis_url() {
437 let url = "redis://user:pass@host:6380/2";
438 let config = QueueConfig::new(url, "test");
439 assert_eq!(config.redis_url, url);
440 }
441
442 #[test]
443 fn test_queue_config_key_with_empty_suffix() {
444 let config = QueueConfig::new("redis://localhost:6379", "test");
445 let key = config.key("");
446 assert!(key.contains("test"));
447 }
448
449 #[test]
450 fn test_queue_config_key_with_special_characters() {
451 let config = QueueConfig::new("redis://localhost:6379", "test");
452 let key = config.key("pending:high:priority");
453 assert!(key.contains("pending:high:priority"));
454 }
455
456 #[test]
457 fn test_queue_config_multiple_prefixes() {
458 let config1 =
459 QueueConfig::new("redis://localhost:6379", "app1").with_key_prefix("production");
460 let config2 =
461 QueueConfig::new("redis://localhost:6379", "app2").with_key_prefix("development");
462
463 let key1 = config1.key("jobs");
464 let key2 = config2.key("jobs");
465
466 assert_ne!(key1, key2);
467 }
468
469 #[test]
470 fn test_queue_config_unlimited_max_size() {
471 let config = QueueConfig::new("redis://localhost:6379", "test").with_max_size(0);
472 assert_eq!(config.max_size, 0);
473 }
474
475 #[test]
476 fn test_queue_config_large_retention() {
477 let week = Duration::from_secs(7 * 24 * 3600);
478 let config = QueueConfig::new("redis://localhost:6379", "test").with_retention_time(week);
479 assert_eq!(config.retention_time, week);
480 }
481}