Skip to main content

Crate crabka_grpc_gateway

Crate crabka_grpc_gateway 

Source
Expand description

crabka-grpc-gateway — gRPC / Connect-RPC + HTTP gateway into Crabka topics.

Built on the native client crates. The broker’s Kafka wire stays byte-exact; the gateway translates Connect-RPC calls into producer, consumer, schema, deduplication, and authorization operations.

§Serving the gateway

use axum::Router;
use crabka_grpc_gateway::{router, state::AppState};
use std::sync::Arc;

let app: Router = router(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app).await?;

Modules§

authz
Gateway trusted-proxy authorization: holds the crabka_authz::Authorizer and an ArcSwap’d AclCache refreshed by polling the broker’s DescribeAcls.
codec
Pluggable record codec. RawCodec is identity/opaque bytes; schema-aware codecs implement the same trait so front-ends and produce/consume cores do not need format-specific branches.
config
Gateway configuration, parsed from CLI flags / env in bin/gateway.rs.
consume
Consume core: a group-subscribed session that yields records and commits offsets. The streaming/poll wire (later plan) drives this. Records are decoded through the codec on the way out.
dedup
Single-owner exactly-once dedup engine.
error
Gateway error type. Wraps native-client errors so handlers can map to Connect status without leaking client internals.
forward
Internal gateway→gateway forwarding: the owner-routing client plus the /internal/v1/forward endpoint that receives a forwarded record and produces it LOCALLY (the receiver is the partition’s owner).
handlers
Connect-RPC handlers — thin adapters: proto in, GatewayRecord to the core, RecordOutcome back to proto.
health
Liveness/readiness endpoints. /healthz is always 200 once serving; /readyz returns 503 until the dedup store has warmed up, so load balancers don’t route dedup’d traffic to a cold replica.
metrics
Gateway Prometheus metrics. A process-global GatewayMetrics (lazy) so any code path can record without threading a handle; /metrics renders it.
outbound
Outbound webhook delivery: one task per subscription. Batch-at-a-time so the commit boundary == the delivered boundary (the consumer commits the whole polled position, so we deliver the whole batch before committing). Per partition: deliver in offset order, retry with exponential backoff + jitter, dead-letter on exhaustion. At-least-once; receivers dedup on X-Crabka-Event-Id.
outbound_config
Operator-supplied outbound webhook subscriptions (TOML), compiled at load: the target URL’s scheme/host is checked against an allow-list (SSRF guard) and any filter JSONPath is parsed once.
pb
Generated protobuf + Connect server stubs. The actual content lives in OUT_DIR/crabka.gateway.v1.rs and is produced by build.rs.
produce
Core produce engine. Keyed records (with an idempotency_key) go through the dedup engine for EOS; unkeyed records take the plain idempotent path (acks=all). Transport-agnostic — front-ends convert to GatewayRecord and receive RecordOutcome.
schema
Schema Registry integration for the gRPC gateway.
serve
Listener serving: plaintext via axum::serve, or rustls via a manual accept loop that hands each TlsStream to hyper and injects the mTLS peer principal (cert subject DN) into request extensions. TLS material is hot- reloadable (DynamicServerConfig); the plaintext path is unchanged from pre-P4 so existing tests are unaffected.
state
Shared, cheaply-cloneable handles for Connect handlers.
streaming
Streaming Connect handlers — bidirectional SendStream (produce) and Subscribe (consume). The per-handler logic lives in a *_inner function returning a plain Stream (unit-testable); the public handler is a thin wrapper into ConnectResponse::new(StreamBody::new(inner)).
types
Protocol-agnostic record types. Every front-end (gRPC now; webhooks later) converts into GatewayRecord and consumes RecordOutcome, so the core engines never depend on a wire format.
webhook
Webhook inbound handlers.
webhook_config
Operator-supplied webhook-endpoint config (TOML), compiled at load time: JSONPath expressions are parsed once, signature settings validated. Mirrors the broker’s file_config pattern.

Functions§

router
Build the Connect-RPC axum::Router for the Gateway service.