use ::async_trait::async_trait;
use ::bytes::Bytes;
use ::futures::TryFutureExt;
use ::redis::{
AsyncTypedCommands, aio::MultiplexedConnection, streams::StreamMaxlen,
};
use super::super::traits::PubBrokerTrait;
use crate::errors::BrokerError;
use super::PublisherConfig;
use super::errors::PublishError;
use super::group_make::make_stream_group;
#[derive(Clone)]
pub struct Publisher {
con: MultiplexedConnection,
cfg: PublisherConfig,
}
impl Publisher {
pub fn new(con: &MultiplexedConnection, cfg: PublisherConfig) -> Self {
Self {
con: con.clone(),
cfg,
}
}
}
#[async_trait]
impl PubBrokerTrait for Publisher {
async fn publish(
&self,
topic: &str,
payload: Bytes,
) -> Result<(), BrokerError> {
let group_name = self.cfg.group_name.clone().unwrap_or(topic.to_string());
let mut con = self.con.clone();
make_stream_group(self.con.clone(), topic, group_name)
.map_err(|err| PublishError::GroupCreation(err))
.await?;
con
.xadd_maxlen(
topic,
StreamMaxlen::Approx(self.cfg.stream_length),
"*",
&[("data", payload.to_vec())],
)
.map_err(|err| PublishError::Push(err))
.await?;
Ok(())
}
}