use crate::backend::rdb::redis::RedisConnection;
use crate::error::Result;
use futures::Stream;
#[cfg(feature = "cluster")]
use redis::cluster::ClusterClient;
#[cfg(feature = "sentinel")]
use redis::sentinel::SentinelClient;
use redis::Client;
#[cfg(feature = "cluster")]
pub struct ClusterPubSubConnection {
receiver: tokio::sync::mpsc::UnboundedReceiver<redis::PushInfo>,
connection: redis::cluster_async::ClusterConnection,
}
#[cfg(feature = "cluster")]
impl ClusterPubSubConnection {
pub fn from_receiver(
receiver: tokio::sync::mpsc::UnboundedReceiver<redis::PushInfo>,
connection: redis::cluster_async::ClusterConnection,
) -> Self {
Self {
receiver,
connection,
}
}
pub fn into_on_message(self) -> std::pin::Pin<Box<dyn Stream<Item = redis::Msg> + Send>> {
use futures::StreamExt;
let mut receiver = self.receiver;
let stream = futures_util::stream::poll_fn(move |cx| match receiver.poll_recv(cx) {
std::task::Poll::Ready(Some(push_info)) => std::task::Poll::Ready(Some(push_info)),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
})
.filter_map(|push_info| {
async move {
if push_info.kind == redis::PushKind::Message || push_info.kind == redis::PushKind::PMessage
{
if push_info.data.len() >= 2 {
let msg_value = if push_info.kind == redis::PushKind::PMessage {
redis::Value::Array(vec![
redis::Value::BulkString(b"pmessage".to_vec()),
push_info.data[0].clone(),
push_info.data[1].clone(),
push_info.data[2].clone(),
])
} else {
redis::Value::Array(vec![
redis::Value::BulkString(b"message".to_vec()),
push_info.data[0].clone(),
push_info.data[1].clone(),
])
};
redis::Msg::from_value(&msg_value)
} else {
None
}
} else {
None
}
}
});
Box::pin(stream)
}
}
pub enum RedisPubSub {
Single(redis::aio::PubSub),
#[cfg(feature = "cluster")]
Cluster(ClusterPubSubConnection),
#[cfg(feature = "sentinel")]
Sentinel(redis::aio::PubSub),
}
impl RedisPubSub {
pub async fn subscribe<T: redis::ToRedisArgs>(&mut self, channel: T) -> Result<()> {
match self {
RedisPubSub::Single(pubsub) => {
pubsub.subscribe(channel).await?;
Ok(())
}
#[cfg(feature = "cluster")]
RedisPubSub::Cluster(cluster_pubsub) => {
cluster_pubsub.connection.subscribe(channel).await?;
Ok(())
}
#[cfg(feature = "sentinel")]
RedisPubSub::Sentinel(pubsub) => {
pubsub.subscribe(channel).await?;
Ok(())
}
}
}
pub fn into_on_message(self) -> Box<dyn Stream<Item = redis::Msg> + Unpin + Send> {
match self {
RedisPubSub::Single(pubsub) => Box::new(pubsub.into_on_message()),
#[cfg(feature = "cluster")]
RedisPubSub::Cluster(pubsub) => Box::new(pubsub.into_on_message()),
#[cfg(feature = "sentinel")]
RedisPubSub::Sentinel(pubsub) => Box::new(pubsub.into_on_message()),
}
}
}
#[derive(Clone)]
pub enum RedisClient {
Single(Client),
#[cfg(feature = "cluster")]
Cluster {
client: ClusterClient,
push_receiver: std::sync::Arc<
tokio::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<redis::PushInfo>>>,
>,
},
#[cfg(feature = "sentinel")]
Sentinel {
client: std::sync::Arc<tokio::sync::Mutex<SentinelClient>>,
},
}
impl RedisClient {
pub async fn get_connection(&self) -> Result<RedisConnection> {
match self {
RedisClient::Single(s) => {
let conn = s.get_multiplexed_async_connection().await?;
Ok(RedisConnection::Single(conn))
}
#[cfg(feature = "cluster")]
RedisClient::Cluster { client, .. } => {
let conn = client.get_async_connection().await?;
Ok(RedisConnection::Cluster(conn))
}
#[cfg(feature = "sentinel")]
RedisClient::Sentinel { client, .. } => {
let mut guard = client.lock().await;
let conn = guard.get_async_connection().await?;
Ok(RedisConnection::Single(conn))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "cluster")]
#[test]
fn test_cluster_pubsub_connection_creation() {
fn _type_check() {
fn _assert_has_subscribe<
T: Fn(
&mut ClusterPubSubConnection,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + '_>>,
>(
_: T,
) {
}
fn _assert_has_into_on_message<
T: Fn(ClusterPubSubConnection) -> std::pin::Pin<Box<dyn Stream<Item = redis::Msg> + Send>>,
>(
_: T,
) {
}
}
}
#[test]
fn test_redis_pubsub_enum() {
fn _check_variants() {
fn _match_check(pubsub: RedisPubSub) {
match pubsub {
RedisPubSub::Single(_) => {}
#[cfg(feature = "cluster")]
RedisPubSub::Cluster(_) => {}
#[cfg(feature = "sentinel")]
RedisPubSub::Sentinel(_) => {}
}
}
}
}
}