1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
//! `DescribeTopicPartitions` (`api_key=75`, KIP-966). Paginated topic-+-partition
//! listing the JVM admin client uses for `kafka-topics --describe`
//! against Kafka 3.7+ brokers. Replaces the Metadata-fan-out the older
//! admin client used for the same job.
//!
//! ## Request shape
//!
//! - `topics`: empty → return all topics (alphabetical). Non-empty →
//! return exactly those, in request order.
//! - `response_partition_limit`: cap on partition rows in the response.
//! Default 2000.
//! - `cursor`: optional resume point `(topic_name, partition_index)`.
//! When set, the response starts at that topic's partition, skipping
//! earlier topics entirely.
//!
//! ## ACL semantics
//!
//! Per-topic `Describe` on `Topic(name)`. For *named* requests, Deny →
//! per-topic row with `error_code = TOPIC_AUTHORIZATION_FAILED (29)`.
//! For *fetch-all* requests, Deny → silently omit (matches
//! `Metadata`-fetch-all so the broker doesn't leak topic names to
//! unauthorized clients).
//!
//! ## KIP-430 integration
//!
//! Every Allow row carries `topic_authorized_operations` — the v0 schema
//! always encodes this field (no opt-in flag, unlike Metadata).
use bytes::{Bytes, BytesMut};
use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::describe_topic_partitions_request::DescribeTopicPartitionsRequest;
use crabka_protocol::owned::describe_topic_partitions_response::{
Cursor as ResponseCursor, DescribeTopicPartitionsResponse,
DescribeTopicPartitionsResponsePartition, DescribeTopicPartitionsResponseTopic,
};
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationResult, authorize_topics};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
use crate::handlers::authorized_operations::authorized_operations_bits;
/// Crabka's three internal topics. JVM clients display these with the
/// `is_internal` flag set so `kafka-topics --list` and friends don't
/// surface them by default.
fn is_internal_topic(name: &str) -> bool {
matches!(
name,
"__consumer_offsets" | "__transaction_state" | "__remote_log_metadata"
)
}
// Read-only handler — never suspends. The `async fn` shape matches the
// other inline-intercept handlers (DescribeCluster, DescribeGroups) so
// dispatch.rs can call it through one `await`.
#[allow(clippy::unused_async)]
#[allow(clippy::too_many_lines)] // ACL preamble + pagination + cursor logic
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let mut cur: &[u8] = req_bytes;
let req = DescribeTopicPartitionsRequest::decode(&mut cur, version)?;
let image = broker.controller.current_image();
// ── 1. Resolve the topic-name iteration order ──────────────────────
// Named request: return every requested name, in request order, even
// if some don't exist (those rows carry UNKNOWN_TOPIC_OR_PARTITION).
// Fetch-all (empty `topics`): walk every topic from the image,
// alphabetical for deterministic pagination.
let named = !req.topics.is_empty();
let mut ordered_names: Vec<String> = if named {
req.topics.iter().map(|t| t.name.clone()).collect()
} else {
let mut all: Vec<String> = image.topics().map(|t| t.name.clone()).collect();
all.sort();
all
};
// ── 2. Apply request Cursor: skip topics before `cursor.topic_name`. ─
let cursor_partition: i32 = match &req.cursor {
Some(c) => {
// Drop every name strictly before the cursor's topic. If the
// cursor's topic isn't in the list, we keep walking from
// wherever the binary partition lands — matches the JVM's
// "lower bound" behavior on out-of-set cursors.
let drop_until = c.topic_name.as_str();
ordered_names.retain(|n| n.as_str() >= drop_until);
c.partition_index
}
None => 0,
};
// ── 3. Batch-authorize Describe on all candidate topics. ───────────
let acl_by_name = authorize_topics(
broker.config.authorizer.as_ref(),
&*image,
ctx.principal,
ctx.peer,
AclOperation::Describe,
ordered_names.iter().map(String::as_str),
);
// ── 4. Walk topics, building rows under the partition-limit budget. ─
let partition_limit = req.response_partition_limit.max(0);
let mut emitted_partitions: i32 = 0;
let mut topics_out: Vec<DescribeTopicPartitionsResponseTopic> =
Vec::with_capacity(ordered_names.len());
let mut next_cursor: Option<ResponseCursor> = None;
// Apply the request cursor's partition_index only to the first topic
// we process (the resume topic); every subsequent topic starts at
// partition 0.
let mut first_topic_partition_offset = cursor_partition;
for name in &ordered_names {
let allowed = acl_by_name
.get(name.as_str())
.copied()
.unwrap_or(AuthorizationResult::Deny)
== AuthorizationResult::Allow;
if !allowed {
if named {
topics_out.push(DescribeTopicPartitionsResponseTopic {
error_code: codes::TOPIC_AUTHORIZATION_FAILED,
name: Some(name.clone()),
topic_id: WireUuid::ZERO,
is_internal: false,
partitions: Vec::new(),
topic_authorized_operations: i32::MIN,
..Default::default()
});
}
// Fetch-all Deny: silently omit so the broker doesn't leak
// topic existence to unauthorized clients.
first_topic_partition_offset = 0;
continue;
}
let topic = image.topic(name);
let Some(t) = topic else {
topics_out.push(DescribeTopicPartitionsResponseTopic {
error_code: codes::UNKNOWN_TOPIC_OR_PARTITION,
name: Some(name.clone()),
topic_id: WireUuid::ZERO,
is_internal: false,
partitions: Vec::new(),
topic_authorized_operations: i32::MIN,
..Default::default()
});
first_topic_partition_offset = 0;
continue;
};
let mut sorted_parts: Vec<_> = image.partitions_of(name).collect();
sorted_parts.sort_by_key(|p| p.partition);
// Skip partitions before the cursor's `partition_index` on the
// resume-topic only. `cursor_partition = 0` is a no-op skip.
if first_topic_partition_offset > 0 {
sorted_parts.retain(|p| p.partition >= first_topic_partition_offset);
}
// Reset the cursor offset; future topics in this response start
// from partition 0.
first_topic_partition_offset = 0;
let mut row_partitions: Vec<DescribeTopicPartitionsResponsePartition> =
Vec::with_capacity(sorted_parts.len());
let mut truncated = false;
let mut next_partition_index: i32 = 0;
for p in &sorted_parts {
if emitted_partitions >= partition_limit {
truncated = true;
next_partition_index = p.partition;
break;
}
row_partitions.push(DescribeTopicPartitionsResponsePartition {
error_code: codes::NONE,
partition_index: p.partition,
leader_id: i32::try_from(p.leader).unwrap_or(i32::MAX),
leader_epoch: p.leader_epoch,
replica_nodes: p
.replicas
.iter()
.map(|&r| i32::try_from(r).unwrap_or(i32::MAX))
.collect(),
isr_nodes: p
.isr
.iter()
.map(|&r| i32::try_from(r).unwrap_or(i32::MAX))
.collect(),
// ELR / last-known-ELR: the schema marks both
// `nullableVersions: 0+` and `default: null`, but the
// Kafka 3.8 admin client's
// `DescribeTopicPartitionsResponse.partitionToTopicPartitionInfo`
// calls `.stream()` on the decoded list without a null
// guard — i.e. a null value NPEs the client. Real
// Kafka brokers emit empty lists rather than null;
// mirror that to stay compatible. A null value produces
// "Cannot invoke java.util.List.stream() because
// the return value of …eligibleLeaderReplicas() is null".
eligible_leader_replicas: Some(Vec::new()),
last_known_elr: Some(Vec::new()),
offline_replicas: Vec::new(),
..Default::default()
});
emitted_partitions += 1;
}
// KIP-430: the v0 schema always encodes the bitfield, no opt-in
// flag exists for this API. Always populate via the shared helper.
let topic_authorized_operations = authorized_operations_bits(
broker.config.authorizer.as_ref(),
&image,
ctx.principal,
ctx.peer,
ResourceType::Topic,
name.as_str(),
);
topics_out.push(DescribeTopicPartitionsResponseTopic {
error_code: codes::NONE,
name: Some(name.clone()),
topic_id: WireUuid(t.topic_id.into_bytes()),
is_internal: is_internal_topic(name),
partitions: row_partitions,
topic_authorized_operations,
..Default::default()
});
if truncated {
next_cursor = Some(ResponseCursor {
topic_name: name.clone(),
partition_index: next_partition_index,
..Default::default()
});
break;
}
}
let resp = DescribeTopicPartitionsResponse {
throttle_time_ms: 0,
topics: topics_out,
next_cursor,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn is_internal_topic_matches_known_internal_names() {
assert!(is_internal_topic("__consumer_offsets"));
assert!(is_internal_topic("__transaction_state"));
assert!(is_internal_topic("__remote_log_metadata"));
assert!(!is_internal_topic("foo"));
assert!(!is_internal_topic("_foo"));
assert!(!is_internal_topic("__user_topic"));
// No accidental prefix matching.
assert!(!is_internal_topic("__consumer_offsets-2"));
}
}