Skip to main content

Crate crabka_broker

Crate crabka_broker 

Source
Expand description

Apache Kafka-compatible broker for Crabka.

crabka-broker ships a library + binary that unmodified JVM Kafka clients can produce to and consume from. It is the runtime that ties together the wire protocol, KRaft metadata controller, log storage, replication, security, quotas, compaction, tiered storage, transactions, and observability.

§Capability areas

  • Accepts Kafka wire-protocol TCP connections and negotiates API versions.
  • Handles topic metadata and administration, Produce, Fetch, ListOffsets, configs, group coordination, offset commits, share groups, transactions, producer-state inspection, quotas, and client telemetry.
  • Runs an embedded crabka_raft KRaft metadata quorum, registers brokers, tracks broker liveness, and drives partition leadership / reassignment.
  • Persists partition data via crabka_log, including leader-epoch checkpoints, transaction indexes, retention, and log compaction.
  • Supports idempotent and transactional producers, read-committed fetches, high-watermark enforcement for acks=all, follower replication, ISR maintenance, and leader election.
  • Supports plaintext, TLS, SASL/PLAIN, SASL/SCRAM, SASL/OAUTHBEARER, SASL/GSSAPI, mTLS principal extraction, ACL authorization, and SCRAM / ACL mutation through the admin APIs.
  • Supports KIP-405 tiered storage through local and S3-compatible remote storage managers plus the topic-backed remote-log metadata manager.

§Quick start

use crabka_broker::{Broker, BrokerConfig};

let handle = Broker::start(BrokerConfig::default()).await?;
tokio::signal::ctrl_c().await?;
handle.shutdown().await;

§Public surface

§Replication

CreateTopics with replication_factor > 1 assigns N replicas per partition via round-robin over MetadataImage::brokers(). The replicator_supervisor subscribes to controller metadata changes and spawns a replicator task per partition where this broker is a non-leader replica. Each replicator opens a crabka_client_core::Client to its partition’s leader and loops on Fetch with replica_id set, appending every returned RecordBatch to the local log.

The replication path includes follower fetch loops, high-watermark tracking, acks=all blocking, leader-epoch fencing, ISR shrink/expand proposals, and controller-driven leader election for broker failures. Produce routing still follows the normal Kafka client contract: clients refresh metadata and send partition writes to the advertised leader.

§Transactions

Kafka transactions (KIP-98 + full KIP-1319 v2) via a per-broker txn::coordinator::TxnCoordinator backed by the __transaction_state internal topic (50 partitions, lazily bootstrapped on first FindCoordinator(TRANSACTION)). Producers call init_transactions / begin_transaction / commit_transaction / abort_transaction / send_offsets_to_transaction; consumers set isolation_level=read_committed to filter aborted records via the per-segment .txnindex and partition-level LSO.

The transaction coordinator works with the replication/high-watermark path: acks=all transactional writes wait for the partition high watermark, consumers using read_committed fetch only records visible below the LSO, and leader-epoch fencing plus controller-driven leader election protect the log after broker failover.

§Bulletproof EOS — HW + acks=all

Per-partition High Watermark tracking via ReplicaState (lives on Partition). The leader maintains each follower’s LEO from their Fetch requests and caches HW = min(LEO over ISR). acks=-1 Produces gate on Partition::await_hw_at_least before responding; on timeout the producer gets per-partition NOT_ENOUGH_REPLICAS_AFTER_APPEND (code 20). Consumer Fetches (replica_id == -1) clamp visible batches and last_stable_offset at HW; read_committed LSO becomes min(HW, log.lso()).

On its own, this leaves a remaining bulletproof-EOS gap: a leader crash mid-transaction still loses records. KIP-101 leader-epoch fencing, leader-election-on-failure, and ISR shrink/expand (below) close that gap.

§Bulletproof EOS — leader-epoch + election + ISR

