use crate::config::QueueConfig;
use crate::error::SmithyResult;
use crate::task::{QueuedTask, TaskId, TaskStatus};
use async_trait::async_trait;
pub mod memory;
pub use memory::InMemoryQueue;
#[cfg(feature = "redis-queue")]
#[cfg_attr(docsrs, doc(cfg(feature = "redis-queue")))]
pub mod redis;
#[cfg(feature = "redis-queue")]
#[cfg_attr(docsrs, doc(cfg(feature = "redis-queue")))]
pub use redis::RedisQueue;
#[cfg(feature = "postgres-queue")]
#[cfg_attr(docsrs, doc(cfg(feature = "postgres-queue")))]
pub mod postgres;
#[cfg(feature = "postgres-queue")]
#[cfg_attr(docsrs, doc(cfg(feature = "postgres-queue")))]
pub use postgres::PostgresQueue;
#[derive(Debug, Clone)]
pub struct QueueStats {
pub pending: u64,
pub running: u64,
pub completed: u64,
pub failed: u64,
pub dead: u64,
pub total_processed: u64,
}
#[async_trait]
pub trait QueueBackend: Send + Sync {
async fn enqueue(&self, task: QueuedTask) -> SmithyResult<TaskId>;
async fn dequeue(&self) -> SmithyResult<Option<QueuedTask>>;
async fn dequeue_batch(&self, count: usize) -> SmithyResult<Vec<QueuedTask>>;
async fn complete_task(&self, task_id: &TaskId) -> SmithyResult<()>;
async fn fail_task(&self, task_id: &TaskId, error: &str, retry: bool) -> SmithyResult<()>;
async fn requeue_task(
&self,
task_id: &TaskId,
delay: Option<std::time::Duration>,
) -> SmithyResult<()>;
async fn get_task(&self, task_id: &TaskId) -> SmithyResult<Option<QueuedTask>>;
async fn update_task_status(&self, task_id: &TaskId, status: TaskStatus) -> SmithyResult<()>;
async fn stats(&self) -> SmithyResult<QueueStats>;
async fn cleanup(&self) -> SmithyResult<u64>;
async fn get_tasks_by_status(
&self,
status: TaskStatus,
limit: Option<usize>,
) -> SmithyResult<Vec<QueuedTask>>;
async fn purge(&self) -> SmithyResult<u64>;
async fn health_check(&self) -> SmithyResult<()>;
}
pub type TaskQueue = Box<dyn QueueBackend>;
pub struct QueueFactory;
impl QueueFactory {
pub fn in_memory(config: QueueConfig) -> TaskQueue {
Box::new(InMemoryQueue::with_config(config))
}
#[cfg(feature = "redis-queue")]
#[cfg_attr(docsrs, doc(cfg(feature = "redis-queue")))]
pub async fn redis(connection_string: &str, config: QueueConfig) -> SmithyResult<TaskQueue> {
let queue = RedisQueue::new(connection_string, config).await?;
Ok(Box::new(queue))
}
#[cfg(feature = "postgres-queue")]
#[cfg_attr(docsrs, doc(cfg(feature = "postgres-queue")))]
pub async fn postgres(connection_string: &str, config: QueueConfig) -> SmithyResult<TaskQueue> {
let queue = PostgresQueue::new(connection_string, config).await?;
Ok(Box::new(queue))
}
}
#[cfg(test)]
mod tests {
use std::time::SystemTime;
use super::*;
use crate::task::QueuedTask;
use serde_json::json;
fn create_test_task(id: &str) -> QueuedTask {
QueuedTask {
id: id.to_string(),
task_type: "test_task".to_string(),
payload: json!({"test": "data"}),
status: TaskStatus::Pending,
retry_count: 0,
max_retries: 3,
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
execute_at: None,
}
}
#[tokio::test]
async fn test_in_memory_queue_basic_operations() {
let queue = InMemoryQueue::new();
let task = create_test_task("test-1");
let task_id = queue.enqueue(task.clone()).await.unwrap();
assert_eq!(task_id, "test-1");
let dequeued = queue.dequeue().await.unwrap();
assert!(dequeued.is_some());
let dequeued_task = dequeued.unwrap();
assert_eq!(dequeued_task.id, "test-1");
assert_eq!(dequeued_task.status, TaskStatus::Running);
queue.complete_task(&task_id).await.unwrap();
let stats = queue.stats().await.unwrap();
assert_eq!(stats.completed, 1);
}
#[tokio::test]
async fn test_queue_factory() {
let queue = QueueFactory::in_memory(QueueConfig::default());
assert!(queue.health_check().await.is_ok());
}
}