1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//! Public catalog of the Kafka protocol APIs this broker advertises.
//!
//! Single source of truth for both the live `ApiVersions` (`api_key` 18)
//! response and the generated protocol-API reference page. The handler in
//! `handlers::api_versions` calls [`supported_apis`]; `crabka-docgen` reads
//! the same list without spawning the broker binary.
use crabka_protocol::owned::api_versions_response::ApiVersion;
macro_rules! v {
($mod:ident) => {
ApiVersion {
api_key: crabka_protocol::owned::$mod::API_KEY,
min_version: crabka_protocol::owned::$mod::MIN_VERSION,
max_version: crabka_protocol::owned::$mod::MAX_VERSION,
..Default::default()
}
};
}
/// The full advertised API set, mirrored from each API's generated
/// `MIN_VERSION`/`MAX_VERSION` constants. Update when adding a handler.
#[must_use]
pub fn supported_apis() -> Vec<ApiVersion> {
let mut apis = client_facing_apis();
apis.extend(admin_apis());
apis
}
fn client_facing_apis() -> Vec<ApiVersion> {
use crabka_protocol::owned;
vec![
v!(api_versions_request),
ApiVersion {
api_key: owned::produce_request::API_KEY,
min_version: crabka_protocol::kafka_3_6_2::owned::produce_request::MIN_VERSION,
max_version: owned::produce_request::MAX_VERSION,
..Default::default()
},
ApiVersion {
api_key: owned::fetch_request::API_KEY,
min_version: crabka_protocol::kafka_3_6_2::owned::fetch_request::MIN_VERSION,
max_version: owned::fetch_request::MAX_VERSION,
..Default::default()
},
v!(list_offsets_request),
v!(metadata_request),
v!(find_coordinator_request),
v!(join_group_request),
v!(sync_group_request),
v!(heartbeat_request),
v!(leave_group_request),
v!(sasl_handshake_request),
v!(sasl_authenticate_request),
v!(offset_commit_request),
v!(offset_fetch_request),
]
}
fn admin_apis() -> Vec<ApiVersion> {
vec![
v!(create_topics_request),
v!(delete_topics_request),
v!(delete_records_request),
v!(init_producer_id_request),
v!(offset_for_leader_epoch_request),
v!(add_partitions_to_txn_request),
v!(add_offsets_to_txn_request),
v!(end_txn_request),
v!(write_txn_markers_request),
v!(txn_offset_commit_request),
v!(describe_configs_request),
v!(alter_replica_log_dirs_request),
v!(describe_log_dirs_request),
v!(describe_groups_request),
v!(list_groups_request),
v!(alter_configs_request),
v!(create_partitions_request),
v!(delete_groups_request),
v!(incremental_alter_configs_request),
v!(alter_partition_request),
v!(assign_replicas_to_dirs_request),
v!(describe_cluster_request),
v!(broker_heartbeat_request),
// UnregisterBroker (KIP-185) — admin RPC to permanently drop a
// broker registration from the cluster's metadata image.
v!(unregister_broker_request),
v!(alter_user_scram_credentials_request),
// UpdateFeatures (api_key 57, KIP-584) — `kafka-features` admin tool
// finalizes broker-supported features through a Raft-persisted path.
v!(update_features_request),
v!(describe_acls_request),
v!(create_acls_request),
v!(delete_acls_request),
v!(elect_leaders_request),
v!(alter_partition_reassignments_request),
v!(list_partition_reassignments_request),
// OffsetDelete (api_key 47, KIP-496): completes
// `kafka-consumer-groups --delete-offsets` parity.
v!(offset_delete_request),
v!(describe_client_quotas_request),
v!(alter_client_quotas_request),
v!(describe_user_scram_credentials_request),
// KIP-48: delegation-token RPCs. Conditional on the
// broker having a master key configured is tempting, but Kafka
// always advertises these — clients discover support at this
// level then get DELEGATION_TOKEN_AUTH_DISABLED (61) on the
// actual call when the broker isn't configured for tokens.
v!(create_delegation_token_request),
v!(renew_delegation_token_request),
v!(expire_delegation_token_request),
v!(describe_delegation_token_request),
// DescribeProducers (KIP-664) — admin introspection of
// per-(topic, partition) idempotent / transactional producer state.
v!(describe_producers_request),
// DescribeTransactions + ListTransactions (KIP-664) — admin
// introspection of the TxnCoordinator's local state map.
v!(describe_transactions_request),
v!(list_transactions_request),
// DescribeTopicPartitions (KIP-966) — paginated topic listing
// used by JVM admin clients 3.7+ in place of fanned-out Metadata
// calls for `kafka-topics --describe`.
v!(describe_topic_partitions_request),
// KIP-714 client-metrics push handshake. Crabka exposes its own
// broker-side observability — these handlers return "no metrics
// subscribed" so clients skip the push entirely. Advertising is
// still important: clients query `ApiVersions` to learn the
// broker supports the API at all, and absence flips them into
// legacy-fallback paths we don't need.
v!(get_telemetry_subscriptions_request),
v!(push_telemetry_request),
// ListConfigResources (KIP-1142) — typed enumeration of every
// configurable resource (topics + brokers + client_metrics). v0
// is the legacy ListClientMetricsResources surface (KIP-714); v1
// adds the `resource_types` filter.
v!(list_config_resources_request),
// DescribeQuorum (KIP-595) — `kafka-metadata-quorum --describe`
// admin introspection of the controller-raft quorum.
v!(describe_quorum_request),
// FetchSnapshot (KIP-630) — controller-snapshot byte-range fetch
// used by replicas catching up via __cluster_metadata snapshots.
v!(fetch_snapshot_request),
// KIP-848 next-gen consumer group protocol.
v!(consumer_group_heartbeat_request),
v!(consumer_group_describe_request),
// KIP-932 share-group membership protocol.
v!(share_group_heartbeat_request),
v!(share_group_describe_request),
// KIP-1071 streams-group rebalance protocol.
v!(streams_group_heartbeat_request),
v!(streams_group_describe_request),
// KIP-932 ShareFetch / ShareAcknowledge data-plane RPCs.
v!(share_fetch_request),
v!(share_acknowledge_request),
// KIP-932 share-group admin offset RPCs.
v!(describe_share_group_offsets_request),
v!(alter_share_group_offsets_request),
v!(delete_share_group_offsets_request),
// KIP-932 share-coordinator (persister) RPCs.
v!(initialize_share_group_state_request),
v!(read_share_group_state_request),
v!(write_share_group_state_request),
v!(delete_share_group_state_request),
v!(read_share_group_state_summary_request),
// GetReplicaLogInfo (KIP-966) — inter-broker RPC the controller's
// unclean recovery manager uses to read each replica's LEO + leader
// epoch. Advertised so InterBrokerClient version negotiation succeeds.
v!(get_replica_log_info_request),
// KIP-853 dynamic-quorum reconfiguration — `kafka-metadata-quorum
// --add-controller / --remove-controller` and the controller
// auto-join path.
v!(add_raft_voter_request),
v!(remove_raft_voter_request),
v!(update_raft_voter_request),
]
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn share_group_apis_are_advertised() {
let apis = supported_apis();
let keys: Vec<i16> = apis.iter().map(|a| a.api_key).collect();
assert!(keys.contains(&76));
assert!(keys.contains(&77));
let hb = apis.iter().find(|a| a.api_key == 76).unwrap();
assert!(hb.min_version == 1 && hb.max_version == 1);
}
#[test]
fn streams_group_apis_are_advertised() {
let apis = supported_apis();
let keys: Vec<i16> = apis.iter().map(|a| a.api_key).collect();
// StreamsGroupHeartbeat(88), StreamsGroupDescribe(89).
assert!(keys.contains(&88));
assert!(keys.contains(&89));
let hb = apis.iter().find(|a| a.api_key == 88).unwrap();
assert!(hb.min_version == 0 && hb.max_version == 0);
}
#[test]
fn share_coordinator_persister_apis_are_advertised() {
let apis = supported_apis();
let keys: Vec<i16> = apis.iter().map(|a| a.api_key).collect();
// InitializeShareGroupState(83), ReadShareGroupState(84),
// WriteShareGroupState(85), DeleteShareGroupState(86),
// ReadShareGroupStateSummary(87).
for k in [83, 84, 85, 86, 87] {
assert!(
keys.contains(&k),
"persister api_key {k} must be advertised"
);
}
}
#[test]
fn supported_apis_is_nonempty_and_sane() {
let apis = supported_apis();
assert!(!apis.is_empty(), "advertised API table must not be empty");
// ApiVersions itself (api_key 18) is always advertised.
assert!(apis.iter().any(|a| a.api_key == 18));
for a in &apis {
assert!(a.min_version <= a.max_version, "api {} min>max", a.api_key);
}
}
}