shove
Type-safe async pub/sub for Rust on top of RabbitMQ or AWS SNS/SQS.
shove is for workloads where "just use the broker client" stops being enough: retries with backoff, DLQs, ordered delivery, consumer groups, autoscaling, and auditability. You define a topic once as a Rust type and the crate derives the messaging topology and runtime behavior from it.
Why shove
- Typed topics instead of stringly-typed queue names
- Built-in retry topologies with delayed hold queues and DLQs
- Concurrent consumers with in-order acknowledgements
- Strict per-key ordering for sequenced topics
- Consumer groups with queue-depth autoscaling
- Structured audit trails for every delivery attempt
- RabbitMQ at-least-once by default, with opt-in transactional exactly-once routing
- SNS publisher-only mode or full SNS + SQS consumption stack
If you have one queue, one consumer, and little retry logic, use lapin or the AWS SDK directly. shove is the layer for multi-service event flows that need operational discipline.
Features
No features are enabled by default.
# RabbitMQ publisher + consumer + consumer groups + autoscaling
# Transactional RabbitMQ subscribers with exactly-once delivery guarantees (use only when absolutely necessary)
# SNS publisher only
# Full AWS SNS + SQS backend
# Optional built-in audit publisher
| Feature | What it enables |
|---|---|
rabbitmq |
RabbitMQ publisher, consumer, topology declaration, consumer groups, autoscaling |
rabbitmq-transactional |
RabbitMQ exactly-once routing via AMQP transactions |
pub-aws-sns |
SNS publisher and topology declaration |
aws-sns-sqs |
Full SNS + SQS publisher/consumer stack, consumer groups, autoscaling |
audit |
Built-in ShoveAuditHandler and AuditLog topic |
Quick start
For RabbitMQ, the repo's docker-compose.yml starts a local broker with the consistent-hash exchange plugin enabled. For the AWS path, it starts LocalStack with SNS and SQS on http://localhost:4566.
Define a topic once:
use ;
use *;
use Duration;
define_topic!;
Publish:
use *;
let client = connect.await?;
let publisher = new.await?;
publisher
.
.await?;
Consume:
use *;
use *;
use CancellationToken;
;
let shutdown = new;
let consumer = new;
let options = new.with_prefetch_count;
consumer..await?;
Publish (SNS):
use *;
use Arc;
let config = SnsConfig ;
let client = new.await?;
let topic_registry = new;
let queue_registry = new;
let declarer = new
.with_queue_registry;
.await?;
let publisher = new;
publisher
.
.await?;
Consume (SQS):
use *;
let consumer = new;
let options = new.with_prefetch_count;
consumer..await?;
Delivery model
shove is at-least-once by default. That means handlers should be idempotent.
Outcome::Ack: successOutcome::Retry: delayed retry through hold queues, with escalating backoffOutcome::Reject: dead-letter immediatelyOutcome::Defer: delay without increasing retry count
Additional behavior:
- Handler timeouts automatically convert to retry
- DLQ consumers receive
DeadMessageMetadata - RabbitMQ publishes a stable
x-message-idheader for deduplication - RabbitMQ can opt into transactional exactly-once routing with
rabbitmq-transactional
Exactly-once mode removes the publish-then-ack race in RabbitMQ by wrapping routing decisions in AMQP transactions. It is materially slower and should be reserved for handlers with irreversible side effects.
Sequenced topics
Use define_sequenced_topic! when messages for the same entity must be processed in order.
use *;
use Duration;
define_sequenced_topic!;
consumer..await?;
Failure policies:
SequenceFailure::Skip: dead-letter the failed message and continue processing the rest of the sequenceSequenceFailure::FailAll: poison the key and dead-letter all remaining messages for that key
Given messages [1, 2, 3, 4, 5] for the same key where message 3 is permanently rejected:
| Policy | Acked | DLQed |
|---|---|---|
Skip |
1,2,4,5 | 3 |
FailAll |
1,2 | 3,4,5 |
Use Skip when messages are independently valid (e.g. audit entries). Use FailAll when later messages are causally dependent on earlier ones (e.g. financial ledger entries, state-machine transitions).
Messages for other sequence keys are unaffected by either policy.
RabbitMQ uses consistent-hash routing for this. SNS/SQS uses FIFO topics and queues.
Consumer groups and autoscaling
Both backends support named consumer groups with min/max bounds, per-consumer prefetch, retry budget, handler timeouts, and optional concurrent processing inside each consumer.
RabbitMQ:
use *;
use Arc;
use Mutex;
let mut registry = new;
registry
.
.await?;
registry.start_all;
let registry = new;
let mut autoscaler = new;
autoscaler.run.await;
SNS/SQS uses SqsConsumerGroupRegistry and SqsAutoscaler with the same configuration shape.
Audit logging
Wrap any handler with Audited<H, A> to persist a structured AuditRecord for each delivery attempt. Records include the trace id, topic, payload, message metadata, outcome, duration, and timestamp.
Implement AuditHandler<T> for your persistence backend:
use *;
;
let handler = new;
// Drop-in replacement — consumers, consumer groups, and FIFO consumers all accept it.
consumer..await?;
If the audit handler returns Err, the message is retried — audit failure is never silently dropped.
If you enable audit, ShoveAuditHandler can publish those records back into a dedicated shove-audit-log topic using the same broker:
use *;
use *;
let audit = new;
let handler = new;
Performance
Measured on a MacBook Pro M4 Max, single RabbitMQ node via Docker, Rust 1.91. Reproducible via cargo run -q --example rabbitmq_stress --features rabbitmq.
| Handler | 1 worker, prefetch=1 | 1 worker, prefetch=20 | 8 workers, prefetch=20 | 32 workers, prefetch=40 |
|---|---|---|---|---|
| Fast (1–5 ms) | 179 msg/s | 2,866 msg/s | 19,669 msg/s | 29,207 msg/s |
| Slow (50–300 ms) | 6 msg/s | 75 msg/s | 544 msg/s | 4,076 msg/s |
| Heavy (1–5 s) | 0.4 msg/s | 5 msg/s | 21 msg/s | 199 msg/s |
prefetch_count is the primary throughput lever for I/O-bound handlers. Adding workers scales linearly when the handler is the bottleneck. Results will vary by hardware and broker configuration.
Examples
RabbitMQ examples:
rabbitmq_basic_pubsubrabbitmq_concurrent_pubsubrabbitmq_sequenced_pubsubrabbitmq_consumer_groupsrabbitmq_audited_consumerrabbitmq_exactly_oncerabbitmq_stress
SNS/SQS examples:
sqs_basic_pubsubsqs_concurrent_pubsubsqs_sequenced_pubsubsqs_consumer_groupssqs_audited_consumersqs_autoscaler
Run them with:
Reference
Outcome variants
| Variant | Behavior |
|---|---|
Outcome::Ack |
Success — remove from queue |
Outcome::Retry |
Transient failure — route to hold queue with escalating backoff, increment retry counter. Also triggered when handler_timeout is exceeded |
Outcome::Reject |
Permanent failure — route to DLQ (or discard if no DLQ configured) |
Outcome::Defer |
Re-deliver via hold queue without incrementing retry counter |
TopologyBuilder
| Method | Effect |
|---|---|
.hold_queue(duration) |
Adds a hold queue for delayed retry ({name}-hold-{secs}s) |
.sequenced(policy) |
Enables strict per-key ordering via consistent-hash exchange |
.routing_shards(n) |
Sets the number of sub-queues for sequenced delivery (default: 8) |
.dlq() |
Adds a dead-letter queue ({name}-dlq) |
Backends
| Backend | Feature flag | Status |
|---|---|---|
| RabbitMQ | rabbitmq |
Stable |
| RabbitMQ (exactly-once routing) | rabbitmq-transactional |
Stable |
| AWS SNS (publisher only) | pub-aws-sns |
Stable |
| AWS SNS + SQS | aws-sns-sqs |
Stable |
aws-sns-sqs implies pub-aws-sns. Sequenced topics use SNS FIFO topics with MessageGroupId and sharded FIFO SQS queues.
Requirements
shove requires Rust 1.85 or newer (edition 2024).
Background
shove came out of production event-processing systems that needed more than a broker client but less than a platform rewrite. The crate focuses on the hard parts around message handling correctness and operational behavior, while leaving transport and persistence to RabbitMQ or SNS/SQS.
The API is still evolving.