object_transfer 2.0.0

An object transfer library for various message broker and/or protocols
Documentation
use ::std::sync::Arc;

use ::async_trait::async_trait;
use ::bytes::Bytes;
use ::futures::stream::{BoxStream, StreamExt, iter};
use ::serde::de::DeserializeOwned;

use crate::brokers::SubBrokerTrait;
use crate::errors::{BrokerError, SubError};
use crate::traits::{AckTrait, SubTrait};

use super::error::MockDeErr;

pub struct SubscribeMock<Entity> {
  data: Vec<(Entity, Arc<dyn AckTrait + Send + Sync>)>,
}

impl<Entity> SubscribeMock<Entity> {
  pub fn new(data: Vec<(Entity, Arc<dyn AckTrait + Send + Sync>)>) -> Self {
    Self { data: data }
  }
}

#[async_trait]
impl<Entity> SubTrait for SubscribeMock<Entity>
where
  Entity: DeserializeOwned + Clone + Send + Sync,
{
  type Item = Entity;
  type DecodeErr = MockDeErr;

  async fn subscribe(
    &self,
  ) -> Result<
    BoxStream<
      Result<
        (Self::Item, Arc<dyn AckTrait + Send + Sync>),
        SubError<Self::DecodeErr>,
      >,
    >,
    SubError<Self::DecodeErr>,
  > {
    Ok(iter(self.data.clone()).map(|item| Ok(item)).boxed())
  }
}

#[async_trait]
impl SubBrokerTrait for SubscribeMock<Bytes> {
  async fn subscribe(
    &self,
  ) -> Result<
    BoxStream<Result<(Bytes, Arc<dyn AckTrait + Send + Sync>), BrokerError>>,
    BrokerError,
  > {
    Ok(iter(self.data.clone()).map(|item| Ok(item)).boxed())
  }
}