Expand description
§Intercom
A fully typed async wrapper for NATS with JetStream support.
§Features
- Fully typed publish/subscribe with turbofish syntax support
- Pluggable codec support (MessagePack default, JSON optional)
- JetStream support with builder pattern
- Push and pull consumers
- Interest-based consumers
- Work queues
- Stream trait for subscribers
- Sink trait for publishers
§Codec Selection
The library supports multiple serialization codecs via cargo features:
msgpack(default) - MessagePack binary serializationjson- JSON text serialization
When creating a client, specify the codec type:
use intercom::{Client, MsgPackCodec, Result};
// Using MessagePack (default)
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;Or with JSON:
ⓘ
use intercom::{Client, JsonCodec, Result};
// Using JSON (requires `json` feature)
let client = Client::<JsonCodec>::connect("nats://localhost:4222").await?;§Example
use intercom::{Client, MsgPackCodec, Result};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct MyMessage {
content: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
// Type-safe publish
client.publish::<MyMessage>("subject", &MyMessage { content: "hello".into() }).await?;
// Create a typed subscriber with Stream trait
let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
// Create a typed publisher with Sink trait
let publisher = client.publisher::<MyMessage>("subject");
Ok(())
}§JetStream Example
use intercom::{Client, MsgPackCodec, Result};
use intercom::jetstream::stream::RetentionPolicy;
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Debug, Serialize, Deserialize)]
struct Event {
id: u64,
data: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
// Create a stream with builder pattern
let stream = jetstream
.stream_builder("events")
.subjects(vec!["events.>".to_string()])
.max_messages(1_000_000)
.create()
.await?;
// Publish typed messages
jetstream.publish::<Event>("events.user", &Event {
id: 1,
data: "user created".to_string(),
}).await?;
// Create a pull consumer with turbofish syntax
let consumer = stream
.pull_consumer_builder::<Event>("my-consumer")
.durable()
.create()
.await?;
// Fetch messages with Stream trait
let mut messages = consumer.fetch(10).await?;
while let Some(result) = messages.next().await {
let msg = result?;
println!("Got: {:?}", msg.payload);
msg.ack().await?;
}
Ok(())
}Re-exports§
pub use client::Client;pub use codec::Codec;pub use codec::CodecType;pub use codec::Decode;pub use codec::Encode;pub use codec::MsgPackCodec;pub use codec::DefaultCodec;pub use error::Error;pub use error::Result;pub use jetstream::consumer::AckPolicy;pub use jetstream::consumer::DeliverPolicy;pub use jetstream::consumer::JetStreamMessage;pub use jetstream::consumer::PullBatch;pub use jetstream::consumer::PullConsumer;pub use jetstream::consumer::PullConsumerBuilder;pub use jetstream::consumer::PullMessages;pub use jetstream::consumer::PushConsumer;pub use jetstream::consumer::PushConsumerBuilder;pub use jetstream::consumer::PushMessages;pub use jetstream::consumer::ReplayPolicy;pub use jetstream::context::JetStreamContext;pub use jetstream::context::PublishAck;pub use jetstream::context::PublishAckFuture;pub use jetstream::queue::InterestQueue;pub use jetstream::queue::InterestQueueBuilder;pub use jetstream::queue::QueueMessage;pub use jetstream::queue::StreamingWorkQueue;pub use jetstream::queue::WorkQueue;pub use jetstream::queue::WorkQueueBuilder;pub use jetstream::queue::WorkQueueSink;pub use jetstream::stream::Compression;pub use jetstream::stream::DiscardPolicy;pub use jetstream::stream::RetentionPolicy;pub use jetstream::stream::StorageType;pub use jetstream::stream::Stream;pub use jetstream::stream::StreamBuilder;pub use jetstream::stream::StreamConfig;pub use jetstream::stream::StreamInfo;pub use jetstream::stream::StreamSource;pub use jetstream::stream::StreamState;pub use publisher::Publisher;pub use subscriber::Message;pub use subscriber::Subscriber;
Modules§
- client
- NATS client wrapper with typed publish/subscribe operations.
- codec
- Message encoding and decoding with pluggable codec support.
- error
- Error types for the intercom library.
- jetstream
- JetStream support with builder pattern.
- publisher
- Typed publisher with Sink trait implementation.
- subscriber
- Typed subscriber with Stream trait implementation.