Expand description
Core trait abstractions for publish-subscribe messaging with strongly-typed items.
This module provides a set of traits that define the interfaces for publishing and subscribing to typed messages across different messaging backends (NATS, Redis, etc.). These traits abstract away the complexity of serialization, transport, and acknowledgment handling, allowing you to write backend-agnostic code.
§Core Traits
PubTrait: Publish strongly-typed items that implementserde::Serialize. Handles encoding and delivery to the backing message broker.SubTrait: Subscribe to a stream of strongly-typed items that implementserde::de::DeserializeOwned. Returns a stream of decoded messages paired with acknowledgment handles.AckTrait: Acknowledge receipt of a message after it has been successfully processed.UnSubTrait: Cancel an active subscription gracefully.
§Dispatch Patterns
The library supports two primary usage patterns for different performance and flexibility trade-offs:
§Static Dispatch (Recommended)
Use generic trait bounds to maintain full type information at compile time. This enables monomorphization, inlining, and zero runtime overhead. Ideal for performance-critical paths.
§Publishing with Static Dispatch
use object_transfer::traits::PubTrait;
use serde::Serialize;
#[derive(Serialize)]
struct Event {
id: u32,
message: String,
}
async fn send_event<P: PubTrait<Item = Event> + Send + Sync + 'static>(publisher: &P, event: &Event) -> Result<(), Box<dyn std::error::Error>> {
publisher.publish(event).await?;
Ok(())
}§Subscribing with Static Dispatch
use object_transfer::traits::{SubTrait, AckTrait};
use serde::Deserialize;
use futures::stream::StreamExt;
#[derive(Deserialize, Debug)]
struct Event {
id: u32,
message: String,
}
async fn receive_events<S: SubTrait<Item = Event> + Send + Sync + 'static>(
subscriber: &S,
) -> Result<(), Box<dyn std::error::Error>> {
let mut stream = subscriber.subscribe().await?;
while let Some(result) = stream.next().await {
match result {
Ok((event, ack)) => {
ack.ack().await.ok();
}
Err(e) => eprintln!("Error: {:?}", e),
}
}
Ok(())
}§Relay Pattern with Generic Implementations
A common pattern is to relay messages between publishers and subscribers:
use object_transfer::traits::{PubTrait, SubTrait};
use serde::{Serialize, Deserialize};
use futures::stream::StreamExt;
#[derive(Serialize, Deserialize, Clone)]
struct Message {
id: u64,
content: String,
}
async fn relay<P, S>(publisher: &P, subscriber: &S) -> Result<(), Box<dyn std::error::Error>>
where
P: PubTrait<Item = Message> + Send + Sync + 'static,
S: SubTrait<Item = Message> + Send + Sync + 'static,
{
let mut stream = subscriber.subscribe().await?;
while let Some(result) = stream.next().await {
if let Ok((msg, ack)) = result {
publisher.publish(&msg).await.ok();
ack.ack().await.ok();
}
}
Ok(())
}§Dynamic Dispatch (Trait Objects)
Use trait objects for runtime polymorphism when the concrete type is unknown or must be determined at runtime. This introduces a small runtime cost but provides maximum flexibility. This pattern is common in plugin systems or when accepting multiple publisher/subscriber implementations.
§Publishing with Dynamic Dispatch
use std::sync::Arc;
use object_transfer::traits::PubTrait;
use serde::Serialize;
#[derive(Serialize)]
struct Event {
id: u32,
data: String,
}
async fn send_via_any_publisher(
publisher: Arc<dyn PubTrait<Item = Event, EncodeErr = serde_json::Error>>,
event: &Event,
) -> Result<(), Box<dyn std::error::Error>> {
publisher.publish(event).await?;
Ok(())
}§Subscribing with Dynamic Dispatch
use std::sync::Arc;
use object_transfer::traits::{SubTrait, AckTrait};
use serde::Deserialize;
use futures::stream::StreamExt;
#[derive(Deserialize, Debug)]
struct Event {
id: u32,
data: String,
}
async fn receive_via_any_subscriber(
subscriber: Arc<dyn SubTrait<Item = Event, DecodeErr = serde_json::Error>>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut stream = subscriber.subscribe().await?;
while let Some(result) = stream.next().await {
match result {
Ok((event, ack)) => {
ack.ack().await.ok();
}
Err(e) => eprintln!("Error: {:?}", e),
}
}
Ok(())
}§Integration with Concrete Backends
Concrete implementations of these traits are provided for different backends:
- NATS: See the [
crate::nats] module (requiresnatsfeature) - Redis: See the [
crate::redis] module (requiresredisfeature)
§Error Handling
Operations may fail for various reasons (encoding errors, network issues, etc.).
Each trait method returns a Result with a specific error type:
Traits§
- AckTrait
- Acknowledge receipt of a message.
- PubTrait
- Abstraction for publishing typed items.
- SubTrait
- Subscription interface returning a stream of decoded items and ack handles.
- UnSub
Trait - Allows canceling a subscription.