Expand description
Apache Kafka-compatible broker for Crabka.
crabka-broker ships a library + binary that unmodified JVM Kafka clients
can produce to and consume from. It is the runtime that ties together the
wire protocol, KRaft metadata controller, log storage, replication, security,
quotas, compaction, tiered storage, transactions, and observability.
§Capability areas
- Accepts Kafka wire-protocol TCP connections and negotiates API versions.
- Handles topic metadata and administration,
Produce,Fetch,ListOffsets, configs, group coordination, offset commits, share groups, transactions, producer-state inspection, quotas, and client telemetry. - Runs an embedded
crabka_raftKRaftmetadata quorum, registers brokers, tracks broker liveness, and drives partition leadership / reassignment. - Persists partition data via
crabka_log, including leader-epoch checkpoints, transaction indexes, retention, and log compaction. - Supports idempotent and transactional producers, read-committed fetches,
high-watermark enforcement for
acks=all, follower replication, ISR maintenance, and leader election. - Supports plaintext, TLS, SASL/PLAIN, SASL/SCRAM, SASL/OAUTHBEARER, SASL/GSSAPI, mTLS principal extraction, ACL authorization, and SCRAM / ACL mutation through the admin APIs.
- Supports KIP-405 tiered storage through local and S3-compatible remote storage managers plus the topic-backed remote-log metadata manager.
§Quick start
use crabka_broker::{Broker, BrokerConfig};
let handle = Broker::start(BrokerConfig::default()).await?;
tokio::signal::ctrl_c().await?;
handle.shutdown().await;§Public surface
Broker— owns the partition registry, metadata image, and handler table; constructed byBroker::start.BrokerHandle— lifecycle handle returned byBroker::start; callBrokerHandle::shutdownto drain.BrokerConfig— listen address, advertised listener, log dir, broker id, per-logLogConfig.BrokerError— error returned byBroker::start.
§Replication
CreateTopics with replication_factor > 1 assigns N replicas per
partition via round-robin over MetadataImage::brokers(). The
replicator_supervisor subscribes to controller metadata changes
and spawns a replicator task per partition where this broker is
a non-leader replica. Each replicator opens a
crabka_client_core::Client to its partition’s leader and loops
on Fetch with replica_id set, appending every returned
RecordBatch to the local log.
The replication path includes follower fetch loops, high-watermark tracking,
acks=all blocking, leader-epoch fencing, ISR shrink/expand proposals, and
controller-driven leader election for broker failures. Produce routing still
follows the normal Kafka client contract: clients refresh metadata and send
partition writes to the advertised leader.
§Transactions
Kafka transactions (KIP-98 + full KIP-1319 v2) via a per-broker
txn::coordinator::TxnCoordinator backed by the __transaction_state
internal topic (50 partitions, lazily bootstrapped on first
FindCoordinator(TRANSACTION)). Producers call init_transactions
/ begin_transaction / commit_transaction / abort_transaction /
send_offsets_to_transaction; consumers set
isolation_level=read_committed to filter aborted records via the
per-segment .txnindex and partition-level LSO.
The transaction coordinator works with the replication/high-watermark path:
acks=all transactional writes wait for the partition high watermark,
consumers using read_committed fetch only records visible below the LSO,
and leader-epoch fencing plus controller-driven leader election protect the
log after broker failover.
§Bulletproof EOS — HW + acks=all
Per-partition High Watermark tracking via ReplicaState
(lives on Partition). The leader maintains each follower’s LEO from
their Fetch requests and caches HW = min(LEO over ISR). acks=-1
Produces gate on Partition::await_hw_at_least before responding;
on timeout the producer gets per-partition
NOT_ENOUGH_REPLICAS_AFTER_APPEND (code 20). Consumer Fetches
(replica_id == -1) clamp visible batches and last_stable_offset
at HW; read_committed LSO becomes min(HW, log.lso()).
On its own, this leaves a remaining bulletproof-EOS gap: a leader crash mid-transaction still loses records. KIP-101 leader-epoch fencing, leader-election-on-failure, and ISR shrink/expand (below) close that gap.
§Bulletproof EOS — leader-epoch + election + ISR
KIP-101 leader-epoch fencing tagged onto every appended batch via
Partition::current_leader_epoch. Per-partition
.leader-epoch-checkpoint file (Apache Kafka byte-compat) backs the
OffsetForLeaderEpoch RPC for follower-side truncation on leader
change. Leader election runs on the controller:
heartbeat::controller_state::ControllerLivenessState tracks
per-broker last_heartbeat; a 1s ticker times out brokers at
heartbeat_timeout_ms and calls leader_election::on_broker_dead
which scans partitions of the dead broker, picks the first alive
ISR replica, and bumps leader_epoch. ISR shrink/expand is
leader-driven by isr_maintenance — proposes AlterPartition
whenever a follower’s last-fetch time exceeds
replica_lag_time_max_ms.
Together with the HW + acks=all work above, the bulletproof-EOS promise is complete:
acks=all produces survive arbitrary single-broker failures with
no data loss and no zombie writes.
Re-exports§
pub use config::BrokerConfig;pub use config::KafkaRlmmConfig;pub use config::RemoteStorageBackend;pub use config::RlmmKind;
Modules§
- api_
catalog - Public catalog of the Kafka protocol APIs this broker advertises.
- authorizer
- Cluster authorizer. The trait + ACL evaluator now live in
crabka-authz(shared with the gateway); this module re-exports them and keeps the broker-only OPA plugin. - bootstrap
- Read
bootstrap.records.bin(produced bycrabka format --add-scram) on broker first start. - codes
- Kafka wire-level error codes used in this MVP.
- config
- Broker configuration. Built directly (library use) or from CLI flags
(binary entry point in
bin/broker.rs). - coordinator
- Group-coordinator subsystem. The unified
GroupCoordinator(unified::GroupCoordinator) owns one tokio actor pergroup_id, each speaking either the classicJoinGroup/SyncGroup/Heartbeat/LeaveGroupprotocol or the KIP-848 next-genConsumerGroupHeartbeatprotocol behind a single registry, one persistence path, and one actor model. - disk_
scanner - Periodic per-partition disk-usage scanner. Spawned by
Broker::startwhen--partition-disk-scan-interval-secs > 0. Each tick walks the log directory for every known (topic, partition), sums regular file sizes, and updates thepartition_disk_bytesgauge. - fetch_
session - KIP-227 incremental-fetch-session cache.
- file_
config - TOML file-config surface for the
crabka-brokerbinary. - leader_
rebalance - KIP-460 auto preferred-replica rebalance. A background task on the
controller leader periodically scans every partition; for each
where
select_new_leader_for_partition(Preferred)succeeds, queues aV1Partitionupdate. Submits in one batch per tick when the cluster-wide imbalance ratio crosses the configured threshold. - log_
dir_ id - Per-
log.dirstable UUIDs (KIP-858 directory ids). - metadata_
observer - Broker-only metadata observer (Component B).
- metadata_
source MetadataSource— the metadata authority a broker reads from and writes through. Combined/controller nodes back it with a liveControllerHandle(openraft voter); broker-only nodes back it with aMetadataObserver(trueKRaftobserver) plus a write-forwarding path to the controller quorum. Handlers depend only on this trait.- metrics
- Broker-side Prometheus metrics.
- network
- TCP listener, per-connection task, and Kafka framing helpers.
- quota
- KIP-13 + KIP-124 + KIP-257 client quotas.
- raft_
handshake - Inbound TLS + SASL handshake for the controller listener.
- replica_
selector - KIP-392 replica selection. The partition leader runs
selecton every consumer Fetch that carries aclient.rack(rack_id) and reports the chosen node id inFetchResponse.preferred_read_replica. Returning-1means “no preference — read from the leader”. - share_
coordinator - KIP-932 share coordinator (persister): durable per-
(group, topicId, partition)delivery state stored in the__share_group_stateinternal topic. Mirrors the transaction coordinator (crate::txn). - share_
partition - KIP-932 share-partition leader: the in-memory acquisition state machine.
- telemetry
- Broker tracing + OTLP distributed-tracing pipeline.
- throttle
- KIP-73 throttled replication — value types and parser.
Structs§
- Broker
- The running broker. Library callers get a
BrokerHandlefromBroker::start; this struct is the shared internal state. - Broker
Handle - Lifecycle handle returned by
Broker::start. Drop or callshutdownto stop the broker. - Topic
Config Doc - One whitelisted topic-config key, for the generated reference page.
Enums§
- Bootstrap
Mode - Bootstrap orchestration for a freshly-formatted controller node.
- Broker
Error - Errors produced by the broker’s lifecycle and handlers.
Functions§
- topic_
config_ docs - The full whitelist documented on the topic-configs reference page.