1use crate::primitives::fixed::{get_bool, get_i32, put_bool, put_i32};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes_borrowed::{
8 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
9 get_nullable_string_borrowed, get_string_borrowed,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::BufMut;
14pub const API_KEY: i16 = 9;
15pub const MIN_VERSION: i16 = 1;
16pub const MAX_VERSION: i16 = 10;
17pub const FLEXIBLE_MIN: i16 = 6;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct OffsetFetchRequest<'a> {
24 pub group_id: &'a str,
25 pub topics: Option<Vec<OffsetFetchRequestTopic<'a>>>,
26 pub groups: Vec<OffsetFetchRequestGroup<'a>>,
27 pub require_stable: bool,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30impl OffsetFetchRequest<'_> {
31 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequest {
32 crate::owned::offset_fetch_request::OffsetFetchRequest {
33 group_id: (self.group_id).to_string(),
34 topics: (self.topics)
35 .as_ref()
36 .map(|v| v.iter().map(OffsetFetchRequestTopic::to_owned).collect()),
37 groups: (self.groups)
38 .iter()
39 .map(OffsetFetchRequestGroup::to_owned)
40 .collect(),
41 require_stable: (self.require_stable),
42 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
43 }
44 }
45}
46impl Encode for OffsetFetchRequest<'_> {
47 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
48 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
49 return Err(ProtocolError::UnsupportedVersion {
50 api_key: API_KEY,
51 version,
52 });
53 }
54 let flex = is_flexible(version);
55 if (0..=7).contains(&version) {
56 if flex {
57 put_compact_string(buf, self.group_id);
58 } else {
59 put_string(buf, self.group_id);
60 }
61 }
62 if (0..=7).contains(&version) {
63 if version >= 2 {
64 {
65 let len = (self.topics).as_ref().map(Vec::len);
66 crate::primitives::array::put_nullable_array_len(buf, len, flex);
67 if let Some(v) = &self.topics {
68 for it in v {
69 it.encode(buf, version)?;
70 }
71 }
72 }
73 } else {
74 {
75 let v = (self.topics).as_deref().unwrap_or(&[]);
76 crate::primitives::array::put_array_len(buf, v.len(), flex);
77 for it in v {
78 it.encode(buf, version)?;
79 }
80 }
81 }
82 }
83 if version >= 8 {
84 {
85 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
86 for it in &self.groups {
87 it.encode(buf, version)?;
88 }
89 }
90 }
91 if version >= 7 {
92 put_bool(buf, self.require_stable);
93 }
94 if flex {
95 let tagged = WriteTaggedFields::new();
96 tagged.write(buf, &self.unknown_tagged_fields);
97 }
98 Ok(())
99 }
100 fn encoded_len(&self, version: i16) -> usize {
101 let flex = is_flexible(version);
102 let mut n: usize = 0;
103 if (0..=7).contains(&version) {
104 n += if flex {
105 compact_string_len(self.group_id)
106 } else {
107 string_len(self.group_id)
108 };
109 }
110 if (0..=7).contains(&version) {
111 n += if version >= 2 {
112 {
113 let opt: Option<&Vec<_>> = (self.topics).as_ref();
114 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
115 opt.map(std::vec::Vec::len),
116 flex,
117 );
118 let body: usize =
119 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
120 prefix + body
121 }
122 } else {
123 {
124 let v = (self.topics).as_deref().unwrap_or(&[]);
125 let prefix = crate::primitives::array::array_len_prefix_len(v.len(), flex);
126 let body: usize = v.iter().map(|it| it.encoded_len(version)).sum();
127 prefix + body
128 }
129 };
130 }
131 if version >= 8 {
132 n += {
133 let prefix =
134 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
135 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
136 prefix + body
137 };
138 }
139 if version >= 7 {
140 n += 1;
141 }
142 if flex {
143 let known_pairs: Vec<(u32, usize)> = Vec::new();
144 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
145 }
146 n
147 }
148}
149impl<'de> DecodeBorrow<'de> for OffsetFetchRequest<'de> {
150 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
151 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
152 return Err(ProtocolError::UnsupportedVersion {
153 api_key: API_KEY,
154 version,
155 });
156 }
157 let flex = is_flexible(version);
158 let mut out = Self::default();
159 if (0..=7).contains(&version) {
160 out.group_id = if flex {
161 get_compact_string_borrowed(buf)?
162 } else {
163 get_string_borrowed(buf)?
164 };
165 }
166 if (0..=7).contains(&version) {
167 out.topics = if version >= 2 {
168 {
169 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
170 match opt {
171 None => None,
172 Some(n) => {
173 let mut v = Vec::with_capacity(n);
174 for _ in 0..n {
175 v.push(OffsetFetchRequestTopic::decode_borrow(buf, version)?);
176 }
177 Some(v)
178 }
179 }
180 }
181 } else {
182 Some({
183 let n = crate::primitives::array::get_array_len(buf, flex)?;
184 let mut v = Vec::with_capacity(n);
185 for _ in 0..n {
186 v.push(OffsetFetchRequestTopic::decode_borrow(buf, version)?);
187 }
188 v
189 })
190 };
191 }
192 if version >= 8 {
193 out.groups = {
194 let n = crate::primitives::array::get_array_len(buf, flex)?;
195 let mut v = Vec::with_capacity(n);
196 for _ in 0..n {
197 v.push(OffsetFetchRequestGroup::decode_borrow(buf, version)?);
198 }
199 v
200 };
201 }
202 if version >= 7 {
203 out.require_stable = get_bool(buf)?;
204 }
205 if flex {
206 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
207 }
208 Ok(out)
209 }
210}
211#[cfg(test)]
212impl OffsetFetchRequest<'_> {
213 #[must_use]
214 pub fn populated(version: i16) -> Self {
215 let mut m = Self::default();
216 if (0..=7).contains(&version) {
217 m.group_id = "x";
218 }
219 if (0..=7).contains(&version) {
220 m.topics = Some(vec![OffsetFetchRequestTopic::populated(version)]);
221 }
222 if version >= 8 {
223 m.groups = vec![OffsetFetchRequestGroup::populated(version)];
224 }
225 if version >= 7 {
226 m.require_stable = true;
227 }
228 m
229 }
230}
231#[derive(Debug, Clone, PartialEq, Eq, Default)]
232pub struct OffsetFetchRequestTopic<'a> {
233 pub name: &'a str,
234 pub partition_indexes: Vec<i32>,
235 pub unknown_tagged_fields: UnknownTaggedFields,
236}
237impl OffsetFetchRequestTopic<'_> {
238 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequestTopic {
239 crate::owned::offset_fetch_request::OffsetFetchRequestTopic {
240 name: (self.name).to_string(),
241 partition_indexes: (self.partition_indexes).clone(),
242 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
243 }
244 }
245}
246impl Encode for OffsetFetchRequestTopic<'_> {
247 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
248 let flex = version >= 6;
249 if (0..=7).contains(&version) {
250 if flex {
251 put_compact_string(buf, self.name);
252 } else {
253 put_string(buf, self.name);
254 }
255 }
256 if (0..=7).contains(&version) {
257 {
258 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
259 for it in &self.partition_indexes {
260 put_i32(buf, *it);
261 }
262 }
263 }
264 if flex {
265 let tagged = WriteTaggedFields::new();
266 tagged.write(buf, &self.unknown_tagged_fields);
267 }
268 Ok(())
269 }
270 fn encoded_len(&self, version: i16) -> usize {
271 let flex = version >= 6;
272 let mut n: usize = 0;
273 if (0..=7).contains(&version) {
274 n += if flex {
275 compact_string_len(self.name)
276 } else {
277 string_len(self.name)
278 };
279 }
280 if (0..=7).contains(&version) {
281 n += {
282 let prefix = crate::primitives::array::array_len_prefix_len(
283 (self.partition_indexes).len(),
284 flex,
285 );
286 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
287 prefix + body
288 };
289 }
290 if flex {
291 let known_pairs: Vec<(u32, usize)> = Vec::new();
292 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
293 }
294 n
295 }
296}
297impl<'de> DecodeBorrow<'de> for OffsetFetchRequestTopic<'de> {
298 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
299 let flex = version >= 6;
300 let mut out = Self::default();
301 if (0..=7).contains(&version) {
302 out.name = if flex {
303 get_compact_string_borrowed(buf)?
304 } else {
305 get_string_borrowed(buf)?
306 };
307 }
308 if (0..=7).contains(&version) {
309 out.partition_indexes = {
310 let n = crate::primitives::array::get_array_len(buf, flex)?;
311 let mut v = Vec::with_capacity(n);
312 for _ in 0..n {
313 v.push(get_i32(buf)?);
314 }
315 v
316 };
317 }
318 if flex {
319 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
320 }
321 Ok(out)
322 }
323}
324#[cfg(test)]
325impl OffsetFetchRequestTopic<'_> {
326 #[must_use]
327 pub fn populated(version: i16) -> Self {
328 let mut m = Self::default();
329 if (0..=7).contains(&version) {
330 m.name = "x";
331 }
332 if (0..=7).contains(&version) {
333 m.partition_indexes = vec![1i32];
334 }
335 m
336 }
337}
338#[derive(Debug, Clone, PartialEq, Eq)]
339pub struct OffsetFetchRequestGroup<'a> {
340 pub group_id: &'a str,
341 pub member_id: Option<&'a str>,
342 pub member_epoch: i32,
343 pub topics: Option<Vec<OffsetFetchRequestTopics<'a>>>,
344 pub unknown_tagged_fields: UnknownTaggedFields,
345}
346impl Default for OffsetFetchRequestGroup<'_> {
347 fn default() -> Self {
348 Self {
349 group_id: "",
350 member_id: None,
351 member_epoch: -1i32,
352 topics: None,
353 unknown_tagged_fields: Default::default(),
354 }
355 }
356}
357impl OffsetFetchRequestGroup<'_> {
358 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequestGroup {
359 crate::owned::offset_fetch_request::OffsetFetchRequestGroup {
360 group_id: (self.group_id).to_string(),
361 member_id: (self.member_id).map(std::string::ToString::to_string),
362 member_epoch: (self.member_epoch),
363 topics: (self.topics)
364 .as_ref()
365 .map(|v| v.iter().map(OffsetFetchRequestTopics::to_owned).collect()),
366 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
367 }
368 }
369}
370impl Encode for OffsetFetchRequestGroup<'_> {
371 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
372 let flex = version >= 6;
373 if version >= 8 {
374 if flex {
375 put_compact_string(buf, self.group_id);
376 } else {
377 put_string(buf, self.group_id);
378 }
379 }
380 if version >= 9 {
381 if flex {
382 put_compact_nullable_string(buf, self.member_id);
383 } else {
384 put_nullable_string(buf, self.member_id);
385 }
386 }
387 if version >= 9 {
388 put_i32(buf, self.member_epoch);
389 }
390 if version >= 8 {
391 {
392 let len = (self.topics).as_ref().map(Vec::len);
393 crate::primitives::array::put_nullable_array_len(buf, len, flex);
394 if let Some(v) = &self.topics {
395 for it in v {
396 it.encode(buf, version)?;
397 }
398 }
399 }
400 }
401 if flex {
402 let tagged = WriteTaggedFields::new();
403 tagged.write(buf, &self.unknown_tagged_fields);
404 }
405 Ok(())
406 }
407 fn encoded_len(&self, version: i16) -> usize {
408 let flex = version >= 6;
409 let mut n: usize = 0;
410 if version >= 8 {
411 n += if flex {
412 compact_string_len(self.group_id)
413 } else {
414 string_len(self.group_id)
415 };
416 }
417 if version >= 9 {
418 n += if flex {
419 compact_nullable_string_len(self.member_id)
420 } else {
421 nullable_string_len(self.member_id)
422 };
423 }
424 if version >= 9 {
425 n += 4;
426 }
427 if version >= 8 {
428 n += {
429 let opt: Option<&Vec<_>> = (self.topics).as_ref();
430 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
431 opt.map(std::vec::Vec::len),
432 flex,
433 );
434 let body: usize =
435 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
436 prefix + body
437 };
438 }
439 if flex {
440 let known_pairs: Vec<(u32, usize)> = Vec::new();
441 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
442 }
443 n
444 }
445}
446impl<'de> DecodeBorrow<'de> for OffsetFetchRequestGroup<'de> {
447 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
448 let flex = version >= 6;
449 let mut out = Self::default();
450 if version >= 8 {
451 out.group_id = if flex {
452 get_compact_string_borrowed(buf)?
453 } else {
454 get_string_borrowed(buf)?
455 };
456 }
457 if version >= 9 {
458 out.member_id = if flex {
459 get_compact_nullable_string_borrowed(buf)?
460 } else {
461 get_nullable_string_borrowed(buf)?
462 };
463 }
464 if version >= 9 {
465 out.member_epoch = get_i32(buf)?;
466 }
467 if version >= 8 {
468 out.topics = {
469 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
470 match opt {
471 None => None,
472 Some(n) => {
473 let mut v = Vec::with_capacity(n);
474 for _ in 0..n {
475 v.push(OffsetFetchRequestTopics::decode_borrow(buf, version)?);
476 }
477 Some(v)
478 }
479 }
480 };
481 }
482 if flex {
483 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
484 }
485 Ok(out)
486 }
487}
488#[cfg(test)]
489impl OffsetFetchRequestGroup<'_> {
490 #[must_use]
491 pub fn populated(version: i16) -> Self {
492 let mut m = Self::default();
493 if version >= 8 {
494 m.group_id = "x";
495 }
496 if version >= 9 {
497 m.member_id = Some("x");
498 }
499 if version >= 9 {
500 m.member_epoch = 1i32;
501 }
502 if version >= 8 {
503 m.topics = Some(vec![OffsetFetchRequestTopics::populated(version)]);
504 }
505 m
506 }
507}
508#[derive(Debug, Clone, PartialEq, Eq, Default)]
509pub struct OffsetFetchRequestTopics<'a> {
510 pub name: &'a str,
511 pub topic_id: crate::primitives::uuid::Uuid,
512 pub partition_indexes: Vec<i32>,
513 pub unknown_tagged_fields: UnknownTaggedFields,
514}
515impl OffsetFetchRequestTopics<'_> {
516 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequestTopics {
517 crate::owned::offset_fetch_request::OffsetFetchRequestTopics {
518 name: (self.name).to_string(),
519 topic_id: (self.topic_id),
520 partition_indexes: (self.partition_indexes).clone(),
521 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
522 }
523 }
524}
525impl Encode for OffsetFetchRequestTopics<'_> {
526 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
527 let flex = version >= 6;
528 if (8..=9).contains(&version) {
529 if flex {
530 put_compact_string(buf, self.name);
531 } else {
532 put_string(buf, self.name);
533 }
534 }
535 if version >= 10 {
536 crate::primitives::uuid::put_uuid(buf, self.topic_id);
537 }
538 if version >= 8 {
539 {
540 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
541 for it in &self.partition_indexes {
542 put_i32(buf, *it);
543 }
544 }
545 }
546 if flex {
547 let tagged = WriteTaggedFields::new();
548 tagged.write(buf, &self.unknown_tagged_fields);
549 }
550 Ok(())
551 }
552 fn encoded_len(&self, version: i16) -> usize {
553 let flex = version >= 6;
554 let mut n: usize = 0;
555 if (8..=9).contains(&version) {
556 n += if flex {
557 compact_string_len(self.name)
558 } else {
559 string_len(self.name)
560 };
561 }
562 if version >= 10 {
563 n += 16;
564 }
565 if version >= 8 {
566 n += {
567 let prefix = crate::primitives::array::array_len_prefix_len(
568 (self.partition_indexes).len(),
569 flex,
570 );
571 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
572 prefix + body
573 };
574 }
575 if flex {
576 let known_pairs: Vec<(u32, usize)> = Vec::new();
577 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
578 }
579 n
580 }
581}
582impl<'de> DecodeBorrow<'de> for OffsetFetchRequestTopics<'de> {
583 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
584 let flex = version >= 6;
585 let mut out = Self::default();
586 if (8..=9).contains(&version) {
587 out.name = if flex {
588 get_compact_string_borrowed(buf)?
589 } else {
590 get_string_borrowed(buf)?
591 };
592 }
593 if version >= 10 {
594 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
595 }
596 if version >= 8 {
597 out.partition_indexes = {
598 let n = crate::primitives::array::get_array_len(buf, flex)?;
599 let mut v = Vec::with_capacity(n);
600 for _ in 0..n {
601 v.push(get_i32(buf)?);
602 }
603 v
604 };
605 }
606 if flex {
607 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
608 }
609 Ok(out)
610 }
611}
612#[cfg(test)]
613impl OffsetFetchRequestTopics<'_> {
614 #[must_use]
615 pub fn populated(version: i16) -> Self {
616 let mut m = Self::default();
617 if (8..=9).contains(&version) {
618 m.name = "x";
619 }
620 if version >= 10 {
621 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
622 }
623 if version >= 8 {
624 m.partition_indexes = vec![1i32];
625 }
626 m
627 }
628}