Skip to main content

Crate shove

Crate shove 

Source
Expand description

Type-safe async pub/sub for Rust on top of RabbitMQ, AWS SNS/SQS, NATS JetStream, Apache Kafka, or an in-process broker.

§The Broker<B> pattern

Everything hangs off a single generic hub Broker<B>, parameterised by a backend marker B (one of [RabbitMq], [Sqs], [Nats], [Kafka], [InMemory], each gated on its Cargo feature). The marker binds that backend’s client / publisher / consumer / topology / registry types together; the generic wrappers below delegate through the sealed Backend trait.

Broker<B>
   ├─ .topology()             → TopologyDeclarer<B>
   ├─ .publisher().await      → Publisher<B>
   ├─ .consumer_supervisor()  → ConsumerSupervisor<B>   (all backends)
   └─ .consumer_group()       → ConsumerGroup<B>        (B: HasCoordinatedGroups)

§Capability gating

  • Kafka, RabbitMQ, NATS, InMemory implement the HasCoordinatedGroups capability trait — they expose Broker::consumer_group for min/max-bounded coordinated groups with autoscaling.
  • SQS does not. A “group” on SQS is N parallel independent pollers on one queue, which maps to ConsumerSupervisor (the backend-agnostic path available on every Broker<B>). Calling consumer_group() on Broker<Sqs> is a compile error.

§Feature flags

No features are enabled by default. Enable only what you need.

FeatureWhat it enables
inmemoryIn-process broker, publisher, consumer, topology, groups, autoscaler (no external broker)
kafkaApache Kafka publisher, consumer, topology, consumer groups, autoscaling
natsNATS JetStream publisher, consumer, topology, consumer groups, autoscaling
rabbitmqRabbitMQ publisher, consumer, topology, consumer groups, autoscaling
rabbitmq-transactionalRabbitMQ exactly-once routing via AMQP transactions (implies rabbitmq)
pub-aws-snsSNS publisher and topology declaration only
aws-sns-sqsFull SNS + SQS stack — publisher, SQS consumer, supervisor, autoscaling (implies pub-aws-sns)
redis-streamsRedis/Valkey Streams publisher, consumer, topology, consumer groups, FIFO sharding
audit[ShoveAuditHandler] + [AuditLog] topic for persisting audit records through any backend

§Quickstart

The example below uses the in-process backend so it needs no external services. Swap InMemory for [RabbitMq], [Sqs], [Nats], or [Kafka] — the topic definition, handler, and every call site stay identical.

use serde::{Deserialize, Serialize};
use shove::inmemory::{InMemoryConfig, InMemoryConsumerGroupConfig};
use shove::{
    Broker, ConsumerGroupConfig, InMemory, MessageHandler, MessageMetadata, Outcome,
    TopologyBuilder, define_topic,
};
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct OrderPaid { order_id: String }

define_topic!(Orders, OrderPaid,
    TopologyBuilder::new("orders").dlq().build());

struct Handler;
impl MessageHandler<Orders> for Handler {
    type Context = ();
    async fn handle(&self, msg: OrderPaid, _: MessageMetadata, _: &()) -> Outcome {
        println!("paid: {}", msg.order_id);
        Outcome::Ack
    }
}

let broker = Broker::<InMemory>::new(InMemoryConfig::default()).await?;
broker.topology().declare::<Orders>().await?;

let publisher = broker.publisher().await?;
publisher.publish::<Orders>(&OrderPaid { order_id: "ORD-1".into() }).await?;

let mut group = broker.consumer_group();
group
    .register::<Orders, _>(
        ConsumerGroupConfig::new(InMemoryConsumerGroupConfig::new(1..=1)),
        || Handler,
    )
    .await?;

let outcome = group
    .run_until_timeout(std::future::ready(()), Duration::from_secs(1))
    .await;
std::process::exit(outcome.exit_code());

§Ergonomics

§Observability

