ruststream 0.2.5

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
---
description: How to build a service with RustStream (handlers, app, brokers, codecs)
globs: examples/**/*.rs
alwaysApply: false
---

- A handler is an `async fn` annotated with `#[subscriber("subject")]`. Its first parameter is the
  decoded payload by reference (`&T`, where `T: serde::Deserialize`); an optional second parameter is
  `ctx: &mut ruststream::runtime::Context`.
- The return type drives acknowledgement: `HandlerResult`, `()` (always acks), or `Result<_, E>` (an
  `Err` nacks). For a reply handler, return the reply value and add a `publish("subject")` clause.
- Build the service with `#[ruststream::app]` on a synchronous `fn app() -> RustStream`. Do not write
  your own `main` or `#[tokio::main]`.
- Construct brokers synchronously (`MemoryBroker::new()`, `NatsBroker::new(url)`); the runtime
  connects them at startup. Never connect inside the builder.
- The codec defaults to `codec::DefaultCodec` (`json` > `cbor` > `msgpack`), so `include` needs no
  codec argument. Log with `tracing`, never `println!`. In tests use `MemoryBroker` or a `TestClient`.

## Subscriber and app

The macro turns `handle` into a value named after the function; mount it with `include`.

```rust
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::subscriber;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Order {
    id: u64,
}

#[subscriber("orders.created")]
async fn handle(order: &Order) -> HandlerResult {
    tracing::info!(order.id, "processing order");
    HandlerResult::Ack
}

#[ruststream::app]
fn app() -> RustStream {
    RustStream::new(AppInfo::new("orders", "0.1.0"))
        .with_broker(MemoryBroker::new(), |b| {
            b.include(handle);
        })
}
```

Swap the broker without touching the handlers (NATS connects lazily at startup):

```rust
use ruststream_nats::NatsBroker;

.with_broker(NatsBroker::new("nats://localhost:4222"), |b| b.include(handle))
```

## Handler results

```rust
// explicit ack / nack
HandlerResult::Ack
HandlerResult::Nack { requeue: true }

// `()` always acks; `Result<(), E>` acks on Ok, nacks on Err
async fn handle(order: &Order) -> Result<(), MyError> { Ok(()) }
```

## Replying (request/response)

Return the reply value and add `publish("subject")`; hand `include_publishing` a `TypedPublisher`.

```rust
use ruststream::runtime::TypedPublisher;
use serde::Serialize;

#[derive(Serialize)]
struct Confirmation {
    id: u64,
    accepted: bool,
}

#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Confirmation {
    Confirmation { id: order.id, accepted: true }
}

// inside with_broker(broker, |b| { ... }):
let replies = TypedPublisher::new(b.broker().publisher()); // default codec
b.include_publishing(confirm, replies);
```

## Choosing a codec

`include` uses `DefaultCodec`. Name one per scope, or per call on a `Router`.

```rust
use ruststream::codec::CborCodec;

// every handler in this scope decodes with CborCodec
RustStream::new(info).with_broker_codec(broker, CborCodec, |b| {
    b.include(handle);
});

// one handler, on a Router
router.with_codec(CborCodec).include(handle);
```

## Routers

Collect handlers in their own module, then mount the group.

```rust
use ruststream::runtime::Router;

fn orders() -> Router<MemoryBroker> {
    let mut router = Router::new();
    router.include(handle);
    router.include_publishing(confirm, replies);
    router
}

// in main
.with_broker(MemoryBroker::new(), |b| b.include_router(orders()))
```

## Broker-specific subscriptions

For options the by-name form can't express (NATS JetStream, consumer groups), override the source
with `include_on`. It is the source-override form, so it takes the codec explicitly.

```rust
use ruststream::codec::JsonCodec;
use ruststream_nats::SubscribeOptions;

b.include_on(
    SubscribeOptions::new("orders.*").jetstream("ORDERS").durable("worker"),
    handle,
    JsonCodec,
);
```

## Context (optional handler argument)

`ctx: &mut Context` is an **optional** second handler parameter. Omit it when you do not use it - do
not add it just in case. When present it exposes the delivery's runtime objects:

- `ctx.name()` - the subject/channel the message arrived on.
- `ctx.headers()` / `ctx.headers_mut()` - the working copy of the message headers.
- `ctx.get::<T>()` - shared application state registered with `RustStream::insert_state`.
- `ctx.publisher(name)` - a named publisher (registered with `RustStream::publisher`) for a manual
  publish through the same middleware pipeline as a macro reply.

```rust
use ruststream::runtime::{Context, HandlerResult, Outgoing};

struct Database; // connection pool, etc.

#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context) -> HandlerResult {
    let subject = ctx.name();                  // "orders"
    if let Some(db) = ctx.get::<Database>() {   // application state
        // db.store(order).await;
        let _ = db;
    }
    if let Some(audit) = ctx.publisher("audit") {
        let _ = audit.publish(Outgoing::new("audit", b"stored".to_vec())).await;
    }
    HandlerResult::Ack
}
```

Register state and named publishers on the app builder:

```rust
RustStream::new(AppInfo::new("orders", "0.1.0"))
    .insert_state(Database)
    .publisher("audit", broker.publisher())
    .with_broker(broker, |b| b.include(handle))
```

## AsyncAPI metadata

Decoding only needs `serde::Deserialize`. The extra derives are **documentation only** for the
generated AsyncAPI spec and are optional: `ruststream::Message` names/describes the type, and
`JsonSchema` (feature `asyncapi`) emits its payload schema.

Neither selects a wire codec - that stays the default (`json` > `cbor` > `msgpack`) or whatever
`with_codec` / `with_broker_codec` names. `JsonSchema` describes the payload's logical shape in the
JSON Schema language (as in OpenAPI), independent of how bytes are encoded on the wire; you cannot
attach a codec to a type.

```rust
use ruststream::Message;
use ruststream::schemars::JsonSchema;
use serde::Deserialize;

/// An order placed on the orders channel.
#[derive(Debug, Deserialize, Message, JsonSchema)]
struct Order {
    id: u64,
}
```

## Testing

Use the in-memory broker (or a broker's `TestClient` under the `testing` feature); no real server.

```rust
use std::time::Duration;
use ruststream::memory::MemoryBroker;
use ruststream::testing::TestClient;

let client = MemoryBroker::start().await?;
client.publish("orders", br#"{"id":1}"#).await?;
let published = client
    .expect_published("confirmations", 1, Duration::from_secs(1))
    .await?;
client.shutdown().await?;
```