Expand description
crabka-grpc-gateway — gRPC / Connect-RPC + HTTP gateway into Crabka topics.
Built on the native client crates. The broker’s Kafka wire stays byte-exact; the gateway translates Connect-RPC calls into producer, consumer, schema, deduplication, and authorization operations.
§Serving the gateway
use axum::Router;
use crabka_grpc_gateway::{router, state::AppState};
use std::sync::Arc;
let app: Router = router(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app).await?;Modules§
- authz
- Gateway trusted-proxy authorization: holds the
crabka_authz::Authorizerand anArcSwap’dAclCacherefreshed by polling the broker’sDescribeAcls. - codec
- Pluggable record codec.
RawCodecis identity/opaque bytes; schema-aware codecs implement the same trait so front-ends and produce/consume cores do not need format-specific branches. - config
- Gateway configuration, parsed from CLI flags / env in
bin/gateway.rs. - consume
- Consume core: a group-subscribed session that yields records and commits offsets. The streaming/poll wire (later plan) drives this. Records are decoded through the codec on the way out.
- dedup
- Single-owner exactly-once dedup engine.
- error
- Gateway error type. Wraps native-client errors so handlers can map to Connect status without leaking client internals.
- forward
- Internal gateway→gateway forwarding: the owner-routing client plus the
/internal/v1/forwardendpoint that receives a forwarded record and produces it LOCALLY (the receiver is the partition’s owner). - handlers
- Connect-RPC handlers — thin adapters: proto in,
GatewayRecordto the core,RecordOutcomeback to proto. - health
- Liveness/readiness endpoints.
/healthzis always 200 once serving;/readyzreturns 503 until the dedup store has warmed up, so load balancers don’t route dedup’d traffic to a cold replica. - metrics
- Gateway Prometheus metrics. A process-global
GatewayMetrics(lazy) so any code path can record without threading a handle;/metricsrenders it. - outbound
- Outbound webhook delivery: one task per subscription. Batch-at-a-time so the commit boundary == the delivered boundary (the consumer commits the whole polled position, so we deliver the whole batch before committing). Per partition: deliver in offset order, retry with exponential backoff + jitter, dead-letter on exhaustion. At-least-once; receivers dedup on X-Crabka-Event-Id.
- outbound_
config - Operator-supplied outbound webhook subscriptions (TOML), compiled at load:
the target URL’s scheme/host is checked against an allow-list (SSRF guard)
and any filter
JSONPathis parsed once. - pb
- Generated protobuf + Connect server stubs. The actual content lives
in
OUT_DIR/crabka.gateway.v1.rsand is produced bybuild.rs. - produce
- Core produce engine. Keyed records (with an
idempotency_key) go through the dedup engine for EOS; unkeyed records take the plain idempotent path (acks=all). Transport-agnostic — front-ends convert toGatewayRecordand receiveRecordOutcome. - schema
- Schema Registry integration for the gRPC gateway.
- serve
- Listener serving: plaintext via
axum::serve, or rustls via a manual accept loop that hands eachTlsStreamto hyper and injects the mTLS peer principal (cert subject DN) into request extensions. TLS material is hot- reloadable (DynamicServerConfig); the plaintext path is unchanged from pre-P4 so existing tests are unaffected. - state
- Shared, cheaply-cloneable handles for Connect handlers.
- streaming
- Streaming Connect handlers — bidirectional
SendStream(produce) andSubscribe(consume). The per-handler logic lives in a*_innerfunction returning a plainStream(unit-testable); the public handler is a thin wrapper intoConnectResponse::new(StreamBody::new(inner)). - types
- Protocol-agnostic record types. Every front-end (gRPC now; webhooks
later) converts into
GatewayRecordand consumesRecordOutcome, so the core engines never depend on a wire format. - webhook
- Webhook inbound handlers.
- webhook_
config - Operator-supplied webhook-endpoint config (TOML), compiled at load time:
JSONPathexpressions are parsed once, signature settings validated. Mirrors the broker’sfile_configpattern.
Functions§
- router
- Build the Connect-RPC
axum::Routerfor the Gateway service.