use futures::stream::StreamExt;
use futures::SinkExt;
use redis::AsyncCommands;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::core::error2::Error;
use crate::core::error2::Result;
#[derive(Clone)]
pub struct RuidsClient {
pub standalone_client: Option<redis::Client>,
pub cluster_client: Option<redis::cluster::ClusterClient>,
pub sentinel_client: Option<Arc<Mutex<redis::sentinel::SentinelClient>>>,
}
impl RuidsClient {
async fn get_standalone_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
if let Some(client) = &self.standalone_client {
let conn = client
.get_multiplexed_async_connection()
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(conn)
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support STANDALONE_CLIENT"
)))
}
}
async fn get_cluster_connection(&self) -> Result<redis::cluster_async::ClusterConnection> {
if let Some(client) = &self.cluster_client {
let conn = client
.get_async_connection()
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(conn)
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLUSTER_CLIENT"
)))
}
}
pub async fn get_sentinel_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
if let Some(sentinel_client) = &self.sentinel_client {
let conn = sentinel_client
.lock()
.await
.get_async_connection()
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(conn)
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support SENTINEL_CLIENT"
)))
}
}
pub async fn ping(&self) -> Result<String> {
if self.standalone_client.is_some() {
match self.get_standalone_connection().await {
Ok(mut conn) => {
let s: String = redis::cmd("PING")
.query_async(&mut conn as &mut redis::aio::MultiplexedConnection)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(s)
}
Err(e) => Err(Error::UnexpectedError(anyhow::anyhow!(e.to_string()))),
}
} else if self.cluster_client.is_some() {
match self.get_cluster_connection().await {
Ok(mut conn) => {
let s: String = redis::cmd("PING")
.query_async(&mut conn as &mut redis::cluster_async::ClusterConnection)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(s)
}
Err(e) => Err(Error::UnexpectedError(anyhow::anyhow!(e.to_string()))),
}
} else if self.sentinel_client.is_some() {
match self.get_sentinel_connection().await {
Ok(mut conn) => {
let s: String = redis::cmd("PING")
.query_async(&mut conn as &mut redis::aio::MultiplexedConnection)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(s)
}
Err(e) => Err(Error::UnexpectedError(anyhow::anyhow!(e.to_string()))),
}
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLIENT"
)))
}
}
pub async fn set(&self, key: &str, value: &str, ttl_sec: u64) -> Result<()> {
if self.standalone_client.is_some() {
self.get_standalone_connection()
.await?
.set_ex(key, value, ttl_sec)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else if self.cluster_client.is_some() {
self.get_cluster_connection()
.await?
.set_ex(key, value, ttl_sec)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else if self.sentinel_client.is_some() {
self.get_sentinel_connection()
.await?
.set_ex(key, value, ttl_sec)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLIENT"
)))
}
}
pub async fn get(&self, key: &str) -> Result<Option<String>> {
if self.standalone_client.is_some() {
match self.get_standalone_connection().await?.get(key).await {
Ok(result) => Ok(Some(result)),
Err(e) => {
log::error!("rudis-get: key={}, error={:?}", key, e);
Ok(None)
}
}
} else if self.cluster_client.is_some() {
match self.get_cluster_connection().await?.get(key).await {
Ok(result) => Ok(Some(result)),
Err(e) => {
log::error!("rudis-get: key={}, error={:?}", key, e);
Ok(None)
}
}
} else if self.sentinel_client.is_some() {
match self.get_sentinel_connection().await?.get(key).await {
Ok(result) => Ok(Some(result)),
Err(e) => {
log::error!("rudis-get: key={}, error={:?}", key, e);
Ok(None)
}
}
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLIENT"
)))
}
}
pub async fn ttl(&self, key: &str) -> Result<i32> {
if self.standalone_client.is_some() {
self.get_standalone_connection()
.await?
.ttl(key)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else if self.cluster_client.is_some() {
self.get_cluster_connection()
.await?
.ttl(key)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else if self.sentinel_client.is_some() {
self.get_sentinel_connection()
.await?
.ttl(key)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLIENT"
)))
}
}
pub async fn exists(&self, key: &str) -> Result<bool> {
if self.standalone_client.is_some() {
self.get_standalone_connection()
.await?
.exists(key)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else if self.cluster_client.is_some() {
self.get_cluster_connection()
.await?
.exists(key)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else if self.sentinel_client.is_some() {
self.get_sentinel_connection()
.await?
.exists(key)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLIENT"
)))
}
}
pub async fn publishs(&self, topic: &str, data: &str) -> Result<bool> {
if self.standalone_client.is_some() {
let _: () = self
.get_standalone_connection()
.await?
.publish(topic, data)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(true)
} else if self.cluster_client.is_some() {
let _: () = self
.get_cluster_connection()
.await?
.publish(topic, data)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(true)
} else if self.sentinel_client.is_some() {
let _: () = self
.get_sentinel_connection()
.await?
.publish(topic, data)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(true)
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLIENT"
)))
}
}
pub async fn subscribe(
&self,
topic: &str,
) -> Result<futures::channel::mpsc::UnboundedReceiver<String>> {
if let Some(client) = &self.standalone_client {
let mut pubsub_conn = client
.get_async_pubsub()
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
pubsub_conn
.subscribe(topic)
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();
tokio::task::spawn_local(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
loop {
tokio::select! {
_ = interval.tick() => {
while let Some(msg) = pubsub_conn.on_message().next().await {
let payload: String = msg.get_payload().expect("Can't get payload of message");
tx.send(payload)
.await
.expect("Can't send a message to the stream");
}
}
_ = tokio::signal::ctrl_c() => {
log::info!("tokio::signal::ctrl_c() - 9");
break;
}
}
}
});
Ok(rx)
} else {
Err(Error::UnexpectedError(anyhow::anyhow!(
"Not support CLUSTER_CLIENT"
)))
}
}
}