1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
//! Apache Kafka-compatible broker for Crabka.
//!
//! `crabka-broker` ships a library + binary that unmodified JVM Kafka clients
//! can produce to and consume from. It is the runtime that ties together the
//! wire protocol, `KRaft` metadata controller, log storage, replication, security,
//! quotas, compaction, tiered storage, transactions, and observability.
//!
//! # Capability areas
//!
//! - Accepts Kafka wire-protocol TCP connections and negotiates API versions.
//! - Handles topic metadata and administration, `Produce`, `Fetch`,
//! `ListOffsets`, configs, group coordination, offset commits, share groups,
//! transactions, producer-state inspection, quotas, and client telemetry.
//! - Runs an embedded [`crabka_raft`] `KRaft` metadata quorum, registers brokers,
//! tracks broker liveness, and drives partition leadership / reassignment.
//! - Persists partition data via [`crabka_log`], including leader-epoch
//! checkpoints, transaction indexes, retention, and log compaction.
//! - Supports idempotent and transactional producers, read-committed fetches,
//! high-watermark enforcement for `acks=all`, follower replication, ISR
//! maintenance, and leader election.
//! - Supports plaintext, TLS, SASL/PLAIN, SASL/SCRAM, SASL/OAUTHBEARER,
//! SASL/GSSAPI, mTLS principal extraction, ACL authorization, and SCRAM / ACL
//! mutation through the admin APIs.
//! - Supports KIP-405 tiered storage through local and S3-compatible remote
//! storage managers plus the topic-backed remote-log metadata manager.
//!
//! # Quick start
//!
//! ```no_run
//! use crabka_broker::{Broker, BrokerConfig};
//!
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! let handle = Broker::start(BrokerConfig::default()).await?;
//! tokio::signal::ctrl_c().await?;
//! handle.shutdown().await;
//! # Ok(())
//! # }
//! ```
//!
//! # Public surface
//!
//! - [`Broker`] — owns the partition registry, metadata image, and
//! handler table; constructed by [`Broker::start`].
//! - [`BrokerHandle`] — lifecycle handle returned by
//! [`Broker::start`]; call [`BrokerHandle::shutdown`] to drain.
//! - [`BrokerConfig`] — listen address, advertised listener, log dir,
//! broker id, per-log [`LogConfig`](crabka_log::LogConfig).
//! - [`BrokerError`] — error returned by [`Broker::start`].
//!
//! ## Replication
//!
//! `CreateTopics` with `replication_factor > 1` assigns N replicas per
//! partition via round-robin over `MetadataImage::brokers()`. The
//! `replicator_supervisor` subscribes to controller metadata changes
//! and spawns a `replicator` task per partition where this broker is
//! a non-leader replica. Each replicator opens a
//! `crabka_client_core::Client` to its partition's leader and loops
//! on `Fetch` with `replica_id` set, appending every returned
//! `RecordBatch` to the local log.
//!
//! The replication path includes follower fetch loops, high-watermark tracking,
//! `acks=all` blocking, leader-epoch fencing, ISR shrink/expand proposals, and
//! controller-driven leader election for broker failures. Produce routing still
//! follows the normal Kafka client contract: clients refresh metadata and send
//! partition writes to the advertised leader.
//!
//! ## Transactions
//!
//! Kafka transactions (KIP-98 + full KIP-1319 v2) via a per-broker
//! `txn::coordinator::TxnCoordinator` backed by the `__transaction_state`
//! internal topic (50 partitions, lazily bootstrapped on first
//! `FindCoordinator(TRANSACTION)`). Producers call `init_transactions`
//! / `begin_transaction` / `commit_transaction` / `abort_transaction` /
//! `send_offsets_to_transaction`; consumers set
//! `isolation_level=read_committed` to filter aborted records via the
//! per-segment `.txnindex` and partition-level LSO.
//!
//! The transaction coordinator works with the replication/high-watermark path:
//! `acks=all` transactional writes wait for the partition high watermark,
//! consumers using `read_committed` fetch only records visible below the LSO,
//! and leader-epoch fencing plus controller-driven leader election protect the
//! log after broker failover.
//!
//! ## Bulletproof EOS — HW + acks=all
//!
//! Per-partition High Watermark tracking via `ReplicaState`
//! (lives on `Partition`). The leader maintains each follower's LEO from
//! their Fetch requests and caches HW = `min(LEO over ISR)`. `acks=-1`
//! Produces gate on `Partition::await_hw_at_least` before responding;
//! on timeout the producer gets per-partition
//! `NOT_ENOUGH_REPLICAS_AFTER_APPEND` (code 20). Consumer Fetches
//! (`replica_id == -1`) clamp visible batches and `last_stable_offset`
//! at HW; `read_committed` LSO becomes `min(HW, log.lso())`.
//!
//! On its own, this leaves a remaining bulletproof-EOS gap: a leader
//! crash mid-transaction still loses records. KIP-101 leader-epoch
//! fencing, leader-election-on-failure, and ISR shrink/expand (below)
//! close that gap.
//!
//! ## Bulletproof EOS — leader-epoch + election + ISR
//!
//! KIP-101 leader-epoch fencing tagged onto every appended batch via
//! `Partition::current_leader_epoch`. Per-partition
//! `.leader-epoch-checkpoint` file (Apache Kafka byte-compat) backs the
//! `OffsetForLeaderEpoch` RPC for follower-side truncation on leader
//! change. Leader election runs on the controller:
//! `heartbeat::controller_state::ControllerLivenessState` tracks
//! per-broker `last_heartbeat`; a 1s ticker times out brokers at
//! `heartbeat_timeout_ms` and calls `leader_election::on_broker_dead`
//! which scans partitions of the dead broker, picks the first alive
//! ISR replica, and bumps `leader_epoch`. ISR shrink/expand is
//! leader-driven by `isr_maintenance` — proposes `AlterPartition`
//! whenever a follower's last-fetch time exceeds
//! `replica_lag_time_max_ms`.
//!
//! Together with the HW + acks=all work above, the bulletproof-EOS promise is complete:
//! `acks=all` produces survive arbitrary single-broker failures with
//! no data loss and no zombie writes.
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub
pub use ;
pub use ;
pub use ;
pub use NodeId;
pub use BrokerError;