---
description: How to build a service with RustStream (handlers, app, brokers, codecs)
globs: examples/**/*.rs
alwaysApply: false
---
- A handler is an `async fn` annotated with `#[subscriber("subject")]`. Its first parameter is the
decoded payload by reference (`&T`, where `T: serde::Deserialize`); an optional second parameter is
`ctx: &mut ruststream::runtime::Context`.
- The return type drives acknowledgement: `HandlerResult`, `()` (always acks), or `Result<_, E>` (an
`Err` nacks). For a reply handler, add a `publish("subject")` clause and return the reply value,
or `Result<Reply, HandlerResult>` for explicit ack control.
- Build the service with `#[ruststream::app]` on a synchronous `fn app() -> RustStream`. Do not write
your own `main` or `#[tokio::main]`.
- Construct brokers synchronously (`MemoryBroker::new()`, `NatsBroker::new(url)`); the runtime
connects them at startup. Never connect inside the builder.
- The codec defaults to `codec::DefaultCodec` (`json` > `cbor` > `msgpack`), so `include` needs no
codec argument. Log with `tracing`, never `println!`. In tests use `MemoryBroker` or a `TestClient`.
## Subscriber and app
The macro turns `handle` into a value named after the function; mount it with `include`.
```rust
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::subscriber;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
}
#[subscriber("orders.created")]
async fn handle(order: &Order) -> HandlerResult {
tracing::info!(order.id, "processing order");
HandlerResult::Ack
}
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("orders", "0.1.0"))
.with_broker(MemoryBroker::new(), |b| {
b.include(handle);
})
}
```
Swap the broker without touching the handlers (NATS connects lazily at startup):
```rust
use ruststream_nats::NatsBroker;
.with_broker(NatsBroker::new("nats://localhost:4222"), |b| b.include(handle))
```
## Handler results
```rust
// explicit ack / nack
HandlerResult::Ack
HandlerResult::Nack { requeue: true }
// `()` always acks; `Result<(), E>` acks on Ok, nacks on Err
async fn handle(order: &Order) -> Result<(), MyError> { Ok(()) }
```
## Replying (request/response)
Return the reply value and add `publish("subject")`; hand `include_publishing` a `TypedPublisher`.
```rust
use ruststream::runtime::TypedPublisher;
use serde::Serialize;
#[derive(Serialize)]
struct Confirmation {
id: u64,
accepted: bool,
}
#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Confirmation {
Confirmation { id: order.id, accepted: true }
}
// inside with_broker(broker, |b| { ... }):
let replies = TypedPublisher::new(b.broker().publisher()); // default codec
b.include_publishing(confirm, replies);
```
Return `Result<Reply, HandlerResult>` to control acknowledgement: `Ok(reply)` publishes and acks,
`Err(result)` publishes nothing and the dispatcher acts on the returned `HandlerResult`. Spell the
`Result` out in the signature (a type alias is treated as a plain reply type). A failed reply
publish nacks the incoming message with `requeue = true`.
```rust
#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Result<Confirmation, HandlerResult> {
if order.id == 0 {
return Err(HandlerResult::drop());
}
Ok(Confirmation { id: order.id, accepted: true })
}
```
## Choosing a codec
`include` uses `DefaultCodec`. No mounting call takes a codec argument; change the default for a
scope with `with_broker_codec`, or for a `Router` chain with `with_codec`.
```rust
use ruststream::codec::CborCodec;
// every handler in this scope decodes with CborCodec
RustStream::new(info).with_broker_codec(broker, CborCodec, |b| {
b.include(handle);
});
// on a Router: with_codec applies to the includes after it (it can change mid-chain)
Router::<MemoryBroker>::new().with_codec(CborCodec).include(handle);
```
## Routers
Collect handlers in their own module, then mount the group. A `Router` is a consuming builder
(each call returns a new type), so the calls chain and the function returns `impl RouterDef<B>`.
The app's global `.layer(..)` reaches router handlers, so cross-cutting middleware (logging,
metrics) goes on the app and applies everywhere; that global layer must be a `BlanketLayer`
(every bundled layer is).
```rust
use ruststream::runtime::{Router, RouterDef};
// `use<>` opts out of borrowing the broker (the router owns its Arc-backed publisher).
fn orders(broker: &MemoryBroker) -> impl RouterDef<MemoryBroker> + use<> {
let replies = TypedPublisher::new(broker.publisher());
Router::new()
.include(handle)
.include_publishing(confirm, replies)
}
// in main
.with_broker(MemoryBroker::new(), |b| b.include_router(orders(b.broker())))
```
## Broker-specific subscriptions
For options the by-name form can't express (NATS JetStream, consumer groups), override the source
with `include_on`. The codec resolves the same way as for `include` (the default, or the scope /
chain codec).
```rust
use ruststream_nats::SubscribeOptions;
b.include_on(
SubscribeOptions::new("orders.*").jetstream("ORDERS").durable("worker"),
handle,
);
```
## Context (optional handler argument)
`ctx: &mut Context` is an **optional** second handler parameter. Omit it when you do not use it - do
not add it just in case. When present it exposes the delivery's runtime objects:
- `ctx.name()` - the subject/channel the message arrived on.
- `ctx.headers()` / `ctx.headers_mut()` - the working copy of the message headers.
- `ctx.get::<T>()` - shared application state registered with `RustStream::insert_state`.
- `ctx.publisher(name)` - a named publisher (registered with `RustStream::publisher`) for a manual
publish through the same middleware pipeline as a macro reply.
```rust
use ruststream::runtime::{Context, HandlerResult, Outgoing};
struct Database; // connection pool, etc.
#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context) -> HandlerResult {
let subject = ctx.name(); // "orders"
if let Some(db) = ctx.get::<Database>() { // application state
// db.store(order).await;
let _ = db;
}
if let Some(audit) = ctx.publisher("audit") {
let _ = audit.publish(Outgoing::new("audit", b"stored".to_vec())).await;
}
HandlerResult::Ack
}
```
Register state and named publishers on the app builder:
```rust
RustStream::new(AppInfo::new("orders", "0.1.0"))
.insert_state(Database)
.publisher("audit", broker.publisher())
.with_broker(broker, |b| b.include(handle))
```
## AsyncAPI metadata
Decoding only needs `serde::Deserialize`. The extra derives are **documentation only** for the
generated AsyncAPI spec and are optional: `ruststream::Message` names/describes the type, and
`JsonSchema` (feature `asyncapi`) emits its payload schema.
Neither selects a wire codec - that stays the default (`json` > `cbor` > `msgpack`) or whatever
`with_codec` / `with_broker_codec` names. `JsonSchema` describes the payload's logical shape in the
JSON Schema language (as in OpenAPI), independent of how bytes are encoded on the wire; you cannot
attach a codec to a type.
```rust
use ruststream::Message;
use ruststream::schemars::JsonSchema;
use serde::Deserialize;
/// An order placed on the orders channel.
#[derive(Debug, Deserialize, Message, JsonSchema)]
struct Order {
id: u64,
}
```
## Testing
Use the in-memory broker (or a broker's `TestClient` under the `testing` feature); no real server.
```rust
use std::time::Duration;
use ruststream::memory::MemoryBroker;
use ruststream::testing::TestClient;
let client = MemoryBroker::start().await?;
client.publish("orders", br#"{"id":1}"#).await?;
let published = client
.expect_published("confirmations", 1, Duration::from_secs(1))
.await?;
client.shutdown().await?;
```