1use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes_borrowed::{
8 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
9 get_nullable_string_borrowed, get_string_borrowed,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::BufMut;
14pub const API_KEY: i16 = 9;
15pub const MIN_VERSION: i16 = 1;
16pub const MAX_VERSION: i16 = 10;
17pub const FLEXIBLE_MIN: i16 = 6;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct OffsetFetchResponse<'a> {
24 pub throttle_time_ms: i32,
25 pub topics: Vec<OffsetFetchResponseTopic<'a>>,
26 pub error_code: i16,
27 pub groups: Vec<OffsetFetchResponseGroup<'a>>,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30impl OffsetFetchResponse<'_> {
31 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponse {
32 crate::owned::offset_fetch_response::OffsetFetchResponse {
33 throttle_time_ms: (self.throttle_time_ms),
34 topics: (self.topics)
35 .iter()
36 .map(OffsetFetchResponseTopic::to_owned)
37 .collect(),
38 error_code: (self.error_code),
39 groups: (self.groups)
40 .iter()
41 .map(OffsetFetchResponseGroup::to_owned)
42 .collect(),
43 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
44 }
45 }
46}
47impl Encode for OffsetFetchResponse<'_> {
48 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
49 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
50 return Err(ProtocolError::UnsupportedVersion {
51 api_key: API_KEY,
52 version,
53 });
54 }
55 let flex = is_flexible(version);
56 if version >= 3 {
57 put_i32(buf, self.throttle_time_ms);
58 }
59 if (0..=7).contains(&version) {
60 {
61 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
62 for it in &self.topics {
63 it.encode(buf, version)?;
64 }
65 }
66 }
67 if (2..=7).contains(&version) {
68 put_i16(buf, self.error_code);
69 }
70 if version >= 8 {
71 {
72 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
73 for it in &self.groups {
74 it.encode(buf, version)?;
75 }
76 }
77 }
78 if flex {
79 let tagged = WriteTaggedFields::new();
80 tagged.write(buf, &self.unknown_tagged_fields);
81 }
82 Ok(())
83 }
84 fn encoded_len(&self, version: i16) -> usize {
85 let flex = is_flexible(version);
86 let mut n: usize = 0;
87 if version >= 3 {
88 n += 4;
89 }
90 if (0..=7).contains(&version) {
91 n += {
92 let prefix =
93 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
94 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
95 prefix + body
96 };
97 }
98 if (2..=7).contains(&version) {
99 n += 2;
100 }
101 if version >= 8 {
102 n += {
103 let prefix =
104 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
105 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
106 prefix + body
107 };
108 }
109 if flex {
110 let known_pairs: Vec<(u32, usize)> = Vec::new();
111 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
112 }
113 n
114 }
115}
116impl<'de> DecodeBorrow<'de> for OffsetFetchResponse<'de> {
117 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
118 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
119 return Err(ProtocolError::UnsupportedVersion {
120 api_key: API_KEY,
121 version,
122 });
123 }
124 let flex = is_flexible(version);
125 let mut out = Self::default();
126 if version >= 3 {
127 out.throttle_time_ms = get_i32(buf)?;
128 }
129 if (0..=7).contains(&version) {
130 out.topics = {
131 let n = crate::primitives::array::get_array_len(buf, flex)?;
132 let mut v = Vec::with_capacity(n);
133 for _ in 0..n {
134 v.push(OffsetFetchResponseTopic::decode_borrow(buf, version)?);
135 }
136 v
137 };
138 }
139 if (2..=7).contains(&version) {
140 out.error_code = get_i16(buf)?;
141 }
142 if version >= 8 {
143 out.groups = {
144 let n = crate::primitives::array::get_array_len(buf, flex)?;
145 let mut v = Vec::with_capacity(n);
146 for _ in 0..n {
147 v.push(OffsetFetchResponseGroup::decode_borrow(buf, version)?);
148 }
149 v
150 };
151 }
152 if flex {
153 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
154 }
155 Ok(out)
156 }
157}
158#[cfg(test)]
159impl OffsetFetchResponse<'_> {
160 #[must_use]
161 pub fn populated(version: i16) -> Self {
162 let mut m = Self::default();
163 if version >= 3 {
164 m.throttle_time_ms = 1i32;
165 }
166 if (0..=7).contains(&version) {
167 m.topics = vec![OffsetFetchResponseTopic::populated(version)];
168 }
169 if (2..=7).contains(&version) {
170 m.error_code = 1i16;
171 }
172 if version >= 8 {
173 m.groups = vec![OffsetFetchResponseGroup::populated(version)];
174 }
175 m
176 }
177}
178#[derive(Debug, Clone, PartialEq, Eq, Default)]
179pub struct OffsetFetchResponseTopic<'a> {
180 pub name: &'a str,
181 pub partitions: Vec<OffsetFetchResponsePartition<'a>>,
182 pub unknown_tagged_fields: UnknownTaggedFields,
183}
184impl OffsetFetchResponseTopic<'_> {
185 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseTopic {
186 crate::owned::offset_fetch_response::OffsetFetchResponseTopic {
187 name: (self.name).to_string(),
188 partitions: (self.partitions)
189 .iter()
190 .map(OffsetFetchResponsePartition::to_owned)
191 .collect(),
192 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
193 }
194 }
195}
196impl Encode for OffsetFetchResponseTopic<'_> {
197 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
198 let flex = version >= 6;
199 if (0..=7).contains(&version) {
200 if flex {
201 put_compact_string(buf, self.name);
202 } else {
203 put_string(buf, self.name);
204 }
205 }
206 if (0..=7).contains(&version) {
207 {
208 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
209 for it in &self.partitions {
210 it.encode(buf, version)?;
211 }
212 }
213 }
214 if flex {
215 let tagged = WriteTaggedFields::new();
216 tagged.write(buf, &self.unknown_tagged_fields);
217 }
218 Ok(())
219 }
220 fn encoded_len(&self, version: i16) -> usize {
221 let flex = version >= 6;
222 let mut n: usize = 0;
223 if (0..=7).contains(&version) {
224 n += if flex {
225 compact_string_len(self.name)
226 } else {
227 string_len(self.name)
228 };
229 }
230 if (0..=7).contains(&version) {
231 n += {
232 let prefix =
233 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
234 let body: usize = (self.partitions)
235 .iter()
236 .map(|it| it.encoded_len(version))
237 .sum();
238 prefix + body
239 };
240 }
241 if flex {
242 let known_pairs: Vec<(u32, usize)> = Vec::new();
243 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
244 }
245 n
246 }
247}
248impl<'de> DecodeBorrow<'de> for OffsetFetchResponseTopic<'de> {
249 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
250 let flex = version >= 6;
251 let mut out = Self::default();
252 if (0..=7).contains(&version) {
253 out.name = if flex {
254 get_compact_string_borrowed(buf)?
255 } else {
256 get_string_borrowed(buf)?
257 };
258 }
259 if (0..=7).contains(&version) {
260 out.partitions = {
261 let n = crate::primitives::array::get_array_len(buf, flex)?;
262 let mut v = Vec::with_capacity(n);
263 for _ in 0..n {
264 v.push(OffsetFetchResponsePartition::decode_borrow(buf, version)?);
265 }
266 v
267 };
268 }
269 if flex {
270 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
271 }
272 Ok(out)
273 }
274}
275#[cfg(test)]
276impl OffsetFetchResponseTopic<'_> {
277 #[must_use]
278 pub fn populated(version: i16) -> Self {
279 let mut m = Self::default();
280 if (0..=7).contains(&version) {
281 m.name = "x";
282 }
283 if (0..=7).contains(&version) {
284 m.partitions = vec![OffsetFetchResponsePartition::populated(version)];
285 }
286 m
287 }
288}
289#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct OffsetFetchResponsePartition<'a> {
291 pub partition_index: i32,
292 pub committed_offset: i64,
293 pub committed_leader_epoch: i32,
294 pub metadata: Option<&'a str>,
295 pub error_code: i16,
296 pub unknown_tagged_fields: UnknownTaggedFields,
297}
298impl Default for OffsetFetchResponsePartition<'_> {
299 fn default() -> Self {
300 Self {
301 partition_index: 0i32,
302 committed_offset: 0i64,
303 committed_leader_epoch: -1i32,
304 metadata: None,
305 error_code: 0i16,
306 unknown_tagged_fields: Default::default(),
307 }
308 }
309}
310impl OffsetFetchResponsePartition<'_> {
311 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponsePartition {
312 crate::owned::offset_fetch_response::OffsetFetchResponsePartition {
313 partition_index: (self.partition_index),
314 committed_offset: (self.committed_offset),
315 committed_leader_epoch: (self.committed_leader_epoch),
316 metadata: (self.metadata).map(std::string::ToString::to_string),
317 error_code: (self.error_code),
318 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
319 }
320 }
321}
322impl Encode for OffsetFetchResponsePartition<'_> {
323 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
324 let flex = version >= 6;
325 if (0..=7).contains(&version) {
326 put_i32(buf, self.partition_index);
327 }
328 if (0..=7).contains(&version) {
329 put_i64(buf, self.committed_offset);
330 }
331 if (5..=7).contains(&version) {
332 put_i32(buf, self.committed_leader_epoch);
333 }
334 if (0..=7).contains(&version) {
335 if flex {
336 put_compact_nullable_string(buf, self.metadata);
337 } else {
338 put_nullable_string(buf, self.metadata);
339 }
340 }
341 if (0..=7).contains(&version) {
342 put_i16(buf, self.error_code);
343 }
344 if flex {
345 let tagged = WriteTaggedFields::new();
346 tagged.write(buf, &self.unknown_tagged_fields);
347 }
348 Ok(())
349 }
350 fn encoded_len(&self, version: i16) -> usize {
351 let flex = version >= 6;
352 let mut n: usize = 0;
353 if (0..=7).contains(&version) {
354 n += 4;
355 }
356 if (0..=7).contains(&version) {
357 n += 8;
358 }
359 if (5..=7).contains(&version) {
360 n += 4;
361 }
362 if (0..=7).contains(&version) {
363 n += if flex {
364 compact_nullable_string_len(self.metadata)
365 } else {
366 nullable_string_len(self.metadata)
367 };
368 }
369 if (0..=7).contains(&version) {
370 n += 2;
371 }
372 if flex {
373 let known_pairs: Vec<(u32, usize)> = Vec::new();
374 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
375 }
376 n
377 }
378}
379impl<'de> DecodeBorrow<'de> for OffsetFetchResponsePartition<'de> {
380 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
381 let flex = version >= 6;
382 let mut out = Self::default();
383 if (0..=7).contains(&version) {
384 out.partition_index = get_i32(buf)?;
385 }
386 if (0..=7).contains(&version) {
387 out.committed_offset = get_i64(buf)?;
388 }
389 if (5..=7).contains(&version) {
390 out.committed_leader_epoch = get_i32(buf)?;
391 }
392 if (0..=7).contains(&version) {
393 out.metadata = if flex {
394 get_compact_nullable_string_borrowed(buf)?
395 } else {
396 get_nullable_string_borrowed(buf)?
397 };
398 }
399 if (0..=7).contains(&version) {
400 out.error_code = get_i16(buf)?;
401 }
402 if flex {
403 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
404 }
405 Ok(out)
406 }
407}
408#[cfg(test)]
409impl OffsetFetchResponsePartition<'_> {
410 #[must_use]
411 pub fn populated(version: i16) -> Self {
412 let mut m = Self::default();
413 if (0..=7).contains(&version) {
414 m.partition_index = 1i32;
415 }
416 if (0..=7).contains(&version) {
417 m.committed_offset = 1i64;
418 }
419 if (5..=7).contains(&version) {
420 m.committed_leader_epoch = 1i32;
421 }
422 if (0..=7).contains(&version) {
423 m.metadata = Some("x");
424 }
425 if (0..=7).contains(&version) {
426 m.error_code = 1i16;
427 }
428 m
429 }
430}
431#[derive(Debug, Clone, PartialEq, Eq, Default)]
432pub struct OffsetFetchResponseGroup<'a> {
433 pub group_id: &'a str,
434 pub topics: Vec<OffsetFetchResponseTopics<'a>>,
435 pub error_code: i16,
436 pub unknown_tagged_fields: UnknownTaggedFields,
437}
438impl OffsetFetchResponseGroup<'_> {
439 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseGroup {
440 crate::owned::offset_fetch_response::OffsetFetchResponseGroup {
441 group_id: (self.group_id).to_string(),
442 topics: (self.topics)
443 .iter()
444 .map(OffsetFetchResponseTopics::to_owned)
445 .collect(),
446 error_code: (self.error_code),
447 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
448 }
449 }
450}
451impl Encode for OffsetFetchResponseGroup<'_> {
452 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
453 let flex = version >= 6;
454 if version >= 8 {
455 if flex {
456 put_compact_string(buf, self.group_id);
457 } else {
458 put_string(buf, self.group_id);
459 }
460 }
461 if version >= 8 {
462 {
463 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
464 for it in &self.topics {
465 it.encode(buf, version)?;
466 }
467 }
468 }
469 if version >= 8 {
470 put_i16(buf, self.error_code);
471 }
472 if flex {
473 let tagged = WriteTaggedFields::new();
474 tagged.write(buf, &self.unknown_tagged_fields);
475 }
476 Ok(())
477 }
478 fn encoded_len(&self, version: i16) -> usize {
479 let flex = version >= 6;
480 let mut n: usize = 0;
481 if version >= 8 {
482 n += if flex {
483 compact_string_len(self.group_id)
484 } else {
485 string_len(self.group_id)
486 };
487 }
488 if version >= 8 {
489 n += {
490 let prefix =
491 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
492 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
493 prefix + body
494 };
495 }
496 if version >= 8 {
497 n += 2;
498 }
499 if flex {
500 let known_pairs: Vec<(u32, usize)> = Vec::new();
501 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
502 }
503 n
504 }
505}
506impl<'de> DecodeBorrow<'de> for OffsetFetchResponseGroup<'de> {
507 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
508 let flex = version >= 6;
509 let mut out = Self::default();
510 if version >= 8 {
511 out.group_id = if flex {
512 get_compact_string_borrowed(buf)?
513 } else {
514 get_string_borrowed(buf)?
515 };
516 }
517 if version >= 8 {
518 out.topics = {
519 let n = crate::primitives::array::get_array_len(buf, flex)?;
520 let mut v = Vec::with_capacity(n);
521 for _ in 0..n {
522 v.push(OffsetFetchResponseTopics::decode_borrow(buf, version)?);
523 }
524 v
525 };
526 }
527 if version >= 8 {
528 out.error_code = get_i16(buf)?;
529 }
530 if flex {
531 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
532 }
533 Ok(out)
534 }
535}
536#[cfg(test)]
537impl OffsetFetchResponseGroup<'_> {
538 #[must_use]
539 pub fn populated(version: i16) -> Self {
540 let mut m = Self::default();
541 if version >= 8 {
542 m.group_id = "x";
543 }
544 if version >= 8 {
545 m.topics = vec![OffsetFetchResponseTopics::populated(version)];
546 }
547 if version >= 8 {
548 m.error_code = 1i16;
549 }
550 m
551 }
552}
553#[derive(Debug, Clone, PartialEq, Eq, Default)]
554pub struct OffsetFetchResponseTopics<'a> {
555 pub name: &'a str,
556 pub topic_id: crate::primitives::uuid::Uuid,
557 pub partitions: Vec<OffsetFetchResponsePartitions<'a>>,
558 pub unknown_tagged_fields: UnknownTaggedFields,
559}
560impl OffsetFetchResponseTopics<'_> {
561 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseTopics {
562 crate::owned::offset_fetch_response::OffsetFetchResponseTopics {
563 name: (self.name).to_string(),
564 topic_id: (self.topic_id),
565 partitions: (self.partitions)
566 .iter()
567 .map(OffsetFetchResponsePartitions::to_owned)
568 .collect(),
569 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
570 }
571 }
572}
573impl Encode for OffsetFetchResponseTopics<'_> {
574 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
575 let flex = version >= 6;
576 if (8..=9).contains(&version) {
577 if flex {
578 put_compact_string(buf, self.name);
579 } else {
580 put_string(buf, self.name);
581 }
582 }
583 if version >= 10 {
584 crate::primitives::uuid::put_uuid(buf, self.topic_id);
585 }
586 if version >= 8 {
587 {
588 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
589 for it in &self.partitions {
590 it.encode(buf, version)?;
591 }
592 }
593 }
594 if flex {
595 let tagged = WriteTaggedFields::new();
596 tagged.write(buf, &self.unknown_tagged_fields);
597 }
598 Ok(())
599 }
600 fn encoded_len(&self, version: i16) -> usize {
601 let flex = version >= 6;
602 let mut n: usize = 0;
603 if (8..=9).contains(&version) {
604 n += if flex {
605 compact_string_len(self.name)
606 } else {
607 string_len(self.name)
608 };
609 }
610 if version >= 10 {
611 n += 16;
612 }
613 if version >= 8 {
614 n += {
615 let prefix =
616 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
617 let body: usize = (self.partitions)
618 .iter()
619 .map(|it| it.encoded_len(version))
620 .sum();
621 prefix + body
622 };
623 }
624 if flex {
625 let known_pairs: Vec<(u32, usize)> = Vec::new();
626 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
627 }
628 n
629 }
630}
631impl<'de> DecodeBorrow<'de> for OffsetFetchResponseTopics<'de> {
632 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
633 let flex = version >= 6;
634 let mut out = Self::default();
635 if (8..=9).contains(&version) {
636 out.name = if flex {
637 get_compact_string_borrowed(buf)?
638 } else {
639 get_string_borrowed(buf)?
640 };
641 }
642 if version >= 10 {
643 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
644 }
645 if version >= 8 {
646 out.partitions = {
647 let n = crate::primitives::array::get_array_len(buf, flex)?;
648 let mut v = Vec::with_capacity(n);
649 for _ in 0..n {
650 v.push(OffsetFetchResponsePartitions::decode_borrow(buf, version)?);
651 }
652 v
653 };
654 }
655 if flex {
656 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
657 }
658 Ok(out)
659 }
660}
661#[cfg(test)]
662impl OffsetFetchResponseTopics<'_> {
663 #[must_use]
664 pub fn populated(version: i16) -> Self {
665 let mut m = Self::default();
666 if (8..=9).contains(&version) {
667 m.name = "x";
668 }
669 if version >= 10 {
670 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
671 }
672 if version >= 8 {
673 m.partitions = vec![OffsetFetchResponsePartitions::populated(version)];
674 }
675 m
676 }
677}
678#[derive(Debug, Clone, PartialEq, Eq)]
679pub struct OffsetFetchResponsePartitions<'a> {
680 pub partition_index: i32,
681 pub committed_offset: i64,
682 pub committed_leader_epoch: i32,
683 pub metadata: Option<&'a str>,
684 pub error_code: i16,
685 pub unknown_tagged_fields: UnknownTaggedFields,
686}
687impl Default for OffsetFetchResponsePartitions<'_> {
688 fn default() -> Self {
689 Self {
690 partition_index: 0i32,
691 committed_offset: 0i64,
692 committed_leader_epoch: -1i32,
693 metadata: None,
694 error_code: 0i16,
695 unknown_tagged_fields: Default::default(),
696 }
697 }
698}
699impl OffsetFetchResponsePartitions<'_> {
700 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponsePartitions {
701 crate::owned::offset_fetch_response::OffsetFetchResponsePartitions {
702 partition_index: (self.partition_index),
703 committed_offset: (self.committed_offset),
704 committed_leader_epoch: (self.committed_leader_epoch),
705 metadata: (self.metadata).map(std::string::ToString::to_string),
706 error_code: (self.error_code),
707 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
708 }
709 }
710}
711impl Encode for OffsetFetchResponsePartitions<'_> {
712 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
713 let flex = version >= 6;
714 if version >= 8 {
715 put_i32(buf, self.partition_index);
716 }
717 if version >= 8 {
718 put_i64(buf, self.committed_offset);
719 }
720 if version >= 8 {
721 put_i32(buf, self.committed_leader_epoch);
722 }
723 if version >= 8 {
724 if flex {
725 put_compact_nullable_string(buf, self.metadata);
726 } else {
727 put_nullable_string(buf, self.metadata);
728 }
729 }
730 if version >= 8 {
731 put_i16(buf, self.error_code);
732 }
733 if flex {
734 let tagged = WriteTaggedFields::new();
735 tagged.write(buf, &self.unknown_tagged_fields);
736 }
737 Ok(())
738 }
739 fn encoded_len(&self, version: i16) -> usize {
740 let flex = version >= 6;
741 let mut n: usize = 0;
742 if version >= 8 {
743 n += 4;
744 }
745 if version >= 8 {
746 n += 8;
747 }
748 if version >= 8 {
749 n += 4;
750 }
751 if version >= 8 {
752 n += if flex {
753 compact_nullable_string_len(self.metadata)
754 } else {
755 nullable_string_len(self.metadata)
756 };
757 }
758 if version >= 8 {
759 n += 2;
760 }
761 if flex {
762 let known_pairs: Vec<(u32, usize)> = Vec::new();
763 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
764 }
765 n
766 }
767}
768impl<'de> DecodeBorrow<'de> for OffsetFetchResponsePartitions<'de> {
769 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
770 let flex = version >= 6;
771 let mut out = Self::default();
772 if version >= 8 {
773 out.partition_index = get_i32(buf)?;
774 }
775 if version >= 8 {
776 out.committed_offset = get_i64(buf)?;
777 }
778 if version >= 8 {
779 out.committed_leader_epoch = get_i32(buf)?;
780 }
781 if version >= 8 {
782 out.metadata = if flex {
783 get_compact_nullable_string_borrowed(buf)?
784 } else {
785 get_nullable_string_borrowed(buf)?
786 };
787 }
788 if version >= 8 {
789 out.error_code = get_i16(buf)?;
790 }
791 if flex {
792 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
793 }
794 Ok(out)
795 }
796}
797#[cfg(test)]
798impl OffsetFetchResponsePartitions<'_> {
799 #[must_use]
800 pub fn populated(version: i16) -> Self {
801 let mut m = Self::default();
802 if version >= 8 {
803 m.partition_index = 1i32;
804 }
805 if version >= 8 {
806 m.committed_offset = 1i64;
807 }
808 if version >= 8 {
809 m.committed_leader_epoch = 1i32;
810 }
811 if version >= 8 {
812 m.metadata = Some("x");
813 }
814 if version >= 8 {
815 m.error_code = 1i16;
816 }
817 m
818 }
819}