1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i32, put_bool, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 9;
15pub const MIN_VERSION: i16 = 1;
16pub const MAX_VERSION: i16 = 10;
17pub const FLEXIBLE_MIN: i16 = 6;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Default)]
25pub struct OffsetFetchRequest {
26 pub group_id: String,
27 pub topics: Option<Vec<OffsetFetchRequestTopic>>,
28 pub groups: Vec<OffsetFetchRequestGroup>,
29 pub require_stable: bool,
30 pub unknown_tagged_fields: UnknownTaggedFields,
31}
32impl Encode for OffsetFetchRequest {
33 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
34 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
35 return Err(ProtocolError::UnsupportedVersion {
36 api_key: API_KEY,
37 version,
38 });
39 }
40 let flex = is_flexible(version);
41 if (0..=7).contains(&version) {
42 if flex {
43 put_compact_string(buf, &self.group_id);
44 } else {
45 put_string(buf, &self.group_id);
46 }
47 }
48 if (0..=7).contains(&version) {
49 if version >= 2 {
50 {
51 let len = (self.topics).as_ref().map(Vec::len);
52 crate::primitives::array::put_nullable_array_len(buf, len, flex);
53 if let Some(v) = &self.topics {
54 for it in v {
55 it.encode(buf, version)?;
56 }
57 }
58 }
59 } else {
60 {
61 let v = (self.topics).as_deref().unwrap_or(&[]);
62 crate::primitives::array::put_array_len(buf, v.len(), flex);
63 for it in v {
64 it.encode(buf, version)?;
65 }
66 }
67 }
68 }
69 if version >= 8 {
70 {
71 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
72 for it in &self.groups {
73 it.encode(buf, version)?;
74 }
75 }
76 }
77 if version >= 7 {
78 put_bool(buf, self.require_stable);
79 }
80 if flex {
81 let tagged = WriteTaggedFields::new();
82 tagged.write(buf, &self.unknown_tagged_fields);
83 }
84 Ok(())
85 }
86 fn encoded_len(&self, version: i16) -> usize {
87 let flex = is_flexible(version);
88 let mut n: usize = 0;
89 if (0..=7).contains(&version) {
90 n += if flex {
91 compact_string_len(&self.group_id)
92 } else {
93 string_len(&self.group_id)
94 };
95 }
96 if (0..=7).contains(&version) {
97 n += if version >= 2 {
98 {
99 let opt: Option<&Vec<_>> = (self.topics).as_ref();
100 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
101 opt.map(std::vec::Vec::len),
102 flex,
103 );
104 let body: usize =
105 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
106 prefix + body
107 }
108 } else {
109 {
110 let v = (self.topics).as_deref().unwrap_or(&[]);
111 let prefix = crate::primitives::array::array_len_prefix_len(v.len(), flex);
112 let body: usize = v.iter().map(|it| it.encoded_len(version)).sum();
113 prefix + body
114 }
115 };
116 }
117 if version >= 8 {
118 n += {
119 let prefix =
120 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
121 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
122 prefix + body
123 };
124 }
125 if version >= 7 {
126 n += 1;
127 }
128 if flex {
129 let known_pairs: Vec<(u32, usize)> = Vec::new();
130 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
131 }
132 n
133 }
134}
135impl Decode<'_> for OffsetFetchRequest {
136 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
137 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
138 return Err(ProtocolError::UnsupportedVersion {
139 api_key: API_KEY,
140 version,
141 });
142 }
143 let flex = is_flexible(version);
144 let mut out = Self::default();
145 if (0..=7).contains(&version) {
146 out.group_id = if flex {
147 get_compact_string_owned(buf)?
148 } else {
149 get_string_owned(buf)?
150 };
151 }
152 if (0..=7).contains(&version) {
153 out.topics = if version >= 2 {
154 {
155 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
156 match opt {
157 None => None,
158 Some(n) => {
159 let mut v = Vec::with_capacity(n);
160 for _ in 0..n {
161 v.push(OffsetFetchRequestTopic::decode(buf, version)?);
162 }
163 Some(v)
164 }
165 }
166 }
167 } else {
168 Some({
169 let n = crate::primitives::array::get_array_len(buf, flex)?;
170 let mut v = Vec::with_capacity(n);
171 for _ in 0..n {
172 v.push(OffsetFetchRequestTopic::decode(buf, version)?);
173 }
174 v
175 })
176 };
177 }
178 if version >= 8 {
179 out.groups = {
180 let n = crate::primitives::array::get_array_len(buf, flex)?;
181 let mut v = Vec::with_capacity(n);
182 for _ in 0..n {
183 v.push(OffsetFetchRequestGroup::decode(buf, version)?);
184 }
185 v
186 };
187 }
188 if version >= 7 {
189 out.require_stable = get_bool(buf)?;
190 }
191 if flex {
192 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
193 }
194 Ok(out)
195 }
196}
197#[cfg(test)]
198impl OffsetFetchRequest {
199 #[must_use]
200 pub fn populated(version: i16) -> Self {
201 let mut m = Self::default();
202 if (0..=7).contains(&version) {
203 m.group_id = "x".to_string();
204 }
205 if (0..=7).contains(&version) {
206 m.topics = Some(vec![OffsetFetchRequestTopic::populated(version)]);
207 }
208 if version >= 8 {
209 m.groups = vec![OffsetFetchRequestGroup::populated(version)];
210 }
211 if version >= 7 {
212 m.require_stable = true;
213 }
214 m
215 }
216}
217#[derive(Debug, Clone, PartialEq, Eq, Default)]
218pub struct OffsetFetchRequestTopic {
219 pub name: String,
220 pub partition_indexes: Vec<i32>,
221 pub unknown_tagged_fields: UnknownTaggedFields,
222}
223impl Encode for OffsetFetchRequestTopic {
224 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
225 let flex = version >= 6;
226 if (0..=7).contains(&version) {
227 if flex {
228 put_compact_string(buf, &self.name);
229 } else {
230 put_string(buf, &self.name);
231 }
232 }
233 if (0..=7).contains(&version) {
234 {
235 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
236 for it in &self.partition_indexes {
237 put_i32(buf, *it);
238 }
239 }
240 }
241 if flex {
242 let tagged = WriteTaggedFields::new();
243 tagged.write(buf, &self.unknown_tagged_fields);
244 }
245 Ok(())
246 }
247 fn encoded_len(&self, version: i16) -> usize {
248 let flex = version >= 6;
249 let mut n: usize = 0;
250 if (0..=7).contains(&version) {
251 n += if flex {
252 compact_string_len(&self.name)
253 } else {
254 string_len(&self.name)
255 };
256 }
257 if (0..=7).contains(&version) {
258 n += {
259 let prefix = crate::primitives::array::array_len_prefix_len(
260 (self.partition_indexes).len(),
261 flex,
262 );
263 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
264 prefix + body
265 };
266 }
267 if flex {
268 let known_pairs: Vec<(u32, usize)> = Vec::new();
269 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
270 }
271 n
272 }
273}
274impl Decode<'_> for OffsetFetchRequestTopic {
275 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
276 let flex = version >= 6;
277 let mut out = Self::default();
278 if (0..=7).contains(&version) {
279 out.name = if flex {
280 get_compact_string_owned(buf)?
281 } else {
282 get_string_owned(buf)?
283 };
284 }
285 if (0..=7).contains(&version) {
286 out.partition_indexes = {
287 let n = crate::primitives::array::get_array_len(buf, flex)?;
288 let mut v = Vec::with_capacity(n);
289 for _ in 0..n {
290 v.push(get_i32(buf)?);
291 }
292 v
293 };
294 }
295 if flex {
296 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
297 }
298 Ok(out)
299 }
300}
301#[cfg(test)]
302impl OffsetFetchRequestTopic {
303 #[must_use]
304 pub fn populated(version: i16) -> Self {
305 let mut m = Self::default();
306 if (0..=7).contains(&version) {
307 m.name = "x".to_string();
308 }
309 if (0..=7).contains(&version) {
310 m.partition_indexes = vec![1i32];
311 }
312 m
313 }
314}
315#[derive(Debug, Clone, PartialEq, Eq)]
316pub struct OffsetFetchRequestGroup {
317 pub group_id: String,
318 pub member_id: Option<String>,
319 pub member_epoch: i32,
320 pub topics: Option<Vec<OffsetFetchRequestTopics>>,
321 pub unknown_tagged_fields: UnknownTaggedFields,
322}
323impl Default for OffsetFetchRequestGroup {
324 fn default() -> Self {
325 Self {
326 group_id: String::new(),
327 member_id: None,
328 member_epoch: -1i32,
329 topics: None,
330 unknown_tagged_fields: Default::default(),
331 }
332 }
333}
334impl Encode for OffsetFetchRequestGroup {
335 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
336 let flex = version >= 6;
337 if version >= 8 {
338 if flex {
339 put_compact_string(buf, &self.group_id);
340 } else {
341 put_string(buf, &self.group_id);
342 }
343 }
344 if version >= 9 {
345 if flex {
346 put_compact_nullable_string(buf, self.member_id.as_deref());
347 } else {
348 put_nullable_string(buf, self.member_id.as_deref());
349 }
350 }
351 if version >= 9 {
352 put_i32(buf, self.member_epoch);
353 }
354 if version >= 8 {
355 {
356 let len = (self.topics).as_ref().map(Vec::len);
357 crate::primitives::array::put_nullable_array_len(buf, len, flex);
358 if let Some(v) = &self.topics {
359 for it in v {
360 it.encode(buf, version)?;
361 }
362 }
363 }
364 }
365 if flex {
366 let tagged = WriteTaggedFields::new();
367 tagged.write(buf, &self.unknown_tagged_fields);
368 }
369 Ok(())
370 }
371 fn encoded_len(&self, version: i16) -> usize {
372 let flex = version >= 6;
373 let mut n: usize = 0;
374 if version >= 8 {
375 n += if flex {
376 compact_string_len(&self.group_id)
377 } else {
378 string_len(&self.group_id)
379 };
380 }
381 if version >= 9 {
382 n += if flex {
383 compact_nullable_string_len(self.member_id.as_deref())
384 } else {
385 nullable_string_len(self.member_id.as_deref())
386 };
387 }
388 if version >= 9 {
389 n += 4;
390 }
391 if version >= 8 {
392 n += {
393 let opt: Option<&Vec<_>> = (self.topics).as_ref();
394 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
395 opt.map(std::vec::Vec::len),
396 flex,
397 );
398 let body: usize =
399 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
400 prefix + body
401 };
402 }
403 if flex {
404 let known_pairs: Vec<(u32, usize)> = Vec::new();
405 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
406 }
407 n
408 }
409}
410impl Decode<'_> for OffsetFetchRequestGroup {
411 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
412 let flex = version >= 6;
413 let mut out = Self::default();
414 if version >= 8 {
415 out.group_id = if flex {
416 get_compact_string_owned(buf)?
417 } else {
418 get_string_owned(buf)?
419 };
420 }
421 if version >= 9 {
422 out.member_id = if flex {
423 get_compact_nullable_string_owned(buf)?
424 } else {
425 get_nullable_string_owned(buf)?
426 };
427 }
428 if version >= 9 {
429 out.member_epoch = get_i32(buf)?;
430 }
431 if version >= 8 {
432 out.topics = {
433 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
434 match opt {
435 None => None,
436 Some(n) => {
437 let mut v = Vec::with_capacity(n);
438 for _ in 0..n {
439 v.push(OffsetFetchRequestTopics::decode(buf, version)?);
440 }
441 Some(v)
442 }
443 }
444 };
445 }
446 if flex {
447 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
448 }
449 Ok(out)
450 }
451}
452#[cfg(test)]
453impl OffsetFetchRequestGroup {
454 #[must_use]
455 pub fn populated(version: i16) -> Self {
456 let mut m = Self::default();
457 if version >= 8 {
458 m.group_id = "x".to_string();
459 }
460 if version >= 9 {
461 m.member_id = Some("x".to_string());
462 }
463 if version >= 9 {
464 m.member_epoch = 1i32;
465 }
466 if version >= 8 {
467 m.topics = Some(vec![OffsetFetchRequestTopics::populated(version)]);
468 }
469 m
470 }
471}
472#[derive(Debug, Clone, PartialEq, Eq, Default)]
473pub struct OffsetFetchRequestTopics {
474 pub name: String,
475 pub topic_id: crate::primitives::uuid::Uuid,
476 pub partition_indexes: Vec<i32>,
477 pub unknown_tagged_fields: UnknownTaggedFields,
478}
479impl Encode for OffsetFetchRequestTopics {
480 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
481 let flex = version >= 6;
482 if (8..=9).contains(&version) {
483 if flex {
484 put_compact_string(buf, &self.name);
485 } else {
486 put_string(buf, &self.name);
487 }
488 }
489 if version >= 10 {
490 crate::primitives::uuid::put_uuid(buf, self.topic_id);
491 }
492 if version >= 8 {
493 {
494 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
495 for it in &self.partition_indexes {
496 put_i32(buf, *it);
497 }
498 }
499 }
500 if flex {
501 let tagged = WriteTaggedFields::new();
502 tagged.write(buf, &self.unknown_tagged_fields);
503 }
504 Ok(())
505 }
506 fn encoded_len(&self, version: i16) -> usize {
507 let flex = version >= 6;
508 let mut n: usize = 0;
509 if (8..=9).contains(&version) {
510 n += if flex {
511 compact_string_len(&self.name)
512 } else {
513 string_len(&self.name)
514 };
515 }
516 if version >= 10 {
517 n += 16;
518 }
519 if version >= 8 {
520 n += {
521 let prefix = crate::primitives::array::array_len_prefix_len(
522 (self.partition_indexes).len(),
523 flex,
524 );
525 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
526 prefix + body
527 };
528 }
529 if flex {
530 let known_pairs: Vec<(u32, usize)> = Vec::new();
531 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
532 }
533 n
534 }
535}
536impl Decode<'_> for OffsetFetchRequestTopics {
537 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
538 let flex = version >= 6;
539 let mut out = Self::default();
540 if (8..=9).contains(&version) {
541 out.name = if flex {
542 get_compact_string_owned(buf)?
543 } else {
544 get_string_owned(buf)?
545 };
546 }
547 if version >= 10 {
548 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
549 }
550 if version >= 8 {
551 out.partition_indexes = {
552 let n = crate::primitives::array::get_array_len(buf, flex)?;
553 let mut v = Vec::with_capacity(n);
554 for _ in 0..n {
555 v.push(get_i32(buf)?);
556 }
557 v
558 };
559 }
560 if flex {
561 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
562 }
563 Ok(out)
564 }
565}
566#[cfg(test)]
567impl OffsetFetchRequestTopics {
568 #[must_use]
569 pub fn populated(version: i16) -> Self {
570 let mut m = Self::default();
571 if (8..=9).contains(&version) {
572 m.name = "x".to_string();
573 }
574 if version >= 10 {
575 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
576 }
577 if version >= 8 {
578 m.partition_indexes = vec![1i32];
579 }
580 m
581 }
582}
583
584#[must_use]
587#[allow(unused_comparisons)]
588pub fn default_json(version: i16) -> ::serde_json::Value {
589 let mut obj = ::serde_json::Map::new();
590 if version <= 7 {
591 obj.insert(
592 "groupId".to_string(),
593 ::serde_json::Value::String(String::new()),
594 );
595 }
596 if version <= 7 {
597 obj.insert(
598 "topics".to_string(),
599 if (2..=7).contains(&version) {
600 ::serde_json::Value::Null
601 } else {
602 ::serde_json::Value::Array(vec![])
603 },
604 );
605 }
606 if version >= 8 {
607 obj.insert("groups".to_string(), ::serde_json::Value::Array(vec![]));
608 }
609 if version >= 7 {
610 obj.insert(
611 "requireStable".to_string(),
612 ::serde_json::Value::Bool(false),
613 );
614 }
615 ::serde_json::Value::Object(obj)
616}
617
618impl crate::ProtocolRequest for OffsetFetchRequest {
619 const API_KEY: i16 = API_KEY;
620 const MIN_VERSION: i16 = MIN_VERSION;
621 const MAX_VERSION: i16 = MAX_VERSION;
622 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
623 type Response = super::offset_fetch_response::OffsetFetchResponse;
624}