1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
9 string_len,
10};
11use crate::primitives::string_bytes_borrowed::{
12 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
13 get_nullable_string_borrowed, get_string_borrowed,
14};
15use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
16use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 55;
19pub const MIN_VERSION: i16 = 0;
20pub const MAX_VERSION: i16 = 2;
21pub const FLEXIBLE_MIN: i16 = 0;
22
23#[inline]
24fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct DescribeQuorumResponse<'a> {
28 pub error_code: i16,
29 pub error_message: Option<&'a str>,
30 pub topics: Vec<TopicData<'a>>,
31 pub nodes: Vec<Node<'a>>,
32 pub unknown_tagged_fields: UnknownTaggedFields,
33}
34
35impl<'a> Default for DescribeQuorumResponse<'a> {
36 fn default() -> Self {
37 Self {
38 error_code: 0i16,
39 error_message: None,
40 topics: Vec::new(),
41 nodes: Vec::new(),
42 unknown_tagged_fields: Default::default(),
43 }
44 }
45}
46
47impl<'a> DescribeQuorumResponse<'a> {
48 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::DescribeQuorumResponse {
49 crate::owned::describe_quorum_response::DescribeQuorumResponse {
50 error_code: (self.error_code),
51 error_message: (self.error_message).map(|s| s.to_string()),
52 topics: (self.topics).iter().map(|it| it.to_owned()).collect(),
53 nodes: (self.nodes).iter().map(|it| it.to_owned()).collect(),
54 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
55 }
56 }
57}
58
59impl<'a> Encode for DescribeQuorumResponse<'a> {
60 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
61 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
62 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
63 }
64 let flex = is_flexible(version);
65 if version >= 0 { put_i16(buf, self.error_code) }
66 if version >= 2 { if flex { put_compact_nullable_string(buf, self.error_message) } else { put_nullable_string(buf, self.error_message) } }
67 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
68 if version >= 2 { { crate::primitives::array::put_array_len(buf, (self.nodes).len(), flex); for it in &self.nodes { it.encode(buf, version)?; } } }
69 if flex {
70 let tagged = WriteTaggedFields::new();
71 tagged.write(buf, &self.unknown_tagged_fields);
72 }
73 Ok(())
74 }
75 fn encoded_len(&self, version: i16) -> usize {
76 let flex = is_flexible(version);
77 let mut n: usize = 0;
78 if version >= 0 { n += 2; }
79 if version >= 2 { n += if flex { compact_nullable_string_len(self.error_message) } else { nullable_string_len(self.error_message) }; }
80 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
81 if version >= 2 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.nodes).len(), flex); let body: usize = (self.nodes).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
82 if flex {
83 let known_pairs: Vec<(u32, usize)> = Vec::new();
84 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
85 }
86 n
87 }
88}
89
90impl<'de> DecodeBorrow<'de> for DescribeQuorumResponse<'de> {
91 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
92 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
93 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
94 }
95 let flex = is_flexible(version);
96 let mut out = Self::default();
97 if version >= 0 { out.error_code = get_i16(buf)?; }
98 if version >= 2 { out.error_message = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
99 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(TopicData::decode_borrow(buf, version)?); } v }; }
100 if version >= 2 { out.nodes = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(Node::decode_borrow(buf, version)?); } v }; }
101 if flex {
102 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
103 Ok(false)
104 })?;
105 }
106 Ok(out)
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct TopicData<'a> {
112 pub topic_name: &'a str,
113 pub partitions: Vec<PartitionData<'a>>,
114 pub unknown_tagged_fields: UnknownTaggedFields,
115}
116
117impl<'a> Default for TopicData<'a> {
118 fn default() -> Self {
119 Self {
120 topic_name: "",
121 partitions: Vec::new(),
122 unknown_tagged_fields: Default::default(),
123 }
124 }
125}
126
127impl<'a> TopicData<'a> {
128 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::TopicData {
129 crate::owned::describe_quorum_response::TopicData {
130 topic_name: (self.topic_name).to_string(),
131 partitions: (self.partitions).iter().map(|it| it.to_owned()).collect(),
132 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
133 }
134 }
135}
136
137impl<'a> Encode for TopicData<'a> {
138 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
139 let flex = version >= 0;
140 if version >= 0 { if flex { put_compact_string(buf, self.topic_name) } else { put_string(buf, self.topic_name) } }
141 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
142 if flex {
143 let tagged = WriteTaggedFields::new();
144 tagged.write(buf, &self.unknown_tagged_fields);
145 }
146 Ok(())
147 }
148 fn encoded_len(&self, version: i16) -> usize {
149 let flex = version >= 0;
150 let mut n: usize = 0;
151 if version >= 0 { n += if flex { compact_string_len(self.topic_name) } else { string_len(self.topic_name) }; }
152 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
153 if flex {
154 let known_pairs: Vec<(u32, usize)> = Vec::new();
155 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
156 }
157 n
158 }
159}
160
161impl<'de> DecodeBorrow<'de> for TopicData<'de> {
162 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
163 let flex = version >= 0;
164 let mut out = Self::default();
165 if version >= 0 { out.topic_name = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
166 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(PartitionData::decode_borrow(buf, version)?); } v }; }
167 if flex {
168 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
169 Ok(false)
170 })?;
171 }
172 Ok(out)
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct PartitionData<'a> {
178 pub partition_index: i32,
179 pub error_code: i16,
180 pub error_message: Option<&'a str>,
181 pub leader_id: i32,
182 pub leader_epoch: i32,
183 pub high_watermark: i64,
184 pub current_voters: Vec<super::common::replica_state::ReplicaState>,
185 pub observers: Vec<super::common::replica_state::ReplicaState>,
186 pub unknown_tagged_fields: UnknownTaggedFields,
187}
188
189impl<'a> Default for PartitionData<'a> {
190 fn default() -> Self {
191 Self {
192 partition_index: 0i32,
193 error_code: 0i16,
194 error_message: None,
195 leader_id: 0i32,
196 leader_epoch: 0i32,
197 high_watermark: 0i64,
198 current_voters: Vec::new(),
199 observers: Vec::new(),
200 unknown_tagged_fields: Default::default(),
201 }
202 }
203}
204
205impl<'a> PartitionData<'a> {
206 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::PartitionData {
207 crate::owned::describe_quorum_response::PartitionData {
208 partition_index: (self.partition_index),
209 error_code: (self.error_code),
210 error_message: (self.error_message).map(|s| s.to_string()),
211 leader_id: (self.leader_id),
212 leader_epoch: (self.leader_epoch),
213 high_watermark: (self.high_watermark),
214 current_voters: (self.current_voters).iter().map(|it| it.to_owned()).collect(),
215 observers: (self.observers).iter().map(|it| it.to_owned()).collect(),
216 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
217 }
218 }
219}
220
221impl<'a> Encode for PartitionData<'a> {
222 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
223 let flex = version >= 0;
224 if version >= 0 { put_i32(buf, self.partition_index) }
225 if version >= 0 { put_i16(buf, self.error_code) }
226 if version >= 2 { if flex { put_compact_nullable_string(buf, self.error_message) } else { put_nullable_string(buf, self.error_message) } }
227 if version >= 0 { put_i32(buf, self.leader_id) }
228 if version >= 0 { put_i32(buf, self.leader_epoch) }
229 if version >= 0 { put_i64(buf, self.high_watermark) }
230 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.current_voters).len(), flex); for it in &self.current_voters { it.encode(buf, version)?; } } }
231 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.observers).len(), flex); for it in &self.observers { it.encode(buf, version)?; } } }
232 if flex {
233 let tagged = WriteTaggedFields::new();
234 tagged.write(buf, &self.unknown_tagged_fields);
235 }
236 Ok(())
237 }
238 fn encoded_len(&self, version: i16) -> usize {
239 let flex = version >= 0;
240 let mut n: usize = 0;
241 if version >= 0 { n += 4; }
242 if version >= 0 { n += 2; }
243 if version >= 2 { n += if flex { compact_nullable_string_len(self.error_message) } else { nullable_string_len(self.error_message) }; }
244 if version >= 0 { n += 4; }
245 if version >= 0 { n += 4; }
246 if version >= 0 { n += 8; }
247 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.current_voters).len(), flex); let body: usize = (self.current_voters).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
248 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.observers).len(), flex); let body: usize = (self.observers).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
249 if flex {
250 let known_pairs: Vec<(u32, usize)> = Vec::new();
251 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
252 }
253 n
254 }
255}
256
257impl<'de> DecodeBorrow<'de> for PartitionData<'de> {
258 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
259 let flex = version >= 0;
260 let mut out = Self::default();
261 if version >= 0 { out.partition_index = get_i32(buf)?; }
262 if version >= 0 { out.error_code = get_i16(buf)?; }
263 if version >= 2 { out.error_message = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
264 if version >= 0 { out.leader_id = get_i32(buf)?; }
265 if version >= 0 { out.leader_epoch = get_i32(buf)?; }
266 if version >= 0 { out.high_watermark = get_i64(buf)?; }
267 if version >= 0 { out.current_voters = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::replica_state::ReplicaState::decode_borrow(buf, version)?); } v }; }
268 if version >= 0 { out.observers = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(super::common::replica_state::ReplicaState::decode_borrow(buf, version)?); } v }; }
269 if flex {
270 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
271 Ok(false)
272 })?;
273 }
274 Ok(out)
275 }
276}
277
278#[derive(Debug, Clone, PartialEq, Eq)]
279pub struct Node<'a> {
280 pub node_id: i32,
281 pub listeners: Vec<Listener<'a>>,
282 pub unknown_tagged_fields: UnknownTaggedFields,
283}
284
285impl<'a> Default for Node<'a> {
286 fn default() -> Self {
287 Self {
288 node_id: 0i32,
289 listeners: Vec::new(),
290 unknown_tagged_fields: Default::default(),
291 }
292 }
293}
294
295impl<'a> Node<'a> {
296 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::Node {
297 crate::owned::describe_quorum_response::Node {
298 node_id: (self.node_id),
299 listeners: (self.listeners).iter().map(|it| it.to_owned()).collect(),
300 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
301 }
302 }
303}
304
305impl<'a> Encode for Node<'a> {
306 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
307 let flex = version >= 0;
308 if version >= 2 { put_i32(buf, self.node_id) }
309 if version >= 2 { { crate::primitives::array::put_array_len(buf, (self.listeners).len(), flex); for it in &self.listeners { it.encode(buf, version)?; } } }
310 if flex {
311 let tagged = WriteTaggedFields::new();
312 tagged.write(buf, &self.unknown_tagged_fields);
313 }
314 Ok(())
315 }
316 fn encoded_len(&self, version: i16) -> usize {
317 let flex = version >= 0;
318 let mut n: usize = 0;
319 if version >= 2 { n += 4; }
320 if version >= 2 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.listeners).len(), flex); let body: usize = (self.listeners).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
321 if flex {
322 let known_pairs: Vec<(u32, usize)> = Vec::new();
323 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
324 }
325 n
326 }
327}
328
329impl<'de> DecodeBorrow<'de> for Node<'de> {
330 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
331 let flex = version >= 0;
332 let mut out = Self::default();
333 if version >= 2 { out.node_id = get_i32(buf)?; }
334 if version >= 2 { out.listeners = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(Listener::decode_borrow(buf, version)?); } v }; }
335 if flex {
336 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
337 Ok(false)
338 })?;
339 }
340 Ok(out)
341 }
342}
343
344#[derive(Debug, Clone, PartialEq, Eq)]
345pub struct Listener<'a> {
346 pub name: &'a str,
347 pub host: &'a str,
348 pub port: u16,
349 pub unknown_tagged_fields: UnknownTaggedFields,
350}
351
352impl<'a> Default for Listener<'a> {
353 fn default() -> Self {
354 Self {
355 name: "",
356 host: "",
357 port: 0u16,
358 unknown_tagged_fields: Default::default(),
359 }
360 }
361}
362
363impl<'a> Listener<'a> {
364 pub fn to_owned(&self) -> crate::owned::describe_quorum_response::Listener {
365 crate::owned::describe_quorum_response::Listener {
366 name: (self.name).to_string(),
367 host: (self.host).to_string(),
368 port: (self.port),
369 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
370 }
371 }
372}
373
374impl<'a> Encode for Listener<'a> {
375 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
376 let flex = version >= 0;
377 if version >= 2 { if flex { put_compact_string(buf, self.name) } else { put_string(buf, self.name) } }
378 if version >= 2 { if flex { put_compact_string(buf, self.host) } else { put_string(buf, self.host) } }
379 if version >= 2 { put_u16(buf, self.port) }
380 if flex {
381 let tagged = WriteTaggedFields::new();
382 tagged.write(buf, &self.unknown_tagged_fields);
383 }
384 Ok(())
385 }
386 fn encoded_len(&self, version: i16) -> usize {
387 let flex = version >= 0;
388 let mut n: usize = 0;
389 if version >= 2 { n += if flex { compact_string_len(self.name) } else { string_len(self.name) }; }
390 if version >= 2 { n += if flex { compact_string_len(self.host) } else { string_len(self.host) }; }
391 if version >= 2 { n += 2; }
392 if flex {
393 let known_pairs: Vec<(u32, usize)> = Vec::new();
394 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
395 }
396 n
397 }
398}
399
400impl<'de> DecodeBorrow<'de> for Listener<'de> {
401 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
402 let flex = version >= 0;
403 let mut out = Self::default();
404 if version >= 2 { out.name = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
405 if version >= 2 { out.host = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
406 if version >= 2 { out.port = get_u16(buf)?; }
407 if flex {
408 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
409 Ok(false)
410 })?;
411 }
412 Ok(out)
413 }
414}