zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::pin::Pin;

use futures::stream::StreamExt;
use futures::SinkExt;

use redis::aio::AsyncStream;

#[allow(deprecated)]
use redis::aio::Connection;

use redis::ControlFlow;
use redis::FromRedisValue;
use redis::PubSubCommands;

use crate::core::errors::Errors;

type StandaloneCon = redis::Client;

#[derive(Clone)]
pub struct ReidsPubsubStandaloneAsync {
    pub client: StandaloneCon,
}

impl ReidsPubsubStandaloneAsync {
    #[allow(deprecated)]
    pub async fn get_connection(
        &self,
    ) -> Result<Connection<Pin<Box<dyn AsyncStream + Send + Sync>>>, Errors> {
        let con = self
            .client
            .get_async_connection()
            .await
            .map_err(|e| Errors::UnexpectedError(anyhow::anyhow!(e)))?;

        Ok(con)
    }

    pub async fn subscribe(
        &self,
        topic: &str,
    ) -> Result<futures::channel::mpsc::UnboundedReceiver<String>, Errors> {
        let mut pubsub_con = self.get_connection().await?.into_pubsub();

        pubsub_con
            .subscribe(vec![topic.to_string()])
            .await
            .map_err(|e| Errors::UnexpectedError(anyhow::anyhow!(e)))?;

        let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();

        tokio::spawn(async move {
            while let Some(msg) = pubsub_con.on_message().next().await {
                let payload = msg.get_payload().expect("Can't get payload of message");

                let payload: String = FromRedisValue::from_redis_value(&payload)
                    .expect("Can't convert from Redis value");

                if payload.starts_with('"') && payload.ends_with('"') {
                    tx.send(payload[1..payload.len() - 1].to_string())
                        .await
                        .expect("Can't send a message to the stream");
                } else {
                    tx.send(payload)
                        .await
                        .expect("Can't send a message to the stream");
                }
            }
        });

        Ok(rx)
    }

    pub async fn subscribe2(&self, channel: &str) -> Result<(), Errors> {
        let mut con = self.client.get_connection().unwrap();

        let _: () = con
            .subscribe(&[channel], |msg| {
                let msg: String = msg.get_payload().unwrap();
                log::info!("redis-subscribe2: msg={}", msg);
                ControlFlow::Continue
            })
            .unwrap();

        Ok(())
    }

    pub fn subscribe3(&self, channel: &str, f: &dyn Fn(String, String)) {
        let mut con = self.client.get_connection().unwrap();

        let _: () = con
            .subscribe(&[channel], |msg| {
                let msg: String = msg.get_payload().unwrap();
                (f)(channel.to_string(), msg);
                ControlFlow::Continue
            })
            .unwrap();
    }
}