strev
An event-driven messaging library for Rust. strev gives you a single, uniform way to publish and consume messages across in-memory channels, Redis Streams, NATS JetStream, and Apache Kafka, with a router, composable middleware, and pluggable serialization.
It is built for event-driven applications: event sourcing, async read models, sagas, and message-based integration between services.
Highlights
- Uniform API across every transport. Swap backends without touching handler code.
- Invalid states unrepresentable. A
Messagecarries its ack/nack lifecycle in the type system, so a message can be acknowledged exactly once. - Router that wires subscribers to handlers, fans out to multiple consumers, and shuts down gracefully.
- Composable middleware for retries, timeouts, throttling, deduplication, and more.
- Decorators for cross-cutting wire-format concerns such as CloudEvents enveloping.
Installation
The crates are not yet published to crates.io. Add what you need as a git dependency:
[]
= { = "https://github.com/guiaramos/strev" }
= { = "https://github.com/guiaramos/strev" } # in-memory
= { = "https://github.com/guiaramos/strev" } # Redis Streams
= { = "https://github.com/guiaramos/strev" } # NATS JetStream
= { = "https://github.com/guiaramos/strev" } # Apache Kafka
= { = "https://github.com/guiaramos/strev" } # PostgreSQL
= { = "https://github.com/guiaramos/strev" } # MongoDB
= { = "https://github.com/guiaramos/strev" } # tracing + metrics
Quickstart
Publish and consume through the in-memory channel using the router:
use Duration;
use Bytes;
use ;
use Channel;
use CancellationToken;
async
Swapping in a real backend only changes how you construct the publisher and subscriber. For example, with Redis:
use ;
let client = open?;
let publisher = new.await?;
let subscriber = new;
Core concepts
Message. A payload plus metadata and a UUID. Its acknowledgement state is a type parameter, so the compiler enforces that you ack or nack each message once:
let msg = new;
let outcome = msg.ack; // consumes the message; it cannot be acked again
Handler. A handler receives a message and decides what happens next: acknowledge it,
negatively acknowledge it, or acknowledge and produce new messages. Any
async fn(Message) -> Result<HandlerResult, HandlerError> is a handler:
|msg: Message| async move
Publisher and Subscriber. Every backend implements two traits:
Router. Registers handlers against subscribers, applies middleware and decorators,
and runs every consumer concurrently until a shutdown signal fires. Use add_consumer
for a sink, or add_handler when a handler also publishes to another topic.
Backends
| Transport | Crate | Notes |
|---|---|---|
| In-memory | strev-channel |
single process, ideal for tests and local dev |
| Redis Streams | strev-redis |
consumer groups, pluggable marshaller |
| NATS JetStream | strev-nats |
durable pull consumers, headers as metadata |
| Apache Kafka | strev-kafka |
consumer groups, manual offset commits |
| PostgreSQL | strev-postgres |
durable table, per-group offsets, pure Rust (sqlx) |
| MongoDB | strev-mongodb |
change streams, resume tokens (needs replica set) |
strev-kafka exposes a sasl-ssl feature that enables TLS and SASL for managed brokers,
and a config passthrough for arbitrary client properties:
new
.option
.option
.option
.option;
Middleware
Register middleware on the router with add_middleware; it wraps every handler in
order. Built-in middleware:
Retry, Timeout, Throttle, CircuitBreaker, Deduplicator, CorrelationId,
PoisonQueue, DelayOnError, Duplicator, IgnoreErrors, InstantAck, RandomFail.
The strev-telemetry crate adds a Telemetry middleware that emits a tracing span per
message plus metrics facade measurements (handler-duration histogram, acked/nacked/
errored counters), so you can wire strev into any tracing/metrics exporter you already use.
Decorators and CloudEvents
Decorators transform messages at the transport boundary on both the publish and
subscribe side. The strev-cloudevents crate uses this to envelope and unwrap messages
as CloudEvents, mapping event attributes to ce-* metadata:
let codec = new
.event_type;
router.add_subscriber_decorator;
router.add_publisher_decorator;
Examples
Runnable examples live under each crate's examples/ directory:
strev:basic_pubsub,router,consumer_groups,middleware_chain,deduplication,poison_queue,event_pipelinestrev-redis:redis_pubsubstrev-nats:nats_pubsubstrev-kafka:kafka_pubsubstrev-postgres:postgres_pubsubstrev-mongodb:mongodb_pubsubstrev-cloudevents:router_cloudeventsstrev-telemetry:telemetry
Run one with, for example:
Development
Unit tests need no services:
Integration tests run against pinned Docker services:
Format and lint:
Git hooks
Commits follow Conventional Commits, which the release automation relies on. Install the hook manager and commit linter, then enable the hooks:
The pre-commit hook runs cargo fmt --check and clippy; the commit-msg hook checks
the message format. CI enforces both regardless of local hooks.