shove 0.6.3

Async tasks via pubsub on steroids. Comes with built-in support for complex queue configurations, audit logs, autoscaling consumer groups and more.
Documentation

shove

ci Latest Version Docs License:MIT Coverage

Type-safe async pub/sub for Rust on top of RabbitMQ or AWS SNS/SQS.

shove is for workloads where "just use the broker client" stops being enough: retries with backoff, DLQs, ordered delivery, consumer groups, autoscaling, and auditability. You define a topic once as a Rust type and the crate derives the messaging topology and runtime behavior from it.

Why shove

  • Typed topics instead of stringly-typed queue names
  • Built-in retry topologies with delayed hold queues and DLQs
  • Concurrent consumers with in-order acknowledgements
  • Strict per-key ordering for sequenced topics
  • Consumer groups with queue-depth autoscaling
  • Structured audit trails for every delivery attempt
  • RabbitMQ at-least-once by default, with opt-in transactional exactly-once routing
  • SNS publisher-only mode or full SNS + SQS consumption stack

If you have one queue, one consumer, and little retry logic, use lapin or the AWS SDK directly. shove is the layer for multi-service event flows that need operational discipline.

Features

No features are enabled by default.

# RabbitMQ publisher + consumer + consumer groups + autoscaling
cargo add shove --features rabbitmq

# Transactional RabbitMQ subscribers with exactly-once delivery guarantees (use only when absolutely necessary)
cargo add shove --features rabbitmq-transactional

# SNS publisher only
cargo add shove --features pub-aws-sns

# Full AWS SNS + SQS backend
cargo add shove --features aws-sns-sqs

# Optional built-in audit publisher
cargo add shove --features rabbitmq,audit
Feature What it enables
rabbitmq RabbitMQ publisher, consumer, topology declaration, consumer groups, autoscaling
rabbitmq-transactional RabbitMQ exactly-once routing via AMQP transactions
pub-aws-sns SNS publisher and topology declaration
aws-sns-sqs Full SNS + SQS publisher/consumer stack, consumer groups, autoscaling
audit Built-in ShoveAuditHandler and AuditLog topic

Quick start

For RabbitMQ, the repo's docker-compose.yml starts a local broker with the consistent-hash exchange plugin enabled. For the AWS path, it starts LocalStack with SNS and SQS on http://localhost:4566.

docker compose up -d

Define a topic once:

use serde::{Deserialize, Serialize};
use shove::*;
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SettlementEvent {
    order_id: String,
    amount_cents: u64,
}

define_topic!(
    OrderSettlement,
    SettlementEvent,
    TopologyBuilder::new("order-settlement")
        .hold_queue(Duration::from_secs(5))
        .hold_queue(Duration::from_secs(30))
        .hold_queue(Duration::from_secs(120))
        .dlq()
        .build()
);

Publish:

use shove::rabbitmq::*;

let client = RabbitMqClient::connect(&RabbitMqConfig::new("amqp://localhost")).await?;
let publisher = RabbitMqPublisher::new(client.clone()).await?;

publisher
    .publish::<OrderSettlement>(&SettlementEvent {
        order_id: "ORD-1".into(),
        amount_cents: 5000,
    })
    .await?;

Consume:

use shove::rabbitmq::*;
use shove::*;
use tokio_util::sync::CancellationToken;

struct SettlementHandler;

impl MessageHandler<OrderSettlement> for SettlementHandler {
    async fn handle(&self, msg: SettlementEvent, _metadata: MessageMetadata) -> Outcome {
        match process(&msg).await {
            Ok(()) => Outcome::Ack,
            Err(e) if e.is_transient() => Outcome::Retry,
            Err(_) => Outcome::Reject,
        }
    }
}

let shutdown = CancellationToken::new();
let consumer = RabbitMqConsumer::new(client);
let options = ConsumerOptions::new(shutdown).with_prefetch_count(20);

consumer.run::<OrderSettlement>(SettlementHandler, options).await?;

Publish (SNS):

use shove::sns::*;
use std::sync::Arc;

let config = SnsConfig {
    region: Some("us-east-1".into()),
    endpoint_url: Some("http://localhost:4566".into()), // omit for real AWS
};
let client = SnsClient::new(&config).await?;

