1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i8, put_i16, put_i32, put_i8};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
9 string_len,
10};
11use crate::primitives::string_bytes_borrowed::{
12 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
13 get_nullable_string_borrowed, get_string_borrowed,
14};
15use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
16use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 69;
19pub const MIN_VERSION: i16 = 0;
20pub const MAX_VERSION: i16 = 1;
21pub const FLEXIBLE_MIN: i16 = 0;
22
23#[inline]
24fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ConsumerGroupDescribeResponse<'a> {
28 pub throttle_time_ms: i32,
29 pub groups: Vec<DescribedGroup<'a>>,
30 pub unknown_tagged_fields: UnknownTaggedFields,
31}
32
33impl<'a> Default for ConsumerGroupDescribeResponse<'a> {
34 fn default() -> Self {
35 Self {
36 throttle_time_ms: 0i32,
37 groups: Vec::new(),
38 unknown_tagged_fields: Default::default(),
39 }
40 }
41}
42
43impl<'a> ConsumerGroupDescribeResponse<'a> {
44 pub fn to_owned(&self) -> crate::owned::consumer_group_describe_response::ConsumerGroupDescribeResponse {
45 crate::owned::consumer_group_describe_response::ConsumerGroupDescribeResponse {
46 throttle_time_ms: (self.throttle_time_ms),
47 groups: (self.groups).iter().map(|it| it.to_owned()).collect(),
48 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
49 }
50 }
51}
52
53impl<'a> Encode for ConsumerGroupDescribeResponse<'a> {
54 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
55 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
56 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
57 }
58 let flex = is_flexible(version);
59 if version >= 0 { put_i32(buf, self.throttle_time_ms) }
60 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.groups).len(), flex); for it in &self.groups { it.encode(buf, version)?; } } }
61 if flex {
62 let tagged = WriteTaggedFields::new();
63 tagged.write(buf, &self.unknown_tagged_fields);
64 }
65 Ok(())
66 }
67 fn encoded_len(&self, version: i16) -> usize {
68 let flex = is_flexible(version);
69 let mut n: usize = 0;
70 if version >= 0 { n += 4; }
71 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.groups).len(), flex); let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
72 if flex {
73 let known_pairs: Vec<(u32, usize)> = Vec::new();
74 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
75 }
76 n
77 }
78}
79
80impl<'de> DecodeBorrow<'de> for ConsumerGroupDescribeResponse<'de> {
81 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
82 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
83 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
84 }
85 let flex = is_flexible(version);
86 let mut out = Self::default();
87 if version >= 0 { out.throttle_time_ms = get_i32(buf)?; }
88 if version >= 0 { out.groups = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(DescribedGroup::decode_borrow(buf, version)?); } v }; }
89 if flex {
90 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
91 Ok(false)
92 })?;
93 }
94 Ok(out)
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct DescribedGroup<'a> {
100 pub error_code: i16,
101 pub error_message: Option<&'a str>,
102 pub group_id: &'a str,
103 pub group_state: &'a str,
104 pub group_epoch: i32,
105 pub assignment_epoch: i32,
106 pub assignor_name: &'a str,
107 pub members: Vec<Member<'a>>,
108 pub authorized_operations: i32,
109 pub unknown_tagged_fields: UnknownTaggedFields,
110}
111
112impl<'a> Default for DescribedGroup<'a> {
113 fn default() -> Self {
114 Self {
115 error_code: 0i16,
116 error_message: None,
117 group_id: "",
118 group_state: "",
119 group_epoch: 0i32,
120 assignment_epoch: 0i32,
121 assignor_name: "",
122 members: Vec::new(),
123 authorized_operations: -2_147_483_648i32,
124 unknown_tagged_fields: Default::default(),
125 }
126 }
127}
128
129impl<'a> DescribedGroup<'a> {
130 pub fn to_owned(&self) -> crate::owned::consumer_group_describe_response::DescribedGroup {
131 crate::owned::consumer_group_describe_response::DescribedGroup {
132 error_code: (self.error_code),
133 error_message: (self.error_message).map(|s| s.to_string()),
134 group_id: (self.group_id).to_string(),
135 group_state: (self.group_state).to_string(),
136 group_epoch: (self.group_epoch),
137 assignment_epoch: (self.assignment_epoch),
138 assignor_name: (self.assignor_name).to_string(),
139 members: (self.members).iter().map(|it| it.to_owned()).collect(),
140 authorized_operations: (self.authorized_operations),
141 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
142 }
143 }
144}
145
146impl<'a> Encode for DescribedGroup<'a> {
147 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
148 let flex = version >= 0;
149 if version >= 0 { put_i16(buf, self.error_code) }
150 if version >= 0 { if flex { put_compact_nullable_string(buf, self.error_message) } else { put_nullable_string(buf, self.error_message) } }
151 if version >= 0 { if flex { put_compact_string(buf, self.group_id) } else { put_string(buf, self.group_id) } }
152 if version >= 0 { if flex { put_compact_string(buf, self.group_state) } else { put_string(buf, self.group_state) } }
153 if version >= 0 { put_i32(buf, self.group_epoch) }
154 if version >= 0 { put_i32(buf, self.assignment_epoch) }
155 if version >= 0 { if flex { put_compact_string(buf, self.assignor_name) } else { put_string(buf, self.assignor_name) } }
156 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.members).len(), flex); for it in &self.members { it.encode(buf, version)?; } } }
157 if version >= 0 { put_i32(buf, self.authorized_operations) }
158 if flex {
159 let tagged = WriteTaggedFields::new();
160 tagged.write(buf, &self.unknown_tagged_fields);
161 }
162 Ok(())
163 }
164 fn encoded_len(&self, version: i16) -> usize {
165 let flex = version >= 0;
166 let mut n: usize = 0;
167 if version >= 0 { n += 2; }
168 if version >= 0 { n += if flex { compact_nullable_string_len(self.error_message) } else { nullable_string_len(self.error_message) }; }
169 if version >= 0 { n += if flex { compact_string_len(self.group_id) } else { string_len(self.group_id) }; }
170 if version >= 0 { n += if flex { compact_string_len(self.group_state) } else { string_len(self.group_state) }; }
171 if version >= 0 { n += 4; }
172 if version >= 0 { n += 4; }
173 if version >= 0 { n += if flex { compact_string_len(self.assignor_name) } else { string_len(self.assignor_name) }; }
174 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.members).len(), flex); let body: usize = (self.members).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
175 if version >= 0 { n += 4; }
176 if flex {
177 let known_pairs: Vec<(u32, usize)> = Vec::new();
178 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
179 }
180 n
181 }
182}
183
184impl<'de> DecodeBorrow<'de> for DescribedGroup<'de> {
185 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
186 let flex = version >= 0;
187 let mut out = Self::default();
188 if version >= 0 { out.error_code = get_i16(buf)?; }
189 if version >= 0 { out.error_message = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
190 if version >= 0 { out.group_id = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
191 if version >= 0 { out.group_state = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
192 if version >= 0 { out.group_epoch = get_i32(buf)?; }
193 if version >= 0 { out.assignment_epoch = get_i32(buf)?; }
194 if version >= 0 { out.assignor_name = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
195 if version >= 0 { out.members = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(Member::decode_borrow(buf, version)?); } v }; }
196 if version >= 0 { out.authorized_operations = get_i32(buf)?; }
197 if flex {
198 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
199 Ok(false)
200 })?;
201 }
202 Ok(out)
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct Member<'a> {
208 pub member_id: &'a str,
209 pub instance_id: Option<&'a str>,
210 pub rack_id: Option<&'a str>,
211 pub member_epoch: i32,
212 pub client_id: &'a str,
213 pub client_host: &'a str,
214 pub subscribed_topic_names: Vec<&'a str>,
215 pub subscribed_topic_regex: Option<&'a str>,
216 pub assignment: super::common::assignment::Assignment<'a>,
217 pub target_assignment: super::common::assignment::Assignment<'a>,
218 pub member_type: i8,
219 pub unknown_tagged_fields: UnknownTaggedFields,
220}
221
222impl<'a> Default for Member<'a> {
223 fn default() -> Self {
224 Self {
225 member_id: "",
226 instance_id: None,
227 rack_id: None,
228 member_epoch: 0i32,
229 client_id: "",
230 client_host: "",
231 subscribed_topic_names: Vec::new(),
232 subscribed_topic_regex: None,
233 assignment: Default::default(),
234 target_assignment: Default::default(),
235 member_type: -1i8,
236 unknown_tagged_fields: Default::default(),
237 }
238 }
239}
240
241impl<'a> Member<'a> {
242 pub fn to_owned(&self) -> crate::owned::consumer_group_describe_response::Member {
243 crate::owned::consumer_group_describe_response::Member {
244 member_id: (self.member_id).to_string(),
245 instance_id: (self.instance_id).map(|s| s.to_string()),
246 rack_id: (self.rack_id).map(|s| s.to_string()),
247 member_epoch: (self.member_epoch),
248 client_id: (self.client_id).to_string(),
249 client_host: (self.client_host).to_string(),
250 subscribed_topic_names: (self.subscribed_topic_names).iter().map(|s| s.to_string()).collect(),
251 subscribed_topic_regex: (self.subscribed_topic_regex).map(|s| s.to_string()),
252 assignment: (self.assignment).to_owned(),
253 target_assignment: (self.target_assignment).to_owned(),
254 member_type: (self.member_type),
255 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
256 }
257 }
258}
259
260impl<'a> Encode for Member<'a> {
261 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
262 let flex = version >= 0;
263 if version >= 0 { if flex { put_compact_string(buf, self.member_id) } else { put_string(buf, self.member_id) } }
264 if version >= 0 { if flex { put_compact_nullable_string(buf, self.instance_id) } else { put_nullable_string(buf, self.instance_id) } }
265 if version >= 0 { if flex { put_compact_nullable_string(buf, self.rack_id) } else { put_nullable_string(buf, self.rack_id) } }
266 if version >= 0 { put_i32(buf, self.member_epoch) }
267 if version >= 0 { if flex { put_compact_string(buf, self.client_id) } else { put_string(buf, self.client_id) } }
268 if version >= 0 { if flex { put_compact_string(buf, self.client_host) } else { put_string(buf, self.client_host) } }
269 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.subscribed_topic_names).len(), flex); for it in &self.subscribed_topic_names { if flex { put_compact_string(buf, *it) } else { put_string(buf, *it) }; } } }
270 if version >= 0 { if flex { put_compact_nullable_string(buf, self.subscribed_topic_regex) } else { put_nullable_string(buf, self.subscribed_topic_regex) } }
271 if version >= 0 { self.assignment.encode(buf, version)? }
272 if version >= 0 { self.target_assignment.encode(buf, version)? }
273 if version >= 1 { put_i8(buf, self.member_type) }
274 if flex {
275 let tagged = WriteTaggedFields::new();
276 tagged.write(buf, &self.unknown_tagged_fields);
277 }
278 Ok(())
279 }
280 fn encoded_len(&self, version: i16) -> usize {
281 let flex = version >= 0;
282 let mut n: usize = 0;
283 if version >= 0 { n += if flex { compact_string_len(self.member_id) } else { string_len(self.member_id) }; }
284 if version >= 0 { n += if flex { compact_nullable_string_len(self.instance_id) } else { nullable_string_len(self.instance_id) }; }
285 if version >= 0 { n += if flex { compact_nullable_string_len(self.rack_id) } else { nullable_string_len(self.rack_id) }; }
286 if version >= 0 { n += 4; }
287 if version >= 0 { n += if flex { compact_string_len(self.client_id) } else { string_len(self.client_id) }; }
288 if version >= 0 { n += if flex { compact_string_len(self.client_host) } else { string_len(self.client_host) }; }
289 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.subscribed_topic_names).len(), flex); let body: usize = (self.subscribed_topic_names).iter().map(|it| if flex { compact_string_len(*it) } else { string_len(*it) }).sum(); prefix + body }; }
290 if version >= 0 { n += if flex { compact_nullable_string_len(self.subscribed_topic_regex) } else { nullable_string_len(self.subscribed_topic_regex) }; }
291 if version >= 0 { n += self.assignment.encoded_len(version); }
292 if version >= 0 { n += self.target_assignment.encoded_len(version); }
293 if version >= 1 { n += 1; }
294 if flex {
295 let known_pairs: Vec<(u32, usize)> = Vec::new();
296 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
297 }
298 n
299 }
300}
301
302impl<'de> DecodeBorrow<'de> for Member<'de> {
303 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
304 let flex = version >= 0;
305 let mut out = Self::default();
306 if version >= 0 { out.member_id = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
307 if version >= 0 { out.instance_id = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
308 if version >= 0 { out.rack_id = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
309 if version >= 0 { out.member_epoch = get_i32(buf)?; }
310 if version >= 0 { out.client_id = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
311 if version >= 0 { out.client_host = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
312 if version >= 0 { out.subscribed_topic_names = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }); } v }; }
313 if version >= 0 { out.subscribed_topic_regex = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
314 if version >= 0 { out.assignment = super::common::assignment::Assignment::decode_borrow(buf, version)?; }
315 if version >= 0 { out.target_assignment = super::common::assignment::Assignment::decode_borrow(buf, version)?; }
316 if version >= 1 { out.member_type = get_i8(buf)?; }
317 if flex {
318 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
319 Ok(false)
320 })?;
321 }
322 Ok(out)
323 }
324}