use ::std::sync::Arc;
use ::async_nats::jetstream::Context;
use ::async_nats::jetstream::consumer::{
PullConsumer as PullCons, PushConsumer as PushCons,
};
use ::async_trait::async_trait;
use ::bytes::Bytes;
use ::futures::stream::BoxStream;
use ::futures::{StreamExt, TryFutureExt, TryStreamExt};
use ::std::boxed::Box;
use super::super::traits::{PubBrokerTrait, SubBrokerTrait};
use crate::errors::BrokerError;
use crate::traits::AckTrait;
#[async_trait]
impl PubBrokerTrait for Context {
async fn publish(
&self,
topic: &str,
payload: Bytes,
) -> Result<(), BrokerError> {
self
.publish(topic.to_string(), payload)
.map_err(|e| BrokerError::from(e))
.await?
.await
.map_err(|e| BrokerError::from(e))?;
Ok(())
}
}
macro_rules! impl_sub_ctx_trait {
($cls_name: ty) => {
#[async_trait]
impl SubBrokerTrait for $cls_name {
async fn subscribe(
&self,
) -> Result<
BoxStream<
Result<(Bytes, Arc<dyn AckTrait + Send + Sync>), BrokerError>,
>,
BrokerError,
> {
let messages = self
.messages()
.map_err(|e| BrokerError::from(e))
.await?
.map_err(|e| BrokerError::from(e))
.and_then(async |msg| {
let (msg, acker) = msg.split();
Ok((
msg.payload.clone(),
Arc::new(acker) as Arc<dyn AckTrait + Send + Sync>,
))
});
Ok(messages.boxed())
}
}
};
}
impl_sub_ctx_trait!(PullCons);
impl_sub_ctx_trait!(PushCons);