use crate::backend::RedisBroker;
use crate::backend::RedisConnectionType;
use crate::base::Broker;
use crate::config::ClientConfig;
use crate::error::Result;
use crate::task::{Task, TaskInfo};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[cfg(feature = "postgres")]
use crate::backend::pgdb::PostgresBroker;
#[cfg(feature = "websocket")]
use crate::backend::wsdb::WebSocketBroker;
#[derive(Clone)]
enum ClientBroker {
Redis(Arc<RedisBroker>),
#[cfg(feature = "postgres")]
Postgres(Arc<PostgresBroker>),
#[cfg(feature = "websocket")]
WebSocket(Arc<WebSocketBroker>),
}
impl ClientBroker {
fn as_broker(&self) -> Arc<dyn Broker> {
match self {
ClientBroker::Redis(broker) => broker.clone(),
#[cfg(feature = "postgres")]
ClientBroker::Postgres(broker) => broker.clone(),
#[cfg(feature = "websocket")]
ClientBroker::WebSocket(broker) => broker.clone(),
}
}
fn as_scheduler_broker(&self) -> Arc<dyn crate::base::SchedulerBroker> {
match self {
ClientBroker::Redis(broker) => broker.clone(),
#[cfg(feature = "postgres")]
ClientBroker::Postgres(broker) => broker.clone(),
#[cfg(feature = "websocket")]
ClientBroker::WebSocket(broker) => broker.clone(),
}
}
}
#[derive(Clone)]
pub struct Client {
broker: ClientBroker,
#[allow(dead_code)]
config: ClientConfig,
}
impl Client {
pub async fn new(redis_connection: RedisConnectionType) -> Result<Self> {
Self::with_config(redis_connection, ClientConfig::default()).await
}
pub async fn with_config(
redis_connection: RedisConnectionType,
config: ClientConfig,
) -> Result<Self> {
let broker = Arc::new(RedisBroker::new(redis_connection).await?);
Ok(Self {
broker: ClientBroker::Redis(broker),
config,
})
}
#[cfg(feature = "postgres")]
pub async fn new_with_postgres(database_url: &str) -> Result<Self> {
Self::new_with_postgres_config(database_url, ClientConfig::default()).await
}
#[cfg(feature = "postgres")]
pub async fn new_with_postgres_config(database_url: &str, config: ClientConfig) -> Result<Self> {
let broker = Arc::new(
PostgresBroker::builder()
.database_url(database_url)
.build()
.await?,
);
Ok(Self {
broker: ClientBroker::Postgres(broker),
config,
})
}
#[cfg(feature = "websocket")]
pub async fn new_with_websocket(url: &str) -> Result<Self> {
Self::new_with_websocket_config(url, ClientConfig::default()).await
}
#[cfg(feature = "websocket")]
pub async fn new_with_websocket_config(url: &str, config: ClientConfig) -> Result<Self> {
let broker = Arc::new(WebSocketBroker::new(url).await?);
Ok(Self {
broker: ClientBroker::WebSocket(broker),
config,
})
}
#[cfg(feature = "websocket")]
pub async fn new_with_websocket_basic_auth(
url: &str,
username: String,
password: String,
) -> Result<Self> {
Self::new_with_websocket_basic_auth_config(url, username, password, ClientConfig::default())
.await
}
#[cfg(feature = "websocket")]
pub async fn new_with_websocket_basic_auth_config(
url: &str,
username: String,
password: String,
config: ClientConfig,
) -> Result<Self> {
let broker =
Arc::new(WebSocketBroker::with_basic_auth(url, Some(username), Some(password)).await?);
Ok(Self {
broker: ClientBroker::WebSocket(broker),
config,
})
}
pub fn get_broker(&self) -> Arc<dyn Broker> {
self.broker.as_broker()
}
pub(crate) fn get_scheduler_broker(&self) -> Arc<dyn crate::base::SchedulerBroker> {
self.broker.as_scheduler_broker()
}
fn apply_acl_prefix_to_task(&self, mut task: Task) -> Task {
if self.config.acl_tenant.is_some() {
let queue = task.get_queue();
let prefixed_queue = self.config.get_queue_name_with_prefix(queue);
task = task.with_queue(prefixed_queue);
}
task
}
pub async fn enqueue(&self, task: Task) -> Result<TaskInfo> {
let task = self.apply_acl_prefix_to_task(task);
self.broker.as_broker().enqueue(&task).await
}
pub async fn enqueue_unique(&self, task: Task, ttl: Duration) -> Result<TaskInfo> {
let task = self.apply_acl_prefix_to_task(task);
self.broker.as_broker().enqueue_unique(&task, ttl).await
}
pub async fn schedule(&self, task: Task, process_at: SystemTime) -> Result<TaskInfo> {
let task = self.apply_acl_prefix_to_task(task);
let date_time = chrono::DateTime::<chrono::Utc>::from(process_at);
self.broker.as_broker().schedule(&task, date_time).await
}
pub async fn schedule_unique(
&self,
task: Task,
process_at: SystemTime,
ttl: Duration,
) -> Result<TaskInfo> {
let task = self.apply_acl_prefix_to_task(task);
let date_time = chrono::DateTime::<chrono::Utc>::from(process_at);
self
.broker
.as_broker()
.schedule_unique(&task, date_time, ttl)
.await
}
pub async fn enqueue_in(&self, task: Task, delay: Duration) -> Result<TaskInfo> {
let task = self.apply_acl_prefix_to_task(task);
let process_at = SystemTime::now()
.checked_add(delay)
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Delay overflow"))?;
self.schedule(task, process_at).await
}
pub async fn add_to_group(&self, task: Task, group: &str) -> Result<TaskInfo> {
let task = self.apply_acl_prefix_to_task(task);
self.broker.as_broker().add_to_group(&task, group).await
}
pub async fn add_to_group_unique(
&self,
task: Task,
group: &str,
ttl: Duration,
) -> Result<TaskInfo> {
let task = self.apply_acl_prefix_to_task(task);
self
.broker
.as_broker()
.add_to_group_unique(&task, group, ttl)
.await
}
pub async fn ping(&self) -> Result<()> {
self.broker.as_broker().ping().await
}
pub async fn close(&self) -> Result<()> {
self.broker.as_broker().close().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::task::Task;
use redis::ConnectionInfo;
use std::str::FromStr;
#[tokio::test]
async fn test_task_creation() {
let task = Task::new("test_task", b"test payload").unwrap();
assert_eq!(task.get_type(), "test_task");
assert_eq!(task.get_payload(), b"test payload");
}
#[tokio::test]
async fn test_client_creation() {
let redis_config = ConnectionInfo::from_str("redis://127.0.0.1:6379").unwrap();
let config = ClientConfig::default();
assert_eq!(redis_config.addr().to_string(), "127.0.0.1:6379");
assert_eq!(config.max_retries, 3);
}
}