object_transfer 1.2.3

An object transfer library for various message broker and/or protocols
Documentation
use ::std::sync::Arc;

use futures::StreamExt;
use serde::{Deserialize, Serialize};

use crate::nats::{SubFetcher, SubFetcherOpt};
use crate::options::SubOpt;
use crate::{Format, Pub, PubTrait, Sub, SubTrait, UnSubTrait};
use async_nats::jetstream::{
  consumer::pull::Config as PullConfig, stream::Config as StreamConfig,
};

#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct MyObj {
  field: String,
}

async fn setup(format: Format) -> Option<(Pub<MyObj>, Sub<MyObj>)> {
  let client = async_nats::connect_with_options(
    "127.0.0.1:4222",
    async_nats::ConnectOptions::default()
      .retry_on_initial_connect()
      .max_reconnects(5),
  )
  .await
  .unwrap();
  let js = Arc::new(async_nats::jetstream::new(client));
  let name: Arc<str> =
    Arc::from(format!("object_transfer_{}", format.to_string()).as_str());
  let publisher = Pub::new(js.clone(), name.to_string(), format);
  let ack_option = SubFetcherOpt::new(name.clone())
    .stream_config(StreamConfig {
      name: name.to_string(),
      subjects: vec![name.to_string()],
      ..Default::default()
    })
    .pull_config(PullConfig {
      durable_name: Some(name.to_string()),
      ..Default::default()
    });
  let sub_option = SubOpt::new().format(format);
  let subfetcher = Arc::new(SubFetcher::new(js, ack_option).await.unwrap());
  let reader = Sub::new(subfetcher.clone(), subfetcher, sub_option);
  Some((publisher, reader))
}

async fn roundtrip(format: Format) {
  if let Some((publisher, reader)) = setup(format).await {
    let obj = MyObj {
      field: "value".into(),
    };
    let sub = ::tokio::spawn(async move {
      let mut subscriber = reader.subscribe().await.unwrap();
      let (obj, _) = subscriber.next().await.unwrap().unwrap();
      reader.unsubscribe().await.unwrap();
      obj
    });
    publisher.publish(&obj).await.unwrap();
    let recv = sub.await.unwrap();
    assert_eq!(obj, recv);
  } else {
    panic!("NATS server not available!");
  }
}

#[tokio::test]
async fn test_messagepack() {
  roundtrip(Format::MessagePack).await;
}

#[tokio::test]
async fn test_json() {
  roundtrip(Format::JSON).await;
}