ruststream 0.5.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
# OpenTelemetry

The `opentelemetry` feature gives a service distributed tracing: a trace flows from an incoming
message onto the replies it produces, so one trace spans the whole consume-transform-produce chain.
It is built on the typed publish-path context - the same seam that lets a publish transform read the
delivery that produced a reply.

```toml
ruststream = { version = "0.5", features = ["macros", "memory", "json", "opentelemetry"] }
```

It is dependency-light: it carries the [W3C Trace Context](https://www.w3.org/TR/trace-context/) and
emits `tracing` spans, leaving the export to a collector to your subscriber (for example
[`tracing-opentelemetry`](https://docs.rs/tracing-opentelemetry)), exactly as the
[logging](logging.md) guide leaves the subscriber to you. Propagation itself is broker-agnostic and
works with no subscriber at all.

## Wiring it up

Create an `OpenTelemetry`, add its consume layer app-wide, and bake its propagation onto the reply
publisher:

```rust
--8<-- "tests/opentelemetry.rs:wiring"
```

- `consume_layer()` is a consume-side [layer]middleware.md: per delivery it reads the incoming
  `traceparent`, opens a `tracing` span for the handler, and records the *consumer's* span on the
  working headers. It applies to handlers mounted directly and through a [router]routing.md.
- `propagation()` is a static [publish layer]publishing.md: it copies the working `traceparent`
  (and `tracestate`) onto every reply, so a downstream service sees the consumer span as the reply's
  parent. Reuse it on a batch publisher with `for_batch(otel.propagation())`.

## What gets propagated

A delivery carrying `00-<trace-id>-<span-id>-01` continues that trace: the reply keeps the same
`trace-id` and carries a fresh `span-id` (the consumer's span), so the trace is linked end to end. A
delivery with no `traceparent` starts a fresh, sampled root trace. The spans are emitted under the
`ruststream.consume` target with `trace_id` / `span_id` / `subscription` fields.

## Reading the trace in a handler

The consumer's trace context is on the working headers, so a handler reads it the same way any
header is read - through the [context](context.md):

<!-- inline-rust: one-line read of the working traceparent inside a handler; the full traced app, including this access, is compiled in tests/opentelemetry.rs and embedded above -->
```rust
let traceparent = ctx.headers().get_str("traceparent");
```

Parse it into a `TraceContext` (`TraceContext::parse`) to read the `trace_id` / `span_id` or to
check whether the trace is `sampled()`.

## Exporting to a collector

This crate stops at the W3C context and `tracing` spans; turning those into OTLP and shipping them to
a collector is the binary's job, the same split as [logging](logging.md). Add
`tracing-opentelemetry` and an exporter in your binary, install it as a `tracing` subscriber, and the
`ruststream.consume` spans flow through it. The propagation layer keeps the trace linked across every
broker hop regardless of which exporter you choose.