1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
//! `ListGroups` (`api_key=16`). Returns every known group across all four
//! registries: classic groups from `GroupCoordinator::list_groups` (type
//! `"classic"`), next-gen KIP-848 consumer groups from the unified
//! coordinator's consumer registry (type `"consumer"`), KIP-932 share
//! groups from its share registry (type `"share"`), and KIP-1071 streams
//! groups from its streams registry (type `"streams"`). The optional
//! `states_filter` (v4+) and `types_filter` (v5+, e.g. `["share"]` from
//! `kafka-share-groups.sh --list`, `["streams"]` from `kafka-streams-groups.sh
//! --list`, or `["consumer"]` from `kafka-consumer-groups.sh --list`) are both
//! honored. A `group_id` is emitted at most once: the registries are disjoint
//! by `GroupType`, but we defensively dedup.
use std::collections::HashSet;
use bytes::{Bytes, BytesMut};
use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::list_groups_request::ListGroupsRequest;
use crabka_protocol::owned::list_groups_response::{ListGroupsResponse, ListedGroup};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
use crate::coordinator::unified::classic_state::GroupState;
use crate::error::BrokerError;
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let mut cur: &[u8] = req_bytes;
let req = ListGroupsRequest::decode(&mut cur, version)?;
let snapshots = broker.group_coordinator.list_groups().await;
let image = broker.controller.current_image();
// Both filters are plain `Vec<String>` (not Option); they are empty when
// no filter was requested (older wire versions always decode them empty).
let states_active = !req.states_filter.is_empty();
let types_active = !req.types_filter.is_empty();
// Per-group `Describe` ACL. On Deny the group is silently omitted from the
// response (no per-group error_code). With the default `AllowAllAuthorizer`
// every group passes; with `SimpleAclAuthorizer` the super-user bypass plus
// matching Describe ACLs let groups through; with `OpaAuthorizer` the policy
// decides per group.
let authorized = |group_id: &str| {
let acl_req = AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: ResourceType::Group,
resource_name: group_id,
operation: AclOperation::Describe,
};
broker.config.authorizer.authorize(&*image, &acl_req) != AuthorizationResult::Deny
};
let mut groups: Vec<ListedGroup> = Vec::with_capacity(snapshots.len());
// Ids already emitted, so the same group_id never appears twice across the
// three (disjoint-by-design) registries.
let mut emitted: HashSet<String> = HashSet::new();
// ── Classic groups (group_type "classic") ───────────────────────────
for s in snapshots {
if !authorized(s.group_id.as_str()) {
continue;
}
let state_str = state_to_str(s.state);
if states_active && !req.states_filter.iter().any(|v| v == state_str) {
continue;
}
if types_active
&& !req
.types_filter
.iter()
.any(|t| t.eq_ignore_ascii_case("classic"))
{
continue;
}
emitted.insert(s.group_id.clone());
groups.push(ListedGroup {
group_id: s.group_id,
protocol_type: s.protocol_type.unwrap_or_else(|| "consumer".into()),
group_state: state_str.into(),
group_type: "classic".into(),
..Default::default()
});
}
// ── KIP-848 next-gen consumer groups (group_type "consumer") ────────
// These live in the unified coordinator's consumer registry, separate
// from the classic group registry.
let ng = &broker.group_coordinator;
append_next_gen(
&mut groups,
&mut emitted,
"consumer",
ng.consumer_group_ids(),
&req,
states_active,
types_active,
&authorized,
);
// ── KIP-932 share groups (group_type "share") ───────────────────────
// Gated on `share_group.enable`; share groups live in the same
// coordinator's separate share registry.
if broker.config.share_group.enable {
append_next_gen(
&mut groups,
&mut emitted,
"share",
ng.share_group_ids(),
&req,
states_active,
types_active,
&authorized,
);
}
// ── KIP-1071 streams groups (group_type "streams") ──────────────────
// Surfaced so the JVM `kafka-streams-groups.sh --list` / `--describe`
// (AdminClient `listGroups(typesFilter=[Streams])`) can find them; the
// describe hop is gated behind a non-empty list on the JVM side.
if broker.config.streams_group.enable {
append_next_gen(
&mut groups,
&mut emitted,
"streams",
ng.streams_group_ids(),
&req,
states_active,
types_active,
&authorized,
);
}
let resp = ListGroupsResponse {
error_code: codes::NONE,
groups,
throttle_time_ms: 0,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
/// Append one `ListedGroup` per next-gen group id (`group_type` is either
/// `"consumer"` or `"share"`), honoring `states_filter` / `types_filter` and the
/// per-group Describe ACL, and deduping against already-emitted ids. Stays sync:
/// the group's runtime state isn't cheaply available without an actor
/// round-trip, so we report the constant "Stable" — `--list` filters on
/// `types_filter`, not on the state here.
#[allow(clippy::too_many_arguments)]
fn append_next_gen(
groups: &mut Vec<ListedGroup>,
emitted: &mut HashSet<String>,
group_type: &str,
ids: Vec<String>,
req: &ListGroupsRequest,
states_active: bool,
types_active: bool,
authorized: &impl Fn(&str) -> bool,
) {
const STATE: &str = "Stable";
if states_active && !req.states_filter.iter().any(|v| v == STATE) {
return;
}
if types_active
&& !req
.types_filter
.iter()
.any(|t| t.eq_ignore_ascii_case(group_type))
{
return;
}
// Share and streams groups carry an empty protocol_type (Kafka emits no
// consumer protocol for them); next-gen consumer groups use "consumer".
let protocol_type = if group_type == "share" || group_type == "streams" {
String::new()
} else {
"consumer".into()
};
for gid in ids {
if emitted.contains(&gid) || !authorized(&gid) {
continue;
}
emitted.insert(gid.clone());
groups.push(ListedGroup {
group_id: gid,
protocol_type: protocol_type.clone(),
group_state: STATE.into(),
group_type: group_type.into(),
..Default::default()
});
}
}
fn state_to_str(s: GroupState) -> &'static str {
match s {
GroupState::Empty => "Empty",
GroupState::PreparingRebalance => "PreparingRebalance",
GroupState::CompletingRebalance => "CompletingRebalance",
GroupState::Stable => "Stable",
GroupState::Dead => "Dead",
}
}