riglr_core/
queue.rs

1//! Job queue abstractions and implementations.
2//!
3//! This module provides the queue infrastructure for distributed job processing,
4//! supporting both in-memory and Redis-backed implementations for scalability.
5
6use crate::jobs::Job;
7use anyhow::Result;
8use async_trait::async_trait;
9use std::time::Duration;
10
11/// Trait for job queue implementations.
12///
13/// Provides a common interface for different queue backends, enabling
14/// both local development with in-memory queues and production deployment
15/// with distributed Redis queues.
16#[async_trait]
17pub trait JobQueue: Send + Sync {
18    /// Add a job to the queue
19    async fn enqueue(&self, job: Job) -> Result<()>;
20
21    /// Get the next job from the queue, blocks until a job is available or timeout
22    async fn dequeue(&self) -> Result<Option<Job>>;
23
24    /// Get the next job from the queue with timeout
25    async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>>;
26
27    /// Get queue length
28    async fn len(&self) -> Result<usize>;
29
30    /// Check if queue is empty
31    async fn is_empty(&self) -> Result<bool> {
32        Ok(self.len().await? == 0)
33    }
34}
35
36/// In-memory job queue implementation for testing and development
37pub struct InMemoryJobQueue {
38    queue: tokio::sync::Mutex<std::collections::VecDeque<Job>>,
39    notify: tokio::sync::Notify,
40}
41
42impl InMemoryJobQueue {
43    /// Create a new in-memory job queue
44    pub fn new() -> Self {
45        Self::default()
46    }
47}
48
49impl Default for InMemoryJobQueue {
50    fn default() -> Self {
51        Self {
52            queue: tokio::sync::Mutex::new(std::collections::VecDeque::default()),
53            notify: tokio::sync::Notify::default(),
54        }
55    }
56}
57
58#[async_trait]
59impl JobQueue for InMemoryJobQueue {
60    async fn enqueue(&self, job: Job) -> Result<()> {
61        let mut queue = self.queue.lock().await;
62        queue.push_back(job);
63        self.notify.notify_one();
64        Ok(())
65    }
66
67    async fn dequeue(&self) -> Result<Option<Job>> {
68        loop {
69            {
70                let mut queue = self.queue.lock().await;
71                if let Some(job) = queue.pop_front() {
72                    return Ok(Some(job));
73                }
74            }
75            self.notify.notified().await;
76        }
77    }
78
79    async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>> {
80        // First check if there are any items immediately available
81        {
82            let mut queue = self.queue.lock().await;
83            if let Some(job) = queue.pop_front() {
84                return Ok(Some(job));
85            }
86        }
87
88        // If no items available, wait for notification or timeout
89        tokio::select! {
90            _ = tokio::time::sleep(timeout) => Ok(None),
91            _ = self.notify.notified() => {
92                let mut queue = self.queue.lock().await;
93                Ok(queue.pop_front())
94            }
95        }
96    }
97
98    async fn len(&self) -> Result<usize> {
99        let queue = self.queue.lock().await;
100        Ok(queue.len())
101    }
102}
103
104/// Redis-based job queue implementation for production use
105#[cfg(feature = "redis")]
106pub struct RedisJobQueue {
107    client: redis::Client,
108    queue_key: String,
109    timeout_seconds: u64,
110}
111
112#[cfg(feature = "redis")]
113impl RedisJobQueue {
114    /// Create a new Redis job queue
115    ///
116    /// # Arguments
117    /// * `redis_url` - Redis connection URL (e.g., "redis://127.0.0.1:6379")
118    /// * `queue_name` - Name of the queue (will be prefixed with "riglr:queue:")
119    pub fn new(redis_url: &str, queue_name: &str) -> Result<Self> {
120        let client = redis::Client::open(redis_url)?;
121        Ok(Self {
122            client,
123            queue_key: format!("riglr:queue:{}", queue_name),
124            timeout_seconds: 5,
125        })
126    }
127
128    /// Set the blocking timeout for dequeue operations
129    pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
130        self.timeout_seconds = timeout_seconds;
131        self
132    }
133}
134
135#[cfg(feature = "redis")]
136#[async_trait]
137impl JobQueue for RedisJobQueue {
138    async fn enqueue(&self, job: Job) -> Result<()> {
139        let mut conn = self.client.get_multiplexed_async_connection().await?;
140        let serialized = serde_json::to_string(&job)?;
141        let _: () = redis::cmd("LPUSH")
142            .arg(&self.queue_key)
143            .arg(serialized)
144            .query_async(&mut conn)
145            .await?;
146        Ok(())
147    }
148
149    async fn dequeue(&self) -> Result<Option<Job>> {
150        let mut conn = self.client.get_multiplexed_async_connection().await?;
151
152        // BRPOP blocks until an item is available or timeout
153        let result: Option<(String, String)> = redis::cmd("BRPOP")
154            .arg(&self.queue_key)
155            .arg(self.timeout_seconds)
156            .query_async(&mut conn)
157            .await?;
158
159        match result {
160            Some((_, job_str)) => {
161                let job: Job = serde_json::from_str(&job_str)?;
162                Ok(Some(job))
163            }
164            None => Ok(None),
165        }
166    }
167
168    async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>> {
169        let mut conn = self.client.get_multiplexed_async_connection().await?;
170        let timeout_seconds = timeout.as_secs().max(1);
171
172        let result: Option<(String, String)> = redis::cmd("BRPOP")
173            .arg(&self.queue_key)
174            .arg(timeout_seconds)
175            .query_async(&mut conn)
176            .await?;
177
178        match result {
179            Some((_, job_str)) => {
180                let job: Job = serde_json::from_str(&job_str)?;
181                Ok(Some(job))
182            }
183            None => Ok(None),
184        }
185    }
186
187    async fn len(&self) -> Result<usize> {
188        let mut conn = self.client.get_multiplexed_async_connection().await?;
189        let len: usize = redis::cmd("LLEN")
190            .arg(&self.queue_key)
191            .query_async(&mut conn)
192            .await?;
193        Ok(len)
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use std::sync::Arc;
201    use tokio::time::{timeout, Duration};
202
203    // Test InMemoryJobQueue::default()
204    #[tokio::test]
205    async fn test_in_memory_queue_default() {
206        let queue = InMemoryJobQueue::default();
207        assert_eq!(queue.len().await.unwrap(), 0);
208        assert!(queue.is_empty().await.unwrap());
209    }
210
211    // Test basic enqueue and dequeue operations
212    #[tokio::test]
213    async fn test_in_memory_queue_enqueue_dequeue() {
214        let queue = InMemoryJobQueue::default();
215
216        // Test enqueue and dequeue
217        let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
218        let job_id = job.job_id;
219
220        queue.enqueue(job).await.unwrap();
221        assert_eq!(queue.len().await.unwrap(), 1);
222        assert!(!queue.is_empty().await.unwrap());
223
224        // Use timeout to avoid blocking forever in tests
225        let dequeued = queue
226            .dequeue_with_timeout(Duration::from_secs(1))
227            .await
228            .unwrap();
229        assert!(dequeued.is_some());
230        assert_eq!(dequeued.unwrap().job_id, job_id);
231
232        assert_eq!(queue.len().await.unwrap(), 0);
233        assert!(queue.is_empty().await.unwrap());
234    }
235
236    // Test dequeue with timeout when queue is empty
237    #[tokio::test]
238    async fn test_in_memory_queue_dequeue_timeout_empty() {
239        let queue = InMemoryJobQueue::default();
240
241        // Dequeue with timeout should return None when queue is empty
242        let result = queue
243            .dequeue_with_timeout(Duration::from_millis(100))
244            .await
245            .unwrap();
246        assert!(result.is_none());
247    }
248
249    // Test dequeue with timeout when job is immediately available
250    #[tokio::test]
251    async fn test_in_memory_queue_dequeue_timeout_immediate() {
252        let queue = InMemoryJobQueue::default();
253        let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
254        let job_id = job.job_id;
255
256        // Enqueue first
257        queue.enqueue(job).await.unwrap();
258
259        // Dequeue with timeout should return immediately
260        let result = queue
261            .dequeue_with_timeout(Duration::from_secs(1))
262            .await
263            .unwrap();
264        assert!(result.is_some());
265        assert_eq!(result.unwrap().job_id, job_id);
266    }
267
268    // Test dequeue with timeout when job becomes available after waiting
269    #[tokio::test]
270    async fn test_in_memory_queue_dequeue_timeout_wait_for_job() {
271        let queue = Arc::new(InMemoryJobQueue::default());
272        let queue_clone = Arc::clone(&queue);
273
274        // Spawn a task to enqueue a job after a short delay
275        let job = Job::new("delayed_tool", &serde_json::json!({}), 3).unwrap();
276        let job_id = job.job_id;
277
278        tokio::spawn(async move {
279            tokio::time::sleep(Duration::from_millis(50)).await;
280            queue_clone.enqueue(job).await.unwrap();
281        });
282
283        // Dequeue should wait and then receive the job
284        let result = queue
285            .dequeue_with_timeout(Duration::from_secs(1))
286            .await
287            .unwrap();
288        assert!(result.is_some());
289        assert_eq!(result.unwrap().job_id, job_id);
290    }
291
292    // Test blocking dequeue with concurrent enqueue
293    #[tokio::test]
294    async fn test_in_memory_queue_dequeue_blocking() {
295        let queue = Arc::new(InMemoryJobQueue::default());
296        let queue_clone = Arc::clone(&queue);
297
298        let job = Job::new("blocking_test", &serde_json::json!({}), 3).unwrap();
299        let job_id = job.job_id;
300
301        // Spawn a task to enqueue after a delay
302        tokio::spawn(async move {
303            tokio::time::sleep(Duration::from_millis(50)).await;
304            queue_clone.enqueue(job).await.unwrap();
305        });
306
307        // Use timeout wrapper to prevent infinite blocking in tests
308        let result = timeout(Duration::from_secs(1), queue.dequeue()).await;
309        assert!(result.is_ok());
310        let dequeued = result.unwrap().unwrap();
311        assert!(dequeued.is_some());
312        assert_eq!(dequeued.unwrap().job_id, job_id);
313    }
314
315    // Test multiple enqueue and dequeue operations (FIFO order)
316    #[tokio::test]
317    async fn test_in_memory_queue_fifo_order() {
318        let queue = InMemoryJobQueue::default();
319
320        // Enqueue multiple jobs
321        let job1 = Job::new("tool1", &serde_json::json!({}), 3).unwrap();
322        let job2 = Job::new("tool2", &serde_json::json!({}), 3).unwrap();
323        let job3 = Job::new("tool3", &serde_json::json!({}), 3).unwrap();
324
325        let job1_id = job1.job_id;
326        let job2_id = job2.job_id;
327        let job3_id = job3.job_id;
328
329        queue.enqueue(job1).await.unwrap();
330        queue.enqueue(job2).await.unwrap();
331        queue.enqueue(job3).await.unwrap();
332
333        assert_eq!(queue.len().await.unwrap(), 3);
334
335        // Dequeue should return jobs in FIFO order
336        let dequeued1 = queue
337            .dequeue_with_timeout(Duration::from_secs(1))
338            .await
339            .unwrap();
340        assert_eq!(dequeued1.unwrap().job_id, job1_id);
341
342        let dequeued2 = queue
343            .dequeue_with_timeout(Duration::from_secs(1))
344            .await
345            .unwrap();
346        assert_eq!(dequeued2.unwrap().job_id, job2_id);
347
348        let dequeued3 = queue
349            .dequeue_with_timeout(Duration::from_secs(1))
350            .await
351            .unwrap();
352        assert_eq!(dequeued3.unwrap().job_id, job3_id);
353
354        assert_eq!(queue.len().await.unwrap(), 0);
355        assert!(queue.is_empty().await.unwrap());
356    }
357
358    // Test is_empty default implementation when queue has items
359    #[tokio::test]
360    async fn test_in_memory_queue_is_empty_with_items() {
361        let queue = InMemoryJobQueue::default();
362        let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
363
364        queue.enqueue(job).await.unwrap();
365        assert!(!queue.is_empty().await.unwrap());
366        assert_eq!(queue.len().await.unwrap(), 1);
367    }
368
369    // Test len() with multiple items
370    #[tokio::test]
371    async fn test_in_memory_queue_len_multiple_items() {
372        let queue = InMemoryJobQueue::default();
373
374        assert_eq!(queue.len().await.unwrap(), 0);
375
376        for i in 0..5 {
377            let job = Job::new(&format!("tool_{}", i), &serde_json::json!({}), 3).unwrap();
378            queue.enqueue(job).await.unwrap();
379            assert_eq!(queue.len().await.unwrap(), i + 1);
380        }
381    }
382
383    // Test concurrent enqueue operations
384    #[tokio::test]
385    async fn test_in_memory_queue_concurrent_enqueue() {
386        let queue = Arc::new(InMemoryJobQueue::default());
387        let mut handles = vec![];
388
389        // Spawn multiple tasks to enqueue concurrently
390        for i in 0..10 {
391            let queue_clone = Arc::clone(&queue);
392            let handle = tokio::spawn(async move {
393                let job =
394                    Job::new(&format!("concurrent_tool_{}", i), &serde_json::json!({}), 3).unwrap();
395                queue_clone.enqueue(job).await.unwrap();
396            });
397            handles.push(handle);
398        }
399
400        // Wait for all enqueue operations to complete
401        for handle in handles {
402            handle.await.unwrap();
403        }
404
405        assert_eq!(queue.len().await.unwrap(), 10);
406    }
407
408    // Test edge case: very short timeout
409    #[tokio::test]
410    async fn test_in_memory_queue_very_short_timeout() {
411        let queue = InMemoryJobQueue::default();
412
413        let result = queue
414            .dequeue_with_timeout(Duration::from_nanos(1))
415            .await
416            .unwrap();
417        assert!(result.is_none());
418    }
419
420    // Test zero timeout
421    #[tokio::test]
422    async fn test_in_memory_queue_zero_timeout() {
423        let queue = InMemoryJobQueue::default();
424
425        let result = queue
426            .dequeue_with_timeout(Duration::from_secs(0))
427            .await
428            .unwrap();
429        assert!(result.is_none());
430    }
431
432    #[cfg(feature = "redis")]
433    mod redis_tests {
434        use super::*;
435
436        // Test RedisJobQueue::new with valid URL
437        #[test]
438        fn test_redis_queue_new_valid_url() {
439            let result = RedisJobQueue::new("redis://127.0.0.1:6379", "test_queue");
440            assert!(result.is_ok());
441            let queue = result.unwrap();
442            assert_eq!(queue.queue_key, "riglr:queue:test_queue");
443            assert_eq!(queue.timeout_seconds, 5);
444        }
445
446        // Test RedisJobQueue::new with invalid URL
447        #[test]
448        fn test_redis_queue_new_invalid_url() {
449            let result = RedisJobQueue::new("invalid_url", "test_queue");
450            assert!(result.is_err());
451        }
452
453        // Test RedisJobQueue::with_timeout
454        #[test]
455        fn test_redis_queue_with_timeout() {
456            let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test_queue")
457                .unwrap()
458                .with_timeout(10);
459            assert_eq!(queue.timeout_seconds, 10);
460        }
461
462        // Test queue key formatting with different queue names
463        #[test]
464        fn test_redis_queue_key_formatting() {
465            let queue1 = RedisJobQueue::new("redis://127.0.0.1:6379", "simple").unwrap();
466            assert_eq!(queue1.queue_key, "riglr:queue:simple");
467
468            let queue2 = RedisJobQueue::new("redis://127.0.0.1:6379", "complex_name_123").unwrap();
469            assert_eq!(queue2.queue_key, "riglr:queue:complex_name_123");
470
471            let queue3 = RedisJobQueue::new("redis://127.0.0.1:6379", "").unwrap();
472            assert_eq!(queue3.queue_key, "riglr:queue:");
473        }
474
475        // Test with_timeout chaining
476        #[test]
477        fn test_redis_queue_timeout_chaining() {
478            let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test")
479                .unwrap()
480                .with_timeout(15)
481                .with_timeout(20);
482            assert_eq!(queue.timeout_seconds, 20);
483        }
484
485        // Integration tests would require a running Redis instance
486        // These are commented out but show the structure for full integration testing
487        /*
488        #[tokio::test]
489        async fn test_redis_queue_integration() {
490            // This test would require a running Redis instance
491            let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test_integration").unwrap();
492
493            let job = Job::new("redis_test", &serde_json::json!({}), 3).unwrap();
494            let job_id = job.job_id;
495
496            // Test enqueue
497            queue.enqueue(job).await.unwrap();
498            assert_eq!(queue.len().await.unwrap(), 1);
499            assert!(!queue.is_empty().await.unwrap());
500
501            // Test dequeue
502            let dequeued = queue.dequeue_with_timeout(Duration::from_secs(1)).await.unwrap();
503            assert!(dequeued.is_some());
504            assert_eq!(dequeued.unwrap().job_id, job_id);
505
506            assert_eq!(queue.len().await.unwrap(), 0);
507            assert!(queue.is_empty().await.unwrap());
508        }
509
510        #[tokio::test]
511        async fn test_redis_queue_timeout_scenarios() {
512            let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test_timeout").unwrap();
513
514            // Test timeout when queue is empty
515            let result = queue.dequeue_with_timeout(Duration::from_millis(100)).await.unwrap();
516            assert!(result.is_none());
517
518            // Test very long timeout handling
519            let result = queue.dequeue_with_timeout(Duration::from_secs(3600)).await.unwrap();
520            assert!(result.is_none());
521        }
522        */
523    }
524}