Skip to main content

crabka_broker/
lib.rs

1//! Apache Kafka-compatible broker for Crabka.
2//!
3//! `crabka-broker` ships a library + binary that unmodified JVM Kafka clients
4//! can produce to and consume from. It is the runtime that ties together the
5//! wire protocol, `KRaft` metadata controller, log storage, replication, security,
6//! quotas, compaction, tiered storage, transactions, and observability.
7//!
8//! # Capability areas
9//!
10//! - Accepts Kafka wire-protocol TCP connections and negotiates API versions.
11//! - Handles topic metadata and administration, `Produce`, `Fetch`,
12//!   `ListOffsets`, configs, group coordination, offset commits, share groups,
13//!   transactions, producer-state inspection, quotas, and client telemetry.
14//! - Runs an embedded [`crabka_raft`] `KRaft` metadata quorum, registers brokers,
15//!   tracks broker liveness, and drives partition leadership / reassignment.
16//! - Persists partition data via [`crabka_log`], including leader-epoch
17//!   checkpoints, transaction indexes, retention, and log compaction.
18//! - Supports idempotent and transactional producers, read-committed fetches,
19//!   high-watermark enforcement for `acks=all`, follower replication, ISR
20//!   maintenance, and leader election.
21//! - Supports plaintext, TLS, SASL/PLAIN, SASL/SCRAM, SASL/OAUTHBEARER,
22//!   SASL/GSSAPI, mTLS principal extraction, ACL authorization, and SCRAM / ACL
23//!   mutation through the admin APIs.
24//! - Supports KIP-405 tiered storage through local and S3-compatible remote
25//!   storage managers plus the topic-backed remote-log metadata manager.
26//!
27//! # Quick start
28//!
29//! ```no_run
30//! use crabka_broker::{Broker, BrokerConfig};
31//!
32//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
33//! let handle = Broker::start(BrokerConfig::default()).await?;
34//! tokio::signal::ctrl_c().await?;
35//! handle.shutdown().await;
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! # Public surface
41//!
42//! - [`Broker`] — owns the partition registry, metadata image, and
43//!   handler table; constructed by [`Broker::start`].
44//! - [`BrokerHandle`] — lifecycle handle returned by
45//!   [`Broker::start`]; call [`BrokerHandle::shutdown`] to drain.
46//! - [`BrokerConfig`] — listen address, advertised listener, log dir,
47//!   broker id, per-log [`LogConfig`](crabka_log::LogConfig).
48//! - [`BrokerError`] — error returned by [`Broker::start`].
49//!
50//! ## Replication
51//!
52//! `CreateTopics` with `replication_factor > 1` assigns N replicas per
53//! partition via round-robin over `MetadataImage::brokers()`. The
54//! `replicator_supervisor` subscribes to controller metadata changes
55//! and spawns a `replicator` task per partition where this broker is
56//! a non-leader replica. Each replicator opens a
57//! `crabka_client_core::Client` to its partition's leader and loops
58//! on `Fetch` with `replica_id` set, appending every returned
59//! `RecordBatch` to the local log.
60//!
61//! The replication path includes follower fetch loops, high-watermark tracking,
62//! `acks=all` blocking, leader-epoch fencing, ISR shrink/expand proposals, and
63//! controller-driven leader election for broker failures. Produce routing still
64//! follows the normal Kafka client contract: clients refresh metadata and send
65//! partition writes to the advertised leader.
66//!
67//! ## Transactions
68//!
69//! Kafka transactions (KIP-98 + full KIP-1319 v2) via a per-broker
70//! `txn::coordinator::TxnCoordinator` backed by the `__transaction_state`
71//! internal topic (50 partitions, lazily bootstrapped on first
72//! `FindCoordinator(TRANSACTION)`). Producers call `init_transactions`
73//! / `begin_transaction` / `commit_transaction` / `abort_transaction` /
74//! `send_offsets_to_transaction`; consumers set
75//! `isolation_level=read_committed` to filter aborted records via the
76//! per-segment `.txnindex` and partition-level LSO.
77//!
78//! The transaction coordinator works with the replication/high-watermark path:
79//! `acks=all` transactional writes wait for the partition high watermark,
80//! consumers using `read_committed` fetch only records visible below the LSO,
81//! and leader-epoch fencing plus controller-driven leader election protect the
82//! log after broker failover.
83//!
84//! ## Bulletproof EOS — HW + acks=all
85//!
86//! Per-partition High Watermark tracking via `ReplicaState`
87//! (lives on `Partition`). The leader maintains each follower's LEO from
88//! their Fetch requests and caches HW = `min(LEO over ISR)`. `acks=-1`
89//! Produces gate on `Partition::await_hw_at_least` before responding;
90//! on timeout the producer gets per-partition
91//! `NOT_ENOUGH_REPLICAS_AFTER_APPEND` (code 20). Consumer Fetches
92//! (`replica_id == -1`) clamp visible batches and `last_stable_offset`
93//! at HW; `read_committed` LSO becomes `min(HW, log.lso())`.
94//!
95//! On its own, this leaves a remaining bulletproof-EOS gap: a leader
96//! crash mid-transaction still loses records. KIP-101 leader-epoch
97//! fencing, leader-election-on-failure, and ISR shrink/expand (below)
98//! close that gap.
99//!
100//! ## Bulletproof EOS — leader-epoch + election + ISR
101//!
102//! KIP-101 leader-epoch fencing tagged onto every appended batch via
103//! `Partition::current_leader_epoch`. Per-partition
104//! `.leader-epoch-checkpoint` file (Apache Kafka byte-compat) backs the
105//! `OffsetForLeaderEpoch` RPC for follower-side truncation on leader
106//! change. Leader election runs on the controller:
107//! `heartbeat::controller_state::ControllerLivenessState` tracks
108//! per-broker `last_heartbeat`; a 1s ticker times out brokers at
109//! `heartbeat_timeout_ms` and calls `leader_election::on_broker_dead`
110//! which scans partitions of the dead broker, picks the first alive
111//! ISR replica, and bumps `leader_epoch`. ISR shrink/expand is
112//! leader-driven by `isr_maintenance` — proposes `AlterPartition`
113//! whenever a follower's last-fetch time exceeds
114//! `replica_lag_time_max_ms`.
115//!
116//! Together with the HW + acks=all work above, the bulletproof-EOS promise is complete:
117//! `acks=all` produces survive arbitrary single-broker failures with
118//! no data loss and no zombie writes.
119
120#![doc(html_root_url = "https://docs.rs/crabka-broker/0.3.5")]
121
122pub mod api_catalog;
123pub(crate) mod assign_dirs;
124pub mod authorizer;
125pub(crate) mod auto_join;
126pub mod bootstrap;
127mod broker;
128pub(crate) mod cleaner;
129mod client_metrics;
130pub mod codes;
131pub mod config;
132pub(crate) mod config_keys;
133pub mod coordinator;
134pub(crate) mod delegation_token_cleanup;
135pub mod disk_scanner;
136mod error;
137mod features;
138pub mod fetch_session;
139pub mod file_config;
140pub(crate) mod future_log;
141mod handlers;
142pub(crate) mod heartbeat;
143pub(crate) mod incarnation;
144pub(crate) mod isr_maintenance;
145pub(crate) mod leader_election;
146pub mod leader_rebalance;
147mod log_dir;
148pub mod log_dir_id;
149mod log_dir_status;
150pub mod metadata_observer;
151pub mod metadata_source;
152pub mod metrics;
153pub(crate) mod metrics_server;
154pub mod network;
155pub(crate) mod oauth_introspection;
156pub(crate) mod oauth_jwks;
157mod partition;
158pub(crate) mod partition_registry;
159mod partition_writer;
160mod producer_id_manager;
161mod producer_state;
162pub mod quota;
163pub mod raft_handshake;
164pub(crate) mod reassignment;
165pub(crate) mod remote_log_manager;
166pub(crate) mod remote_reader;
167pub mod replica_selector;
168pub(crate) mod replica_state;
169mod replicator;
170mod replicator_supervisor;
171pub mod share_coordinator;
172pub mod share_partition;
173pub mod telemetry;
174pub mod throttle;
175pub(crate) mod time_util;
176pub(crate) mod tls_reload;
177pub(crate) mod topic_resolve;
178mod txn;
179pub(crate) mod unclean_recovery;
180
181pub use broker::{Broker, BrokerHandle};
182pub use config::{BootstrapMode, BrokerConfig, KafkaRlmmConfig, RemoteStorageBackend, RlmmKind};
183pub use config_keys::{TopicConfigDoc, topic_config_docs};
184pub use crabka_raft::NodeId;
185pub use error::BrokerError;