Crate intercom_rs

Crate intercom_rs 

Source
Expand description

§Intercom

A fully typed async wrapper for NATS with JetStream support.

§Features

  • Fully typed publish/subscribe with turbofish syntax support
  • Pluggable codec support (MessagePack default, JSON optional)
  • JetStream support with builder pattern
  • Push and pull consumers
  • Interest-based consumers
  • Work queues
  • Stream trait for subscribers
  • Sink trait for publishers

§Codec Selection

The library supports multiple serialization codecs via cargo features:

  • msgpack (default) - MessagePack binary serialization
  • json - JSON text serialization

When creating a client, specify the codec type:

use intercom::{Client, MsgPackCodec, Result};

// Using MessagePack (default)
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;

Or with JSON:

use intercom::{Client, JsonCodec, Result};

// Using JSON (requires `json` feature)
let client = Client::<JsonCodec>::connect("nats://localhost:4222").await?;

§Example

use intercom::{Client, MsgPackCodec, Result};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct MyMessage {
    content: String,
}

#[tokio::main]
async fn main() -> Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
     
    // Type-safe publish
    client.publish::<MyMessage>("subject", &MyMessage { content: "hello".into() }).await?;
     
    // Create a typed subscriber with Stream trait
    let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
     
    // Create a typed publisher with Sink trait
    let publisher = client.publisher::<MyMessage>("subject");
     
    Ok(())
}

§JetStream Example

use intercom::{Client, MsgPackCodec, Result};
use intercom::jetstream::stream::RetentionPolicy;
use serde::{Deserialize, Serialize};
use futures::StreamExt;

#[derive(Debug, Serialize, Deserialize)]
struct Event {
    id: u64,
    data: String,
}

#[tokio::main]
async fn main() -> Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    let jetstream = client.jetstream();
     
    // Create a stream with builder pattern
    let stream = jetstream
        .stream_builder("events")
        .subjects(vec!["events.>".to_string()])
        .max_messages(1_000_000)
        .create()
        .await?;
     
    // Publish typed messages
    jetstream.publish::<Event>("events.user", &Event {
        id: 1,
        data: "user created".to_string(),
    }).await?;
     
    // Create a pull consumer with turbofish syntax
    let consumer = stream
        .pull_consumer_builder::<Event>("my-consumer")
        .durable()
        .create()
        .await?;
     
    // Fetch messages with Stream trait
    let mut messages = consumer.fetch(10).await?;
    while let Some(result) = messages.next().await {
        let msg = result?;
        println!("Got: {:?}", msg.payload);
        msg.ack().await?;
    }
     
    Ok(())
}

Re-exports§

pub use client::Client;
pub use codec::Codec;
pub use codec::CodecType;
pub use codec::Decode;
pub use codec::Encode;
pub use codec::MsgPackCodec;
pub use codec::DefaultCodec;
pub use error::Error;
pub use error::Result;
pub use jetstream::consumer::AckPolicy;
pub use jetstream::consumer::DeliverPolicy;
pub use jetstream::consumer::JetStreamMessage;
pub use jetstream::consumer::PullBatch;
pub use jetstream::consumer::PullConsumer;
pub use jetstream::consumer::PullConsumerBuilder;
pub use jetstream::consumer::PullMessages;
pub use jetstream::consumer::PushConsumer;
pub use jetstream::consumer::PushConsumerBuilder;
pub use jetstream::consumer::PushMessages;
pub use jetstream::consumer::ReplayPolicy;
pub use jetstream::context::JetStreamContext;
pub use jetstream::context::PublishAck;
pub use jetstream::context::PublishAckFuture;
pub use jetstream::queue::InterestQueue;
pub use jetstream::queue::InterestQueueBuilder;
pub use jetstream::queue::QueueMessage;
pub use jetstream::queue::StreamingWorkQueue;
pub use jetstream::queue::WorkQueue;
pub use jetstream::queue::WorkQueueBuilder;
pub use jetstream::queue::WorkQueueSink;
pub use jetstream::stream::Compression;
pub use jetstream::stream::DiscardPolicy;
pub use jetstream::stream::RetentionPolicy;
pub use jetstream::stream::StorageType;
pub use jetstream::stream::Stream;
pub use jetstream::stream::StreamBuilder;
pub use jetstream::stream::StreamConfig;
pub use jetstream::stream::StreamInfo;
pub use jetstream::stream::StreamSource;
pub use jetstream::stream::StreamState;
pub use publisher::Publisher;
pub use subscriber::Message;
pub use subscriber::Subscriber;

Modules§

client
NATS client wrapper with typed publish/subscribe operations.
codec
Message encoding and decoding with pluggable codec support.
error
Error types for the intercom library.
jetstream
JetStream support with builder pattern.
publisher
Typed publisher with Sink trait implementation.
subscriber
Typed subscriber with Stream trait implementation.