1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//! `DescribeProducers` (`api_key=61`, KIP-664). Admin RPC that surfaces
//! the broker's in-memory producer-state snapshot for a set of
//! `(topic, partition)` pairs. Used by JVM `Admin.describeProducers`
//! and `kafka-transactions --describe-producers` to debug stuck
//! idempotent / transactional producers.
//!
//! ## ACL
//!
//! Per-topic `Read` on `Topic(name)` (mirrors `Fetch` per KIP-664).
//! Deny → every partition of that topic carries
//! `TOPIC_AUTHORIZATION_FAILED (29)`. Unknown topic / out-of-range
//! partition → per-partition `UNKNOWN_TOPIC_OR_PARTITION (3)`.
//!
//! ## Field semantics
//!
//! `producer_id`, `producer_epoch`, `last_sequence`, `last_timestamp`
//! come straight from `crate::producer_state`. The transactional
//! fields `coordinator_epoch` and `current_txn_start_offset` aren't
//! wired up — the broker doesn't track them per `(topic, partition)`
//! today, so they default to `-1` (the schema's "unknown / no current
//! txn" sentinel). When transactional in-flight tracking lands, only
//! the row builder needs to look those up.
use bytes::{Bytes, BytesMut};
use crabka_metadata::AclOperation;
use crabka_protocol::owned::describe_producers_request::DescribeProducersRequest;
use crabka_protocol::owned::describe_producers_response::{
DescribeProducersResponse, PartitionResponse, ProducerState, TopicResponse,
};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationResult, authorize_topics};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
#[allow(clippy::unused_async)] // signature symmetry with other inline-intercept handlers
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let mut cur: &[u8] = req_bytes;
let req = DescribeProducersRequest::decode(&mut cur, version)?;
let image = broker.controller.current_image();
// Batch-authorize Read on every requested topic in one pass.
let topic_decisions = authorize_topics(
broker.config.authorizer.as_ref(),
&*image,
ctx.principal,
ctx.peer,
AclOperation::Read,
req.topics.iter().map(|t| t.name.as_str()),
);
let mut topics_out: Vec<TopicResponse> = Vec::with_capacity(req.topics.len());
for topic_req in &req.topics {
let allow = topic_decisions
.get(topic_req.name.as_str())
.copied()
.unwrap_or(AuthorizationResult::Deny);
let mut parts_out: Vec<PartitionResponse> =
Vec::with_capacity(topic_req.partition_indexes.len());
if allow == AuthorizationResult::Deny {
// KIP-664: per-partition TOPIC_AUTHORIZATION_FAILED on every
// requested partition of a denied topic.
for &idx in &topic_req.partition_indexes {
parts_out.push(PartitionResponse {
partition_index: idx,
error_code: codes::TOPIC_AUTHORIZATION_FAILED,
error_message: None,
active_producers: Vec::new(),
..Default::default()
});
}
topics_out.push(TopicResponse {
name: topic_req.name.clone(),
partitions: parts_out,
..Default::default()
});
continue;
}
// Topic-existence + per-partition-bounds check. The image
// exposes `partition(name, idx) -> Option<&PartitionRecord>`
// which combines both checks in one lookup.
for &idx in &topic_req.partition_indexes {
if image.partition(topic_req.name.as_str(), idx).is_none() {
parts_out.push(PartitionResponse {
partition_index: idx,
error_code: codes::UNKNOWN_TOPIC_OR_PARTITION,
error_message: None,
active_producers: Vec::new(),
..Default::default()
});
continue;
}
let snapshot = broker
.producer_state
.snapshot(topic_req.name.as_str(), idx)
.await;
let active_producers: Vec<ProducerState> = snapshot
.into_iter()
.map(|(producer_id, entry)| ProducerState {
producer_id,
producer_epoch: i32::from(entry.epoch),
last_sequence: entry.last_sequence,
last_timestamp: entry.last_timestamp,
// Crabka doesn't track per-(topic, partition) txn
// bookkeeping on the producer-state map; these stay
// at -1 (the schema "unknown / no current txn"
// sentinel) until that work lands.
coordinator_epoch: -1,
current_txn_start_offset: -1,
..Default::default()
})
.collect();
parts_out.push(PartitionResponse {
partition_index: idx,
error_code: codes::NONE,
error_message: None,
active_producers,
..Default::default()
});
}
topics_out.push(TopicResponse {
name: topic_req.name.clone(),
partitions: parts_out,
..Default::default()
});
}
let resp = DescribeProducersResponse {
throttle_time_ms: 0,
topics: topics_out,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}