Skip to main content

Crate shove

Crate shove 

Source
Expand description

Type-safe async pub/sub for Rust on top of RabbitMQ, AWS SNS/SQS, NATS JetStream, Apache Kafka, or an in-process broker.

§The Broker<B> pattern

Everything hangs off a single generic hub Broker<B>, parameterised by a backend marker B (one of RabbitMq, Sqs, Nats, Kafka, InMemory, each gated on its Cargo feature). The marker binds that backend’s client / publisher / consumer / topology / registry types together; the generic wrappers below delegate through the sealed Backend trait.

Broker<B>
   ├─ .topology()             → TopologyDeclarer<B>
   ├─ .publisher().await      → Publisher<B>
   ├─ .consumer_supervisor()  → ConsumerSupervisor<B>   (all backends)
   ├─ .autoscaler()           → B::AutoscalerImpl       (all backends)
   └─ .consumer_group()       → ConsumerGroup<B>        (B: HasCoordinatedGroups)

§Capability gating

  • Kafka, RabbitMQ, NATS, InMemory, Redis (redis-streams) implement the HasCoordinatedGroups capability trait — they expose Broker::consumer_group for min/max-bounded coordinated groups with autoscaling.
  • SQS does not. A “group” on SQS is N parallel independent pollers on one queue, which maps to ConsumerSupervisor (the backend-agnostic path available on every Broker<B>). Calling consumer_group() on Broker<Sqs> is a compile error.

§Feature flags

No features are enabled by default. Enable only what you need.

FeatureWhat it enables
inmemoryIn-process broker, publisher, consumer, topology, groups, autoscaler (no external broker)
kafkaApache Kafka publisher, consumer, topology, consumer groups, autoscaling (plaintext only)
kafka-sslTLS + SASL mechanisms for Kafka — required for any authenticated cluster (implies kafka)
kafka-msk-iamAWS MSK IAM OAUTHBEARER auth (implies kafka-ssl)
natsNATS JetStream publisher, consumer, topology, consumer groups, autoscaling
rabbitmqRabbitMQ publisher, consumer, topology, consumer groups, autoscaling
rabbitmq-transactionalRabbitMQ exactly-once routing via AMQP transactions (implies rabbitmq)
pub-aws-snsSNS publisher and topology declaration only
aws-sns-sqsFull SNS + SQS stack — publisher, SQS consumer, supervisor, autoscaling (implies pub-aws-sns)
redis-streamsRedis/Valkey Streams publisher, consumer, topology, consumer groups, FIFO sharding
auditShoveAuditHandler + AuditLog topic for persisting audit records through any backend

§Quickstart

The example below uses the in-process backend so it needs no external services. Swap InMemory for RabbitMq, Sqs, Nats, or Kafka — the topic definition, handler, and every call site stay identical.

use serde::{Deserialize, Serialize};
use shove::inmemory::{InMemoryConfig, InMemoryConsumerGroupConfig};
use shove::{
    Broker, ConsumerGroupConfig, InMemory, MessageHandler, MessageMetadata, Outcome,
    TopologyBuilder, define_topic,
};
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct OrderPaid { order_id: String }

define_topic!(Orders, OrderPaid,
    TopologyBuilder::new("orders").dlq().build());

struct Handler;
impl MessageHandler<Orders> for Handler {
    type Context = ();
    async fn handle(&self, msg: OrderPaid, _: MessageMetadata, _: &()) -> Outcome {
        println!("paid: {}", msg.order_id);
        Outcome::Ack
    }
}

let broker = Broker::<InMemory>::new(InMemoryConfig::default()).await?;
broker.topology().declare::<Orders>().await?;

let publisher = broker.publisher().await?;
publisher.publish::<Orders>(&OrderPaid { order_id: "ORD-1".into() }).await?;

let mut group = broker.consumer_group();
group
    .register::<Orders, _>(
        ConsumerGroupConfig::new(InMemoryConsumerGroupConfig::new(1..=1)),
        || Handler,
    )
    .await?;

let outcome = group
    .run_until_timeout(std::future::ready(()), Duration::from_secs(1))
    .await;
std::process::exit(outcome.exit_code());

§Ergonomics

§Observability

Every interesting state change is emitted as a structured tracing event, so wiring any tracing-subscriber gives a full operational trail without handler-side instrumentation.

