shove
Type-safe async pub/sub for Rust. Supports RabbitMQ and AWS SNS/SQS.
Define a topic once — shove handles the plumbing: queue topology, retries, DLQ, ordered delivery, auditing and autoscaling consumer groups. A single consumer handles 10k+ msg/s out of the box.
Quick start
No features are enabled by default. Add the backend you need:
# RabbitMQ backend
# SNS publisher only (publish to SNS, consume elsewhere)
# Full AWS SNS+SQS backend (publisher + consumer)
For RabbitMQ, you need a running instance with the consistent-hash exchange plugin enabled. Start one with the included docker-compose:
Define a topic
use *;
use ;
use Duration;
define_topic!;
Publish
use *;
// Connect to local RabbitMQ (docker compose up -d)
let client = connect.await?;
let publisher = new.await?;
let event = SettlementEvent ;
// Type-safe: compiler ensures SettlementEvent matches OrderSettlement::Message
publisher..await?;
Publish (SNS)
use *;
use Arc;
let client = new.await?;
let registry = new;
let declarer = new;
.await?;
let publisher = new;
publisher..await?;
Consume (SQS)
use *;
use Arc;
let client = new.await?;
let topic_registry = new;
let queue_registry = new;
let declarer = new
.with_queue_registry;
.await?;
let consumer = new;
let options = new.with_prefetch_count;
consumer..await?;
Consume (RabbitMQ)
use *;
use CancellationToken;
;
let shutdown_token = new;
let consumer = new;
let options = new
.with_prefetch_count;
// Processes up to 20 messages concurrently, acks always in delivery order
consumer
.
.await?;
Why not just use lapin?
lapin gives you raw AMQP primitives — channels, queues, exchanges, bindings. If you have one service with one queue, that's all you need.
shove is for when you have dozens of topics across multiple services and you don't want to hand-roll retry topologies, DLQ routing, consumer groups, autoscaling, and audit trails for each one. It's a higher-level abstraction: topics are Rust types, the topology is derived from them, and the framework manages the lifecycle.
Performance
Measured on a MacBook Pro M4 Max, single RabbitMQ node via Docker, Rust 1.91. Results will vary by hardware and broker configuration. Reproducible via cargo run -q --example stress --features rabbitmq.
Throughput is controlled by two knobs: prefetch count (how many messages each worker processes concurrently) and worker count (how many consumers in the group).
| Handler | 1 worker, prefetch=1 | 1 worker, prefetch=20 | 8 workers, prefetch=20 | 32 workers, prefetch=40 |
|---|---|---|---|---|
| Fast (1-5ms) | 179 msg/s | 2,866 msg/s | 19,669 msg/s | 29,207 msg/s |
| Slow (50-300ms) | 6 msg/s | 75 msg/s | 544 msg/s | 4,076 msg/s |
| Heavy (1-5s) | 0.4 msg/s | 5 msg/s | 21 msg/s | 199 msg/s |
Prefetch count is the single biggest throughput lever for I/O-bound handlers. Adding workers scales linearly when the handler is the bottleneck.
The framework itself adds minimal overhead: sub-millisecond dispatch latency per message, ~4 KB RSS per consumer, and zero idle CPU — tested up to 4096 consumers on a single connection.
Features
- Compile-time topic binding — each topic is a unit struct that associates a message type (
Serialize + DeserializeOwned) with a queue topology. No stringly-typed queue names at call sites. - Concurrent consumption — process up to
prefetch_countmessages concurrently within a single consumer, while always acknowledging in delivery order. - Escalating retry backoff — configure multiple hold queues with increasing delays. The consumer picks the right one automatically based on retry count.
- Dead-letter queues — opt-in per topic. Messages that exceed max retries or are explicitly rejected route to DLQ with full death metadata.
- Sequenced delivery — strict per-key ordering via
SequencedTopic. Messages sharing a sequence key are delivered in publish order. Different keys are processed concurrently within each shard. Two failure policies:Skip(continue the sequence) orFailAll(terminate it). - Consumer groups & autoscaling — dynamically scale consumers up and down based on queue depth, with hysteresis and cooldown to prevent flapping.
- Audit logging — wrap any handler with
Audited<H, A>to capture every delivery attempt as a structuredAuditRecord. Implement theAuditHandlertrait for your persistence backend, or enable theauditfeature for a built-in handler that publishes records to a dedicatedshove-audit-logtopic. - Handler timeout — set a per-consumer
handler_timeoutinConsumerOptionsorConsumerGroupConfig. Messages that exceed the deadline are automatically retried. - Backend-agnostic core — traits for
Publisher,Consumer,TopologyDeclarer, andMessageHandlerlive in the core crate. Backends are feature-gated.
Concurrent consumption
By default, run processes up to prefetch_count messages concurrently within a single consumer. Messages are dispatched to handler tasks as they arrive, but acknowledgements are always sent in delivery order — if messages 1, 2, 3 are in-flight and message 3 finishes first, it waits for 1 and 2 to complete before any are acked.
Consumer groups support this via ConsumerGroupConfig::with_concurrent_processing(true).
Set prefetch_count = 1 for sequential processing if your handler has process-level side effects that cannot tolerate concurrency (e.g. writing to a shared file or holding a global lock). If your handler is async and talks to external services, use the default prefetch count.
Sequenced topics
Use sequenced topics when message ordering is absolutely required for correctness. The canonical example is financial transactions: if account ACC-123 receives a deposit, then a withdrawal, processing them out of order produces an incorrect balance. Other cases include state-machine transitions, inventory adjustments, and any domain where events are causally dependent.
Within each shard, different sequence keys are processed concurrently — only messages sharing the same key are serialized. This means you get the throughput benefits of concurrent processing across independent entities (different accounts, different users) while maintaining strict ordering where it matters:
define_sequenced_topic!;
// Compiler enforces AccountLedger: SequencedTopic
consumer
.
.await?;
Sequence failure policies
When a sequenced message fails permanently (exceeds max retries or returns Outcome::Reject), the failure policy controls what happens to the rest of the sequence:
SequenceFailure::Skip — Dead-letter the failed message and continue processing subsequent messages for the same key. Use this when messages are independently valid but need ordered delivery (e.g. audit log entries, analytics events).
SequenceFailure::FailAll — Dead-letter the failed message and automatically dead-letter all remaining messages for the same key. The key is "poisoned" for the lifetime of the consumer process. Use this when messages are causally dependent — processing later messages after an earlier failure would produce an inconsistent state (e.g. financial ledger entries, state-machine transitions).
Messages for other sequence keys are unaffected by either policy.
Example: given messages [1, 2, 3, 4, 5] for key ACC-A where message 3 is rejected:
| Policy | Ack'd | DLQ'd |
|---|---|---|
Skip |
1,2,4,5 | 3 |
FailAll |
1,2 | 3,4,5 |
Consumer groups & autoscaling
use *;
let mut registry = new;
registry..await?;
registry.start_all;
// Autoscaler adjusts consumer count based on queue depth
let mgmt = new;
let mut autoscaler = new;
autoscaler.run.await;
Audit logging
Wrap any handler with Audited to record every delivery attempt as a structured AuditRecord:
use *;
;
let handler = new;
// Use it anywhere a normal handler is accepted — consumers, consumer groups, etc.
consumer
.
.await?;
Each AuditRecord contains: trace_id, topic, payload, metadata, outcome, duration_ms, and timestamp.
If the audit handler returns Err, the message is retried — audit failure is never silently dropped.
Built-in ShoveAuditHandler
Enable the audit feature (on by default) for a handler that publishes records back to a dedicated shove-audit-log topic:
use *;
use *;
let audit = new;
let handler = new;
This creates a self-contained audit trail inside your broker. The audit topic itself is not audited.
Reference
Outcome variants
| Variant | Behavior |
|---|---|
Ack |
Message processed successfully, remove from queue |
Retry |
Transient failure — route to hold queue with escalating backoff, increment retry counter. Also triggered automatically when handler_timeout is exceeded |
Reject |
Permanent failure — route to DLQ (or discard if no DLQ) |
Defer |
Re-deliver via hold queue without incrementing retry counter (e.g. scheduled messages) |
Topology configurations
| Configuration | What it does |
|---|---|
.dlq() |
Adds a dead-letter queue ({name}-dlq) |
.hold_queue(duration) |
Adds a hold queue for delayed retry ({name}-hold-{secs}s) |
.sequenced(policy) |
Enables strict per-key ordering via consistent-hash exchange |
.routing_shards(n) |
Sets the number of sub-queues for sequenced delivery (default: 8) |
Backends
| Backend | Feature flag | Status |
|---|---|---|
| RabbitMQ | rabbitmq |
Stable |
| AWS SNS (publish) | pub-aws-sns |
In Progress |
| AWS SNS+SQS | aws-sns-sqs |
In Progress |
pub-aws-sns enables SNS publisher only — useful when you publish to SNS but consume on a different stack.
aws-sns-sqs enables the full SNS+SQS backend (publisher + consumer + consumer groups + autoscaling). It implies pub-aws-sns.
Sequenced topics use SNS FIFO topics with MessageGroupId for per-key ordering and sharded FIFO SQS queues for parallel consumption.
No features are enabled by default. Enable audit alongside your backend for the built-in ShoveAuditHandler:
= { = "0.6", = ["rabbitmq", "audit"] }
Minimum Rust version
shove uses edition = "2024", which requires Rust 1.85 or later.
Examples
See the examples/ directory:
basic_pubsub— publish/consume lifecycle, DLQ handling, all outcome variantsconcurrent_pubsub— concurrent consumption with in-order acking, sequential vs concurrent comparisonsequenced_pubsub— ordered delivery withSkipandFailAllpoliciesconsumer_groups— dynamic scaling with the autoscaleraudited_consumer— customAuditHandlerthat logs every delivery attempt to stdoutstress— tiered stress benchmarks measuring throughput, latency percentiles, scaling efficiency, and resource usage
Stress tests
The stress suite runs tiered benchmarks against a RabbitMQ container (started automatically via testcontainers):
The bench suite also includes consumer-overhead analysis:
Roadmap
- SNS publisher — publish via SNS standard and FIFO topics with batch support and auto-chunking
- SQS consumer — consume via SQS queues with consumer groups, sequenced delivery via FIFO queues, and autoscaling
- Observability — built-in OpenTelemetry metrics (publish latency, consume rate, retry/DLQ counts)
- Other pubsub providers — looking at Cloudflare Queues, Firebase Pub/Sub and others
Background
The first version of this crate was built for Lens to handle millions of async events — media ingestion, cross-chain migrations, backfills. It was a self-contained pubsub system including a bespoke message broker, and it worked well, but lacked auditing and autoscaling.
shove is the do-over. RabbitMQ handles storage and routing. shove handles the rest: type-safe topics, retry topologies, ordered delivery, consumer groups that scale themselves.
We needed a name for "throw a job at something and stop thinking about it." Push was taken. Yeet was considered. shove stuck — it's what you do with messages: shove them in, let the broker deal with it.
Note: shove is under active development. The API is subject to breaking changes.
Disclaimer
The architecture and design of this crate are human-made. The implementation is mostly written by Claude.