1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_bool, get_i32, put_bool, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
9};
10use crate::primitives::string_bytes_borrowed::{
11 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
12 get_nullable_string_borrowed, get_string_borrowed,
13};
14use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
15use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
16
17pub const API_KEY: i16 = 9;
18pub const MIN_VERSION: i16 = 1;
19pub const MAX_VERSION: i16 = 10;
20pub const FLEXIBLE_MIN: i16 = 6;
21
22#[inline]
23fn is_flexible(version: i16) -> bool {
24 version >= FLEXIBLE_MIN
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Default)]
28pub struct OffsetFetchRequest<'a> {
29 pub group_id: &'a str,
30 pub topics: Option<Vec<OffsetFetchRequestTopic<'a>>>,
31 pub groups: Vec<OffsetFetchRequestGroup<'a>>,
32 pub require_stable: bool,
33 pub unknown_tagged_fields: UnknownTaggedFields,
34}
35impl OffsetFetchRequest<'_> {
36 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequest {
37 crate::owned::offset_fetch_request::OffsetFetchRequest {
38 group_id: (self.group_id).to_string(),
39 topics: (self.topics)
40 .as_ref()
41 .map(|v| v.iter().map(OffsetFetchRequestTopic::to_owned).collect()),
42 groups: (self.groups)
43 .iter()
44 .map(OffsetFetchRequestGroup::to_owned)
45 .collect(),
46 require_stable: (self.require_stable),
47 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
48 }
49 }
50}
51impl Encode for OffsetFetchRequest<'_> {
52 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
53 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
54 return Err(ProtocolError::UnsupportedVersion {
55 api_key: API_KEY,
56 version,
57 });
58 }
59 let flex = is_flexible(version);
60 if (0..=7).contains(&version) {
61 if flex {
62 put_compact_string(buf, self.group_id);
63 } else {
64 put_string(buf, self.group_id);
65 }
66 }
67 if (0..=7).contains(&version) {
68 if version >= 2 {
69 {
70 let len = (self.topics).as_ref().map(Vec::len);
71 crate::primitives::array::put_nullable_array_len(buf, len, flex);
72 if let Some(v) = &self.topics {
73 for it in v {
74 it.encode(buf, version)?;
75 }
76 }
77 }
78 } else {
79 {
80 let v = (self.topics).as_deref().unwrap_or(&[]);
81 crate::primitives::array::put_array_len(buf, v.len(), flex);
82 for it in v {
83 it.encode(buf, version)?;
84 }
85 }
86 }
87 }
88 if version >= 8 {
89 {
90 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
91 for it in &self.groups {
92 it.encode(buf, version)?;
93 }
94 }
95 }
96 if version >= 7 {
97 put_bool(buf, self.require_stable);
98 }
99 if flex {
100 let tagged = WriteTaggedFields::new();
101 tagged.write(buf, &self.unknown_tagged_fields);
102 }
103 Ok(())
104 }
105 fn encoded_len(&self, version: i16) -> usize {
106 let flex = is_flexible(version);
107 let mut n: usize = 0;
108 if (0..=7).contains(&version) {
109 n += if flex {
110 compact_string_len(self.group_id)
111 } else {
112 string_len(self.group_id)
113 };
114 }
115 if (0..=7).contains(&version) {
116 n += if version >= 2 {
117 {
118 let opt: Option<&Vec<_>> = (self.topics).as_ref();
119 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
120 opt.map(std::vec::Vec::len),
121 flex,
122 );
123 let body: usize =
124 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
125 prefix + body
126 }
127 } else {
128 {
129 let v = (self.topics).as_deref().unwrap_or(&[]);
130 let prefix = crate::primitives::array::array_len_prefix_len(v.len(), flex);
131 let body: usize = v.iter().map(|it| it.encoded_len(version)).sum();
132 prefix + body
133 }
134 };
135 }
136 if version >= 8 {
137 n += {
138 let prefix =
139 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
140 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
141 prefix + body
142 };
143 }
144 if version >= 7 {
145 n += 1;
146 }
147 if flex {
148 let known_pairs: Vec<(u32, usize)> = Vec::new();
149 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
150 }
151 n
152 }
153}
154impl<'de> DecodeBorrow<'de> for OffsetFetchRequest<'de> {
155 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
156 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
157 return Err(ProtocolError::UnsupportedVersion {
158 api_key: API_KEY,
159 version,
160 });
161 }
162 let flex = is_flexible(version);
163 let mut out = Self::default();
164 if (0..=7).contains(&version) {
165 out.group_id = if flex {
166 get_compact_string_borrowed(buf)?
167 } else {
168 get_string_borrowed(buf)?
169 };
170 }
171 if (0..=7).contains(&version) {
172 out.topics = if version >= 2 {
173 {
174 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
175 match opt {
176 None => None,
177 Some(n) => {
178 let mut v = Vec::with_capacity(n);
179 for _ in 0..n {
180 v.push(OffsetFetchRequestTopic::decode_borrow(buf, version)?);
181 }
182 Some(v)
183 }
184 }
185 }
186 } else {
187 Some({
188 let n = crate::primitives::array::get_array_len(buf, flex)?;
189 let mut v = Vec::with_capacity(n);
190 for _ in 0..n {
191 v.push(OffsetFetchRequestTopic::decode_borrow(buf, version)?);
192 }
193 v
194 })
195 };
196 }
197 if version >= 8 {
198 out.groups = {
199 let n = crate::primitives::array::get_array_len(buf, flex)?;
200 let mut v = Vec::with_capacity(n);
201 for _ in 0..n {
202 v.push(OffsetFetchRequestGroup::decode_borrow(buf, version)?);
203 }
204 v
205 };
206 }
207 if version >= 7 {
208 out.require_stable = get_bool(buf)?;
209 }
210 if flex {
211 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
212 }
213 Ok(out)
214 }
215}
216#[cfg(test)]
217impl OffsetFetchRequest<'_> {
218 #[must_use]
219 pub fn populated(version: i16) -> Self {
220 let mut m = Self::default();
221 if (0..=7).contains(&version) {
222 m.group_id = "x";
223 }
224 if (0..=7).contains(&version) {
225 m.topics = Some(vec![OffsetFetchRequestTopic::populated(version)]);
226 }
227 if version >= 8 {
228 m.groups = vec![OffsetFetchRequestGroup::populated(version)];
229 }
230 if version >= 7 {
231 m.require_stable = true;
232 }
233 m
234 }
235}
236#[derive(Debug, Clone, PartialEq, Eq, Default)]
237pub struct OffsetFetchRequestTopic<'a> {
238 pub name: &'a str,
239 pub partition_indexes: Vec<i32>,
240 pub unknown_tagged_fields: UnknownTaggedFields,
241}
242impl OffsetFetchRequestTopic<'_> {
243 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequestTopic {
244 crate::owned::offset_fetch_request::OffsetFetchRequestTopic {
245 name: (self.name).to_string(),
246 partition_indexes: (self.partition_indexes).clone(),
247 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
248 }
249 }
250}
251impl Encode for OffsetFetchRequestTopic<'_> {
252 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
253 let flex = version >= 6;
254 if (0..=7).contains(&version) {
255 if flex {
256 put_compact_string(buf, self.name);
257 } else {
258 put_string(buf, self.name);
259 }
260 }
261 if (0..=7).contains(&version) {
262 {
263 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
264 for it in &self.partition_indexes {
265 put_i32(buf, *it);
266 }
267 }
268 }
269 if flex {
270 let tagged = WriteTaggedFields::new();
271 tagged.write(buf, &self.unknown_tagged_fields);
272 }
273 Ok(())
274 }
275 fn encoded_len(&self, version: i16) -> usize {
276 let flex = version >= 6;
277 let mut n: usize = 0;
278 if (0..=7).contains(&version) {
279 n += if flex {
280 compact_string_len(self.name)
281 } else {
282 string_len(self.name)
283 };
284 }
285 if (0..=7).contains(&version) {
286 n += {
287 let prefix = crate::primitives::array::array_len_prefix_len(
288 (self.partition_indexes).len(),
289 flex,
290 );
291 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
292 prefix + body
293 };
294 }
295 if flex {
296 let known_pairs: Vec<(u32, usize)> = Vec::new();
297 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
298 }
299 n
300 }
301}
302impl<'de> DecodeBorrow<'de> for OffsetFetchRequestTopic<'de> {
303 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
304 let flex = version >= 6;
305 let mut out = Self::default();
306 if (0..=7).contains(&version) {
307 out.name = if flex {
308 get_compact_string_borrowed(buf)?
309 } else {
310 get_string_borrowed(buf)?
311 };
312 }
313 if (0..=7).contains(&version) {
314 out.partition_indexes = {
315 let n = crate::primitives::array::get_array_len(buf, flex)?;
316 let mut v = Vec::with_capacity(n);
317 for _ in 0..n {
318 v.push(get_i32(buf)?);
319 }
320 v
321 };
322 }
323 if flex {
324 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
325 }
326 Ok(out)
327 }
328}
329#[cfg(test)]
330impl OffsetFetchRequestTopic<'_> {
331 #[must_use]
332 pub fn populated(version: i16) -> Self {
333 let mut m = Self::default();
334 if (0..=7).contains(&version) {
335 m.name = "x";
336 }
337 if (0..=7).contains(&version) {
338 m.partition_indexes = vec![1i32];
339 }
340 m
341 }
342}
343#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct OffsetFetchRequestGroup<'a> {
345 pub group_id: &'a str,
346 pub member_id: Option<&'a str>,
347 pub member_epoch: i32,
348 pub topics: Option<Vec<OffsetFetchRequestTopics<'a>>>,
349 pub unknown_tagged_fields: UnknownTaggedFields,
350}
351impl Default for OffsetFetchRequestGroup<'_> {
352 fn default() -> Self {
353 Self {
354 group_id: "",
355 member_id: None,
356 member_epoch: -1i32,
357 topics: None,
358 unknown_tagged_fields: Default::default(),
359 }
360 }
361}
362impl OffsetFetchRequestGroup<'_> {
363 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequestGroup {
364 crate::owned::offset_fetch_request::OffsetFetchRequestGroup {
365 group_id: (self.group_id).to_string(),
366 member_id: (self.member_id).map(std::string::ToString::to_string),
367 member_epoch: (self.member_epoch),
368 topics: (self.topics)
369 .as_ref()
370 .map(|v| v.iter().map(OffsetFetchRequestTopics::to_owned).collect()),
371 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
372 }
373 }
374}
375impl Encode for OffsetFetchRequestGroup<'_> {
376 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
377 let flex = version >= 6;
378 if version >= 8 {
379 if flex {
380 put_compact_string(buf, self.group_id);
381 } else {
382 put_string(buf, self.group_id);
383 }
384 }
385 if version >= 9 {
386 if flex {
387 put_compact_nullable_string(buf, self.member_id);
388 } else {
389 put_nullable_string(buf, self.member_id);
390 }
391 }
392 if version >= 9 {
393 put_i32(buf, self.member_epoch);
394 }
395 if version >= 8 {
396 {
397 let len = (self.topics).as_ref().map(Vec::len);
398 crate::primitives::array::put_nullable_array_len(buf, len, flex);
399 if let Some(v) = &self.topics {
400 for it in v {
401 it.encode(buf, version)?;
402 }
403 }
404 }
405 }
406 if flex {
407 let tagged = WriteTaggedFields::new();
408 tagged.write(buf, &self.unknown_tagged_fields);
409 }
410 Ok(())
411 }
412 fn encoded_len(&self, version: i16) -> usize {
413 let flex = version >= 6;
414 let mut n: usize = 0;
415 if version >= 8 {
416 n += if flex {
417 compact_string_len(self.group_id)
418 } else {
419 string_len(self.group_id)
420 };
421 }
422 if version >= 9 {
423 n += if flex {
424 compact_nullable_string_len(self.member_id)
425 } else {
426 nullable_string_len(self.member_id)
427 };
428 }
429 if version >= 9 {
430 n += 4;
431 }
432 if version >= 8 {
433 n += {
434 let opt: Option<&Vec<_>> = (self.topics).as_ref();
435 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
436 opt.map(std::vec::Vec::len),
437 flex,
438 );
439 let body: usize =
440 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
441 prefix + body
442 };
443 }
444 if flex {
445 let known_pairs: Vec<(u32, usize)> = Vec::new();
446 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
447 }
448 n
449 }
450}
451impl<'de> DecodeBorrow<'de> for OffsetFetchRequestGroup<'de> {
452 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
453 let flex = version >= 6;
454 let mut out = Self::default();
455 if version >= 8 {
456 out.group_id = if flex {
457 get_compact_string_borrowed(buf)?
458 } else {
459 get_string_borrowed(buf)?
460 };
461 }
462 if version >= 9 {
463 out.member_id = if flex {
464 get_compact_nullable_string_borrowed(buf)?
465 } else {
466 get_nullable_string_borrowed(buf)?
467 };
468 }
469 if version >= 9 {
470 out.member_epoch = get_i32(buf)?;
471 }
472 if version >= 8 {
473 out.topics = {
474 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
475 match opt {
476 None => None,
477 Some(n) => {
478 let mut v = Vec::with_capacity(n);
479 for _ in 0..n {
480 v.push(OffsetFetchRequestTopics::decode_borrow(buf, version)?);
481 }
482 Some(v)
483 }
484 }
485 };
486 }
487 if flex {
488 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
489 }
490 Ok(out)
491 }
492}
493#[cfg(test)]
494impl OffsetFetchRequestGroup<'_> {
495 #[must_use]
496 pub fn populated(version: i16) -> Self {
497 let mut m = Self::default();
498 if version >= 8 {
499 m.group_id = "x";
500 }
501 if version >= 9 {
502 m.member_id = Some("x");
503 }
504 if version >= 9 {
505 m.member_epoch = 1i32;
506 }
507 if version >= 8 {
508 m.topics = Some(vec![OffsetFetchRequestTopics::populated(version)]);
509 }
510 m
511 }
512}
513#[derive(Debug, Clone, PartialEq, Eq, Default)]
514pub struct OffsetFetchRequestTopics<'a> {
515 pub name: &'a str,
516 pub topic_id: crate::primitives::uuid::Uuid,
517 pub partition_indexes: Vec<i32>,
518 pub unknown_tagged_fields: UnknownTaggedFields,
519}
520impl OffsetFetchRequestTopics<'_> {
521 pub fn to_owned(&self) -> crate::owned::offset_fetch_request::OffsetFetchRequestTopics {
522 crate::owned::offset_fetch_request::OffsetFetchRequestTopics {
523 name: (self.name).to_string(),
524 topic_id: (self.topic_id),
525 partition_indexes: (self.partition_indexes).clone(),
526 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
527 }
528 }
529}
530impl Encode for OffsetFetchRequestTopics<'_> {
531 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
532 let flex = version >= 6;
533 if (8..=9).contains(&version) {
534 if flex {
535 put_compact_string(buf, self.name);
536 } else {
537 put_string(buf, self.name);
538 }
539 }
540 if version >= 10 {
541 crate::primitives::uuid::put_uuid(buf, self.topic_id);
542 }
543 if version >= 8 {
544 {
545 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
546 for it in &self.partition_indexes {
547 put_i32(buf, *it);
548 }
549 }
550 }
551 if flex {
552 let tagged = WriteTaggedFields::new();
553 tagged.write(buf, &self.unknown_tagged_fields);
554 }
555 Ok(())
556 }
557 fn encoded_len(&self, version: i16) -> usize {
558 let flex = version >= 6;
559 let mut n: usize = 0;
560 if (8..=9).contains(&version) {
561 n += if flex {
562 compact_string_len(self.name)
563 } else {
564 string_len(self.name)
565 };
566 }
567 if version >= 10 {
568 n += 16;
569 }
570 if version >= 8 {
571 n += {
572 let prefix = crate::primitives::array::array_len_prefix_len(
573 (self.partition_indexes).len(),
574 flex,
575 );
576 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
577 prefix + body
578 };
579 }
580 if flex {
581 let known_pairs: Vec<(u32, usize)> = Vec::new();
582 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
583 }
584 n
585 }
586}
587impl<'de> DecodeBorrow<'de> for OffsetFetchRequestTopics<'de> {
588 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
589 let flex = version >= 6;
590 let mut out = Self::default();
591 if (8..=9).contains(&version) {
592 out.name = if flex {
593 get_compact_string_borrowed(buf)?
594 } else {
595 get_string_borrowed(buf)?
596 };
597 }
598 if version >= 10 {
599 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
600 }
601 if version >= 8 {
602 out.partition_indexes = {
603 let n = crate::primitives::array::get_array_len(buf, flex)?;
604 let mut v = Vec::with_capacity(n);
605 for _ in 0..n {
606 v.push(get_i32(buf)?);
607 }
608 v
609 };
610 }
611 if flex {
612 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
613 }
614 Ok(out)
615 }
616}
617#[cfg(test)]
618impl OffsetFetchRequestTopics<'_> {
619 #[must_use]
620 pub fn populated(version: i16) -> Self {
621 let mut m = Self::default();
622 if (8..=9).contains(&version) {
623 m.name = "x";
624 }
625 if version >= 10 {
626 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
627 }
628 if version >= 8 {
629 m.partition_indexes = vec![1i32];
630 }
631 m
632 }
633}