Expand description
Type-safe async pub/sub for Rust on top of RabbitMQ, AWS SNS/SQS, NATS JetStream, Apache Kafka, or an in-process broker.
§The Broker<B> pattern
Everything hangs off a single generic hub Broker<B>, parameterised by a
backend marker B (one of [RabbitMq], [Sqs], [Nats], [Kafka],
[InMemory], each gated on its Cargo feature). The marker binds that
backend’s client / publisher / consumer / topology / registry types
together; the generic wrappers below delegate through the sealed
Backend trait.
Broker<B>
├─ .topology() → TopologyDeclarer<B>
├─ .publisher().await → Publisher<B>
├─ .consumer_supervisor() → ConsumerSupervisor<B> (all backends)
└─ .consumer_group() → ConsumerGroup<B> (B: HasCoordinatedGroups)§Capability gating
- Kafka, RabbitMQ, NATS, InMemory implement the
HasCoordinatedGroupscapability trait — they exposeBroker::consumer_groupfor min/max-bounded coordinated groups with autoscaling. - SQS does not. A “group” on SQS is N parallel independent
pollers on one queue, which maps to
ConsumerSupervisor(the backend-agnostic path available on everyBroker<B>). Callingconsumer_group()onBroker<Sqs>is a compile error.
§Feature flags
No features are enabled by default. Enable only what you need.
| Feature | What it enables |
|---|---|
inmemory | In-process broker, publisher, consumer, topology, groups, autoscaler (no external broker) |
kafka | Apache Kafka publisher, consumer, topology, consumer groups, autoscaling |
nats | NATS JetStream publisher, consumer, topology, consumer groups, autoscaling |
rabbitmq | RabbitMQ publisher, consumer, topology, consumer groups, autoscaling |
rabbitmq-transactional | RabbitMQ exactly-once routing via AMQP transactions (implies rabbitmq) |
pub-aws-sns | SNS publisher and topology declaration only |
aws-sns-sqs | Full SNS + SQS stack — publisher, SQS consumer, supervisor, autoscaling (implies pub-aws-sns) |
redis-streams | Redis/Valkey Streams publisher, consumer, topology, consumer groups, FIFO sharding |
audit | [ShoveAuditHandler] + [AuditLog] topic for persisting audit records through any backend |
§Quickstart
The example below uses the in-process backend so it needs no external
services. Swap InMemory for [RabbitMq], [Sqs], [Nats], or
[Kafka] — the topic definition, handler, and every call site stay
identical.
use serde::{Deserialize, Serialize};
use shove::inmemory::{InMemoryConfig, InMemoryConsumerGroupConfig};
use shove::{
Broker, ConsumerGroupConfig, InMemory, MessageHandler, MessageMetadata, Outcome,
TopologyBuilder, define_topic,
};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct OrderPaid { order_id: String }
define_topic!(Orders, OrderPaid,
TopologyBuilder::new("orders").dlq().build());
struct Handler;
impl MessageHandler<Orders> for Handler {
type Context = ();
async fn handle(&self, msg: OrderPaid, _: MessageMetadata, _: &()) -> Outcome {
println!("paid: {}", msg.order_id);
Outcome::Ack
}
}
let broker = Broker::<InMemory>::new(InMemoryConfig::default()).await?;
broker.topology().declare::<Orders>().await?;
let publisher = broker.publisher().await?;
publisher.publish::<Orders>(&OrderPaid { order_id: "ORD-1".into() }).await?;
let mut group = broker.consumer_group();
group
.register::<Orders, _>(
ConsumerGroupConfig::new(InMemoryConsumerGroupConfig::new(1..=1)),
|| Handler,
)
.await?;
let outcome = group
.run_until_timeout(std::future::ready(()), Duration::from_secs(1))
.await;
std::process::exit(outcome.exit_code());§Ergonomics
MessageHandlerExt::audited— fluent audit wrapping:handler.audited(sink)instead ofAudited::new(handler, sink).TopologyDeclarer::declare_all— declare multiple topics in one call via tuple arities 1 through 16.ConsumerOptions::preset— shorthand fornew().with_prefetch_count(n).SupervisorOutcome::exit_code— canonical process exit code from a consumer group or supervisor:0clean,1any handler error,2any task panic,3drain timeout.
§Observability
Every interesting state change is emitted as a structured tracing event,
so wiring any tracing-subscriber gives a full operational trail without
handler-side instrumentation.
Enable the metrics cargo feature to also emit operational counters,
histograms, and gauges through the metrics
facade — messages_consumed_total, message_processing_duration_seconds,
messages_inflight, backend_errors_total, and friends. shove is a
library, so it does not open a port: install your own recorder
(metrics-exporter-prometheus,
metrics-exporter-statsd, OpenTelemetry, etc.) and expose the endpoint
from your service. Override the shove_ metric prefix with
metrics::set_prefix once at startup, before any broker activity. See
the module docs and the Observability guide for the full schema.
§See also
TopologyBuilderfor hold queues, DLQs, and sequenced routing.define_topic!anddefine_sequenced_topic!for the typed-topic macros.- Per-backend modules: [
rabbitmq], [sns], [nats], [kafka], [inmemory] — expose the config and client types bound to each marker.
Re-exports§
pub use audit::AuditHandler;pub use audit::AuditRecord;pub use audit::Audited;pub use autoscale_metrics::AutoscaleMetrics;pub use backend::Backend;pub use backend::capability::HasCoordinatedGroups;pub use consumer::ConsumerOptions;pub use consumer::DEFAULT_HANDLER_TIMEOUT;pub use consumer::DEFAULT_MAX_MESSAGE_SIZE;pub use consumer::DEFAULT_MAX_PENDING_PER_KEY;pub use consumer_supervisor::ConsumerSupervisor;pub use consumer_supervisor::SupervisorOutcome;pub use error::ShoveError;pub use handler::MessageHandler;pub use handler::MessageHandlerExt;pub use metadata::DeadMessageMetadata;pub use metadata::MessageMetadata;pub use outcome::Outcome;pub use topic::SequencedTopic;pub use topic::Topic;pub use topology::HoldQueue;pub use topology::QueueTopology;pub use topology::SequenceConfig;pub use topology::SequenceFailure;pub use topology::TopologyBuilder;pub use autoscaler::Autoscaler;pub use autoscaler::AutoscalerBackend;pub use autoscaler::AutoscalerConfig;pub use autoscaler::ScalingDecision;pub use autoscaler::ScalingMetrics;pub use autoscaler::ScalingStrategy;pub use autoscaler::Stabilized;pub use autoscaler::ThresholdStrategy;pub use broker::Broker;pub use consumer_group::ConsumerGroup;pub use consumer_group::ConsumerGroupConfig;pub use publisher::Publisher;pub use topology_declarer::Topics;pub use topology_declarer::TopologyDeclarer;
Modules§
- audit
- autoscale_
metrics - Honest, partial-data metrics struct for autoscaler inputs. See DESIGN_V2.md §9.1.
- autoscaler
- backend
- Sealed
Backendtrait layer. Binds a backend marker type to its concrete internal implementation types; the public generic wrappers (Broker<B>,Publisher<B>, etc.) delegate through this. - broker
- Public
Broker<B>hub. See DESIGN_V2.md §6.1. - consumer
- consumer_
group - Public
ConsumerGroup<B, Ctx>– specialist harness for coordinated consumer groups. Gated onB: HasCoordinatedGroups. See DESIGN_V2.md §6.3. - consumer_
supervisor - Consumer supervisor harness and its outcome type. See DESIGN_V2.md §6.5.
- error
- handler
- markers
- Backend marker types. One zero-sized struct per backend, each under the existing Cargo feature. See DESIGN_V2.md §7.
- metadata
- metrics
- Operational metrics emitted via the
metricsfacade. - outcome
- publisher
- Public
Publisher<B>wrapper. See DESIGN_V2.md §6.2. - topic
- topology
- topology_
declarer - Public
TopologyDeclarer<B>+Topicstuple trait. See DESIGN_V2.md §6.4, §11.4.
Macros§
- define_
sequenced_ topic - Define a sequenced topic with static topology.
- define_
topic - Define a topic with static topology.