use crate::jobs::Job;
use anyhow::Result;
use async_trait::async_trait;
use std::time::Duration;
#[async_trait]
pub trait JobQueue: Send + Sync {
async fn enqueue(&self, job: Job) -> Result<()>;
async fn dequeue(&self) -> Result<Option<Job>>;
async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>>;
async fn len(&self) -> Result<usize>;
async fn is_empty(&self) -> Result<bool> {
Ok(self.len().await? == 0)
}
}
pub struct InMemoryJobQueue {
queue: tokio::sync::Mutex<std::collections::VecDeque<Job>>,
notify: tokio::sync::Notify,
}
impl InMemoryJobQueue {
pub fn new() -> Self {
Self::default()
}
}
impl Default for InMemoryJobQueue {
fn default() -> Self {
Self {
queue: tokio::sync::Mutex::new(std::collections::VecDeque::default()),
notify: tokio::sync::Notify::default(),
}
}
}
#[async_trait]
impl JobQueue for InMemoryJobQueue {
async fn enqueue(&self, job: Job) -> Result<()> {
let mut queue = self.queue.lock().await;
queue.push_back(job);
self.notify.notify_one();
Ok(())
}
async fn dequeue(&self) -> Result<Option<Job>> {
loop {
{
let mut queue = self.queue.lock().await;
if let Some(job) = queue.pop_front() {
return Ok(Some(job));
}
}
self.notify.notified().await;
}
}
async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>> {
{
let mut queue = self.queue.lock().await;
if let Some(job) = queue.pop_front() {
return Ok(Some(job));
}
}
tokio::select! {
_ = tokio::time::sleep(timeout) => Ok(None),
_ = self.notify.notified() => {
let mut queue = self.queue.lock().await;
Ok(queue.pop_front())
}
}
}
async fn len(&self) -> Result<usize> {
let queue = self.queue.lock().await;
Ok(queue.len())
}
}
#[cfg(feature = "redis")]
pub struct RedisJobQueue {
client: redis::Client,
queue_key: String,
timeout_seconds: u64,
}
#[cfg(feature = "redis")]
impl RedisJobQueue {
pub fn new(redis_url: &str, queue_name: &str) -> Result<Self> {
let client = redis::Client::open(redis_url)?;
Ok(Self {
client,
queue_key: format!("riglr:queue:{}", queue_name),
timeout_seconds: 5,
})
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = timeout_seconds;
self
}
}
#[cfg(feature = "redis")]
#[async_trait]
impl JobQueue for RedisJobQueue {
async fn enqueue(&self, job: Job) -> Result<()> {
let mut conn = self.client.get_multiplexed_async_connection().await?;
let serialized = serde_json::to_string(&job)?;
let _: () = redis::cmd("LPUSH")
.arg(&self.queue_key)
.arg(serialized)
.query_async(&mut conn)
.await?;
Ok(())
}
async fn dequeue(&self) -> Result<Option<Job>> {
let mut conn = self.client.get_multiplexed_async_connection().await?;
let result: Option<(String, String)> = redis::cmd("BRPOP")
.arg(&self.queue_key)
.arg(self.timeout_seconds)
.query_async(&mut conn)
.await?;
match result {
Some((_, job_str)) => {
let job: Job = serde_json::from_str(&job_str)?;
Ok(Some(job))
}
None => Ok(None),
}
}
async fn dequeue_with_timeout(&self, timeout: Duration) -> Result<Option<Job>> {
let mut conn = self.client.get_multiplexed_async_connection().await?;
let timeout_seconds = timeout.as_secs().max(1);
let result: Option<(String, String)> = redis::cmd("BRPOP")
.arg(&self.queue_key)
.arg(timeout_seconds)
.query_async(&mut conn)
.await?;
match result {
Some((_, job_str)) => {
let job: Job = serde_json::from_str(&job_str)?;
Ok(Some(job))
}
None => Ok(None),
}
}
async fn len(&self) -> Result<usize> {
let mut conn = self.client.get_multiplexed_async_connection().await?;
let len: usize = redis::cmd("LLEN")
.arg(&self.queue_key)
.query_async(&mut conn)
.await?;
Ok(len)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_in_memory_queue_default() {
let queue = InMemoryJobQueue::default();
assert_eq!(queue.len().await.unwrap(), 0);
assert!(queue.is_empty().await.unwrap());
}
#[tokio::test]
async fn test_in_memory_queue_enqueue_dequeue() {
let queue = InMemoryJobQueue::default();
let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
let job_id = job.job_id;
queue.enqueue(job).await.unwrap();
assert_eq!(queue.len().await.unwrap(), 1);
assert!(!queue.is_empty().await.unwrap());
let dequeued = queue
.dequeue_with_timeout(Duration::from_secs(1))
.await
.unwrap();
assert!(dequeued.is_some());
assert_eq!(dequeued.unwrap().job_id, job_id);
assert_eq!(queue.len().await.unwrap(), 0);
assert!(queue.is_empty().await.unwrap());
}
#[tokio::test]
async fn test_in_memory_queue_dequeue_timeout_empty() {
let queue = InMemoryJobQueue::default();
let result = queue
.dequeue_with_timeout(Duration::from_millis(100))
.await
.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_in_memory_queue_dequeue_timeout_immediate() {
let queue = InMemoryJobQueue::default();
let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
let job_id = job.job_id;
queue.enqueue(job).await.unwrap();
let result = queue
.dequeue_with_timeout(Duration::from_secs(1))
.await
.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().job_id, job_id);
}
#[tokio::test]
async fn test_in_memory_queue_dequeue_timeout_wait_for_job() {
let queue = Arc::new(InMemoryJobQueue::default());
let queue_clone = Arc::clone(&queue);
let job = Job::new("delayed_tool", &serde_json::json!({}), 3).unwrap();
let job_id = job.job_id;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
queue_clone.enqueue(job).await.unwrap();
});
let result = queue
.dequeue_with_timeout(Duration::from_secs(1))
.await
.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().job_id, job_id);
}
#[tokio::test]
async fn test_in_memory_queue_dequeue_blocking() {
let queue = Arc::new(InMemoryJobQueue::default());
let queue_clone = Arc::clone(&queue);
let job = Job::new("blocking_test", &serde_json::json!({}), 3).unwrap();
let job_id = job.job_id;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
queue_clone.enqueue(job).await.unwrap();
});
let result = timeout(Duration::from_secs(1), queue.dequeue()).await;
assert!(result.is_ok());
let dequeued = result.unwrap().unwrap();
assert!(dequeued.is_some());
assert_eq!(dequeued.unwrap().job_id, job_id);
}
#[tokio::test]
async fn test_in_memory_queue_fifo_order() {
let queue = InMemoryJobQueue::default();
let job1 = Job::new("tool1", &serde_json::json!({}), 3).unwrap();
let job2 = Job::new("tool2", &serde_json::json!({}), 3).unwrap();
let job3 = Job::new("tool3", &serde_json::json!({}), 3).unwrap();
let job1_id = job1.job_id;
let job2_id = job2.job_id;
let job3_id = job3.job_id;
queue.enqueue(job1).await.unwrap();
queue.enqueue(job2).await.unwrap();
queue.enqueue(job3).await.unwrap();
assert_eq!(queue.len().await.unwrap(), 3);
let dequeued1 = queue
.dequeue_with_timeout(Duration::from_secs(1))
.await
.unwrap();
assert_eq!(dequeued1.unwrap().job_id, job1_id);
let dequeued2 = queue
.dequeue_with_timeout(Duration::from_secs(1))
.await
.unwrap();
assert_eq!(dequeued2.unwrap().job_id, job2_id);
let dequeued3 = queue
.dequeue_with_timeout(Duration::from_secs(1))
.await
.unwrap();
assert_eq!(dequeued3.unwrap().job_id, job3_id);
assert_eq!(queue.len().await.unwrap(), 0);
assert!(queue.is_empty().await.unwrap());
}
#[tokio::test]
async fn test_in_memory_queue_is_empty_with_items() {
let queue = InMemoryJobQueue::default();
let job = Job::new("test_tool", &serde_json::json!({}), 3).unwrap();
queue.enqueue(job).await.unwrap();
assert!(!queue.is_empty().await.unwrap());
assert_eq!(queue.len().await.unwrap(), 1);
}
#[tokio::test]
async fn test_in_memory_queue_len_multiple_items() {
let queue = InMemoryJobQueue::default();
assert_eq!(queue.len().await.unwrap(), 0);
for i in 0..5 {
let job = Job::new(&format!("tool_{}", i), &serde_json::json!({}), 3).unwrap();
queue.enqueue(job).await.unwrap();
assert_eq!(queue.len().await.unwrap(), i + 1);
}
}
#[tokio::test]
async fn test_in_memory_queue_concurrent_enqueue() {
let queue = Arc::new(InMemoryJobQueue::default());
let mut handles = vec![];
for i in 0..10 {
let queue_clone = Arc::clone(&queue);
let handle = tokio::spawn(async move {
let job =
Job::new(&format!("concurrent_tool_{}", i), &serde_json::json!({}), 3).unwrap();
queue_clone.enqueue(job).await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
assert_eq!(queue.len().await.unwrap(), 10);
}
#[tokio::test]
async fn test_in_memory_queue_very_short_timeout() {
let queue = InMemoryJobQueue::default();
let result = queue
.dequeue_with_timeout(Duration::from_nanos(1))
.await
.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_in_memory_queue_zero_timeout() {
let queue = InMemoryJobQueue::default();
let result = queue
.dequeue_with_timeout(Duration::from_secs(0))
.await
.unwrap();
assert!(result.is_none());
}
#[cfg(feature = "redis")]
mod redis_tests {
use super::*;
#[test]
fn test_redis_queue_new_valid_url() {
let result = RedisJobQueue::new("redis://127.0.0.1:6379", "test_queue");
assert!(result.is_ok());
let queue = result.unwrap();
assert_eq!(queue.queue_key, "riglr:queue:test_queue");
assert_eq!(queue.timeout_seconds, 5);
}
#[test]
fn test_redis_queue_new_invalid_url() {
let result = RedisJobQueue::new("invalid_url", "test_queue");
assert!(result.is_err());
}
#[test]
fn test_redis_queue_with_timeout() {
let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test_queue")
.unwrap()
.with_timeout(10);
assert_eq!(queue.timeout_seconds, 10);
}
#[test]
fn test_redis_queue_key_formatting() {
let queue1 = RedisJobQueue::new("redis://127.0.0.1:6379", "simple").unwrap();
assert_eq!(queue1.queue_key, "riglr:queue:simple");
let queue2 = RedisJobQueue::new("redis://127.0.0.1:6379", "complex_name_123").unwrap();
assert_eq!(queue2.queue_key, "riglr:queue:complex_name_123");
let queue3 = RedisJobQueue::new("redis://127.0.0.1:6379", "").unwrap();
assert_eq!(queue3.queue_key, "riglr:queue:");
}
#[test]
fn test_redis_queue_timeout_chaining() {
let queue = RedisJobQueue::new("redis://127.0.0.1:6379", "test")
.unwrap()
.with_timeout(15)
.with_timeout(20);
assert_eq!(queue.timeout_seconds, 20);
}
}
}