Enable the metrics cargo feature to also emit operational counters, histograms, and gauges through the metrics facade — messages_consumed_total, message_processing_duration_seconds, messages_inflight, backend_errors_total, and friends. shove is a library, so it does not open a port: install your own recorder (metrics-exporter-prometheus, metrics-exporter-statsd, OpenTelemetry, etc.) and expose the endpoint from your service. Override the shove_ metric prefix with metrics::set_prefix once at startup, before any broker activity. See the module docs and the Observability guide for the full schema.

§See also

Re-exports§

pub use audit::AuditHandler;
pub use audit::AuditRecord;
pub use audit::Audited;
pub use autoscale_metrics::AutoscaleMetrics;
pub use backend::Backend;
pub use backend::capability::HasCoordinatedGroups;
pub use codec::Codec;
pub use codec::JsonCodec;
pub use codec::RawBytesCodec;
pub use codecs::protobuf::ProtobufCodec;protobuf
pub use consumer::ConsumerOptions;
pub use consumer::DEFAULT_HANDLER_TIMEOUT;
pub use consumer::DEFAULT_MAX_MESSAGE_SIZE;
pub use consumer::DEFAULT_MAX_PENDING_PER_KEY;
pub use consumer_supervisor::ConsumerSupervisor;
pub use consumer_supervisor::SupervisorOutcome;
pub use error::ShoveError;
pub use handler::MessageHandler;
pub use handler::MessageHandlerExt;
pub use metadata::DeadMessageMetadata;
pub use metadata::MessageMetadata;
pub use outcome::Outcome;
pub use topic::SequencedTopic;
pub use topic::Topic;
pub use topology::HoldQueue;
pub use topology::QueueTopology;
pub use topology::SequenceConfig;
pub use topology::SequenceFailure;
pub use topology::TopologyBuilder;
pub use autoscaler::Autoscaler;
pub use autoscaler::AutoscalerBackend;
pub use autoscaler::AutoscalerConfig;
pub use autoscaler::ScalingDecision;
pub use autoscaler::ScalingMetrics;
pub use autoscaler::ScalingStrategy;
pub use autoscaler::Stabilized;
pub use autoscaler::ThresholdStrategy;
pub use broker::Broker;
pub use consumer_group::ConsumerGroup;
pub use consumer_group::ConsumerGroupConfig;
pub use publisher::Publisher;
pub use topology_declarer::Topics;
pub use topology_declarer::TopologyDeclarer;
pub use markers::InMemory;inmemory
pub use markers::Kafka;kafka
pub use markers::Nats;nats
pub use markers::RabbitMq;rabbitmq
pub use markers::Redis;redis-streams
pub use markers::Sqs;aws-sns-sqs
pub use audit::AuditLog;audit
pub use audit::ShoveAuditHandler;audit

Modules§

audit
autoscale_metrics
Honest, partial-data metrics struct for autoscaler inputs.
autoscaler
backend
Sealed Backend trait layer. Binds a backend marker type to its concrete internal implementation types; the public generic wrappers (Broker<B>, Publisher<B>, etc.) delegate through this.
broker
Public Broker<B> hub.
codec
Pluggable encoding of Topic::Message payloads.
codecs
Optional codec implementations gated behind cargo features.
consumer
consumer_group
Public ConsumerGroup<B, Ctx> – specialist harness for coordinated consumer groups. Gated on B: HasCoordinatedGroups.
consumer_supervisor
Consumer supervisor harness and its outcome type.
error
handler
inmemoryinmemory
In-process, non-durable broker backend.
kafkakafka
markers
Backend marker types. One zero-sized struct per backend, each under the existing Cargo feature.
metadata
metrics
Operational metrics emitted via the metrics facade.
natsnats
outcome
publisher
Public Publisher<B> wrapper.
rabbitmqrabbitmq
redisredis-streams
Redis Streams backend.
schema_registrykafka-schema-registry
In-house Confluent Schema Registry decode for Kafka consumers.
snspub-aws-sns
topic
topology
topology_declarer
Public TopologyDeclarer<B> + Topics tuple trait.

Macros§

define_sequenced_topic
Define a sequenced topic with static topology.
define_topic
Define a topic with static topology.