shove
Topic-typed pub/sub for Rust.
shove is an async message publishing and consuming library that binds message types to their broker destinations at compile time. Define a topic once — the type system ensures publishers and consumers agree on message shape, queue names, retry policy, and ordering guarantees.
Features
- Compile-time topic binding — each topic is a unit struct that associates a message type (
Serialize + DeserializeOwned) with a queue topology. No stringly-typed queue names at call sites. - Escalating retry backoff — configure multiple hold queues with increasing delays. The consumer picks the right one automatically based on retry count.
- Dead-letter queues — opt-in per topic. Messages that exceed max retries or are explicitly rejected route to DLQ with full death metadata.
- Sequenced delivery — strict per-key ordering via
SequencedTopic. Messages sharing a sequence key are delivered in publish order. Two failure policies:Skip(continue the sequence) orFailAll(terminate it). - Consumer groups & autoscaling — dynamically scale consumers up and down based on queue depth, with hysteresis and cooldown to prevent flapping.
- Backend-agnostic core — traits for
Publisher,Consumer,TopologyDeclarer, andMessageHandlerlive in the core crate. Backends are feature-gated.
Backends
| Backend | Feature flag | Status |
|---|---|---|
| RabbitMQ | rabbitmq |
Stable |
| AWS SNS/SQS | sns |
Planned |
Quick start
Add shove to your Cargo.toml:
[]
= { = "0.1", = ["rabbitmq"] }
Define a topic
use *;
use ;
use Duration;
define_topic!;
Publish
use *;
let client = connect.await?;
let publisher = new.await?;
let event = SettlementEvent ;
// Type-safe: compiler ensures SettlementEvent matches OrderSettlement::Message
publisher..await?;
Consume
use *;
;
let consumer = new;
consumer
.
.await?;
Sequenced topics
For messages that must be processed in strict order per key:
define_sequenced_topic!;
// Compiler enforces AccountLedger: SequencedTopic
consumer
.
.await?;
Topology configurations
| Configuration | What it does |
|---|---|
.dlq() |
Adds a dead-letter queue ({name}-dlq) |
.hold_queue(duration) |
Adds a hold queue for delayed retry ({name}-hold-{secs}s) |
.sequenced(policy) |
Enables strict per-key ordering via consistent-hash exchange |
.routing_shards(n) |
Sets the number of sub-queues for sequenced delivery (default: 8) |
Outcome variants
| Variant | Behavior |
|---|---|
Ack |
Message processed successfully, remove from queue |
Retry |
Transient failure — route to hold queue with escalating backoff, increment retry counter |
Reject |
Permanent failure — route to DLQ (or discard if no DLQ) |
Defer |
Re-deliver via hold queue without incrementing retry counter (e.g. scheduled messages) |
Consumer groups & autoscaling
use *;
let mut registry = new;
registry.;
registry.start_all;
// Autoscaler adjusts consumer count based on queue depth
let mgmt = new;
let mut autoscaler = new;
autoscaler.run.await;
Examples
See the examples/ directory:
basic_pubsub— all non-sequenced configurations, publish/consume lifecycle, DLQ handling, all outcome variantssequenced_pubsub— ordered delivery withSkipandFailAllpoliciesconsumer_groups— dynamic scaling with the autoscaler
Roadmap
- SNS/SQS backend — publish via SNS, consume via SQS FIFO queues with message group ID for sequenced delivery
- Batch consume — process messages in batches for throughput-sensitive workloads
- Observability — built-in metrics (publish latency, consume rate, retry/DLQ counts)
- Schema evolution — pluggable serialization with versioned message envelopes
- Multi-backend topology declaration — declare topics across backends in a single call