use std::marker::PhantomData;
use std::sync::Arc;
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{TryFutureExt, TryStreamExt};
use serde::de::{DeserializeOwned, Error as DeErr};
use crate::brokers::SubBrokerTrait;
use crate::encoders::Decoder;
use crate::errors::{DecodeError, SubError, UnSubError};
use crate::options::SubOpt;
use crate::traits::{AckTrait, SubTrait, UnSubTrait};
pub struct Sub<T, DecodeErrorType: DeErr + Send + Sync> {
ctx: Arc<dyn SubBrokerTrait + Send + Sync>,
unsub: Arc<dyn UnSubTrait + Send + Sync>,
decoder: Arc<dyn Decoder<Item = T, Error = DecodeErrorType> + Send + Sync>,
options: SubOpt,
_marker: PhantomData<T>,
}
impl<T, DecodeErrorType> Sub<T, DecodeErrorType>
where
T: DeserializeOwned + Send + Sync,
DecodeErrorType: DeErr + Send + Sync,
{
pub fn new(
ctx: Arc<dyn SubBrokerTrait + Send + Sync>,
unsub: Arc<dyn UnSubTrait + Send + Sync>,
decoder: Arc<dyn Decoder<Item = T, Error = DecodeErrorType> + Send + Sync>,
options: SubOpt,
) -> Self {
Self {
ctx,
unsub,
decoder,
options,
_marker: PhantomData,
}
}
}
#[async_trait]
impl<T, DecodeErrorType> SubTrait for Sub<T, DecodeErrorType>
where
T: DeserializeOwned + Send + Sync,
DecodeErrorType: DeErr + Send + Sync,
{
type Item = T;
type DecodeErr = DecodeErrorType;
async fn subscribe(
&self,
) -> Result<
BoxStream<
Result<
(Self::Item, Arc<dyn AckTrait + Send + Sync>),
SubError<Self::DecodeErr>,
>,
>,
SubError<Self::DecodeErr>,
> {
let messages = self.ctx.subscribe().await?.map_err(SubError::from);
let stream = messages.and_then(async move |(msg, acker)| {
let data = self
.decoder
.decode(msg)
.map_err(|e| SubError::from(DecodeError::new(e)))?;
if self.options.auto_ack {
acker.ack().map_err(|e| SubError::AckError(e)).await?;
}
Ok((data, acker))
});
Ok(Box::pin(stream))
}
}
#[async_trait]
impl<T, DecodeErrorType> UnSubTrait for Sub<T, DecodeErrorType>
where
T: DeserializeOwned + Send + Sync,
DecodeErrorType: DeErr + Send + Sync,
{
async fn unsubscribe(&self) -> Result<(), UnSubError> {
self.unsub.unsubscribe().await
}
}
#[cfg(test)]
mod test {
use ::bytes::Bytes;
use ::futures::stream::StreamExt;
use ::serde_json::{from_slice as parse, to_vec as jsonify};
use crate::UnSubNoop;
use crate::encoders::MockDecoder;
use crate::errors::AckError;
use crate::tests::{
entity::TestEntity, error::MockDeErr, subscribe::SubscribeMock,
};
use crate::traits::MockAckTrait;
use super::*;
async fn test_subscribe(auto_ack: bool) {
let entities = vec![
TestEntity::new(1, "Test1"),
TestEntity::new(2, "Test2"),
TestEntity::new(3, "Test3"),
];
let data: Vec<(Bytes, Arc<dyn AckTrait + Send + Sync>)> = entities
.iter()
.map(|e| {
let mut ack_mock = MockAckTrait::new();
if auto_ack {
ack_mock.expect_ack().returning(|| Ok(())).once();
} else {
ack_mock.expect_ack().never();
}
return (
Bytes::from(jsonify(e).unwrap()),
Arc::new(ack_mock) as Arc<dyn AckTrait + Send + Sync>,
);
})
.collect();
let mut decoder = MockDecoder::new();
decoder
.expect_decode()
.times(entities.len())
.returning(|bytes| {
let entity: TestEntity = parse(&bytes).unwrap();
Ok(entity)
});
let ctx: Arc<dyn SubBrokerTrait + Send + Sync> =
Arc::new(SubscribeMock::new(data));
let options = SubOpt::new().auto_ack(auto_ack);
let subscribe: Sub<TestEntity, _> = Sub::new(
ctx,
Arc::new(UnSubNoop::new(false)),
Arc::new(decoder),
options,
);
let stream = subscribe.subscribe().await.unwrap();
let obtained: Vec<TestEntity> = stream
.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|(entity, _ack)| entity)
.collect();
assert_eq!(obtained, entities);
}
#[tokio::test]
async fn test_subscribe_ack() {
test_subscribe(true).await;
}
#[tokio::test]
async fn test_subscribe_no_auto_ack() {
test_subscribe(false).await;
}
#[tokio::test]
async fn test_ack_err() {
let mut data: Vec<(Bytes, Arc<dyn AckTrait + Send + Sync>)> = Vec::new();
data.push((Bytes::new(), {
let mut ack_mock = MockAckTrait::new();
ack_mock
.expect_ack()
.returning(|| Err(AckError::ErrorTest))
.once();
Arc::new(ack_mock)
}));
let ctx: Arc<dyn SubBrokerTrait + Send + Sync> =
Arc::new(SubscribeMock::new(data));
let mut decoder = MockDecoder::new();
decoder.expect_decode().once().returning(|_| {
let entity = TestEntity::new(0, "Test");
Ok(entity)
});
let options = SubOpt::new().auto_ack(true);
let subscribe: Sub<TestEntity, _> = Sub::new(
ctx,
Arc::new(UnSubNoop::new(false)),
Arc::new(decoder),
options,
);
let stream = subscribe.subscribe().await.unwrap();
let obtained: Vec<String> = stream
.collect::<Vec<_>>()
.await
.iter()
.filter_map(|res| res.as_ref().map_err(|err| err.to_string()).err())
.collect();
assert_eq!(
obtained,
vec![SubError::<MockDeErr>::AckError(AckError::ErrorTest).to_string()]
);
}
}