strev 0.6.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation

strev

CI

An event-driven messaging library for Rust. strev gives you a single, uniform way to publish and consume messages across in-memory channels, Redis Streams, NATS JetStream, and Apache Kafka, with a router, composable middleware, and pluggable serialization.

It is built for event-driven applications: event sourcing, async read models, sagas, and message-based integration between services.

Highlights

  • Uniform API across every transport. Swap backends without touching handler code.
  • Invalid states unrepresentable. A Message carries its ack/nack lifecycle in the type system, so a message can be acknowledged exactly once.
  • Router that wires subscribers to handlers, fans out to multiple consumers, and shuts down gracefully.
  • Composable middleware for retries, timeouts, throttling, deduplication, and more.
  • Decorators for cross-cutting wire-format concerns such as CloudEvents enveloping.

Installation

The crates are not yet published to crates.io. Add what you need as a git dependency:

[dependencies]
strev = { git = "https://github.com/guiaramos/strev" }
strev-channel = { git = "https://github.com/guiaramos/strev" }   # in-memory
strev-redis = { git = "https://github.com/guiaramos/strev" }     # Redis Streams
strev-nats = { git = "https://github.com/guiaramos/strev" }      # NATS JetStream
strev-kafka = { git = "https://github.com/guiaramos/strev" }     # Apache Kafka
strev-postgres = { git = "https://github.com/guiaramos/strev" }  # PostgreSQL
strev-mongodb = { git = "https://github.com/guiaramos/strev" }   # MongoDB
strev-telemetry = { git = "https://github.com/guiaramos/strev" } # tracing + metrics

Quickstart

Publish and consume through the in-memory channel using the router:

use std::time::Duration;

use bytes::Bytes;
use strev::{HandlerResult, Message, Publisher, Router, ShutdownSignal, Topic};
use strev_channel::Channel;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    let channel = Channel::new(64);
    let topic = Topic::new("orders");

    let mut router = Router::new();
    router.add_consumer(
        "orders",
        topic.clone(),
        channel.clone(),
        |msg: Message| async move {
            println!("received: {}", String::from_utf8_lossy(msg.payload()));
            Ok(HandlerResult::ack(msg))
        },
    );

    let token = CancellationToken::new();
    let tc = token.clone();
    let handle = tokio::spawn(async move { router.run(ShutdownSignal::Token(tc)).await });

    Publisher::publish(&channel, &topic, vec![Message::new(Bytes::from("order-1"))])
        .await
        .unwrap();

    tokio::time::sleep(Duration::from_millis(200)).await;
    token.cancel();
    handle.await.unwrap().unwrap();
}

Swapping in a real backend only changes how you construct the publisher and subscriber. For example, with Redis:

use strev_redis::{RedisPublisher, RedisPublisherConfig, RedisSubscriber, RedisSubscriberConfig};

let client = redis::Client::open("redis://127.0.0.1:6379/")?;
let publisher = RedisPublisher::new(RedisPublisherConfig::new(client.clone())).await?;
let subscriber = RedisSubscriber::new(RedisSubscriberConfig::new(client, "orders-group"));

Core concepts

Message. A payload plus metadata and a UUID. Its acknowledgement state is a type parameter, so the compiler enforces that you ack or nack each message once:

let msg = Message::new(Bytes::from("payload"));
let outcome = msg.ack(); // consumes the message; it cannot be acked again

Handler. A handler receives a message and decides what happens next: acknowledge it, negatively acknowledge it, or acknowledge and produce new messages. Any async fn(Message) -> Result<HandlerResult, HandlerError> is a handler:

|msg: Message| async move {
    let produced = vec![/* ProducedMessage */];
    Ok(HandlerResult::ack_with(msg, produced))
}

Publisher and Subscriber. Every backend implements two traits:

#[async_trait]
pub trait Publisher: Send + Sync {
    async fn publish(&self, topic: &Topic, messages: Vec<Message>) -> Result<Vec<Outcome>, PublishError>;
    async fn close(&mut self) -> Result<(), CloseError>;
}

#[async_trait]
pub trait Subscriber: Send + Sync {
    async fn subscribe(&self, topic: &Topic) -> Result<MessageStream, SubscribeError>;
    async fn close(&mut self) -> Result<(), CloseError>;
}

Router. Registers handlers against subscribers, applies middleware and decorators, and runs every consumer concurrently until a shutdown signal fires. Use add_consumer for a sink, or add_handler when a handler also publishes to another topic.

Backends

Transport Crate Notes
In-memory strev-channel single process, ideal for tests and local dev
Redis Streams strev-redis consumer groups, pluggable marshaller
NATS JetStream strev-nats durable pull consumers, headers as metadata
Apache Kafka strev-kafka consumer groups, manual offset commits
PostgreSQL strev-postgres durable table, per-group offsets, pure Rust (sqlx)
MongoDB strev-mongodb change streams, resume tokens (needs replica set)

strev-kafka exposes a sasl-ssl feature that enables TLS and SASL for managed brokers, and a config passthrough for arbitrary client properties:

KafkaPublisherConfig::new("broker:9092")
    .option("security.protocol", "SASL_SSL")
    .option("sasl.mechanisms", "PLAIN")
    .option("sasl.username", "<key>")
    .option("sasl.password", "<secret>");

Middleware

Register middleware on the router with add_middleware; it wraps every handler in order. Built-in middleware:

Retry, Timeout, Throttle, CircuitBreaker, Deduplicator, CorrelationId, PoisonQueue, DelayOnError, Duplicator, IgnoreErrors, InstantAck, RandomFail.

The strev-telemetry crate adds a Telemetry middleware that emits a tracing span per message plus metrics facade measurements (handler-duration histogram, acked/nacked/ errored counters), so you can wire strev into any tracing/metrics exporter you already use.

Decorators and CloudEvents

Decorators transform messages at the transport boundary on both the publish and subscribe side. The strev-cloudevents crate uses this to envelope and unwrap messages as CloudEvents, mapping event attributes to ce-* metadata:

let codec = CloudEventCodec::new("https://example.com/orders")
    .event_type("com.example.order.created");

router.add_subscriber_decorator(CloudEventsSubscriberDecorator::new(codec.clone()));
router.add_publisher_decorator(CloudEventsPublisherDecorator::new(codec));

Examples

Runnable examples live under each crate's examples/ directory:

  • strev: basic_pubsub, router, consumer_groups, middleware_chain, deduplication, poison_queue, event_pipeline
  • strev-redis: redis_pubsub
  • strev-nats: nats_pubsub
  • strev-kafka: kafka_pubsub
  • strev-postgres: postgres_pubsub
  • strev-mongodb: mongodb_pubsub
  • strev-cloudevents: router_cloudevents
  • strev-telemetry: telemetry

Run one with, for example:

cargo run -p strev --example basic_pubsub

Development

Unit tests need no services:

make test-unit

Integration tests run against pinned Docker services:

make services       # start redis, nats, kafka
make test-all       # unit + integration
make services-down  # tear down

Format and lint:

make check          # fmt check + clippy + unit tests

Git hooks

Commits follow Conventional Commits, which the release automation relies on. Install the hook manager and commit linter, then enable the hooks:

brew install lefthook committed
lefthook install

The pre-commit hook runs cargo fmt --check and clippy; the commit-msg hook checks the message format. CI enforces both regardless of local hooks.

License

MIT