object_transfer 2.0.0

An object transfer library for various message broker and/or protocols
Documentation

Object Transfer Library for Rust

CI/CD Status

Service Status
Crates.io Crates.io Version Img
Rust Test Test Rust Code Img

Description

This library provides a simple and efficient way to transfer objects between different parts of an application or between different applications through message brokers like NATS. It supports serialization and/or deserialization of various data formats, making it easy to send and/or receive complex data structures.

Installation

If you have cargo-edit installed, you can add this library to your project by running:

cargo add object_transfer

Alternatively, you can manually add the following line to your Cargo.toml file:

[dependencies]
object_transfer = "x.x.x"

where x.x.x is the desired version of the library. For the latst version, please check Crates.io.

Example of Use with JSON Format

use std::sync::Arc;
use futures::StreamExt;
use serde::{Serialize, Deserialize};
use object_transfer::{
  encoders::JSONEncoder,
  encoders::JSONDecoder,
  Pub, Sub,
  SubOpt,
  traits::{PubTrait, SubTrait},
};
use object_transfer::brokers::nats::{SubFetcherOpt, SubFetcher};

// Define a data structure that will be transmitted via message broker
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MyData {
  pub id: u32,
  pub name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  // Connect to NATS server
  let client = async_nats::connect("demo.nats.io").await?;
  let js = Arc::new(async_nats::jetstream::new(client));

  // Initialize publisher with JSON encoder
  let publisher: Pub<MyData, _> = Pub::new(
    js.clone(),
    "mydata.created",
    Arc::new(JSONEncoder::new()),
  );

  // Create and publish a sample event
  let event = MyData {
    id: 42,
    name: "Jane Doe".to_string(),
  };
  publisher.publish(&event).await?;

  // Configure subscription with acknowledgment settings
  let fetcher_opt = SubFetcherOpt::new(Arc::from("events"))
    .subjects(vec!["mydata.created"])
    .durable_name("user-created");

  // Initialize subscriber with JSON decoder
  let fetcher = Arc::new(SubFetcher::new(js.clone(), fetcher_opt).await?);
  let unsub = fetcher.clone();
  let options = SubOpt::new().auto_ack(false); // Require manual acknowledgment
  let subscriber: Sub<MyData, _> = Sub::new(fetcher, unsub, Arc::new(JSONDecoder::new()), options);

  // Subscribe to messages and wait for incoming data
  let mut stream = subscriber.subscribe().await?;

  // Process the received message with manual acknowledgment
  if let Some(Ok((event, ack))) = stream.next().await {
    println!("received {:?}", event);
    // Manually acknowledge the message since auto_ack is disabled
    ack.ack().await?;
  } else {
    println!("no event received");
  }

  Ok(())
}

Custom Serialization Formats

The library supports any serialization format by implementing the Encoder and Decoder traits. This "any-format" design allows you to use JSON, MessagePack, Protocol Buffers, CBOR, or any custom format without modifying the library.

Implementing a Custom Encoder and Decoder

Here's an example of implementing a simple plain-text encoder:

use bytes::Bytes;
use serde::Serialize;
use object_transfer::encoders::Encoder;

#[derive(Serialize)]
struct MyData {
    id: u32,
    name: String,
}

// Custom plain-text encoder
struct PlainTextEncoder;

impl Encoder for PlainTextEncoder {
    type Item = MyData;
    type Error = std::fmt::Error;

    fn encode(&self, item: &Self::Item) -> Result<Bytes, Self::Error> {
        let text = format!("id={},name={}", item.id, item.name);
        Ok(Bytes::from(text))
    }
}

Implement the corresponding decoder:

use bytes::Bytes;
use serde::de::DeserializeOwned;
use object_transfer::encoders::Decoder;
use std::num::ParseIntError;

#[derive(Debug)]
enum ParseError {
    InvalidFormat,
    ParseInt(ParseIntError),
}

impl std::fmt::Display for ParseError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            ParseError::InvalidFormat => write!(f, "Invalid format"),
            ParseError::ParseInt(e) => write!(f, "Parse error: {}", e),
        }
    }
}

impl std::error::Error for ParseError {}

struct PlainTextDecoder;

impl Decoder for PlainTextDecoder {
    type Item = MyData;
    type Error = ParseError;

    fn decode(&self, data: Bytes) -> Result<Self::Item, Self::Error> {
        let text = String::from_utf8(data.to_vec())
            .map_err(|_| ParseError::InvalidFormat)?;
        let parts: Vec<&str> = text.split(',').collect();
        if parts.len() != 2 {
            return Err(ParseError::InvalidFormat);
        }
        let id = parts[0]
            .strip_prefix("id=")
            .ok_or(ParseError::InvalidFormat)?
            .parse::<u32>()
            .map_err(ParseError::ParseInt)?;
        let name = parts[1]
            .strip_prefix("name=")
            .ok_or(ParseError::InvalidFormat)?
            .to_string();
        Ok(MyData { id, name })
    }
}

Then use your custom format with Pub and Sub:

use std::sync::Arc;
use object_transfer::{Pub, Sub, SubOpt};
use object_transfer::traits::PubTrait;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = async_nats::connect("demo.nats.io").await?;
    let js = Arc::new(async_nats::jetstream::new(client));

    // Use your custom plain-text encoder
    let publisher: Pub<MyData, _> = Pub::new(
        js.clone(),
        "events",
        Arc::new(PlainTextEncoder),
    );

    let event = MyData { id: 1, name: "test".to_string() };
    publisher.publish(&event).await?;

    Ok(())
}

Selecting Format at Runtime

Since the encoder and decoder are passed as parameters, you can select the format at runtime:

use std::sync::Arc;
use object_transfer::{
  encoders::JSONEncoder,
  encoders::JSONDecoder,
  encoders::MessagePackEncoder,
  encoders::MessagePackDecoder,
  Pub, Sub, SubOpt,
  traits::PubTrait,
};

fn get_encoder(format: &str) -> Arc<dyn object_transfer::encoders::Encoder<Item = MyData> + Send + Sync> {
    match format {
        "json" => Arc::new(JSONEncoder::new()),
        "msgpack" => Arc::new(MessagePackEncoder::new()),
        // Could also include custom formats here
        _ => Arc::new(JSONEncoder::new()),
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = async_nats::connect("demo.nats.io").await?;
    let js = Arc::new(async_nats::jetstream::new(client));

    let format = std::env::var("MESSAGE_FORMAT").unwrap_or_else(|_| "json".to_string());
    let publisher: Pub<MyData, _> = Pub::new(
        js.clone(),
        "events",
        get_encoder(&format),
    );

    Ok(())
}

Built-in Formats

The library provides built-in encoders/decoders for common formats (when features are enabled):