shove
Topic-typed pub/sub for Rust.
shove is an async message publishing and consuming library that binds message types to their broker destinations at compile time. Define a topic once — the type system ensures publishers and consumers agree on message shape, queue names, retry policy, and ordering guarantees.
⚠️ Warning: shove is under active development and is not considered production-ready. The API is subject to breaking changes and comes with no guarantees of stability, correctness, or backwards compatibility. 🚧
Background
The first version of this crate was built for Lens to handle millions of async events — ingestion, cross-chain migrations, backfills. It was a self-contained pubsub system including a bespoke message broker, and it worked, but lacked auditing and autoscaling.
shove is the do-over. RabbitMQ handles storage and routing. shove handles the rest: type-safe topics, retry topologies, ordered delivery, consumer groups that scale themselves.
Who is it for
You have a distributed, event-driven application. You need to move messages between services reliably, at high throughput, without burning through resources. You want the compiler to catch mismatches between publishers and consumers before they hit production. And when load spikes, you want consumers to scale themselves instead of paging someone at 3 AM.
If all of the above sounds relatable, then shove is for you.
Why "shove"
We needed a name for "throw a job at something and stop thinking about it." Push was taken. Yeet was considered. shove stuck — it's what you do with messages: shove them in, let the broker deal with it.
Features
- Compile-time topic binding — each topic is a unit struct that associates a message type (
Serialize + DeserializeOwned) with a queue topology. No stringly-typed queue names at call sites. - Escalating retry backoff — configure multiple hold queues with increasing delays. The consumer picks the right one automatically based on retry count.
- Dead-letter queues — opt-in per topic. Messages that exceed max retries or are explicitly rejected route to DLQ with full death metadata.
- Sequenced delivery — strict per-key ordering via
SequencedTopic. Messages sharing a sequence key are delivered in publish order. Two failure policies:Skip(continue the sequence) orFailAll(terminate it). - Consumer groups & autoscaling — dynamically scale consumers up and down based on queue depth, with hysteresis and cooldown to prevent flapping.
- Audit logging — wrap any handler with
Audited<H, A>to capture every delivery attempt as a structuredAuditRecord. Implement theAuditHandlertrait for your persistence backend, or enable theauditfeature for a built-in handler that publishes records to a dedicatedshove-audit-logtopic. - Handler timeout — set a per-consumer
handler_timeoutinConsumerOptionsorConsumerGroupConfig. Messages that exceed the deadline are automatically retried. - Backend-agnostic core — traits for
Publisher,Consumer,TopologyDeclarer, andMessageHandlerlive in the core crate. Backends are feature-gated.
Backends
| Backend | Feature flag | Status |
|---|---|---|
| RabbitMQ | rabbitmq |
Stable |
| AWS SNS/SQS | sns |
Planned |
Both rabbitmq and audit are default features. To opt out of the built-in audit backend:
= { = "0.4", = false, = ["rabbitmq"] }
Quick start
Add shove to your Cargo.toml:
[]
= { = "0.4", = ["rabbitmq"] }
Define a topic
use *;
use ;
use Duration;
define_topic!;
Publish
use *;
let client = connect.await?;
let publisher = new.await?;
let event = SettlementEvent ;
// Type-safe: compiler ensures SettlementEvent matches OrderSettlement::Message
publisher..await?;
Consume
use *;
;
let consumer = new;
consumer
.
.await?;
Sequenced topics
For messages that must be processed in strict order per key:
define_sequenced_topic!;
// Compiler enforces AccountLedger: SequencedTopic
consumer
.
.await?;
Sequence failure policies
When a sequenced message fails permanently (exceeds max retries or returns Outcome::Reject), the failure policy controls what happens to the rest of the sequence:
SequenceFailure::Skip — Dead-letter the failed message and continue processing subsequent messages for the same key. Use this when messages are independently valid but need ordered delivery (e.g. audit log entries, analytics events).
SequenceFailure::FailAll — Dead-letter the failed message and automatically dead-letter all remaining messages for the same key. The key is "poisoned" for the lifetime of the consumer process. Use this when messages are causally dependent — processing later messages after an earlier failure would produce an inconsistent state (e.g. financial ledger entries, state-machine transitions).
Messages for other sequence keys are unaffected by either policy.
Example: given messages [1, 2, 3, 4, 5] for key ACC-A where message 3 is rejected:
| Policy | Ack'd | DLQ'd |
|---|---|---|
Skip |
1,2,4,5 | 3 |
FailAll |
1,2 | 3,4,5 |
Topology configurations
| Configuration | What it does |
|---|---|
.dlq() |
Adds a dead-letter queue ({name}-dlq) |
.hold_queue(duration) |
Adds a hold queue for delayed retry ({name}-hold-{secs}s) |
.sequenced(policy) |
Enables strict per-key ordering via consistent-hash exchange |
.routing_shards(n) |
Sets the number of sub-queues for sequenced delivery (default: 8) |
Outcome variants
| Variant | Behavior |
|---|---|
Ack |
Message processed successfully, remove from queue |
Retry |
Transient failure — route to hold queue with escalating backoff, increment retry counter. Also triggered automatically when handler_timeout is exceeded |
Reject |
Permanent failure — route to DLQ (or discard if no DLQ) |
Defer |
Re-deliver via hold queue without incrementing retry counter (e.g. scheduled messages) |
Consumer groups & autoscaling
use *;
let mut registry = new;
registry.;
registry.start_all;
// Autoscaler adjusts consumer count based on queue depth
let mgmt = new;
let mut autoscaler = new;
autoscaler.run.await;
Audit logging
Wrap any handler with Audited to record every delivery attempt as a structured AuditRecord:
use *;
;
let handler = new;
// Use it anywhere a normal handler is accepted — consumers, consumer groups, etc.
consumer
.
.await?;
Each AuditRecord contains: trace_id, topic, payload, metadata, outcome, duration_ms, and timestamp.
If the audit handler returns Err, the message is retried — audit failure is never silently dropped.
Built-in ShoveAuditHandler
Enable the audit feature (on by default) for a handler that publishes records back to a dedicated shove-audit-log topic:
use *;
use *;
let audit = new;
let handler = new;
This creates a self-contained audit trail inside your broker. The audit topic itself is not audited.
Examples
See the examples/ directory:
basic_pubsub— all non-sequenced configurations, publish/consume lifecycle, DLQ handling, all outcome variantssequenced_pubsub— ordered delivery withSkipandFailAllpoliciesconsumer_groups— dynamic scaling with the autoscaleraudited_consumer— customAuditHandlerthat logs every delivery attempt to stdout
Start RabbitMQ with the included docker-compose (enables the management and consistent-hash exchange plugins):
Then run any example:
Roadmap
- SNS/SQS backend — publish via SNS, consume via SQS FIFO queues with message group ID for sequenced delivery
- Batch consume — process messages in batches for throughput-sensitive workloads
- Observability — built-in metrics (publish latency, consume rate, retry/DLQ counts)
- Schema evolution — pluggable serialization with versioned message envelopes
- Multi-backend topology declaration — declare topics across backends in a single call
Disclaimer
The architecture and design of this crate are human-made. The implementation is mostly written by Claude.