let topic_registry = Arc::new(TopicRegistry::new());
let queue_registry = Arc::new(QueueRegistry::new());

let declarer = SnsTopologyDeclarer::new(client.clone(), topic_registry.clone())
    .with_queue_registry(queue_registry.clone());
declare_topic::<OrderSettlement>(&declarer).await?;

let publisher = SnsPublisher::new(client.clone(), topic_registry.clone());
publisher
    .publish::<OrderSettlement>(&SettlementEvent {
        order_id: "ORD-1".into(),
        amount_cents: 5000,
    })
    .await?;

Consume (SQS):

use shove::sns::*;

let consumer = SqsConsumer::new(client.clone(), queue_registry.clone());
let options = ConsumerOptions::new(shutdown).with_prefetch_count(20);
consumer.run::<OrderSettlement>(SettlementHandler, options).await?;

Delivery model

shove is at-least-once by default. That means handlers should be idempotent.

  • Outcome::Ack: success
  • Outcome::Retry: delayed retry through hold queues, with escalating backoff
  • Outcome::Reject: dead-letter immediately
  • Outcome::Defer: delay without increasing retry count

Additional behavior:

  • Handler timeouts automatically convert to retry
  • DLQ consumers receive DeadMessageMetadata
  • RabbitMQ publishes a stable x-message-id header for deduplication
  • RabbitMQ can opt into transactional exactly-once routing with rabbitmq-transactional

Exactly-once mode removes the publish-then-ack race in RabbitMQ by wrapping routing decisions in AMQP transactions. It is materially slower and should be reserved for handlers with irreversible side effects.

Sequenced topics

Use define_sequenced_topic! when messages for the same entity must be processed in order.

use shove::*;
use std::time::Duration;

define_sequenced_topic!(
    AccountLedger,
    LedgerEntry,
    |msg| msg.account_id.clone(),
    TopologyBuilder::new("account-ledger")
        .sequenced(SequenceFailure::FailAll)
        .routing_shards(16)
        .hold_queue(Duration::from_secs(5))
        .dlq()
        .build()
);

consumer.run_fifo::<AccountLedger>(handler, options).await?;

Failure policies:

  • SequenceFailure::Skip: dead-letter the failed message and continue processing the rest of the sequence
  • SequenceFailure::FailAll: poison the key and dead-letter all remaining messages for that key

Given messages [1, 2, 3, 4, 5] for the same key where message 3 is permanently rejected:

Policy Acked DLQed
Skip 1,2,4,5 3
FailAll 1,2 3,4,5

Use Skip when messages are independently valid (e.g. audit entries). Use FailAll when later messages are causally dependent on earlier ones (e.g. financial ledger entries, state-machine transitions).

Messages for other sequence keys are unaffected by either policy.

RabbitMQ uses consistent-hash routing for this. SNS/SQS uses FIFO topics and queues.

Consumer groups and autoscaling

Both backends support named consumer groups with min/max bounds, per-consumer prefetch, retry budget, handler timeouts, and optional concurrent processing inside each consumer.

RabbitMQ:

use shove::rabbitmq::*;
use std::sync::Arc;
use tokio::sync::Mutex;

let mut registry = ConsumerGroupRegistry::new(client.clone());

registry
    .register::<WorkQueue, TaskHandler>(
        ConsumerGroupConfig::new(1..=8)
            .with_prefetch_count(10)
            .with_max_retries(5)
            .with_handler_timeout(Duration::from_secs(30))
            .with_concurrent_processing(true),
        || TaskHandler,
    )
    .await?;

registry.start_all();

let registry = Arc::new(Mutex::new(registry));

let mut autoscaler = Autoscaler::new(
    &ManagementConfig::new("http://localhost:15672", "guest", "guest"),
    AutoscalerConfig {
        poll_interval: Duration::from_secs(2),
        scale_up_multiplier: 1.5,
        scale_down_multiplier: 0.3,
        hysteresis_duration: Duration::from_secs(4),
        cooldown_duration: Duration::from_secs(8),
    },
);
autoscaler.run(registry, shutdown_token).await;

SNS/SQS uses SqsConsumerGroupRegistry and SqsAutoscaler with the same configuration shape.

Audit logging

Wrap any handler with Audited<H, A> to persist a structured AuditRecord for each delivery attempt. Records include the trace id, topic, payload, message metadata, outcome, duration, and timestamp.

