1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
//! `Metadata` (`api_key=3`). Returns all registered brokers and the
//! requested topics' (or all topics, if `topics: None`) partitions.
//! Metadata is sourced from `controller.current_image()` — the
//! quorum-replicated snapshot — rather than a local in-memory struct.
use bytes::{Bytes, BytesMut};
use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::metadata_request::MetadataRequest;
use crabka_protocol::owned::metadata_response::{
MetadataResponse, MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic,
};
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationResult, authorize_topics};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
use crate::handlers::authorized_operations::authorized_operations_bits;
#[allow(clippy::too_many_lines)] // ACL preamble + asymmetric loop
#[allow(clippy::unused_async)] // Handler is wholly sync but we keep the
// `async fn` shape so it mirrors the other inline-intercept handlers
// (produce/fetch/etc) and lets future Metadata work (e.g. waiting on
// topic creation) add `.await`s without changing the signature.
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let controller = broker.controller.clone();
let inter_broker_name = broker.config.inter_broker_listener_name.clone();
let mut cur: &[u8] = req_bytes;
let req = MetadataRequest::decode(&mut cur, version)?;
let image = controller.current_image();
// ── ACL preamble ────────────────────────────────────────
// Metadata has asymmetric authorization semantics for `Describe`:
// • Named-topic request (`req.topics = Some([...])`): every
// requested topic appears in the response — Allow rows carry
// `error_code = 0`, Deny rows carry
// `error_code = TOPIC_AUTHORIZATION_FAILED (29)`.
// • Fetch-all (`req.topics = None`): only `Allow` topics appear
// in the response. Deny topics are silently omitted so the
// broker doesn't leak their existence to unauthorized clients.
//
// For a named request we resolve each requested `(name, topic_id)`
// pair up front via the KIP-516 strict resolver, carrying the outcome
// (`Ok(record)` or an error wire code) per request entry so the
// response loop below can echo errors without collapsing an unknown
// id to an empty name. The set of names we authorize is sourced from
// the *resolved* records (plus the requested name for the
// name-only-miss case), so a topic requested by id is still
// ACL-checked under its real name.
let named = req.topics.is_some();
let resolved: Vec<(
&crabka_protocol::owned::metadata_request::MetadataRequestTopic,
Result<&crabka_metadata::TopicRecord, i16>,
)> = match &req.topics {
Some(list) => list
.iter()
.map(|t| {
let name_str = t.name.as_deref().unwrap_or("");
(
t,
crate::topic_resolve::resolve(&image, name_str, t.topic_id),
)
})
.collect(),
None => Vec::new(),
};
// Names to batch-authorize: resolved records' real names, plus the
// requested name for a name-only miss (so the UNKNOWN_TOPIC_OR_PARTITION
// row still respects Deny → omit / auth-failed semantics). Topic-id
// errors carry no trustworthy name and are surfaced unconditionally.
let candidate_topics: Vec<String> = match &req.topics {
Some(_) => resolved
.iter()
.filter_map(|(t, r)| match r {
Ok(rec) => Some(rec.name.clone()),
Err(code) if *code == codes::UNKNOWN_TOPIC_OR_PARTITION => {
t.name.clone().filter(|n| !n.is_empty())
}
Err(_) => None,
})
.collect(),
None => image.topics().map(|t| t.name.clone()).collect(),
};
let acl_by_name = authorize_topics(
broker.config.authorizer.as_ref(),
&*image,
ctx.principal,
ctx.peer,
AclOperation::Describe,
candidate_topics.iter().map(String::as_str),
);
// Brokers: enumerate all registered nodes from the metadata image.
let brokers: Vec<MetadataResponseBroker> = image
.brokers()
.map(|b| project_broker(b, &inter_broker_name))
.collect();
let allowed = |name: &str| {
acl_by_name
.get(name)
.copied()
.unwrap_or(AuthorizationResult::Deny)
== AuthorizationResult::Allow
};
// Build a fully-populated success row for a known topic by name.
let success_row = |name: &str, rec: &crabka_metadata::TopicRecord| {
// Partitions are stored in a `HashMap`; sort by index so clients
// (and tests) see a deterministic ordering.
let mut sorted: Vec<_> = image.partitions_of(name).collect();
sorted.sort_by_key(|p| p.partition);
let partitions: Vec<MetadataResponsePartition> = sorted
.into_iter()
.map(|p| MetadataResponsePartition {
error_code: codes::NONE,
partition_index: p.partition,
leader_id: i32::try_from(p.leader).unwrap_or(i32::MAX),
leader_epoch: p.leader_epoch,
replica_nodes: p
.replicas
.iter()
.map(|&r| i32::try_from(r).unwrap_or(i32::MAX))
.collect(),
isr_nodes: p
.isr
.iter()
.map(|&r| i32::try_from(r).unwrap_or(i32::MAX))
.collect(),
..Default::default()
})
.collect();
// KIP-430: per-topic bitfield, only when the client opted in.
// Schema gates the field on version (v8+) so the value is
// harmlessly dropped on the wire below v8.
let topic_authorized_operations = if req.include_topic_authorized_operations {
authorized_operations_bits(
broker.config.authorizer.as_ref(),
&image,
ctx.principal,
ctx.peer,
ResourceType::Topic,
name,
)
} else {
i32::MIN
};
MetadataResponseTopic {
error_code: codes::NONE,
name: Some(rec.name.clone()),
topic_id: WireUuid(rec.topic_id.into_bytes()),
partitions,
is_internal: false,
topic_authorized_operations,
..Default::default()
}
};
let mut topics_out: Vec<MetadataResponseTopic> = Vec::with_capacity(candidate_topics.len());
if named {
// Named request: drive off the per-entry resolution outcome so
// KIP-516 topic-id errors echo the requested id rather than
// collapsing to an empty name.
for (t, outcome) in &resolved {
match outcome {
Ok(rec) => {
if !allowed(&rec.name) {
// Named-topic Deny: surface explicit auth-failed row.
topics_out.push(MetadataResponseTopic {
error_code: codes::TOPIC_AUTHORIZATION_FAILED,
name: Some(rec.name.clone()),
topic_id: WireUuid::ZERO,
..Default::default()
});
continue;
}
topics_out.push(success_row(&rec.name, rec));
}
Err(code) if *code == codes::UNKNOWN_TOPIC_OR_PARTITION => {
// Name-only miss. Preserve the existing behavior: a
// Deny on the requested name yields an auth-failed row
// (don't reveal whether the topic exists); otherwise
// surface UNKNOWN_TOPIC_OR_PARTITION.
let name_str = t.name.as_deref().unwrap_or("");
if !name_str.is_empty() && !allowed(name_str) {
topics_out.push(MetadataResponseTopic {
error_code: codes::TOPIC_AUTHORIZATION_FAILED,
name: t.name.clone(),
topic_id: WireUuid::ZERO,
..Default::default()
});
continue;
}
topics_out.push(MetadataResponseTopic {
error_code: codes::UNKNOWN_TOPIC_OR_PARTITION,
name: t.name.clone(),
topic_id: t.topic_id,
..Default::default()
});
}
Err(code) => {
// KIP-516: UNKNOWN_TOPIC_ID / INCONSISTENT_TOPIC_ID.
// Echo the requested name (may be `None`) and id.
topics_out.push(MetadataResponseTopic {
error_code: *code,
name: t.name.clone(),
topic_id: t.topic_id,
..Default::default()
});
}
}
}
} else {
// Fetch-all: only `Allow` topics appear; Deny topics are silently
// omitted so the broker doesn't leak their existence.
for name in &candidate_topics {
if !allowed(name) {
continue;
}
if let Some(rec) = image.topic(name) {
topics_out.push(success_row(name, rec));
}
}
}
// controller_id: the current Raft leader, or -1 when unknown.
let controller_id: i32 = controller
.watch_leader()
.borrow()
.and_then(|id| i32::try_from(id).ok())
.unwrap_or(-1);
// KIP-430: the cluster-level field only exists on the wire for v8-10;
// the codegen drops it on other versions. Compute when the opt-in
// flag is set so the response carries the value on the in-range
// versions, leaving the default `i32::MIN` otherwise.
let cluster_authorized_operations = if req.include_cluster_authorized_operations {
authorized_operations_bits(
broker.config.authorizer.as_ref(),
&image,
ctx.principal,
ctx.peer,
ResourceType::Cluster,
"kafka-cluster",
)
} else {
i32::MIN
};
let resp = MetadataResponse {
throttle_time_ms: 0,
brokers,
cluster_id: Some(image.cluster_id().to_string()),
controller_id,
topics: topics_out,
cluster_authorized_operations,
..Default::default()
};
tracing::info!(
version,
req_topics = ?req.topics.as_ref().map(|ts| ts.iter().filter_map(|t| t.name.clone()).collect::<Vec<_>>()),
resp_brokers = ?resp.brokers.iter().map(|b| format!("{}@{}:{}", b.node_id, b.host, b.port)).collect::<Vec<_>>(),
resp_controller_id = resp.controller_id,
resp_cluster_id = ?resp.cluster_id,
resp_topics = ?resp.topics.iter().map(|t| format!("{}={:?}/p{}", t.name.as_deref().unwrap_or("?"), t.error_code, t.partitions.len())).collect::<Vec<_>>(),
"metadata response"
);
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
/// Project a stored [`crabka_metadata::BrokerRegistrationRecord`] into a
/// single wire-format [`MetadataResponseBroker`].
///
/// The Kafka `MetadataResponse` wire format (v0..v12 at time of writing)
/// carries exactly one `host:port`/`rack` tuple per broker — there is no
/// `endpoints[]` array on `MetadataResponseBroker`. To honor per-listener
/// registration we pick the broker's inter-broker endpoint
/// (matched by name) and fall back to the first recorded endpoint, then
/// to the legacy top-level `host`/`port` if `endpoints` is empty.
/// Clamps `node_id` to `i32::MAX` if the openraft `u64` overflows — broker
/// ids are tiny in practice so this is purely defensive.
fn project_broker(
b: &crabka_metadata::BrokerRegistrationRecord,
inter_broker_name: &str,
) -> MetadataResponseBroker {
let primary = b
.endpoints
.iter()
.find(|e| e.name == inter_broker_name)
.or_else(|| b.endpoints.first());
let (host, port) = match primary {
Some(e) => (e.host.clone(), i32::from(e.port)),
None => (b.host.clone(), i32::from(b.port)),
};
MetadataResponseBroker {
node_id: i32::try_from(b.node_id).unwrap_or(i32::MAX),
host,
port,
rack: b.rack.clone(),
..Default::default()
}
}
fn parse_host_port(addr: &str) -> (String, i32) {
if let Some((h, p)) = addr.rsplit_once(':')
&& let Ok(port) = p.parse::<u16>()
{
return (h.to_string(), i32::from(port));
}
tracing::warn!(
addr,
"advertised_listener not host:port; falling back to localhost:9092"
);
("localhost".into(), 9092)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn parse_host_port_ok() {
assert!(parse_host_port("foo:1234") == ("foo".into(), 1234));
}
#[test]
fn parse_host_port_falls_back() {
assert!(parse_host_port("not-an-addr") == ("localhost".into(), 9092));
}
}