KIP-101 leader-epoch fencing tagged onto every appended batch via Partition::current_leader_epoch. Per-partition .leader-epoch-checkpoint file (Apache Kafka byte-compat) backs the OffsetForLeaderEpoch RPC for follower-side truncation on leader change. Leader election runs on the controller: heartbeat::controller_state::ControllerLivenessState tracks per-broker last_heartbeat; a 1s ticker times out brokers at heartbeat_timeout_ms and calls leader_election::on_broker_dead which scans partitions of the dead broker, picks the first alive ISR replica, and bumps leader_epoch. ISR shrink/expand is leader-driven by isr_maintenance — proposes AlterPartition whenever a follower’s last-fetch time exceeds replica_lag_time_max_ms.

Together with the HW + acks=all work above, the bulletproof-EOS promise is complete: acks=all produces survive arbitrary single-broker failures with no data loss and no zombie writes.

Re-exports§

pub use config::BrokerConfig;
pub use config::KafkaRlmmConfig;
pub use config::RemoteStorageBackend;
pub use config::RlmmKind;

Modules§

api_catalog
Public catalog of the Kafka protocol APIs this broker advertises.
authorizer
Cluster authorizer. The trait + ACL evaluator now live in crabka-authz (shared with the gateway); this module re-exports them and keeps the broker-only OPA plugin.
bootstrap
Read bootstrap.records.bin (produced by crabka format --add-scram) on broker first start.
codes
Kafka wire-level error codes used in this MVP.
config
Broker configuration. Built directly (library use) or from CLI flags (binary entry point in bin/broker.rs).
coordinator
Group-coordinator subsystem. The unified GroupCoordinator (unified::GroupCoordinator) owns one tokio actor per group_id, each speaking either the classic JoinGroup/SyncGroup/Heartbeat/LeaveGroup protocol or the KIP-848 next-gen ConsumerGroupHeartbeat protocol behind a single registry, one persistence path, and one actor model.
disk_scanner
Periodic per-partition disk-usage scanner. Spawned by Broker::start when --partition-disk-scan-interval-secs > 0. Each tick walks the log directory for every known (topic, partition), sums regular file sizes, and updates the partition_disk_bytes gauge.
fetch_session
KIP-227 incremental-fetch-session cache.
file_config
TOML file-config surface for the crabka-broker binary.
leader_rebalance
KIP-460 auto preferred-replica rebalance. A background task on the controller leader periodically scans every partition; for each where select_new_leader_for_partition(Preferred) succeeds, queues a V1Partition update. Submits in one batch per tick when the cluster-wide imbalance ratio crosses the configured threshold.
log_dir_id
Per-log.dir stable UUIDs (KIP-858 directory ids).
metadata_observer
Broker-only metadata observer (Component B).
metadata_source
MetadataSource — the metadata authority a broker reads from and writes through. Combined/controller nodes back it with a live ControllerHandle (openraft voter); broker-only nodes back it with a MetadataObserver (true KRaft observer) plus a write-forwarding path to the controller quorum. Handlers depend only on this trait.
metrics
Broker-side Prometheus metrics.
network
TCP listener, per-connection task, and Kafka framing helpers.
quota
KIP-13 + KIP-124 + KIP-257 client quotas.
raft_handshake
Inbound TLS + SASL handshake for the controller listener.
replica_selector
KIP-392 replica selection. The partition leader runs select on every consumer Fetch that carries a client.rack (rack_id) and reports the chosen node id in FetchResponse.preferred_read_replica. Returning -1 means “no preference — read from the leader”.
share_coordinator
KIP-932 share coordinator (persister): durable per-(group, topicId, partition) delivery state stored in the __share_group_state internal topic. Mirrors the transaction coordinator (crate::txn).
share_partition
KIP-932 share-partition leader: the in-memory acquisition state machine.
telemetry
Broker tracing + OTLP distributed-tracing pipeline.
throttle
KIP-73 throttled replication — value types and parser.

Structs§

Broker
The running broker. Library callers get a BrokerHandle from Broker::start; this struct is the shared internal state.
BrokerHandle
Lifecycle handle returned by Broker::start. Drop or call shutdown to stop the broker.
TopicConfigDoc
One whitelisted topic-config key, for the generated reference page.

Enums§

BootstrapMode
Bootstrap orchestration for a freshly-formatted controller node.
BrokerError
Errors produced by the broker’s lifecycle and handlers.

Functions§

topic_config_docs
The full whitelist documented on the topic-configs reference page.

Type Aliases§

NodeId