# shove
[](https://github.com/zannis/shove/actions/workflows/ci.yml)
[](https://crates.io/crates/shove)
[](https://docs.rs/shove)
[](https://opensource.org/licenses/MIT)
[](https://codecov.io/gh/zannis/shove)
Type-safe async pub/sub for Rust on top of RabbitMQ or AWS SNS/SQS.
`shove` is for workloads where "just use the broker client" stops being enough: retries with backoff, DLQs, ordered delivery, consumer groups, autoscaling, and auditability. You define a topic once as a Rust type and the crate derives the messaging topology and runtime behavior from it.
## Why shove
- Typed topics instead of stringly-typed queue names
- Built-in retry topologies with delayed hold queues and DLQs
- Concurrent consumers with in-order acknowledgements
- Strict per-key ordering for sequenced topics
- Consumer groups with queue-depth autoscaling
- Structured audit trails for every delivery attempt
- RabbitMQ at-least-once by default, with opt-in transactional exactly-once routing
- SNS publisher-only mode or full SNS + SQS consumption stack
If you have one queue, one consumer, and little retry logic, use `lapin` or the AWS SDK directly. `shove` is the layer for multi-service event flows that need operational discipline.
## Features
No features are enabled by default.
```sh
# RabbitMQ publisher + consumer + consumer groups + autoscaling
cargo add shove --features rabbitmq
# Transactional RabbitMQ subscribers with exactly-once delivery guarantees (use only when absolutely necessary)
cargo add shove --features rabbitmq-transactional
# SNS publisher only
cargo add shove --features pub-aws-sns
# Full AWS SNS + SQS backend
cargo add shove --features aws-sns-sqs
# Optional built-in audit publisher
cargo add shove --features rabbitmq,audit
```
| `rabbitmq` | RabbitMQ publisher, consumer, topology declaration, consumer groups, autoscaling |
| `rabbitmq-transactional` | RabbitMQ exactly-once routing via AMQP transactions |
| `pub-aws-sns` | SNS publisher and topology declaration |
| `aws-sns-sqs` | Full SNS + SQS publisher/consumer stack, consumer groups, autoscaling |
| `audit` | Built-in `ShoveAuditHandler` and `AuditLog` topic |
## Quick start
For RabbitMQ, the repo's `docker-compose.yml` starts a local broker with the consistent-hash exchange plugin enabled. For the AWS path, it starts LocalStack with SNS and SQS on `http://localhost:4566`.
```sh
docker compose up -d
```
Define a topic once:
```rust,no_run
use serde::{Deserialize, Serialize};
use shove::*;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SettlementEvent {
order_id: String,
amount_cents: u64,
}
define_topic!(
OrderSettlement,
SettlementEvent,
TopologyBuilder::new("order-settlement")
.hold_queue(Duration::from_secs(5))
.hold_queue(Duration::from_secs(30))
.hold_queue(Duration::from_secs(120))
.dlq()
.build()
);
```
Publish:
```rust,no_run
use shove::rabbitmq::*;
let client = RabbitMqClient::connect(&RabbitMqConfig::new("amqp://localhost")).await?;
let publisher = RabbitMqPublisher::new(client.clone()).await?;
publisher
.publish::<OrderSettlement>(&SettlementEvent {
order_id: "ORD-1".into(),
amount_cents: 5000,
})
.await?;
```
Consume:
```rust,no_run
use shove::rabbitmq::*;
use shove::*;
use tokio_util::sync::CancellationToken;
struct SettlementHandler;
impl MessageHandler<OrderSettlement> for SettlementHandler {
async fn handle(&self, msg: SettlementEvent, _metadata: MessageMetadata) -> Outcome {
match process(&msg).await {
Ok(()) => Outcome::Ack,
Err(e) if e.is_transient() => Outcome::Retry,
Err(_) => Outcome::Reject,
}
}
}
let shutdown = CancellationToken::new();
let consumer = RabbitMqConsumer::new(client);
let options = ConsumerOptions::new(shutdown).with_prefetch_count(20);
consumer.run::<OrderSettlement>(SettlementHandler, options).await?;
```
Publish (SNS):
```rust,no_run
use shove::sns::*;
use std::sync::Arc;
let config = SnsConfig {
region: Some("us-east-1".into()),
endpoint_url: Some("http://localhost:4566".into()), // omit for real AWS
};
let client = SnsClient::new(&config).await?;
let topic_registry = Arc::new(TopicRegistry::new());
let queue_registry = Arc::new(QueueRegistry::new());
let declarer = SnsTopologyDeclarer::new(client.clone(), topic_registry.clone())
.with_queue_registry(queue_registry.clone());
declare_topic::<OrderSettlement>(&declarer).await?;
let publisher = SnsPublisher::new(client.clone(), topic_registry.clone());
publisher
.publish::<OrderSettlement>(&SettlementEvent {
order_id: "ORD-1".into(),
amount_cents: 5000,
})
.await?;
```
Consume (SQS):
```rust,no_run
use shove::sns::*;
let consumer = SqsConsumer::new(client.clone(), queue_registry.clone());
let options = ConsumerOptions::new(shutdown).with_prefetch_count(20);
consumer.run::<OrderSettlement>(SettlementHandler, options).await?;
```
## Delivery model
`shove` is at-least-once by default. That means handlers should be idempotent.
- `Outcome::Ack`: success
- `Outcome::Retry`: delayed retry through hold queues, with escalating backoff
- `Outcome::Reject`: dead-letter immediately
- `Outcome::Defer`: delay without increasing retry count
Additional behavior:
- Handler timeouts automatically convert to retry
- DLQ consumers receive `DeadMessageMetadata`
- RabbitMQ publishes a stable `x-message-id` header for deduplication
- RabbitMQ can opt into transactional exactly-once routing with `rabbitmq-transactional`
Exactly-once mode removes the publish-then-ack race in RabbitMQ by wrapping routing decisions in AMQP transactions. It is materially slower and should be reserved for handlers with irreversible side effects.
## Sequenced topics
Use `define_sequenced_topic!` when messages for the same entity must be processed in order.
```rust,no_run
use shove::*;
use std::time::Duration;
define_sequenced_topic!(
AccountLedger,
LedgerEntry,
|msg| msg.account_id.clone(),
TopologyBuilder::new("account-ledger")
.sequenced(SequenceFailure::FailAll)
.routing_shards(16)
.hold_queue(Duration::from_secs(5))
.dlq()
.build()
);
consumer.run_fifo::<AccountLedger>(handler, options).await?;
```
Failure policies:
- `SequenceFailure::Skip`: dead-letter the failed message and continue processing the rest of the sequence
- `SequenceFailure::FailAll`: poison the key and dead-letter all remaining messages for that key
Given messages `[1, 2, 3, 4, 5]` for the same key where message 3 is permanently rejected:
| `Skip` | 1,2,4,5 | 3 |
| `FailAll` | 1,2 | 3,4,5 |
Use `Skip` when messages are independently valid (e.g. audit entries). Use `FailAll` when later messages are causally dependent on earlier ones (e.g. financial ledger entries, state-machine transitions).
Messages for other sequence keys are unaffected by either policy.
RabbitMQ uses consistent-hash routing for this. SNS/SQS uses FIFO topics and queues.
## Consumer groups and autoscaling
Both backends support named consumer groups with min/max bounds, per-consumer prefetch, retry budget, handler timeouts, and optional concurrent processing inside each consumer.
RabbitMQ:
```rust,no_run
use shove::rabbitmq::*;
use std::sync::Arc;
use tokio::sync::Mutex;
let mut registry = ConsumerGroupRegistry::new(client.clone());
registry
.register::<WorkQueue, TaskHandler>(
ConsumerGroupConfig::new(1..=8)
.with_prefetch_count(10)
.with_max_retries(5)
.with_handler_timeout(Duration::from_secs(30))
.with_concurrent_processing(true),
|| TaskHandler,
)
.await?;
registry.start_all();
let registry = Arc::new(Mutex::new(registry));
let mut autoscaler = Autoscaler::new(
&ManagementConfig::new("http://localhost:15672", "guest", "guest"),
AutoscalerConfig {
poll_interval: Duration::from_secs(2),
scale_up_multiplier: 1.5,
scale_down_multiplier: 0.3,
hysteresis_duration: Duration::from_secs(4),
cooldown_duration: Duration::from_secs(8),
},
);
autoscaler.run(registry, shutdown_token).await;
```
SNS/SQS uses `SqsConsumerGroupRegistry` and `SqsAutoscaler` with the same configuration shape.
## Audit logging
Wrap any handler with `Audited<H, A>` to persist a structured `AuditRecord` for each delivery attempt. Records include the trace id, topic, payload, message metadata, outcome, duration, and timestamp.
Implement `AuditHandler<T>` for your persistence backend:
```rust,no_run
use shove::*;
struct MyAuditSink;
impl AuditHandler<OrderSettlement> for MyAuditSink {
async fn audit(&self, record: &AuditRecord<SettlementEvent>) -> Result<(), ShoveError> {
println!("{}", serde_json::to_string(record).unwrap());
Ok(())
}
}
let handler = Audited::new(SettlementHandler, MyAuditSink);
// Drop-in replacement — consumers, consumer groups, and FIFO consumers all accept it.
consumer.run::<OrderSettlement>(handler, ConsumerOptions::new(shutdown)).await?;
```
If the audit handler returns `Err`, the message is retried — audit failure is never silently dropped.
If you enable `audit`, `ShoveAuditHandler` can publish those records back into a dedicated `shove-audit-log` topic using the same broker:
```rust,no_run
use shove::rabbitmq::*;
use shove::*;
let audit = ShoveAuditHandler::new(publisher.clone());
let handler = Audited::new(SettlementHandler, audit);
```
## Performance
Measured on a MacBook Pro M4 Max, single RabbitMQ node via Docker, Rust 1.91. Reproducible via `cargo run -q --example rabbitmq_stress --features rabbitmq`.
| Fast (1–5 ms) | 179 msg/s | 2,866 msg/s | 19,669 msg/s | 29,207 msg/s |
| Slow (50–300 ms)| 6 msg/s | 75 msg/s | 544 msg/s | 4,076 msg/s |
| Heavy (1–5 s) | 0.4 msg/s | 5 msg/s | 21 msg/s | 199 msg/s |
`prefetch_count` is the primary throughput lever for I/O-bound handlers. Adding workers scales linearly when the handler is the bottleneck. Results will vary by hardware and broker configuration.
## Examples
RabbitMQ examples:
- `rabbitmq_basic_pubsub`
- `rabbitmq_concurrent_pubsub`
- `rabbitmq_sequenced_pubsub`
- `rabbitmq_consumer_groups`
- `rabbitmq_audited_consumer`
- `rabbitmq_exactly_once`
- `rabbitmq_stress`
SNS/SQS examples:
- `sqs_basic_pubsub`
- `sqs_concurrent_pubsub`
- `sqs_sequenced_pubsub`
- `sqs_consumer_groups`
- `sqs_audited_consumer`
- `sqs_autoscaler`
Run them with:
```sh
cargo run --example rabbitmq_basic_pubsub --features rabbitmq
cargo run --example rabbitmq_exactly_once --features rabbitmq-transactional
cargo run --example sqs_basic_pubsub --features aws-sns-sqs
cargo run --example sqs_autoscaler --features aws-sns-sqs
```
## Reference
### Outcome variants
| `Outcome::Ack` | Success — remove from queue |
| `Outcome::Retry` | Transient failure — route to hold queue with escalating backoff, increment retry counter. Also triggered when `handler_timeout` is exceeded |
| `Outcome::Reject` | Permanent failure — route to DLQ (or discard if no DLQ configured) |
| `Outcome::Defer` | Re-deliver via hold queue without incrementing retry counter |
### TopologyBuilder
| `.hold_queue(duration)` | Adds a hold queue for delayed retry (`{name}-hold-{secs}s`) |
| `.sequenced(policy)` | Enables strict per-key ordering via consistent-hash exchange |
| `.routing_shards(n)` | Sets the number of sub-queues for sequenced delivery (default: 8) |
| `.dlq()` | Adds a dead-letter queue (`{name}-dlq`) |
### Backends
| RabbitMQ | `rabbitmq` | Stable |
| RabbitMQ (exactly-once routing) | `rabbitmq-transactional` | Stable |
| AWS SNS (publisher only) | `pub-aws-sns` | Stable |
| AWS SNS + SQS | `aws-sns-sqs` | Stable |
`aws-sns-sqs` implies `pub-aws-sns`. Sequenced topics use SNS FIFO topics with `MessageGroupId` and sharded FIFO SQS queues.
## Requirements
`shove` requires Rust 1.85 or newer (edition 2024).
## Background
`shove` came out of production event-processing systems that needed more than a broker client but less than a platform rewrite. The crate focuses on the hard parts around message handling correctness and operational behavior, while leaving transport and persistence to RabbitMQ or SNS/SQS.
The API is still evolving.
## License
[MIT](LICENSE)