1use crate::jobs::Job;
7use anyhow::Result;
8use async_trait::async_trait;
9use std::time::Duration;
10
11#[async_trait]
17pub trait JobQueue: Send + Sync {
18 async fn enqueue(&self, job: Job) -> Result<()>;
20
21 async fn dequeue(&self) -> Result<Option<Job>>;
23
24 async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>>;
26
27 async fn len(&self) -> Result<usize>;
29
30 async fn is_empty(&self) -> Result<bool> {
32 Ok(self.len().await? == 0)
33 }
34}
35
36pub struct InMemoryJobQueue {
38 queue: tokio::sync::Mutex<std::collections::VecDeque<Job>>,
39 notify: tokio::sync::Notify,
40}
41
42impl InMemoryJobQueue {
43 pub fn new() -> Self {
45 Self::default()
46 }
47}
48
49impl Default for InMemoryJobQueue {
50 fn default() -> Self {
51 Self {
52 queue: tokio::sync::Mutex::new(std::collections::VecDeque::default()),
53 notify: tokio::sync::Notify::default(),
54 }
55 }
56}
57
58#[async_trait]
59impl JobQueue for InMemoryJobQueue {
60 async fn enqueue(&self, job: Job) -> Result<()> {
61 let mut queue = self.queue.lock().await;
62 queue.push_back(job);
63 self.notify.notify_one();
64 Ok(())
65 }
66
67 async fn dequeue(&self) -> Result<Option<Job>> {
68 loop {
69 {
70 let mut queue = self.queue.lock().await;
71 if let Some(job) = queue.pop_front() {
72 return Ok(Some(job));
73 }
74 }
75 self.notify.notified().await;
76 }
77 }
78
79 async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>> {
80 {
82 let mut queue = self.queue.lock().await;
83 if let Some(job) = queue.pop_front() {
84 return Ok(Some(job));
85 }
86 }
87
88 tokio::select! {
90 _ = tokio::time::sleep(timeout) => Ok(None),
91 _ = self.notify.notified() => {
92 let mut queue = self.queue.lock().await;
93 Ok(queue.pop_front())
94 }
95 }
96 }
97
98 async fn len(&self) -> Result<usize> {
99 let queue = self.queue.lock().await;
100 Ok(queue.len())
101 }
102}
103
104#[cfg(feature = "redis")]
106pub struct RedisJobQueue {
107 client: redis::Client,
108 queue_key: String,
109 timeout_seconds: u64,
110}
111
112#[cfg(feature = "redis")]
113impl RedisJobQueue {
114 pub fn new(redis_url: &str, queue_name: &str) -> Result<Self> {
120 let client = redis::Client::open(redis_url)?;
121 Ok(Self {
122 client,
123 queue_key: format!("riglr:queue:{}", queue_name),
124 timeout_seconds: 5,
125 })
126 }
127
128 pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
130 self.timeout_seconds = timeout_seconds;
131 self
132 }
133}
134
135#[cfg(feature = "redis")]
136#[async_trait]
137impl JobQueue for RedisJobQueue {
138 async fn enqueue(&self, job: Job) -> Result<()> {
139 let mut conn = self.client.get_multiplexed_async_connection().await?;
140 let serialized = serde_json::to_string(&job)?;
141 let _: () = redis::cmd("LPUSH")
142 .arg(&self.queue_key)
143 .arg(serialized)
144 .query_async(&mut conn)
145 .await?;
146 Ok(())
147 }
148
149 async fn dequeue(&self) -> Result<Option<Job>> {
150 let mut conn = self.client.get_multiplexed_async_connection().await?;
151
152 let result: Option<(String, String)> = redis::cmd("BRPOP")
154 .arg(&self.queue_key)
155 .arg(self.timeout_seconds)
156 .query_async(&mut conn)
157 .await?;
158
159 match result {
160 Some((_, job_str)) => {
161 let job: Job = serde_json::from_str(&job_str)?;
162 Ok(Some(job))
163 }
164 None => Ok(None),
165 }
166 }
167
168 async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>> {
169 let mut conn = self.client.get_multiplexed_async_connection().await?;
170 let timeout_seconds = timeout.as_secs().max(1);
171
172 let result: Option<(String, String)> = redis::cmd("BRPOP")
173 .arg(&self.queue_key)
174 .arg(timeout_seconds)
175 .query_async(&mut conn)
176 .await?;
177
178 match result {
179 Some((_, job_str)) => {
180 let job: Job = serde_json::from_str(&job_str)?;
181 Ok(Some(job))
182 }
183 None => Ok(None),
184 }
185 }
186
187 async fn len(&self) -> Result<usize> {
188 let mut conn = self.client.get_multiplexed_async_connection().await?;
189 let len: usize = redis::cmd("LLEN")
190 .arg(&self.queue_key)
191 .query_async(&mut conn)
192 .await?;
193 Ok(len)
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use std::sync::Arc;
201 use tokio::time::{timeout, Duration};
202
203 #[tokio::test]
205 async fn test_in_memory_queue_default() {
206 let queue = InMemoryJobQueue::default();
207 assert_eq!(queue.len().await.unwrap(), 0);
208 assert!(queue.is_empty().await.unwrap());
209 }
210
211 #[tokio::test]
213 async fn test_in_memory_queue_enqueue_dequeue() {
214 let queue = InMemoryJobQueue::default();
215
216 let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
218 let job_id = job.job_id;
219
220 queue.enqueue(job).await.unwrap();
221 assert_eq!(queue.len().await.unwrap(), 1);
222 assert!(!queue.is_empty().await.unwrap());
223
224 let dequeued = queue
226 .dequeue_with_timeout(Duration::from_secs(1))
227 .await
228 .unwrap();
229 assert!(dequeued.is_some());
230 assert_eq!(dequeued.unwrap().job_id, job_id);
231
232 assert_eq!(queue.len().await.unwrap(), 0);
233 assert!(queue.is_empty().await.unwrap());
234 }
235
236 #[tokio::test]
238 async fn test_in_memory_queue_dequeue_timeout_empty() {
239 let queue = InMemoryJobQueue::default();
240
241 let result = queue
243 .dequeue_with_timeout(Duration::from_millis(100))
244 .await
245 .unwrap();
246 assert!(result.is_none());
247 }
248
249 #[tokio::test]
251 async fn test_in_memory_queue_dequeue_timeout_immediate() {
252 let queue = InMemoryJobQueue::default();
253 let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
254 let job_id = job.job_id;
255
256 queue.enqueue(job).await.unwrap();
258
259 let result = queue
261 .dequeue_with_timeout(Duration::from_secs(1))
262 .await
263 .unwrap();
264 assert!(result.is_some());
265 assert_eq!(result.unwrap().job_id, job_id);
266 }
267
268 #[tokio::test]
270 async fn test_in_memory_queue_dequeue_timeout_wait_for_job() {
271 let queue = Arc::new(InMemoryJobQueue::default());
272 let queue_clone = Arc::clone(&queue);
273
274 let job = Job::new("delayed_tool", &serde_json::json!({}), 3).unwrap();
276 let job_id = job.job_id;
277
278 tokio::spawn(async move {
279 tokio::time::sleep(Duration::from_millis(50)).await;
280 queue_clone.enqueue(job).await.unwrap();
281 });
282
283 let result = queue
285 .dequeue_with_timeout(Duration::from_secs(1))
286 .await
287 .unwrap();
288 assert!(result.is_some());
289 assert_eq!(result.unwrap().job_id, job_id);
290 }
291
292 #[tokio::test]
294 async fn test_in_memory_queue_dequeue_blocking() {
295 let queue = Arc::new(InMemoryJobQueue::default());
296 let queue_clone = Arc::clone(&queue);
297
298 let job = Job::new("blocking_test", &serde_json::json!({}), 3).unwrap();
299 let job_id = job.job_id;
300
301 tokio::spawn(async move {
303 tokio::time::sleep(Duration::from_millis(50)).await;
304 queue_clone.enqueue(job).await.unwrap();
305 });
306
307 let result = timeout(Duration::from_secs(1), queue.dequeue()).await;
309 assert!(result.is_ok());
310 let dequeued = result.unwrap().unwrap();
311 assert!(dequeued.is_some());
312 assert_eq!(dequeued.unwrap().job_id, job_id);
313 }
314
315 #[tokio::test]
317 async fn test_in_memory_queue_fifo_order() {
318 let queue = InMemoryJobQueue::default();
319
320 let job1 = Job::new("tool1", &serde_json::json!({}), 3).unwrap();
322 let job2 = Job::new("tool2", &serde_json::json!({}), 3).unwrap();
323 let job3 = Job::new("tool3", &serde_json::json!({}), 3).unwrap();
324
325 let job1_id = job1.job_id;
326 let job2_id = job2.job_id;
327 let job3_id = job3.job_id;
328
329 queue.enqueue(job1).await.unwrap();
330 queue.enqueue(job2).await.unwrap();
331 queue.enqueue(job3).await.unwrap();
332
333 assert_eq!(queue.len().await.unwrap(), 3);
334
335 let dequeued1 = queue
337 .dequeue_with_timeout(Duration::from_secs(1))
338 .await
339 .unwrap();
340 assert_eq!(dequeued1.unwrap().job_id, job1_id);
341
342 let dequeued2 = queue
343 .dequeue_with_timeout(Duration::from_secs(1))
344 .await
345 .unwrap();
346 assert_eq!(dequeued2.unwrap().job_id, job2_id);
347
348 let dequeued3 = queue
349 .dequeue_with_timeout(Duration::from_secs(1))
350 .await
351 .unwrap();
352 assert_eq!(dequeued3.unwrap().job_id, job3_id);
353
354 assert_eq!(queue.len().await.unwrap(), 0);
355 assert!(queue.is_empty().await.unwrap());
356 }
357
358 #[tokio::test]
360 async fn test_in_memory_queue_is_empty_with_items() {
361 let queue = InMemoryJobQueue::default();
362 let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
363
364 queue.enqueue(job).await.unwrap();
365 assert!(!queue.is_empty().await.unwrap());
366 assert_eq!(queue.len().await.unwrap(), 1);
367 }
368
369 #[tokio::test]
371 async fn test_in_memory_queue_len_multiple_items() {
372 let queue = InMemoryJobQueue::default();
373
374 assert_eq!(queue.len().await.unwrap(), 0);
375
376 for i in 0..5 {
377 let job = Job::new(&format!("tool_{}", i), &serde_json::json!({}), 3).unwrap();
378 queue.enqueue(job).await.unwrap();
379 assert_eq!(queue.len().await.unwrap(), i + 1);
380 }
381 }
382
383 #[tokio::test]
385 async fn test_in_memory_queue_concurrent_enqueue() {
386 let queue = Arc::new(InMemoryJobQueue::default());
387 let mut handles = vec![];
388
389 for i in 0..10 {
391 let queue_clone = Arc::clone(&queue);
392 let handle = tokio::spawn(async move {
393 let job =
394 Job::new(&format!("concurrent_tool_{}", i), &serde_json::json!({}), 3).unwrap();
395 queue_clone.enqueue(job).await.unwrap();
396 });
397 handles.push(handle);
398 }
399
400 for handle in handles {
402 handle.await.unwrap();
403 }
404
405 assert_eq!(queue.len().await.unwrap(), 10);
406 }
407
408 #[tokio::test]
410 async fn test_in_memory_queue_very_short_timeout() {
411 let queue = InMemoryJobQueue::default();
412
413 let result = queue
414 .dequeue_with_timeout(Duration::from_nanos(1))
415 .await
416 .unwrap();
417 assert!(result.is_none());
418 }
419
420 #[tokio::test]
422 async fn test_in_memory_queue_zero_timeout() {
423 let queue = InMemoryJobQueue::default();
424
425 let result = queue
426 .dequeue_with_timeout(Duration::from_secs(0))
427 .await
428 .unwrap();
429 assert!(result.is_none());
430 }
431
432 #[cfg(feature = "redis")]
433 mod redis_tests {
434 use super::*;
435
436 #[test]
438 fn test_redis_queue_new_valid_url() {
439 let result = RedisJobQueue::new("redis://127.0.0.1:6379", "test_queue");
440 assert!(result.is_ok());
441 let queue = result.unwrap();
442 assert_eq!(queue.queue_key, "riglr:queue:test_queue");
443 assert_eq!(queue.timeout_seconds, 5);
444 }
445
446 #[test]
448 fn test_redis_queue_new_invalid_url() {
449 let result = RedisJobQueue::new("invalid_url", "test_queue");
450 assert!(result.is_err());
451 }
452
453 #[test]
455 fn test_redis_queue_with_timeout() {
456 let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test_queue")
457 .unwrap()
458 .with_timeout(10);
459 assert_eq!(queue.timeout_seconds, 10);
460 }
461
462 #[test]
464 fn test_redis_queue_key_formatting() {
465 let queue1 = RedisJobQueue::new("redis://127.0.0.1:6379", "simple").unwrap();
466 assert_eq!(queue1.queue_key, "riglr:queue:simple");
467
468 let queue2 = RedisJobQueue::new("redis://127.0.0.1:6379", "complex_name_123").unwrap();
469 assert_eq!(queue2.queue_key, "riglr:queue:complex_name_123");
470
471 let queue3 = RedisJobQueue::new("redis://127.0.0.1:6379", "").unwrap();
472 assert_eq!(queue3.queue_key, "riglr:queue:");
473 }
474
475 #[test]
477 fn test_redis_queue_timeout_chaining() {
478 let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test")
479 .unwrap()
480 .with_timeout(15)
481 .with_timeout(20);
482 assert_eq!(queue.timeout_seconds, 20);
483 }
484
485 }
524}