Every interesting state change is emitted as a structured tracing event, so wiring any tracing-subscriber gives a full operational trail without handler-side instrumentation.

Enable the metrics cargo feature to also emit operational counters, histograms, and gauges through the metrics facade — messages_consumed_total, message_processing_duration_seconds, messages_inflight, backend_errors_total, and friends. shove is a library, so it does not open a port: install your own recorder (metrics-exporter-prometheus, metrics-exporter-statsd, OpenTelemetry, etc.) and expose the endpoint from your service. Override the shove_ metric prefix with metrics::set_prefix once at startup, before any broker activity. See the module docs and the Observability guide for the full schema.

§See also

  • TopologyBuilder for hold queues, DLQs, and sequenced routing.
  • define_topic! and define_sequenced_topic! for the typed-topic macros.
  • Per-backend modules: [rabbitmq], [sns], [nats], [kafka], [inmemory] — expose the config and client types bound to each marker.

Re-exports§

pub use audit::AuditHandler;
pub use audit::AuditRecord;
pub use audit::Audited;
pub use autoscale_metrics::AutoscaleMetrics;
pub use backend::Backend;
pub use backend::capability::HasCoordinatedGroups;
pub use consumer::ConsumerOptions;
pub use consumer::DEFAULT_HANDLER_TIMEOUT;
pub use consumer::DEFAULT_MAX_MESSAGE_SIZE;
pub use consumer::DEFAULT_MAX_PENDING_PER_KEY;
pub use consumer_supervisor::ConsumerSupervisor;
pub use consumer_supervisor::SupervisorOutcome;
pub use error::ShoveError;
pub use handler::MessageHandler;
pub use handler::MessageHandlerExt;
pub use metadata::DeadMessageMetadata;
pub use metadata::MessageMetadata;
pub use outcome::Outcome;
pub use topic::SequencedTopic;
pub use topic::Topic;
pub use topology::HoldQueue;
pub use topology::QueueTopology;
pub use topology::SequenceConfig;
pub use topology::SequenceFailure;
pub use topology::TopologyBuilder;
pub use autoscaler::Autoscaler;
pub use autoscaler::AutoscalerBackend;
pub use autoscaler::AutoscalerConfig;
pub use autoscaler::ScalingDecision;
pub use autoscaler::ScalingMetrics;
pub use autoscaler::ScalingStrategy;
pub use autoscaler::Stabilized;
pub use autoscaler::ThresholdStrategy;
pub use broker::Broker;
pub use consumer_group::ConsumerGroup;
pub use consumer_group::ConsumerGroupConfig;
pub use publisher::Publisher;
pub use topology_declarer::Topics;
pub use topology_declarer::TopologyDeclarer;

Modules§

audit
autoscale_metrics
Honest, partial-data metrics struct for autoscaler inputs. See DESIGN_V2.md §9.1.
autoscaler
backend
Sealed Backend trait layer. Binds a backend marker type to its concrete internal implementation types; the public generic wrappers (Broker<B>, Publisher<B>, etc.) delegate through this.
broker
Public Broker<B> hub. See DESIGN_V2.md §6.1.
consumer
consumer_group
Public ConsumerGroup<B, Ctx> – specialist harness for coordinated consumer groups. Gated on B: HasCoordinatedGroups. See DESIGN_V2.md §6.3.
consumer_supervisor
Consumer supervisor harness and its outcome type. See DESIGN_V2.md §6.5.
error
handler
markers
Backend marker types. One zero-sized struct per backend, each under the existing Cargo feature. See DESIGN_V2.md §7.
metadata
metrics
Operational metrics emitted via the metrics facade.
outcome
publisher
Public Publisher<B> wrapper. See DESIGN_V2.md §6.2.
topic
topology
topology_declarer
Public TopologyDeclarer<B> + Topics tuple trait. See DESIGN_V2.md §6.4, §11.4.

Macros§

define_sequenced_topic
Define a sequenced topic with static topology.
define_topic
Define a topic with static topology.