use ::std::marker::PhantomData;
use ::std::sync::Arc;
use async_trait::async_trait;
use serde::Serialize;
use serde::ser::Error as EncErr;
use crate::brokers::PubBrokerTrait;
use crate::encoders::Encoder;
use crate::errors::{EncodeError, PubError};
use crate::traits::PubTrait;
pub struct Pub<T, SerErr: EncErr + Send + Sync> {
ctx: Arc<dyn PubBrokerTrait + Send + Sync>,
subject: String,
encoder: Arc<dyn Encoder<Item = T, Error = SerErr> + Send + Sync>,
_phantom: PhantomData<T>,
}
impl<T, SerErr> Pub<T, SerErr>
where
T: Serialize + Send + Sync,
SerErr: EncErr + Send + Sync,
{
pub fn new(
ctx: Arc<dyn PubBrokerTrait + Send + Sync>,
subject: impl Into<String>,
encoder: Arc<dyn Encoder<Item = T, Error = SerErr> + Send + Sync>,
) -> Self {
Self {
ctx,
subject: subject.into(),
encoder,
_phantom: PhantomData,
}
}
}
#[async_trait]
impl<T, SerErr> PubTrait for Pub<T, SerErr>
where
T: Serialize + Send + Sync,
SerErr: EncErr + Send + Sync,
{
type Item = T;
type EncodeErr = SerErr;
async fn publish(&self, obj: &T) -> Result<(), PubError<Self::EncodeErr>> {
let payload = self.encoder.encode(obj).map_err(|e| EncodeError::new(e))?;
self
.ctx
.publish(self.subject.as_str(), payload.into())
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use ::bytes::Bytes;
use ::mockall::predicate::*;
use crate::brokers::{errors::BrokerError, traits::MockPubBrokerTrait};
use crate::encoders::MockEncoder;
use crate::tests::entity::TestEntity;
use crate::tests::error::{MockBrokerErr, MockEncErr};
use super::*;
#[tokio::test]
async fn test_publish() {
let entity = TestEntity::new(1, "Test Name");
let subject = "test.subject";
let correct = Bytes::from("serialized bytes");
let mut ctx = MockPubBrokerTrait::new();
ctx
.expect_publish()
.with(eq(subject), eq(correct.clone()))
.times(1)
.returning(|_, _| Ok(()));
let mut encoder = MockEncoder::new();
encoder
.expect_encode()
.with(eq(entity.clone()))
.times(1)
.returning(move |_| Ok(correct.clone()));
let publisher: Pub<TestEntity, _> =
Pub::new(Arc::new(ctx), subject, Arc::new(encoder));
let res = publisher.publish(&entity).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_publish_error() {
let entity = TestEntity::new(1, "Test Name");
let subject = "test.subject.error";
let correct = Bytes::from("serialized bytes");
let mut ctx = MockPubBrokerTrait::new();
ctx
.expect_publish()
.with(eq(subject), eq(correct.clone()))
.times(1)
.returning(|_, _| Err(BrokerError::new(MockBrokerErr)));
let mut encoder = MockEncoder::new();
encoder
.expect_encode()
.with(eq(entity.clone()))
.times(1)
.returning(move |_| Ok(correct.clone()));
let publisher: Pub<TestEntity, _> =
Pub::new(Arc::new(ctx), subject, Arc::new(encoder));
let res = publisher.publish(&entity).await;
let err_msg = res.unwrap_err().to_string();
assert_eq!(
err_msg,
PubError::<MockEncErr>::BrokerError(BrokerError::new(MockBrokerErr))
.to_string()
);
}
}