Implement AuditHandler<T> for your persistence backend:

use shove::*;

struct MyAuditSink;

impl AuditHandler<OrderSettlement> for MyAuditSink {
    async fn audit(&self, record: &AuditRecord<SettlementEvent>) -> Result<(), ShoveError> {
        println!("{}", serde_json::to_string(record).unwrap());
        Ok(())
    }
}

let handler = Audited::new(SettlementHandler, MyAuditSink);

// Drop-in replacement — consumers, consumer groups, and FIFO consumers all accept it.
consumer.run::<OrderSettlement>(handler, ConsumerOptions::new(shutdown)).await?;

If the audit handler returns Err, the message is retried — audit failure is never silently dropped.

If you enable audit, ShoveAuditHandler can publish those records back into a dedicated shove-audit-log topic using the same broker:

use shove::rabbitmq::*;
use shove::*;

let audit = ShoveAuditHandler::new(publisher.clone());
let handler = Audited::new(SettlementHandler, audit);

Performance

Measured on a MacBook Pro M4 Max, single RabbitMQ node via Docker, Rust 1.91. Reproducible via cargo run -q --example rabbitmq_stress --features rabbitmq.

Handler 1 worker, prefetch=1 1 worker, prefetch=20 8 workers, prefetch=20 32 workers, prefetch=40
Fast (1–5 ms) 179 msg/s 2,866 msg/s 19,669 msg/s 29,207 msg/s
Slow (50–300 ms) 6 msg/s 75 msg/s 544 msg/s 4,076 msg/s
Heavy (1–5 s) 0.4 msg/s 5 msg/s 21 msg/s 199 msg/s

prefetch_count is the primary throughput lever for I/O-bound handlers. Adding workers scales linearly when the handler is the bottleneck. Results will vary by hardware and broker configuration.

Examples

RabbitMQ examples:

  • rabbitmq_basic_pubsub
  • rabbitmq_concurrent_pubsub
  • rabbitmq_sequenced_pubsub
  • rabbitmq_consumer_groups
  • rabbitmq_audited_consumer
  • rabbitmq_exactly_once
  • rabbitmq_stress

SNS/SQS examples:

  • sqs_basic_pubsub
  • sqs_concurrent_pubsub
  • sqs_sequenced_pubsub
  • sqs_consumer_groups
  • sqs_audited_consumer
  • sqs_autoscaler

Run them with:

cargo run --example rabbitmq_basic_pubsub --features rabbitmq
cargo run --example rabbitmq_exactly_once --features rabbitmq-transactional
cargo run --example sqs_basic_pubsub --features aws-sns-sqs
cargo run --example sqs_autoscaler --features aws-sns-sqs

Reference

Outcome variants

Variant Behavior
Outcome::Ack Success — remove from queue
Outcome::Retry Transient failure — route to hold queue with escalating backoff, increment retry counter. Also triggered when handler_timeout is exceeded
Outcome::Reject Permanent failure — route to DLQ (or discard if no DLQ configured)
Outcome::Defer Re-deliver via hold queue without incrementing retry counter

TopologyBuilder

Method Effect
.hold_queue(duration) Adds a hold queue for delayed retry ({name}-hold-{secs}s)
.sequenced(policy) Enables strict per-key ordering via consistent-hash exchange
.routing_shards(n) Sets the number of sub-queues for sequenced delivery (default: 8)
.dlq() Adds a dead-letter queue ({name}-dlq)

Backends

Backend Feature flag Status
RabbitMQ rabbitmq Stable
RabbitMQ (exactly-once routing) rabbitmq-transactional Stable
AWS SNS (publisher only) pub-aws-sns Stable
AWS SNS + SQS aws-sns-sqs Stable

aws-sns-sqs implies pub-aws-sns. Sequenced topics use SNS FIFO topics with MessageGroupId and sharded FIFO SQS queues.

Requirements

shove requires Rust 1.85 or newer (edition 2024).

Background

shove came out of production event-processing systems that needed more than a broker client but less than a platform rewrite. The crate focuses on the hard parts around message handling correctness and operational behavior, while leaving transport and persistence to RabbitMQ or SNS/SQS.

The API is still evolving.

License

MIT