Intercom
A fully typed async wrapper for NATS with JetStream support.
Features
- Fully typed publish/subscribe with turbofish syntax support
- Pluggable codec support (MessagePack default, JSON optional)
- JetStream support with builder pattern for all options
- Push and pull consumers with typed messages
- Interest-based consumers for complex routing scenarios
- Work queues with automatic acknowledgment tracking
- Stream trait for subscribers and consumers
- Sink trait for publishers
Installation
Add to your Cargo.toml:
[dependencies]
intercom = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
futures = "0.3"
Codec Features
By default, MessagePack is used for serialization. You can enable JSON support:
intercom = "0.1"
intercom = { version = "0.1", default-features = false, features = ["json"] }
intercom = { version = "0.1", features = ["json"] }
Quick Start
Basic Publish/Subscribe
use intercom::{Client, MsgPackCodec, Result};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Debug, Serialize, Deserialize)]
struct MyMessage {
content: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
client.publish::<MyMessage>("subject", &MyMessage {
content: "hello".into()
}).await?;
let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
while let Some(result) = subscriber.next().await {
match result {
Ok(msg) => println!("Received: {:?}", msg.payload),
Err(e) => eprintln!("Error: {}", e),
}
}
Ok(())
}
Using JSON Codec
use intercom::{Client, JsonCodec, Result}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct MyMessage { content: String }
async fn example() -> Result<()> {
let client = Client::<JsonCodec>::connect("nats://localhost:4222").await?;
client.publish::<MyMessage>("subject", &MyMessage {
content: "hello".into()
}).await?;
Ok(())
}
Request/Reply
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Request { query: String }
#[derive(Serialize, Deserialize)]
struct Response { result: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let response = client.request::<Request, Response>(
"service",
&Request { query: "hello".into() }
).await?;
println!("Response: {}", response.result);
Ok(())
}
Publisher with Sink Trait
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::SinkExt;
#[derive(Serialize, Deserialize)]
struct MyMessage { content: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let mut publisher = client.publisher::<MyMessage>("subject");
publisher.send(MyMessage { content: "hello".into() }).await?;
publisher.feed(MyMessage { content: "msg1".into() }).await?;
publisher.feed(MyMessage { content: "msg2".into() }).await?;
publisher.flush().await?;
Ok(())
}
Queue Groups
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Task { id: u64 }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let subscriber = client.queue_subscribe::<Task>("tasks", "workers").await?;
Ok(())
}
JetStream
Creating Streams with Builder Pattern
use intercom::{Client, MsgPackCodec, RetentionPolicy};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64, data: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let stream = jetstream
.stream_builder("events")
.subjects(vec!["events.>".to_string()])
.max_messages(1_000_000)
.max_bytes(1024 * 1024 * 100) .max_age(std::time::Duration::from_secs(86400)) .replicas(3)
.create()
.await?;
Ok(())
}
Publishing to JetStream
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Event { id: u64, data: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let ack = jetstream.publish::<Event>("events.user", &Event {
id: 1,
data: "user created".to_string(),
}).await?;
println!("Published to stream: {}, seq: {}", ack.stream, ack.sequence);
let ack_future = jetstream.publish_async::<Event>("events.user", &Event {
id: 2,
data: "user updated".to_string(),
}).await?;
let ack = ack_future.await?;
Ok(())
}
Pull Consumers
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let stream = jetstream.get_stream("events").await?;
let consumer = stream
.pull_consumer_builder::<Event>("my-consumer")
.durable()
.filter_subject("events.user.>")
.ack_wait(std::time::Duration::from_secs(30))
.max_deliver(3)
.create()
.await?;
let mut messages = consumer.fetch(10).await?;
while let Some(result) = messages.next().await {
let msg = result?;
println!("Got: {:?}", msg.payload);
msg.ack().await?;
}
let mut messages = consumer.messages().await?;
while let Some(result) = messages.next().await {
let msg = result?;
msg.ack().await?;
}
Ok(())
}
Push Consumers
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let stream = jetstream.get_stream("events").await?;
let consumer = stream
.push_consumer_builder::<Event>("my-push-consumer")
.deliver_subject("deliver.events")
.deliver_group("workers") .durable()
.create()
.await?;
let mut messages = consumer.messages().await?;
while let Some(result) = messages.next().await {
let msg = result?;
println!("Got: {:?}", msg.payload);
msg.ack().await?;
}
Ok(())
}
Work Queues
use intercom::{Client, MsgPackCodec, WorkQueue};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Job { id: u64, payload: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs")
.max_messages(10_000)
.create()
.await?;
queue.push(&Job { id: 1, payload: "do work".into() }).await?;
let mut queue = queue.into_stream().await?;
while let Some(result) = queue.next().await {
let job = result?;
println!("Processing: {:?}", job.payload);
job.ack().await?; }
Ok(())
}
Interest-Based Consumers
use intercom::{Client, MsgPackCodec, InterestQueue};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64, data: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let queue = InterestQueue::<Event, MsgPackCodec>::builder(&jetstream, "events")
.subject("events.>")
.create()
.await?;
let consumer1 = queue.add_consumer("service-a").await?;
let consumer2 = queue.add_consumer("service-b").await?;
queue.publish(&Event { id: 1, data: "test".into() }).await?;
Ok(())
}
Message Acknowledgment
JetStream messages support various acknowledgment modes:
msg.ack().await?;
msg.double_ack().await?;
msg.nak().await?;
msg.nak_with_delay(std::time::Duration::from_secs(10)).await?;
msg.in_progress().await?;
msg.term().await?;
Serialization
Intercom supports pluggable codecs:
- MsgPackCodec (default): Efficient binary serialization via rmp-serde
- JsonCodec (optional): Human-readable JSON via serde_json
All message types must implement Serialize and/or Deserialize:
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct MyMessage {
id: u64,
data: Vec<u8>,
tags: Vec<String>,
}
License
MIT