object_transfer 2.0.0

An object transfer library for various message broker and/or protocols
Documentation
use ::std::sync::Arc;
use ::std::time::{SystemTime, UNIX_EPOCH};

use futures::StreamExt;
use serde::{de::Error as DeErr, ser::Error as SeErr};

use crate::options::SubOpt;
use crate::tests::entity::TestEntity;
use crate::{Pub, PubTrait, Sub, SubTrait, UnSubTrait};

use super::{Publisher, PublisherConfig, Subscriber, SubscriberConfig};

use crate::encoders::{
  Decoder as IDecoder, Encoder as IEncoder, JSONDecoder, JSONEncoder,
  MessagePackDecoder, MessagePackEncoder,
};

type EncoderType<E> =
  Arc<dyn IEncoder<Item = TestEntity, Error = E> + Send + Sync>;
type DecoderType<E> =
  Arc<dyn IDecoder<Item = TestEntity, Error = E> + Send + Sync>;

fn unique_stream_name(name: &str) -> String {
  let now = SystemTime::now()
    .duration_since(UNIX_EPOCH)
    .unwrap_or_default()
    .as_millis();
  format!("object_transfer_redis_{}_{}", name, now)
}

async fn setup<DE, SE>(
  name: &str,
  encoder: EncoderType<SE>,
  decoder: DecoderType<DE>,
) -> Option<(Pub<TestEntity, SE>, Sub<TestEntity, DE>)>
where
  DE: DeErr + Send + Sync,
  SE: SeErr + Send + Sync,
{
  let client = redis::Client::open("redis://127.0.0.1:6379/").ok()?;
  let pub_con = client.get_multiplexed_async_connection().await.ok()?;
  let sub_con = client.get_multiplexed_async_connection().await.ok()?;
  let stream_name = unique_stream_name(name);
  let publisher_group = format!("{}_publisher", stream_name);
  let subscriber_group = format!("{}_subscriber", stream_name);
  let subscriber_consumer = format!("{}_consumer", stream_name);

  let publisher = Publisher::new(
    &pub_con,
    PublisherConfig::new().group_name(publisher_group),
  );
  let subscriber = Arc::new(Subscriber::new(
    &sub_con,
    SubscriberConfig::new(stream_name.clone())
      .group_name(subscriber_group)
      .consumer_name(subscriber_consumer)
      .num_fetch(1)
      .block_time(500),
  ));

  let options = SubOpt::new();
  let pub_typed = Pub::new(Arc::new(publisher), stream_name, encoder);
  let sub_typed =
    Sub::new(subscriber.clone(), subscriber.clone(), decoder, options);
  Some((pub_typed, sub_typed))
}

async fn roundtrip<DE, SE>(
  name: &str,
  encoder: EncoderType<SE>,
  decoder: DecoderType<DE>,
) where
  DE: DeErr + Send + Sync + 'static,
  SE: SeErr + Send + Sync,
{
  if let Some((publisher, reader)) = setup(name, encoder, decoder).await {
    let obj = TestEntity {
      id: 1,
      name: "test".into(),
    };
    let sub = ::tokio::spawn(async move {
      let mut subscriber = reader.subscribe().await.unwrap();
      let (obj, ack) = subscriber.next().await.unwrap().unwrap();
      ack.ack().await.unwrap();
      reader.unsubscribe().await.unwrap();
      obj
    });
    publisher.publish(&obj).await.unwrap();
    let recv = sub.await.unwrap();
    assert_eq!(obj, recv);
  } else {
    panic!("Redis server not available!");
  }
}

#[tokio::test]
async fn test_messagepack() {
  let encoder = Arc::new(MessagePackEncoder::<TestEntity>::new());
  let decoder = Arc::new(MessagePackDecoder::<TestEntity>::new());
  roundtrip("messagepack", encoder, decoder).await;
}

#[tokio::test]
async fn test_json() {
  let encoder = Arc::new(JSONEncoder::<TestEntity>::new());
  let decoder = Arc::new(JSONDecoder::<TestEntity>::new());
  roundtrip("json", encoder, decoder).await;
}