1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
//! `FindCoordinator` (`api_key=10`). Supports:
//! - `key_type=0` (GROUP): returns this broker as coordinator for every
//! group key (single-broker MVP).
//! - `key_type=1` (TRANSACTION): ensures `__transaction_state` exists,
//! hashes the transaction-id to a partition, resolves the leader, and
//! returns that broker's address.
//!
//! Response fields are populated in both the legacy single-coordinator form
//! (v0-v3) and the per-key `coordinators` array (v4+).
use std::sync::Arc;
use bytes::{Bytes, BytesMut};
use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::find_coordinator_request::FindCoordinatorRequest;
use crabka_protocol::owned::find_coordinator_response::{Coordinator, FindCoordinatorResponse};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
const KEY_TYPE_GROUP: i8 = 0;
const KEY_TYPE_TRANSACTION: i8 = 1;
const KEY_TYPE_SHARE: i8 = 2;
/// A per-key authorization-failed `Coordinator` entry. Kafka stamps the
/// denied key's row with the authorization-failed code and leaves
/// authorized keys to resolve normally.
fn denied_coordinator(key: String, error_code: i16) -> Coordinator {
Coordinator {
key,
node_id: -1,
host: String::new(),
port: -1,
error_code,
error_message: Some("authorization failed".into()),
..Default::default()
}
}
/// Authorize a single `FindCoordinator` key against its key-type ACL:
/// GROUP → `Describe` on `Group(key)`; TRANSACTION → `Describe` on
/// `TransactionalId(key)`. Returns the authorization-failed code to stamp
/// on Deny, or `None` when allowed (or for key-types we don't gate, e.g.
/// SHARE / unknown).
fn key_authz_failure(
authorizer: &dyn crate::authorizer::Authorizer,
image: &crabka_metadata::MetadataImage,
principal: &crabka_security::Principal,
host: &std::net::SocketAddr,
key_type: i8,
key: &str,
) -> Option<i16> {
let (resource_type, failure_code) = match key_type {
KEY_TYPE_GROUP => (ResourceType::Group, codes::GROUP_AUTHORIZATION_FAILED),
KEY_TYPE_TRANSACTION => (
ResourceType::TransactionalId,
codes::TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
),
_ => return None,
};
let allow = authorizer.authorize(
image,
&AuthorizationRequest {
principal,
host,
resource_type,
resource_name: key,
operation: AclOperation::Describe,
},
);
(allow == AuthorizationResult::Deny).then_some(failure_code)
}
#[allow(clippy::too_many_lines)]
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let broker_id = broker.config.broker_id;
let node_id = broker.config.node_id;
let advertised = broker.config.advertised_listener.clone();
let controller = Arc::clone(&broker.controller);
{
let mut cur: &[u8] = req_bytes;
let req = FindCoordinatorRequest::decode(&mut cur, version)?;
// For v4+, requests carry `coordinator_keys`. For v0-v3 the single
// `key` field is what the client cares about — populate the legacy
// top-level fields and also emit a single `Coordinator` entry for
// that key so the encode path is uniform.
let keys: Vec<String> = if req.coordinator_keys.is_empty() {
vec![req.key.clone()]
} else {
req.coordinator_keys.clone()
};
// ── ACL preamble ────────────────────────────────────────────
// Per-key `Describe`: GROUP → `Group(key)`; TRANSACTION →
// `TransactionalId(key)`. Denied keys are emitted with the
// authorization-failed code (per-entry for the v4+ multi-key
// array; the v0-v3 top-level fields are derived from the first
// entry below). Authorized keys resolve normally — so we split
// `keys` into denied entries + the still-to-resolve list.
let acl_image = controller.current_image();
let mut denied_entries: Vec<Coordinator> = Vec::new();
let mut allowed_keys: Vec<String> = Vec::with_capacity(keys.len());
for k in keys {
if let Some(code) = key_authz_failure(
broker.config.authorizer.as_ref(),
&acl_image,
ctx.principal,
ctx.peer,
req.key_type,
&k,
) {
denied_entries.push(denied_coordinator(k, code));
} else {
allowed_keys.push(k);
}
}
let keys = allowed_keys;
let mut coordinators: Vec<Coordinator> = match req.key_type {
KEY_TYPE_GROUP => {
let (host, port) = parse_host_port(&advertised);
let port_i32 = i32::from(port);
keys.into_iter()
.map(|k| Coordinator {
key: k,
node_id: broker_id,
host: host.clone(),
port: port_i32,
error_code: codes::NONE,
error_message: None,
..Default::default()
})
.collect()
}
KEY_TYPE_TRANSACTION => {
// Ensure __transaction_state topic exists before we try to
// look up partitions in it.
if let Err(e) = crate::txn::bootstrap::ensure_topic(&controller).await {
tracing::warn!(
error = %e,
"txn bootstrap failed; replying COORDINATOR_NOT_AVAILABLE"
);
return encode_error_response(
broker_id,
&advertised,
version,
codes::COORDINATOR_NOT_AVAILABLE,
Some("txn topic bootstrap failed"),
);
}
let mut result = Vec::with_capacity(keys.len());
for k in keys {
let p = crate::txn::partitioner::partition_for_tid(
&k,
crate::txn::bootstrap::NUM_PARTITIONS,
);
let image = controller.current_image();
let Some(pr) = image.partition(crate::txn::bootstrap::TOPIC, p) else {
result.push(Coordinator {
key: k,
node_id: -1,
host: String::new(),
port: -1,
error_code: codes::COORDINATOR_NOT_AVAILABLE,
error_message: Some("partition not found".into()),
..Default::default()
});
continue;
};
let leader = pr.leader;
let Some(broker_info) = image.broker(leader) else {
result.push(Coordinator {
key: k,
node_id: -1,
host: String::new(),
port: -1,
error_code: codes::COORDINATOR_NOT_AVAILABLE,
error_message: Some("leader broker not registered".into()),
..Default::default()
});
continue;
};
let node_id_i32 = i32::try_from(leader).unwrap_or(-1);
// Prefer our own `advertised_listener` when the leader is
// this broker: the metadata record may carry the pre-bind
// port (0) in test setups where the OS assigns the port.
let (host, port_i32) = if leader == node_id {
let (h, p) = parse_host_port(&advertised);
(h, i32::from(p))
} else {
(broker_info.host.clone(), i32::from(broker_info.port))
};
result.push(Coordinator {
key: k,
node_id: node_id_i32,
host,
port: port_i32,
error_code: codes::NONE,
error_message: None,
..Default::default()
});
}
result
}
KEY_TYPE_SHARE => {
// Ensure __share_group_state exists before resolving its
// partitions' leaders.
if let Err(e) = crate::share_coordinator::bootstrap::ensure_topic(&controller).await
{
tracing::warn!(
error = %e,
"share-state bootstrap failed; replying COORDINATOR_NOT_AVAILABLE"
);
return encode_error_response(
broker_id,
&advertised,
version,
codes::COORDINATOR_NOT_AVAILABLE,
Some("share-state topic bootstrap failed"),
);
}
let mut result = Vec::with_capacity(keys.len());
for k in keys {
// Kafka's share-coordinator key is `group:topicId:partition`.
// Group ids may contain ':', so split from the right: the
// last segment is the partition, the next is the topic id,
// and everything before that is the group.
let Some((group, topic_uuid, partition)) = parse_share_key(&k) else {
result.push(Coordinator {
key: k,
node_id: -1,
host: String::new(),
port: -1,
error_code: codes::COORDINATOR_NOT_AVAILABLE,
error_message: Some("malformed share-state key".into()),
..Default::default()
});
continue;
};
let p = crate::share_coordinator::partitioner::partition_for_share_key(
group,
&topic_uuid,
partition,
crate::share_coordinator::bootstrap::NUM_PARTITIONS,
);
let image = controller.current_image();
let Some(pr) = image.partition(crate::share_coordinator::bootstrap::TOPIC, p)
else {
result.push(Coordinator {
key: k,
node_id: -1,
host: String::new(),
port: -1,
error_code: codes::COORDINATOR_NOT_AVAILABLE,
error_message: Some("partition not found".into()),
..Default::default()
});
continue;
};
let leader = pr.leader;
let Some(broker_info) = image.broker(leader) else {
result.push(Coordinator {
key: k,
node_id: -1,
host: String::new(),
port: -1,
error_code: codes::COORDINATOR_NOT_AVAILABLE,
error_message: Some("leader broker not registered".into()),
..Default::default()
});
continue;
};
let node_id_i32 = i32::try_from(leader).unwrap_or(-1);
// Prefer our own `advertised_listener` when the leader is
// this broker (the metadata record may carry a pre-bind
// port in test setups).
let (host, port_i32) = if leader == node_id {
let (h, p) = parse_host_port(&advertised);
(h, i32::from(p))
} else {
(broker_info.host.clone(), i32::from(broker_info.port))
};
result.push(Coordinator {
key: k,
node_id: node_id_i32,
host,
port: port_i32,
error_code: codes::NONE,
error_message: None,
..Default::default()
});
}
result
}
unknown => {
tracing::warn!(key_type = unknown, "unknown FindCoordinator key_type");
let (host, port) = parse_host_port(&advertised);
let port_i32 = i32::from(port);
keys.into_iter()
.map(|k| Coordinator {
key: k,
node_id: broker_id,
host: host.clone(),
port: port_i32,
error_code: codes::NONE,
error_message: None,
..Default::default()
})
.collect()
}
};
// Re-attach the authorization-denied entries. They lead the list so
// a v0-v3 single-key request whose only key was denied surfaces the
// authorization-failed code in the derived top-level fields below.
if !denied_entries.is_empty() {
denied_entries.extend(coordinators);
coordinators = denied_entries;
}
// Derive the legacy top-level fields from the first coordinator in
// the list (matches Apache Kafka v0-v3 behaviour).
let (top_node_id, top_host, top_port, top_error, top_error_message) =
if let Some(first) = coordinators.first() {
(
first.node_id,
first.host.clone(),
first.port,
first.error_code,
first.error_message.clone(),
)
} else {
let (host, port) = parse_host_port(&advertised);
(broker_id, host, i32::from(port), codes::NONE, None)
};
let resp = FindCoordinatorResponse {
throttle_time_ms: 0,
error_code: top_error,
error_message: top_error_message,
node_id: top_node_id,
host: top_host,
port: top_port,
coordinators,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
}
/// Build an error response (all coordinators carry the given error code).
/// Used when a top-level failure (e.g. bootstrap) prevents per-key lookup.
fn encode_error_response(
broker_id: i32,
advertised: &str,
version: i16,
error_code: i16,
error_message: Option<&str>,
) -> Result<Bytes, BrokerError> {
let (host, port) = parse_host_port(advertised);
let resp = FindCoordinatorResponse {
throttle_time_ms: 0,
error_code,
error_message: error_message.map(str::to_owned),
node_id: broker_id,
host,
port: i32::from(port),
coordinators: vec![],
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
/// Parse a share-coordinator key `"{group}:{topicId}:{partition}"` into its
/// `(group, topic_id, partition)` parts. Group ids may themselves contain `:`,
/// so the partition and topic-id are peeled from the right. Returns `None` on a
/// malformed partition int, topic-id UUID, or missing segments.
fn parse_share_key(key: &str) -> Option<(&str, uuid::Uuid, i32)> {
let (rest, partition_str) = key.rsplit_once(':')?;
let (group, topic_str) = rest.rsplit_once(':')?;
let partition: i32 = partition_str.parse().ok()?;
let topic_id = uuid::Uuid::parse_str(topic_str).ok()?;
Some((group, topic_id, partition))
}
fn parse_host_port(addr: &str) -> (String, u16) {
if let Some((h, p)) = addr.rsplit_once(':')
&& let Ok(port) = p.parse::<u16>()
{
return (h.to_string(), port);
}
tracing::warn!(
addr,
"advertised_listener not host:port; falling back to localhost:9092"
);
("localhost".into(), 9092)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
fn deny_authorizer() -> crate::authorizer::SimpleAclAuthorizer {
crate::authorizer::SimpleAclAuthorizer::new(std::collections::HashSet::new())
}
fn anon() -> crabka_security::Principal {
crabka_security::Principal {
name: "ANONYMOUS".into(),
auth_method: crabka_security::AuthMethod::Anonymous,
groups: vec![],
}
}
#[test]
fn group_key_denied_maps_to_group_authorization_failed() {
let authz = deny_authorizer();
let image = crabka_metadata::MetadataImage::new(uuid::Uuid::nil());
let peer = std::net::SocketAddr::from(([127, 0, 0, 1], 9092));
let code = key_authz_failure(&authz, &image, &anon(), &peer, KEY_TYPE_GROUP, "g");
assert!(code == Some(codes::GROUP_AUTHORIZATION_FAILED));
}
#[test]
fn txn_key_denied_maps_to_transactional_id_authorization_failed() {
let authz = deny_authorizer();
let image = crabka_metadata::MetadataImage::new(uuid::Uuid::nil());
let peer = std::net::SocketAddr::from(([127, 0, 0, 1], 9092));
let code = key_authz_failure(&authz, &image, &anon(), &peer, KEY_TYPE_TRANSACTION, "t");
assert!(code == Some(codes::TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
}
#[test]
fn denied_entry_carries_the_failure_code() {
let c = denied_coordinator("g".into(), codes::GROUP_AUTHORIZATION_FAILED);
assert!(c.error_code == codes::GROUP_AUTHORIZATION_FAILED);
assert!(c.node_id == -1);
}
}