crabbyq 1.0.0

A declarative async Rust framework for message-driven microservices.
Documentation

🦀 CrabbyQ

CrabbyQ is a declarative, asynchronous Rust framework for building message-driven microservices.

Inspired by the ergonomics of Axum and the messaging mindset of FastStream, CrabbyQ aims to make broker-driven services feel modular, typed, and easy to compose.

Overview

CrabbyQ abstracts away broker subscription loops and route dispatching so you can focus on handlers and domain logic. Internally it is built around Tower, so routes are modeled as tower::Services and the framework can grow naturally toward layers and middleware.

By default, CrabbyQ keeps the core lightweight and enables only the json format feature. Broker backends such as NATS, Redis, and MQTT are enabled explicitly through Cargo features.

Current Features

  • A broker-agnostic core Router with typed shared state through set_state(...).
  • Route registration through route(...), routes(...), include(...), and route_service(...).
  • tower middleware support through layer(...).
  • Axum-like extractors such as State<T>, Subject, Headers, Body, and optional payload extractors like Json<T> and Cbor<T>.
  • Framework-managed publishing through the Publish extractor and the core Publisher.
  • Broker-specific publishers for NATS, Redis, and MQTT.
  • RPC-style request-reply through handler return values and Publisher::request(...).
  • Graceful shutdown hooks and always-on error logging with optional router-scoped and service-level error topics.
  • Multi-argument handler support with the axum-style rule that body-consuming extractors go last.
  • Working broker backends for NATS, Redis pub/sub, and MQTT.
  • NatsRouter and NatsPublisher for NATS-specific features, including JetStream routes and JetStream publishing.

Features

  • json Enabled by default. Adds Json<T> extractor/response support.
  • cbor Enables Cbor<T> extractor/response support.
  • nats Enables NatsBroker, NatsRouter, NatsPublisher, and JetStream integration.
  • redis Enables RedisBroker and RedisPublisher.
  • mqtt Enables MqttBroker and MqttPublisher.
  • full Convenience feature for local development and IDE indexing. Enables json, cbor, nats, redis, and mqtt.

Quick Start

Stateless Handler

use crabbyq::prelude::*;
use crabbyq::brokers::NatsBroker;
use tracing::info;

async fn handle_async_event(
    event: Event,
) -> CrabbyResult<()> {
    info!("Stateless handler got event: {}", event.subject());
    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    info!("Stateless work done for: {}", event.subject());
    Ok(())
}

#[tokio::main]
async fn main() -> CrabbyResult<()> {
    tracing_subscriber::fmt::init();

    let nats_client = async_nats::connect("nats://localhost:4222").await?;
    let nats_broker = NatsBroker::new(nats_client);

    let app = Router::new()
        .route("test.simple", handle_async_event)
        .into_service(nats_broker);

    app.serve().await?;
    Ok(())
}

For the NATS examples above, enable the nats feature:

crabbyq = { version = "0.1.0", features = ["nats"] }

State and Extractors

use crabbyq::prelude::*;
use serde::Deserialize;

#[derive(Clone)]
struct AppState {
    app_name: &'static str,
}

#[derive(Clone)]
struct AppName(&'static str);

impl FromRef<AppState> for AppName {
    fn from_ref(input: &AppState) -> Self {
        Self(input.app_name)
    }
}

#[derive(Deserialize)]
struct Payload {
    id: u32,
}

async fn handle(
    State(app_name): State<AppName>,
    Subject(subject): Subject,
    Json(payload): Json<Payload>,
) -> CrabbyResult<()> {
    println!("{} -> {} -> {}", app_name.0, subject, payload.id);
    Ok(())
}

Body-consuming extractors such as Body, Json<T>, and Cbor<T> should be the last handler argument.

RPC Through Return Values

use crabbyq::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Deserialize)]
struct SumRequest {
    left: i32,
    right: i32,
}

#[derive(Serialize)]
struct SumResponse {
    result: i32,
}

async fn handle_sum(
    Json(payload): Json<SumRequest>,
) -> CrabbyResult<Json<SumResponse>> {
    Ok(Json(SumResponse {
        result: payload.left + payload.right,
    }))
}

Examples

The repository includes runnable examples for stateless handlers, state, extractors, publishers, RPC, multi-router composition, and graceful shutdown. See examples/README.md for the full list of runnable examples and example commands.

Architecture Notes

  • Route keys are broker-facing identifiers, not HTTP paths.
  • include(...) composes routers without rewriting route keys.
  • Duplicate route keys panic during router construction instead of failing later at runtime.
  • Extractors are split into:
    • FromEventParts<S> for metadata-like values.
    • FromEvent<S> for full-event or payload-consuming values.
  • Publishers are injected by the framework at into_service(...) time, so handlers can publish without manually wiring broker clients through state.
  • Publisher stays broker-agnostic and is the injected handler capability. Broker-specific publishing features live in broker-specific publishers such as NatsPublisher, RedisPublisher, and MqttPublisher.
  • RPC replies are sent automatically when the incoming message carries reply_to metadata and the handler returns a response value.
  • Middleware is exposed through Router::layer(...) and currently composes routes through boxed tower::Services for implementation simplicity. A more zero-cost internal pipeline is planned as the framework matures.
  • Broker-specific features live in broker-specific routers and publishers. For example, NatsRouter and NatsPublisher extend the core API with JetStream-specific routing and publishing.
  • Broker backends and message formats are feature-gated so embedded and other constrained targets can keep the dependency footprint smaller.
  • CrabbyQ focuses on routing and service ergonomics. For NATS JetStream storage APIs such as Key-Value and Object Store, async-nats already provides a solid API, so CrabbyQ does not add extra wrapper layers there.

Roadmap

Future development focuses on:

  • A more zero-cost internal middleware pipeline on top of the current tower::Layer integration.
  • Broker-specific extractors for transport metadata such as NATS and JetStream delivery context.
  • Better typed rejection and error response handling.
  • More broker backends such as RabbitMQ and Kafka.
  • Additional payload formats such as Protobuf and MsgPack.

Contributing

If you are interested in building "Axum for message brokers" in Rust, contributions are welcome through:

  • Feature requests and API design discussions.
  • Pull requests for new broker implementations.
  • Documentation improvements.