# Subscribers and publishers
A subscriber binds a handler to one subscription. The `#[subscriber]` macro is the ergonomic way to
declare one; this guide covers the handler contract, the macro forms, how handlers are mounted, and
how to group them with a router.
## The handler contract
A handler is an `async fn` whose first parameter is a reference to the decoded payload:
```rust
use ruststream::runtime::HandlerResult;
use ruststream::subscriber;
#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
HandlerResult::Ack
}
```
The macro turns the function into a value named after it (here `handle`) that implements the
mounting contract. You pass that value to `include`.
### Accepting the context
Declare an optional second parameter, `&mut Context`, to read headers, the subscription name, and
shared state, or to publish from inside the handler:
```rust
use ruststream::runtime::{Context, HandlerResult};
#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(id) = ctx.headers().correlation_id() {
// ...
}
HandlerResult::Ack
}
```
### Acking
The return type is anything that converts into a [`HandlerResult`]:
| `HandlerResult::Ack` | acknowledge; the broker removes the message |
| `HandlerResult::retry()` | nack with requeue (redeliver later) |
| `HandlerResult::drop()` | nack without requeue (discard or dead-letter) |
| `()` | always `Ack` |
| `Result<_, E>` | `Ack` on `Ok`, `drop` on `Err` |
On the message itself, ack consumes `self`, so the type system prevents acking twice.
## Choosing the subscription source
### By name
`#[subscriber("orders")]` subscribes by name. It works with any broker that implements the
`Subscribe` capability, which covers the common case.
### Broker-specific descriptors
When a subscription needs broker-specific options (a consumer group, a durable name, a delivery
policy), the broker crate exposes a descriptor type. Use its constructor directly in the decorator:
```rust
#[subscriber(OrdersStream::new("orders", "workers"))]
async fn handle(order: &Order) -> HandlerResult {
HandlerResult::Ack
}
```
The macro reads the descriptor type out of the constructor call, so the compiler checks the
descriptor against the broker it is mounted on. A descriptor is any type that implements
`SubscriptionSource<B>`; see [Broker authors](../broker-authors/index.md#subscription-sources).
## Mounting handlers
Inside `with_broker`, mount a definition with `include`. It decodes the payload with the default
codec - `json` if enabled, otherwise `cbor`, otherwise `msgpack`:
```rust
RustStream::new(info).with_broker(broker, |b| {
b.include(handle);
});
```
### Where the codec comes from
The decode codec is fixed at compile time. `include` takes no codec argument; it resolves one from
the most specific level you set, from narrowest to widest:
**1. Per handler.** Override a single mounting:
=== "Router"
```rust
router.with_codec(CborCodec).include(handle);
```
=== "with_broker"
```rust
// name the source and the codec for this one handler
b.include_on(handle.source(), handle, CborCodec);
```
**2. Per scope.** Set one codec for every handler in a `with_broker` scope:
```rust
use ruststream::codec::CborCodec;
b.include(other_handler); // also CborCodec
});
```
**3. Default.** When nothing above names a codec, `include` uses `DefaultCodec` - `json` if the
feature is enabled, otherwise `cbor`, otherwise `msgpack`.
The reply side mirrors this: `TypedPublisher::new(publisher)` uses the default codec,
`TypedPublisher::with_codec(publisher, codec)` names one, and `include_publishing(def, publisher)`
reuses the publisher's codec to decode the request.
### Codecs
| `JsonCodec` | `json` | JSON |
| `MsgpackCodec` | `msgpack` | MessagePack |
| `CborCodec` | `cbor` | CBOR |
A codec is any type implementing the `Codec` trait, so you can supply your own.
When decoding fails, the message is dropped by default. The decode-failure behaviour is configurable
on the typed adapter when you build handlers by hand (see the API reference for `Typed` and
`DecodeFailure`).
## Macro or manual
`#[subscriber]` is sugar over a generic API. The macro generates a typed handler and its metadata;
you can write the same registration by hand with `typed` (which decodes the payload), a closure or
struct handler, and `HandlerMetadata`. Both forms below register the same handler.
=== "Macro"
```rust
use ruststream::codec::JsonCodec;
use ruststream::subscriber;
#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
HandlerResult::Ack
}
// inside with_broker(...):
b.include(handle);
```
=== "Manual"
```rust
use ruststream::Name;
use ruststream::codec::JsonCodec;
use ruststream::runtime::{Context, HandlerMetadata, HandlerResult, typed};
// inside with_broker(...):
b.subscribe(
Name::new("orders"),
typed(JsonCodec, |order: &Order, _ctx: &mut Context| async { HandlerResult::Ack }),
HandlerMetadata::typed::<Order>("orders"),
);
```
Reach for the manual form when a handler needs state the macro cannot express (a struct handler with
fields), or to set a non-default decode-failure policy. Otherwise the macro is less to maintain.
## Routers
Group handlers in their own module by collecting them into a `Router`, then mount the whole group
with `include_router`. The router-level `include` and `include_publishing` mirror the scope methods:
```rust title="routes.rs"
use ruststream::codec::JsonCodec;
use ruststream::runtime::Router;
pub fn orders() -> Router<MyBroker> {
let mut router = Router::new();
router.include(handle);
router.include(other_handler);
router
}
```
```rust title="main.rs"
RustStream::new(info).with_broker(broker, |b| {
b.include_router(routes::orders());
});
```
The application's global middleware (added with `layer`) does not wrap router handlers, since a
router is finalized independently. Wrap handlers inside the router if you need that.
## Publishers
A handler that produces a reply is a publisher. See [Publishing and replies](publishing.md).