shove
Type-safe async pub/sub for Rust on top of RabbitMQ, AWS SNS/SQS, NATS JetStream, Apache Kafka, or an in-process broker.
shove is for workloads where "just use the broker client" stops being enough: retries with backoff, DLQs, ordered delivery, consumer groups, autoscaling, and auditability. You define a topic once as a Rust type and the crate derives the messaging topology and runtime behavior from it. Pick the transport that fits your stack — RabbitMQ, SNS/SQS, NATS JetStream, Kafka, or the zero-dependency in-process backend for tests and single-process apps — and get the same high-level API everywhere.
Why shove
- Typed topics — define a topic once as a Rust type; queue names, DLQs, and hold queues all derive from it.
- Retry topologies without glue code — escalating backoff through hold queues, DLQ routing, retry budgets, handler timeouts.
- Strict per-key ordering —
SequencedTopicwith pluggable failure policies (SkiporFailAll), enforced by the broker. - Consumer groups + autoscaling — min/max bounds driven by queue depth (or consumer lag on Kafka), with optional structured audit trails.
- One API across five backends — swap the transport without changing topic definitions or handlers.
If you have one queue, one consumer, and little retry logic, use lapin, the AWS SDK, async-nats, or rdkafka directly. shove is the layer for multi-service event flows that need operational discipline.
30-second tour
No Docker, no credentials, no config — this runs against the in-process backend:
use ;
use *;
use *;
use Duration;
use CancellationToken;
define_topic!;
;
# async
Swap InMemory* for RabbitMq*, Sns*, Nats*, or Kafka* — the topic definition and handler stay identical.
Pick your backend
| Backend | Feature flag | Durability | Ordering primitive | Autoscale signal | Ops burden |
|---|---|---|---|---|---|
| RabbitMQ | rabbitmq |
Disk | Consistent-hash exchange + SAC shards | Queue depth (mgmt API) | Broker + mgmt plugin |
| AWS SNS/SQS | aws-sns-sqs |
Managed (AWS) | FIFO topic + MessageGroupId |
Queue depth | Managed — no infra |
| NATS JetStream | nats |
Disk (JetStream) | Subject shard + max_ack_pending=1 |
Pending messages | NATS server |
| Apache Kafka | kafka |
Disk (log) | Partition key | Consumer lag | Kafka cluster |
| In-process | inmemory |
None (process RAM) | Per-key FIFO shards | Queue depth (in-proc) | None |
Rules of thumb:
- Prototyping, tests, single-process apps →
inmemory - Already on AWS, don't want to run infra →
aws-sns-sqs - Low-latency streaming, high throughput, replay →
kafka - Complex routing + retry topologies, existing RabbitMQ →
rabbitmq - Lightweight edge deployments, JetStream already in-stack →
nats
All backends support typed topics, retry/DLQ, sequenced delivery, consumer groups, autoscaling, and audit — the trait surface is identical.
Features
No features are enabled by default.
# RabbitMQ publisher + consumer + consumer groups + autoscaling
# Transactional RabbitMQ subscribers with exactly-once delivery guarantees (use only when absolutely necessary)
# SNS publisher only
# Full AWS SNS + SQS backend
# NATS JetStream
# Apache Kafka
# In-memory broker
# Optional built-in audit publisher
| Feature | What it enables |
|---|---|
rabbitmq |
RabbitMQ publisher, consumer, topology declaration, consumer groups, autoscaling |
rabbitmq-transactional |
RabbitMQ exactly-once routing via AMQP transactions |
pub-aws-sns |
SNS publisher and topology declaration |
aws-sns-sqs |
Full SNS + SQS publisher/consumer stack, consumer groups, autoscaling |
nats |
NATS JetStream publisher, consumer, topology declaration, consumer groups, autoscaling |
kafka |
Kafka publisher, consumer, topology declaration, consumer groups, autoscaling |
inmemory |
In-memory broker — publisher, consumer, topology, consumer groups, autoscaling |
audit |
Built-in ShoveAuditHandler and AuditLog topic |
Quick start
The repo's docker-compose.yml starts every broker needed below: RabbitMQ with the consistent-hash plugin, LocalStack (SNS + SQS on http://localhost:4566), NATS JetStream on localhost:4222, and Kafka on localhost:9092. The inmemory backend needs nothing.
Define the topic and handler once — this is identical across every backend:
use ;
use *;
use Duration;
define_topic!;
;
Then pick the transport:
use *;
use CancellationToken;
let client = connect.await?;
// Publish
let publisher = new.await?;
publisher..await?;
// Consume
let shutdown = new;
let consumer = new;
let options = new.with_prefetch_count;
consumer..await?;
use *;
use Arc;
let client = new.await?;
let topic_registry = new;
let queue_registry = new;
let declarer = new
.with_queue_registry;
.await?;
// Publish
let publisher = new;
publisher..await?;
// Consume
let consumer = new;
consumer..await?;
use *;
let client = connect.await?;
new
.declare.await?;
// Publish
new.await?
..await?;
// Consume
new
..await?;
use *;
let client = connect.await?;
new
.declare.await?;
// Publish
new.await?
..await?;
// Consume
new
..await?;
use *;
let broker = new;
new
.declare.await?;
// Publish
new
..await?;
// Consume
new
..await?;
Messages live only in the broker process and are dropped on shutdown — use a durable backend for anything production.
Delivery model
shove is at-least-once by default. That means handlers should be idempotent.
Outcome::Ack: successOutcome::Retry: delayed retry through hold queues, with escalating backoffOutcome::Reject: dead-letter immediatelyOutcome::Defer: delay without increasing retry count
Additional behavior:
- Handler timeouts automatically convert to retry
- DLQ consumers receive
DeadMessageMetadata - RabbitMQ publishes a stable
x-message-idheader for deduplication - RabbitMQ can opt into transactional exactly-once routing with
rabbitmq-transactional - NATS uses
Nats-Msg-Idfor deduplication within a 120-second window - Kafka uses partition-based ordering; retry and DLQ routing is handled via hold/DLQ topics
Exactly-once mode removes the publish-then-ack race in RabbitMQ by wrapping routing decisions in AMQP transactions. It is materially slower and should be reserved for handlers with irreversible side effects.
Sequenced topics
Use define_sequenced_topic! when messages for the same entity must be processed in order.
use *;
use Duration;
define_sequenced_topic!;
consumer..await?;
Failure policies:
SequenceFailure::Skip: dead-letter the failed message and continue processing the rest of the sequenceSequenceFailure::FailAll: poison the key and dead-letter all remaining messages for that key
Given messages [1, 2, 3, 4, 5] for the same key where message 3 is permanently rejected:
| Policy | Acked | DLQed |
|---|---|---|
Skip |
1,2,4,5 | 3 |
FailAll |
1,2 | 3,4,5 |
Use Skip when messages are independently valid (e.g. audit entries). Use FailAll when later messages are causally dependent on earlier ones (e.g. financial ledger entries, state-machine transitions).
Messages for other sequence keys are unaffected by either policy.
RabbitMQ uses consistent-hash routing for this. SNS/SQS uses FIFO topics and queues. NATS uses subject-based shard routing with max_ack_pending: 1 per shard to enforce strict ordering. Kafka uses partition-key routing — messages with the same sequence key land in the same partition, and the consumer processes one message at a time to guarantee order.
Consumer groups and autoscaling
All backends support named consumer groups with min/max bounds, per-consumer prefetch, retry budget, handler timeouts, and optional concurrent processing inside each consumer.
RabbitMQ:
use *;
use AutoscalerConfig;
use Arc;
use Duration;
use Mutex;
let mut registry = new;
registry
.
.await?;
registry.start_all;
let registry = new;
let mgmt = new;
let mut autoscaler = autoscaler;
autoscaler.run.await;
Every other backend has an identical shape — swap ConsumerGroupRegistry / RabbitMqAutoscalerBackend for SqsConsumerGroupRegistry / SqsAutoscalerBackend, NatsConsumerGroupRegistry / NatsAutoscalerBackend, KafkaConsumerGroupRegistry / KafkaAutoscalerBackend, or InMemoryConsumerGroupRegistry / InMemoryAutoscalerBackend. Config constructors (NatsConsumerGroupConfig::new, KafkaConsumerGroupConfig::new, …) carry the same builder methods.
Audit logging
Wrap any handler with Audited<H, A> to persist a structured AuditRecord for each delivery attempt. Records include the trace id, topic, payload, message metadata, outcome, duration, and timestamp.
Implement AuditHandler<T> for your persistence backend:
use *;
;
let handler = new;
// Drop-in replacement — consumers, consumer groups, and FIFO consumers all accept it.
consumer..await?;
If the audit handler returns Err, the message is retried — audit failure is never silently dropped.
If you enable audit, ShoveAuditHandler can publish those records back into a dedicated shove-audit-log topic using the same broker:
use *;
use *;
let audit = new;
let handler = new;
Performance
Measured on a MacBook Pro M4 Max, single RabbitMQ node via Docker, Rust 1.91. Reproducible via cargo run -q --example rabbitmq_stress --features rabbitmq.
| Handler | 1 worker, prefetch=1 | 1 worker, prefetch=20 | 8 workers, prefetch=20 | 32 workers, prefetch=40 |
|---|---|---|---|---|
| Fast (1–5 ms) | 179 msg/s | 2,866 msg/s | 19,669 msg/s | 29,207 msg/s |
| Slow (50–300 ms) | 6 msg/s | 75 msg/s | 544 msg/s | 4,076 msg/s |
| Heavy (1–5 s) | 0.4 msg/s | 5 msg/s | 21 msg/s | 199 msg/s |
prefetch_count is the primary throughput lever for I/O-bound handlers. Adding workers scales linearly when the handler is the bottleneck. Results will vary by hardware and broker configuration.
Examples
Run any with cargo run --example <name> --features <flag>:
| Backend | Feature flag | Examples |
|---|---|---|
| RabbitMQ | rabbitmq |
rabbitmq_basic_pubsub, rabbitmq_concurrent_pubsub, rabbitmq_sequenced_pubsub, rabbitmq_consumer_groups, rabbitmq_audited_consumer, rabbitmq_stress |
| RabbitMQ (tx) | rabbitmq-transactional |
rabbitmq_exactly_once |
| AWS SNS/SQS | aws-sns-sqs |
sqs_basic_pubsub, sqs_concurrent_pubsub, sqs_sequenced_pubsub, sqs_consumer_groups, sqs_audited_consumer, sqs_autoscaler, sqs_stress |
| NATS JetStream | nats |
nats_basic, nats_sequenced, nats_audited_consumer, nats_stress |
| Apache Kafka | kafka |
kafka_basic, kafka_sequenced, kafka_audited_consumer, kafka_stress |
| In-process | inmemory |
inmemory_basic, inmemory_sequenced, inmemory_consumer_groups, inmemory_audited_consumer, inmemory_stress |
The audit feature is required on top of the backend flag for any *_audited_consumer example.
API reference
Full rustdoc is on docs.rs/shove:
Outcome— what a handler returns (Ack,Retry,Reject,Defer)TopologyBuilder—.hold_queue,.sequenced,.routing_shards,.dlqConsumerOptions,MessageHandler,Publisher- Per-backend modules:
rabbitmq,sns,nats,kafka,inmemory
Backend-specific sequenced-delivery mapping: RabbitMQ uses a consistent-hash exchange with SAC shards. SNS/SQS uses FIFO topics + MessageGroupId. NATS uses subject-based shard routing with max_ack_pending: 1. Kafka uses partition-key routing. In-process uses per-key FIFO shards (ordering enforced in-memory, no persistence, no cross-process delivery).
Requirements
shove requires Rust 1.85 or newer (edition 2024).
Background
shove came out of production event-processing systems that needed more than a broker client but less than a platform rewrite. The crate focuses on the hard parts around message handling correctness and operational behavior, while leaving transport and persistence to RabbitMQ, SNS/SQS, NATS JetStream, or Kafka.
The API is still evolving.