use crate::error::Result;
use crate::queue::QueueInterface;
use crate::queue::memory_queue_manager::MemoryQueueManager;
#[cfg(feature = "redis-cluster")]
use crate::queue::redis_cluster_queue_manager::RedisClusterQueueManager;
#[cfg(feature = "redis")]
use crate::queue::redis_queue_manager::RedisQueueManager;
use crate::webhook::sender::JobProcessorFnAsync;
use crate::webhook::types::JobData;
use tracing::*;
pub struct QueueManagerFactory;
impl QueueManagerFactory {
#[allow(unused_variables)]
pub async fn create(
driver: &str,
redis_url: Option<&str>,
prefix: Option<&str>,
concurrency: Option<usize>,
) -> Result<Box<dyn QueueInterface>> {
match driver {
#[cfg(feature = "redis")]
"redis" => {
let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
let prefix_str = prefix.unwrap_or("sockudo"); let concurrency_val = concurrency.unwrap_or(5); info!(
"Creating Redis queue manager (Prefix: {}, Concurrency: {})",
prefix_str, concurrency_val
);
debug!("Redis queue manager URL: {}", url);
let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
Ok(Box::new(manager))
}
#[cfg(feature = "redis-cluster")]
"redis-cluster" => {
let nodes_str = redis_url.unwrap_or(
"redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
);
let cluster_nodes: Vec<String> =
nodes_str.split(',').map(|s| s.trim().to_string()).collect();
let prefix_str = prefix.unwrap_or("sockudo");
let concurrency_val = concurrency.unwrap_or(5);
info!(
"Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
prefix_str, concurrency_val
);
debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
let manager =
RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
.await?;
Ok(Box::new(manager))
}
"memory" => {
info!("{}", "Creating Memory queue manager".to_string());
let manager = MemoryQueueManager::new();
manager.start_processing(); Ok(Box::new(manager))
}
#[cfg(not(feature = "redis"))]
"redis" => {
warn!(
"Redis queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(not(feature = "redis-cluster"))]
"redis-cluster" => {
warn!(
"Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
other => Err(crate::error::Error::Queue(format!(
"Unsupported queue driver: {other}"
))),
}
}
}
pub struct QueueManager {
driver: Box<dyn QueueInterface>,
}
impl QueueManager {
pub fn new(driver: Box<dyn QueueInterface>) -> Self {
Self { driver }
}
pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
self.driver.add_to_queue(queue_name, data).await
}
pub async fn process_queue(
&self,
queue_name: &str,
callback: JobProcessorFnAsync,
) -> Result<()> {
self.driver.process_queue(queue_name, callback).await
}
pub async fn disconnect(&self) -> Result<()> {
self.driver.disconnect().await
}
pub async fn check_health(&self) -> Result<()> {
self.driver.check_health().await
}
}