1use crate::primitives::fixed::{get_i32, get_u16, put_i32, put_u16};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes_borrowed::{
8 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
9 get_nullable_string_borrowed, get_string_borrowed,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::BufMut;
14pub const API_KEY: i16 = 54;
15pub const MIN_VERSION: i16 = 0;
16pub const MAX_VERSION: i16 = 1;
17pub const FLEXIBLE_MIN: i16 = 1;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct EndQuorumEpochRequest<'a> {
24 pub cluster_id: Option<&'a str>,
25 pub topics: Vec<TopicData<'a>>,
26 pub leader_endpoints: Vec<LeaderEndpoint<'a>>,
27 pub unknown_tagged_fields: UnknownTaggedFields,
28}
29impl EndQuorumEpochRequest<'_> {
30 pub fn to_owned(&self) -> crate::owned::end_quorum_epoch_request::EndQuorumEpochRequest {
31 crate::owned::end_quorum_epoch_request::EndQuorumEpochRequest {
32 cluster_id: (self.cluster_id).map(std::string::ToString::to_string),
33 topics: (self.topics).iter().map(TopicData::to_owned).collect(),
34 leader_endpoints: (self.leader_endpoints)
35 .iter()
36 .map(LeaderEndpoint::to_owned)
37 .collect(),
38 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
39 }
40 }
41}
42impl Encode for EndQuorumEpochRequest<'_> {
43 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
44 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
45 return Err(ProtocolError::UnsupportedVersion {
46 api_key: API_KEY,
47 version,
48 });
49 }
50 let flex = is_flexible(version);
51 if version >= 0 {
52 if flex {
53 put_compact_nullable_string(buf, self.cluster_id);
54 } else {
55 put_nullable_string(buf, self.cluster_id);
56 }
57 }
58 if version >= 0 {
59 {
60 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
61 for it in &self.topics {
62 it.encode(buf, version)?;
63 }
64 }
65 }
66 if version >= 1 {
67 {
68 crate::primitives::array::put_array_len(buf, (self.leader_endpoints).len(), flex);
69 for it in &self.leader_endpoints {
70 it.encode(buf, version)?;
71 }
72 }
73 }
74 if flex {
75 let tagged = WriteTaggedFields::new();
76 tagged.write(buf, &self.unknown_tagged_fields);
77 }
78 Ok(())
79 }
80 fn encoded_len(&self, version: i16) -> usize {
81 let flex = is_flexible(version);
82 let mut n: usize = 0;
83 if version >= 0 {
84 n += if flex {
85 compact_nullable_string_len(self.cluster_id)
86 } else {
87 nullable_string_len(self.cluster_id)
88 };
89 }
90 if version >= 0 {
91 n += {
92 let prefix =
93 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
94 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
95 prefix + body
96 };
97 }
98 if version >= 1 {
99 n += {
100 let prefix = crate::primitives::array::array_len_prefix_len(
101 (self.leader_endpoints).len(),
102 flex,
103 );
104 let body: usize = (self.leader_endpoints)
105 .iter()
106 .map(|it| it.encoded_len(version))
107 .sum();
108 prefix + body
109 };
110 }
111 if flex {
112 let known_pairs: Vec<(u32, usize)> = Vec::new();
113 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
114 }
115 n
116 }
117}
118impl<'de> DecodeBorrow<'de> for EndQuorumEpochRequest<'de> {
119 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
120 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
121 return Err(ProtocolError::UnsupportedVersion {
122 api_key: API_KEY,
123 version,
124 });
125 }
126 let flex = is_flexible(version);
127 let mut out = Self::default();
128 if version >= 0 {
129 out.cluster_id = if flex {
130 get_compact_nullable_string_borrowed(buf)?
131 } else {
132 get_nullable_string_borrowed(buf)?
133 };
134 }
135 if version >= 0 {
136 out.topics = {
137 let n = crate::primitives::array::get_array_len(buf, flex)?;
138 let mut v = Vec::with_capacity(n);
139 for _ in 0..n {
140 v.push(TopicData::decode_borrow(buf, version)?);
141 }
142 v
143 };
144 }
145 if version >= 1 {
146 out.leader_endpoints = {
147 let n = crate::primitives::array::get_array_len(buf, flex)?;
148 let mut v = Vec::with_capacity(n);
149 for _ in 0..n {
150 v.push(LeaderEndpoint::decode_borrow(buf, version)?);
151 }
152 v
153 };
154 }
155 if flex {
156 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
157 }
158 Ok(out)
159 }
160}
161#[cfg(test)]
162impl EndQuorumEpochRequest<'_> {
163 #[must_use]
164 pub fn populated(version: i16) -> Self {
165 let mut m = Self::default();
166 if version >= 0 {
167 m.cluster_id = Some("x");
168 }
169 if version >= 0 {
170 m.topics = vec![TopicData::populated(version)];
171 }
172 if version >= 1 {
173 m.leader_endpoints = vec![LeaderEndpoint::populated(version)];
174 }
175 m
176 }
177}
178#[derive(Debug, Clone, PartialEq, Eq, Default)]
179pub struct TopicData<'a> {
180 pub topic_name: &'a str,
181 pub partitions: Vec<PartitionData>,
182 pub unknown_tagged_fields: UnknownTaggedFields,
183}
184impl TopicData<'_> {
185 pub fn to_owned(&self) -> crate::owned::end_quorum_epoch_request::TopicData {
186 crate::owned::end_quorum_epoch_request::TopicData {
187 topic_name: (self.topic_name).to_string(),
188 partitions: (self.partitions)
189 .iter()
190 .map(PartitionData::to_owned)
191 .collect(),
192 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
193 }
194 }
195}
196impl Encode for TopicData<'_> {
197 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
198 let flex = version >= 1;
199 if version >= 0 {
200 if flex {
201 put_compact_string(buf, self.topic_name);
202 } else {
203 put_string(buf, self.topic_name);
204 }
205 }
206 if version >= 0 {
207 {
208 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
209 for it in &self.partitions {
210 it.encode(buf, version)?;
211 }
212 }
213 }
214 if flex {
215 let tagged = WriteTaggedFields::new();
216 tagged.write(buf, &self.unknown_tagged_fields);
217 }
218 Ok(())
219 }
220 fn encoded_len(&self, version: i16) -> usize {
221 let flex = version >= 1;
222 let mut n: usize = 0;
223 if version >= 0 {
224 n += if flex {
225 compact_string_len(self.topic_name)
226 } else {
227 string_len(self.topic_name)
228 };
229 }
230 if version >= 0 {
231 n += {
232 let prefix =
233 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
234 let body: usize = (self.partitions)
235 .iter()
236 .map(|it| it.encoded_len(version))
237 .sum();
238 prefix + body
239 };
240 }
241 if flex {
242 let known_pairs: Vec<(u32, usize)> = Vec::new();
243 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
244 }
245 n
246 }
247}
248impl<'de> DecodeBorrow<'de> for TopicData<'de> {
249 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
250 let flex = version >= 1;
251 let mut out = Self::default();
252 if version >= 0 {
253 out.topic_name = if flex {
254 get_compact_string_borrowed(buf)?
255 } else {
256 get_string_borrowed(buf)?
257 };
258 }
259 if version >= 0 {
260 out.partitions = {
261 let n = crate::primitives::array::get_array_len(buf, flex)?;
262 let mut v = Vec::with_capacity(n);
263 for _ in 0..n {
264 v.push(PartitionData::decode_borrow(buf, version)?);
265 }
266 v
267 };
268 }
269 if flex {
270 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
271 }
272 Ok(out)
273 }
274}
275#[cfg(test)]
276impl TopicData<'_> {
277 #[must_use]
278 pub fn populated(version: i16) -> Self {
279 let mut m = Self::default();
280 if version >= 0 {
281 m.topic_name = "x";
282 }
283 if version >= 0 {
284 m.partitions = vec![PartitionData::populated(version)];
285 }
286 m
287 }
288}
289#[derive(Debug, Clone, PartialEq, Eq, Default)]
290pub struct PartitionData {
291 pub partition_index: i32,
292 pub leader_id: i32,
293 pub leader_epoch: i32,
294 pub preferred_successors: Vec<i32>,
295 pub preferred_candidates: Vec<ReplicaInfo>,
296 pub unknown_tagged_fields: UnknownTaggedFields,
297}
298impl PartitionData {
299 pub fn to_owned(&self) -> crate::owned::end_quorum_epoch_request::PartitionData {
300 crate::owned::end_quorum_epoch_request::PartitionData {
301 partition_index: (self.partition_index),
302 leader_id: (self.leader_id),
303 leader_epoch: (self.leader_epoch),
304 preferred_successors: (self.preferred_successors).clone(),
305 preferred_candidates: (self.preferred_candidates)
306 .iter()
307 .map(ReplicaInfo::to_owned)
308 .collect(),
309 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
310 }
311 }
312}
313impl Encode for PartitionData {
314 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
315 let flex = version >= 1;
316 if version >= 0 {
317 put_i32(buf, self.partition_index);
318 }
319 if version >= 0 {
320 put_i32(buf, self.leader_id);
321 }
322 if version >= 0 {
323 put_i32(buf, self.leader_epoch);
324 }
325 if version == 0 {
326 {
327 crate::primitives::array::put_array_len(
328 buf,
329 (self.preferred_successors).len(),
330 flex,
331 );
332 for it in &self.preferred_successors {
333 put_i32(buf, *it);
334 }
335 }
336 }
337 if version >= 1 {
338 {
339 crate::primitives::array::put_array_len(
340 buf,
341 (self.preferred_candidates).len(),
342 flex,
343 );
344 for it in &self.preferred_candidates {
345 it.encode(buf, version)?;
346 }
347 }
348 }
349 if flex {
350 let tagged = WriteTaggedFields::new();
351 tagged.write(buf, &self.unknown_tagged_fields);
352 }
353 Ok(())
354 }
355 fn encoded_len(&self, version: i16) -> usize {
356 let flex = version >= 1;
357 let mut n: usize = 0;
358 if version >= 0 {
359 n += 4;
360 }
361 if version >= 0 {
362 n += 4;
363 }
364 if version >= 0 {
365 n += 4;
366 }
367 if version == 0 {
368 n += {
369 let prefix = crate::primitives::array::array_len_prefix_len(
370 (self.preferred_successors).len(),
371 flex,
372 );
373 let body: usize = (self.preferred_successors).iter().map(|_| 4).sum();
374 prefix + body
375 };
376 }
377 if version >= 1 {
378 n += {
379 let prefix = crate::primitives::array::array_len_prefix_len(
380 (self.preferred_candidates).len(),
381 flex,
382 );
383 let body: usize = (self.preferred_candidates)
384 .iter()
385 .map(|it| it.encoded_len(version))
386 .sum();
387 prefix + body
388 };
389 }
390 if flex {
391 let known_pairs: Vec<(u32, usize)> = Vec::new();
392 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
393 }
394 n
395 }
396}
397impl<'de> DecodeBorrow<'de> for PartitionData {
398 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
399 let flex = version >= 1;
400 let mut out = Self::default();
401 if version >= 0 {
402 out.partition_index = get_i32(buf)?;
403 }
404 if version >= 0 {
405 out.leader_id = get_i32(buf)?;
406 }
407 if version >= 0 {
408 out.leader_epoch = get_i32(buf)?;
409 }
410 if version == 0 {
411 out.preferred_successors = {
412 let n = crate::primitives::array::get_array_len(buf, flex)?;
413 let mut v = Vec::with_capacity(n);
414 for _ in 0..n {
415 v.push(get_i32(buf)?);
416 }
417 v
418 };
419 }
420 if version >= 1 {
421 out.preferred_candidates = {
422 let n = crate::primitives::array::get_array_len(buf, flex)?;
423 let mut v = Vec::with_capacity(n);
424 for _ in 0..n {
425 v.push(ReplicaInfo::decode_borrow(buf, version)?);
426 }
427 v
428 };
429 }
430 if flex {
431 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
432 }
433 Ok(out)
434 }
435}
436#[cfg(test)]
437impl PartitionData {
438 #[must_use]
439 pub fn populated(version: i16) -> Self {
440 let mut m = Self::default();
441 if version >= 0 {
442 m.partition_index = 1i32;
443 }
444 if version >= 0 {
445 m.leader_id = 1i32;
446 }
447 if version >= 0 {
448 m.leader_epoch = 1i32;
449 }
450 if version == 0 {
451 m.preferred_successors = vec![1i32];
452 }
453 if version >= 1 {
454 m.preferred_candidates = vec![ReplicaInfo::populated(version)];
455 }
456 m
457 }
458}
459#[derive(Debug, Clone, PartialEq, Eq, Default)]
460pub struct ReplicaInfo {
461 pub candidate_id: i32,
462 pub candidate_directory_id: crate::primitives::uuid::Uuid,
463 pub unknown_tagged_fields: UnknownTaggedFields,
464}
465impl ReplicaInfo {
466 pub fn to_owned(&self) -> crate::owned::end_quorum_epoch_request::ReplicaInfo {
467 crate::owned::end_quorum_epoch_request::ReplicaInfo {
468 candidate_id: (self.candidate_id),
469 candidate_directory_id: (self.candidate_directory_id),
470 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
471 }
472 }
473}
474impl Encode for ReplicaInfo {
475 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
476 let flex = version >= 1;
477 if version >= 1 {
478 put_i32(buf, self.candidate_id);
479 }
480 if version >= 1 {
481 crate::primitives::uuid::put_uuid(buf, self.candidate_directory_id);
482 }
483 if flex {
484 let tagged = WriteTaggedFields::new();
485 tagged.write(buf, &self.unknown_tagged_fields);
486 }
487 Ok(())
488 }
489 fn encoded_len(&self, version: i16) -> usize {
490 let flex = version >= 1;
491 let mut n: usize = 0;
492 if version >= 1 {
493 n += 4;
494 }
495 if version >= 1 {
496 n += 16;
497 }
498 if flex {
499 let known_pairs: Vec<(u32, usize)> = Vec::new();
500 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
501 }
502 n
503 }
504}
505impl<'de> DecodeBorrow<'de> for ReplicaInfo {
506 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
507 let flex = version >= 1;
508 let mut out = Self::default();
509 if version >= 1 {
510 out.candidate_id = get_i32(buf)?;
511 }
512 if version >= 1 {
513 out.candidate_directory_id = crate::primitives::uuid::get_uuid(buf)?;
514 }
515 if flex {
516 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
517 }
518 Ok(out)
519 }
520}
521#[cfg(test)]
522impl ReplicaInfo {
523 #[must_use]
524 pub fn populated(version: i16) -> Self {
525 let mut m = Self::default();
526 if version >= 1 {
527 m.candidate_id = 1i32;
528 }
529 if version >= 1 {
530 m.candidate_directory_id = crate::primitives::uuid::Uuid([1u8; 16]);
531 }
532 m
533 }
534}
535#[derive(Debug, Clone, PartialEq, Eq, Default)]
536pub struct LeaderEndpoint<'a> {
537 pub name: &'a str,
538 pub host: &'a str,
539 pub port: u16,
540 pub unknown_tagged_fields: UnknownTaggedFields,
541}
542impl LeaderEndpoint<'_> {
543 pub fn to_owned(&self) -> crate::owned::end_quorum_epoch_request::LeaderEndpoint {
544 crate::owned::end_quorum_epoch_request::LeaderEndpoint {
545 name: (self.name).to_string(),
546 host: (self.host).to_string(),
547 port: (self.port),
548 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
549 }
550 }
551}
552impl Encode for LeaderEndpoint<'_> {
553 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
554 let flex = version >= 1;
555 if version >= 1 {
556 if flex {
557 put_compact_string(buf, self.name);
558 } else {
559 put_string(buf, self.name);
560 }
561 }
562 if version >= 1 {
563 if flex {
564 put_compact_string(buf, self.host);
565 } else {
566 put_string(buf, self.host);
567 }
568 }
569 if version >= 1 {
570 put_u16(buf, self.port);
571 }
572 if flex {
573 let tagged = WriteTaggedFields::new();
574 tagged.write(buf, &self.unknown_tagged_fields);
575 }
576 Ok(())
577 }
578 fn encoded_len(&self, version: i16) -> usize {
579 let flex = version >= 1;
580 let mut n: usize = 0;
581 if version >= 1 {
582 n += if flex {
583 compact_string_len(self.name)
584 } else {
585 string_len(self.name)
586 };
587 }
588 if version >= 1 {
589 n += if flex {
590 compact_string_len(self.host)
591 } else {
592 string_len(self.host)
593 };
594 }
595 if version >= 1 {
596 n += 2;
597 }
598 if flex {
599 let known_pairs: Vec<(u32, usize)> = Vec::new();
600 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
601 }
602 n
603 }
604}
605impl<'de> DecodeBorrow<'de> for LeaderEndpoint<'de> {
606 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
607 let flex = version >= 1;
608 let mut out = Self::default();
609 if version >= 1 {
610 out.name = if flex {
611 get_compact_string_borrowed(buf)?
612 } else {
613 get_string_borrowed(buf)?
614 };
615 }
616 if version >= 1 {
617 out.host = if flex {
618 get_compact_string_borrowed(buf)?
619 } else {
620 get_string_borrowed(buf)?
621 };
622 }
623 if version >= 1 {
624 out.port = get_u16(buf)?;
625 }
626 if flex {
627 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
628 }
629 Ok(out)
630 }
631}
632#[cfg(test)]
633impl LeaderEndpoint<'_> {
634 #[must_use]
635 pub fn populated(version: i16) -> Self {
636 let mut m = Self::default();
637 if version >= 1 {
638 m.name = "x";
639 }
640 if version >= 1 {
641 m.host = "x";
642 }
643 if version >= 1 {
644 m.port = 1u16;
645 }
646 m
647 }
648}