use std::pin::Pin;
use futures::stream::StreamExt;
use futures::SinkExt;
use redis::aio::AsyncStream;
#[allow(deprecated)]
use redis::aio::Connection;
use redis::ControlFlow;
use redis::FromRedisValue;
use redis::PubSubCommands;
use crate::core::errors::Errors;
type StandaloneCon = redis::Client;
#[derive(Clone)]
pub struct ReidsPubsubStandaloneAsync {
pub client: StandaloneCon,
}
impl ReidsPubsubStandaloneAsync {
#[allow(deprecated)]
pub async fn get_connection(
&self,
) -> Result<Connection<Pin<Box<dyn AsyncStream + Send + Sync>>>, Errors> {
let con = self
.client
.get_async_connection()
.await
.map_err(|e| Errors::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(con)
}
pub async fn subscribe(
&self,
topic: &str,
) -> Result<futures::channel::mpsc::UnboundedReceiver<String>, Errors> {
let mut pubsub_con = self.get_connection().await?.into_pubsub();
pubsub_con
.subscribe(vec![topic.to_string()])
.await
.map_err(|e| Errors::UnexpectedError(anyhow::anyhow!(e)))?;
let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();
tokio::spawn(async move {
while let Some(msg) = pubsub_con.on_message().next().await {
let payload = msg.get_payload().expect("Can't get payload of message");
let payload: String = FromRedisValue::from_redis_value(&payload)
.expect("Can't convert from Redis value");
if payload.starts_with('"') && payload.ends_with('"') {
tx.send(payload[1..payload.len() - 1].to_string())
.await
.expect("Can't send a message to the stream");
} else {
tx.send(payload)
.await
.expect("Can't send a message to the stream");
}
}
});
Ok(rx)
}
pub async fn subscribe2(&self, channel: &str) -> Result<(), Errors> {
let mut con = self.client.get_connection().unwrap();
let _: () = con
.subscribe(&[channel], |msg| {
let msg: String = msg.get_payload().unwrap();
log::info!("redis-subscribe2: msg={}", msg);
ControlFlow::Continue
})
.unwrap();
Ok(())
}
pub fn subscribe3(&self, channel: &str, f: &dyn Fn(String, String)) {
let mut con = self.client.get_connection().unwrap();
let _: () = con
.subscribe(&[channel], |msg| {
let msg: String = msg.get_payload().unwrap();
(f)(channel.to_string(), msg);
ControlFlow::Continue
})
.unwrap();
}
}