use crate::backend::rdb::redis::{RedisConnection, RedisConnectionType};
use crate::backend::rdb::redis_scripts::{RedisArg, ScriptManager};
#[cfg(feature = "cluster")]
use crate::backend::rdb::universal_client::ClusterPubSubConnection;
use crate::backend::rdb::universal_client::{RedisClient, RedisPubSub};
use crate::base::constants::DEFAULT_QUEUE_NAME;
use crate::base::keys;
use crate::error::{Error, Result};
use crate::proto::{SchedulerEnqueueEvent, SchedulerEntry, TaskMessage};
use crate::task::Task;
use prost::Message;
#[cfg(feature = "sentinel")]
use redis::sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType};
use redis::AsyncCommands;
use redis::Client;
use uuid::Uuid;
pub struct RedisBroker {
client: RedisClient,
pub(crate) script_manager: ScriptManager,
}
impl RedisBroker {
pub async fn new(conn: RedisConnectionType) -> Result<Self> {
match conn {
RedisConnectionType::Single {
connection_info,
#[cfg(feature = "tls")]
tls_certs,
} => {
let client = {
#[cfg(feature = "tls")]
{
let ci = connection_info.clone();
if let Some(tls) = tls_certs {
Client::build_with_tls(ci, tls)?
} else {
Client::open(ci)?
}
}
#[cfg(not(feature = "tls"))]
{
Client::open(connection_info)?
}
};
let mut broker = Self {
client: RedisClient::Single(client),
script_manager: ScriptManager::default(),
};
broker.init_scripts().await?;
Ok(broker)
}
#[cfg(feature = "cluster")]
crate::backend::RedisConnectionType::Cluster(config) => {
let (push_receiver, cluster_client) = {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let client = redis::cluster::ClusterClientBuilder::new(config)
.use_protocol(redis::ProtocolVersion::RESP3)
.push_sender(tx)
.build()?;
(
std::sync::Arc::new(tokio::sync::Mutex::new(Some(rx))),
client,
)
};
let mut broker = Self {
client: RedisClient::Cluster {
client: cluster_client,
push_receiver,
},
script_manager: ScriptManager::default(),
};
broker.init_scripts().await?;
Ok(broker)
}
#[cfg(feature = "sentinel")]
crate::backend::RedisConnectionType::Sentinel {
master_name,
sentinels,
redis_connection_info,
#[cfg(feature = "tls")]
tls_certs,
} => {
let mut node_conn_info: Option<SentinelNodeConnectionInfo> = None;
if let Some(conn_info) = redis_connection_info {
let sentinel_node_connection_info =
SentinelNodeConnectionInfo::default().set_redis_connection_info(conn_info);
node_conn_info = Some(sentinel_node_connection_info);
}
#[cfg(feature = "tls")]
let sentinel_client = {
if let Some(certs) = tls_certs {
let addrs: Vec<redis::ConnectionAddr> =
sentinels.iter().map(|ci| ci.addr().clone()).collect();
let mut builder = redis::sentinel::SentinelClientBuilder::new(
addrs,
master_name.clone(),
SentinelServerType::Master,
)?;
builder = builder.set_client_to_redis_certificates(certs);
builder.build()?
} else {
SentinelClient::build(
sentinels,
master_name.clone(),
node_conn_info,
SentinelServerType::Master,
)?
}
};
#[cfg(not(feature = "tls"))]
let sentinel_client = {
SentinelClient::build(
sentinels,
master_name.clone(),
node_conn_info,
SentinelServerType::Master,
)?
};
let client = std::sync::Arc::new(tokio::sync::Mutex::new(sentinel_client));
let mut broker = Self {
client: RedisClient::Sentinel { client },
script_manager: ScriptManager::default(),
};
broker.init_scripts().await?;
Ok(broker)
}
}
}
pub async fn get_async_connection(&self) -> Result<RedisConnection> {
let async_conn = self.client.get_connection().await?;
Ok(async_conn)
}
pub async fn get_pubsub(&self) -> Result<RedisPubSub> {
match &self.client {
RedisClient::Single(client) => {
let pubsub = client.get_async_pubsub().await?;
Ok(RedisPubSub::Single(pubsub))
}
#[cfg(feature = "cluster")]
RedisClient::Cluster {
client,
push_receiver,
} => {
let mut receiver_option = push_receiver.lock().await;
if let Some(receiver) = receiver_option.take() {
let connection = client.get_async_connection().await?;
let cluster_pubsub = ClusterPubSubConnection::from_receiver(receiver, connection);
Ok(RedisPubSub::Cluster(cluster_pubsub))
} else {
Err(Error::other(
"Cluster PubSub receiver has already been taken. You can only create one PubSub connection for cluster mode."
))
}
}
#[cfg(feature = "sentinel")]
RedisClient::Sentinel { client, .. } => {
let mut guard = client.lock().await;
let master_client = guard.async_get_client().await?;
let pubsub = master_client.get_async_pubsub().await?;
Ok(RedisPubSub::Sentinel(pubsub))
}
}
}
pub(crate) async fn init_scripts(&mut self) -> Result<()> {
let mut conn = self.get_async_connection().await?;
self.script_manager.load_scripts(&mut conn).await?;
Ok(())
}
pub(crate) fn encode_task_message(&self, msg: &TaskMessage) -> Result<Vec<u8>> {
let mut buf = Vec::new();
msg.encode(&mut buf)?;
Ok(buf)
}
pub fn decode_task_message(&self, data: &[u8]) -> Result<TaskMessage> {
match TaskMessage::decode(data) {
Ok(msg) => Ok(msg),
Err(decode_err) => Err(Error::ProtoDecode(decode_err)),
}
}
pub(crate) fn task_to_message(&self, task: &Task) -> TaskMessage {
TaskMessage {
r#type: task.task_type.clone(),
payload: task.payload.clone(),
headers: task.resolved_headers(),
id: task
.options
.task_id
.clone()
.unwrap_or(Uuid::new_v4().to_string()),
queue: if task.options.queue.clone().is_empty() {
DEFAULT_QUEUE_NAME.to_string()
} else {
task.options.queue.clone()
},
retry: task.options.max_retry,
retried: 0,
error_msg: String::new(),
last_failed_at: 0,
timeout: task
.options
.timeout
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
deadline: task.options.deadline.map(|d| d.timestamp()).unwrap_or(0),
unique_key: String::new(),
group_key: task.options.group.clone().unwrap_or_default(),
retention: task
.options
.retention
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
completed_at: 0,
}
}
pub async fn write_scheduler_entries(
&self,
entries: &[SchedulerEntry],
scheduler_id: &str,
ttl_secs: u64,
tenant: Option<&str>,
) -> Result<()> {
let mut args: Vec<RedisArg> = Vec::new();
args.push(RedisArg::Int(ttl_secs as i64));
for entry in entries {
let mut buf = Vec::new();
entry
.encode(&mut buf)
.map_err(|e| Error::other(format!("prost encode error: {e}")))?;
args.push(RedisArg::Bytes(buf));
}
let key = match tenant {
Some(t) => keys::scheduler_entries_key_with_tenant(t, scheduler_id),
None => keys::scheduler_entries_key(scheduler_id),
};
let mut conn = self.get_async_connection().await?;
let _: () = self
.script_manager
.eval_script(
&mut conn,
"write_scheduler_entries",
std::slice::from_ref(&key),
&args,
)
.await?;
let zset_key = keys::ALL_SCHEDULERS;
let expire_at = chrono::Utc::now().timestamp() + ttl_secs as i64;
let _: () = conn.zadd(zset_key, &key, expire_at).await?;
Ok(())
}
pub async fn record_scheduler_enqueue_event(
&self,
event: &SchedulerEnqueueEvent,
entry_id: &str,
) -> Result<()> {
let mut buf = Vec::new();
event
.encode(&mut buf)
.map_err(|e| Error::other(format!("prost encode error: {e}")))?;
let key = keys::scheduler_history_key(entry_id);
let mut conn = self.get_async_connection().await?;
let args = vec![
RedisArg::Int(event.enqueue_time.map(|x| x.seconds).unwrap_or(0)),
RedisArg::Bytes(buf),
RedisArg::Int(1000),
];
self
.script_manager
.eval_script::<()>(&mut conn, "record_scheduler_enqueue_event", &[key], &args)
.await?;
Ok(())
}
pub async fn scheduler_entries_script(
&self,
scheduler_id: &str,
) -> Result<std::collections::HashMap<String, Vec<u8>>> {
let key = keys::scheduler_entries_key(scheduler_id);
let mut conn = self.get_async_connection().await?;
let result: Vec<Vec<u8>> = self
.script_manager
.eval_script(&mut conn, "get_scheduler_entries", &[key], &[])
.await?;
let mut map = std::collections::HashMap::new();
let mut iter = result.chunks_exact(2);
while let Some([k, v]) = iter.next() {
let key_str = String::from_utf8_lossy(k).to_string();
map.insert(key_str, v.clone());
}
Ok(map)
}
pub async fn scheduler_events_script(&self, count: usize) -> Result<Vec<Vec<u8>>> {
let key = keys::SCHEDULER_EVENTS.to_string();
let mut conn = self.get_async_connection().await?;
let args = vec![RedisArg::Int(count as i64)];
let result: Vec<Vec<u8>> = self
.script_manager
.eval_script(&mut conn, "get_scheduler_events", &[key], &args)
.await?;
Ok(result)
}
pub async fn clear_scheduler_entries(
&self,
scheduler_id: &str,
tenant: Option<&str>,
) -> Result<()> {
let key = match tenant {
Some(t) => keys::scheduler_entries_key_with_tenant(t, scheduler_id),
None => keys::scheduler_entries_key(scheduler_id),
};
let zset_key = keys::ALL_SCHEDULERS;
let mut conn = self.get_async_connection().await?;
let _: () = conn
.zrem(zset_key, &key)
.await
.map_err(|e| Error::other(format!("redis ZREM error: {e}")))?;
let _: () = conn
.del(&key)
.await
.map_err(|e| Error::other(format!("redis DEL error: {e}")))?;
Ok(())
}
}