zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use futures::stream::StreamExt;
use futures::SinkExt;
use redis::AsyncCommands;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::core::error2::Error;
use crate::core::error2::Result;

#[derive(Clone)]
pub struct RuidsClient {
    pub standalone_client: Option<redis::Client>,
    pub cluster_client: Option<redis::cluster::ClusterClient>,
    pub sentinel_client: Option<Arc<Mutex<redis::sentinel::SentinelClient>>>,
}

impl RuidsClient {
    async fn get_standalone_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
        if let Some(client) = &self.standalone_client {
            let conn = client
                .get_multiplexed_async_connection()
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            Ok(conn)
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support STANDALONE_CLIENT"
            )))
        }
    }

    async fn get_cluster_connection(&self) -> Result<redis::cluster_async::ClusterConnection> {
        if let Some(client) = &self.cluster_client {
            let conn = client
                .get_async_connection()
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            Ok(conn)
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLUSTER_CLIENT"
            )))
        }
    }

    pub async fn get_sentinel_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
        if let Some(sentinel_client) = &self.sentinel_client {
            let conn = sentinel_client
                .lock()
                .await
                .get_async_connection()
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            Ok(conn)
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support SENTINEL_CLIENT"
            )))
        }
    }

    pub async fn ping(&self) -> Result<String> {
        if self.standalone_client.is_some() {
            match self.get_standalone_connection().await {
                Ok(mut conn) => {
                    let s: String = redis::cmd("PING")
                        .query_async(&mut conn as &mut redis::aio::MultiplexedConnection)
                        .await
                        .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

                    Ok(s)
                }
                Err(e) => Err(Error::UnexpectedError(anyhow::anyhow!(e.to_string()))),
            }
        } else if self.cluster_client.is_some() {
            match self.get_cluster_connection().await {
                Ok(mut conn) => {
                    let s: String = redis::cmd("PING")
                        .query_async(&mut conn as &mut redis::cluster_async::ClusterConnection)
                        .await
                        .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

                    Ok(s)
                }
                Err(e) => Err(Error::UnexpectedError(anyhow::anyhow!(e.to_string()))),
            }
        } else if self.sentinel_client.is_some() {
            match self.get_sentinel_connection().await {
                Ok(mut conn) => {
                    let s: String = redis::cmd("PING")
                        .query_async(&mut conn as &mut redis::aio::MultiplexedConnection)
                        .await
                        .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

                    Ok(s)
                }
                Err(e) => Err(Error::UnexpectedError(anyhow::anyhow!(e.to_string()))),
            }
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLIENT"
            )))
        }
    }

    pub async fn set(&self, key: &str, value: &str, ttl_sec: u64) -> Result<()> {
        if self.standalone_client.is_some() {
            self.get_standalone_connection()
                .await?
                .set_ex(key, value, ttl_sec)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else if self.cluster_client.is_some() {
            self.get_cluster_connection()
                .await?
                .set_ex(key, value, ttl_sec)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else if self.sentinel_client.is_some() {
            self.get_sentinel_connection()
                .await?
                .set_ex(key, value, ttl_sec)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLIENT"
            )))
        }
    }

    pub async fn get(&self, key: &str) -> Result<Option<String>> {
        if self.standalone_client.is_some() {
            match self.get_standalone_connection().await?.get(key).await {
                Ok(result) => Ok(Some(result)),
                Err(e) => {
                    log::error!("rudis-get: key={}, error={:?}", key, e);
                    Ok(None)
                }
            }
        } else if self.cluster_client.is_some() {
            match self.get_cluster_connection().await?.get(key).await {
                Ok(result) => Ok(Some(result)),
                Err(e) => {
                    log::error!("rudis-get: key={}, error={:?}", key, e);
                    Ok(None)
                }
            }
        } else if self.sentinel_client.is_some() {
            match self.get_sentinel_connection().await?.get(key).await {
                Ok(result) => Ok(Some(result)),
                Err(e) => {
                    log::error!("rudis-get: key={}, error={:?}", key, e);
                    Ok(None)
                }
            }
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLIENT"
            )))
        }
    }

    pub async fn ttl(&self, key: &str) -> Result<i32> {
        if self.standalone_client.is_some() {
            self.get_standalone_connection()
                .await?
                .ttl(key)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else if self.cluster_client.is_some() {
            self.get_cluster_connection()
                .await?
                .ttl(key)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else if self.sentinel_client.is_some() {
            self.get_sentinel_connection()
                .await?
                .ttl(key)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLIENT"
            )))
        }
    }

    pub async fn exists(&self, key: &str) -> Result<bool> {
        if self.standalone_client.is_some() {
            self.get_standalone_connection()
                .await?
                .exists(key)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else if self.cluster_client.is_some() {
            self.get_cluster_connection()
                .await?
                .exists(key)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else if self.sentinel_client.is_some() {
            self.get_sentinel_connection()
                .await?
                .exists(key)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLIENT"
            )))
        }
    }

    pub async fn publishs(&self, topic: &str, data: &str) -> Result<bool> {
        if self.standalone_client.is_some() {
            let _: () = self
                .get_standalone_connection()
                .await?
                .publish(topic, data)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            Ok(true)
        } else if self.cluster_client.is_some() {
            let _: () = self
                .get_cluster_connection()
                .await?
                .publish(topic, data)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            Ok(true)
        } else if self.sentinel_client.is_some() {
            let _: () = self
                .get_sentinel_connection()
                .await?
                .publish(topic, data)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            Ok(true)
        } else {
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLIENT"
            )))
        }
    }

    pub async fn subscribe(
        &self,
        topic: &str,
    ) -> Result<futures::channel::mpsc::UnboundedReceiver<String>> {
        if let Some(client) = &self.standalone_client {
            let mut pubsub_conn = client
                .get_async_pubsub()
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            pubsub_conn
                .subscribe(topic)
                .await
                .map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;

            let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();

            tokio::task::spawn_local(async move {
                let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));

                loop {
                    tokio::select! {
                        _ = interval.tick() => {
                            while let Some(msg) = pubsub_conn.on_message().next().await {
                                let payload: String = msg.get_payload().expect("Can't get payload of message");

                                tx.send(payload)
                                    .await
                                    .expect("Can't send a message to the stream");
                            }
                        }
                        // 监听退出信号
                        _ = tokio::signal::ctrl_c() => {
                            log::info!("tokio::signal::ctrl_c() - 9");
                            break;
                        }
                    }
                }
            });

            Ok(rx)
        } else {
            // should use nats-server
            // https://docs.nats.io/running-a-nats-service/nats_docker
            Err(Error::UnexpectedError(anyhow::anyhow!(
                "Not support CLUSTER_CLIENT"
            )))
        }
    }
}