1use crate::primitives::fixed::{get_bool, get_i32, put_bool, put_i32};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
10use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
11use bytes::{Buf, BufMut};
12pub const API_KEY: i16 = 9;
13pub const MIN_VERSION: i16 = 1;
14pub const MAX_VERSION: i16 = 10;
15pub const FLEXIBLE_MIN: i16 = 6;
16#[inline]
17fn is_flexible(version: i16) -> bool {
18 version >= FLEXIBLE_MIN
19}
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub struct OffsetFetchRequest {
22 pub group_id: String,
23 pub topics: Option<Vec<OffsetFetchRequestTopic>>,
24 pub groups: Vec<OffsetFetchRequestGroup>,
25 pub require_stable: bool,
26 pub unknown_tagged_fields: UnknownTaggedFields,
27}
28impl Encode for OffsetFetchRequest {
29 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
30 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
31 return Err(ProtocolError::UnsupportedVersion {
32 api_key: API_KEY,
33 version,
34 });
35 }
36 let flex = is_flexible(version);
37 if (0..=7).contains(&version) {
38 if flex {
39 put_compact_string(buf, &self.group_id);
40 } else {
41 put_string(buf, &self.group_id);
42 }
43 }
44 if (0..=7).contains(&version) {
45 if version >= 2 {
46 {
47 let len = (self.topics).as_ref().map(Vec::len);
48 crate::primitives::array::put_nullable_array_len(buf, len, flex);
49 if let Some(v) = &self.topics {
50 for it in v {
51 it.encode(buf, version)?;
52 }
53 }
54 }
55 } else {
56 {
57 let v = (self.topics).as_deref().unwrap_or(&[]);
58 crate::primitives::array::put_array_len(buf, v.len(), flex);
59 for it in v {
60 it.encode(buf, version)?;
61 }
62 }
63 }
64 }
65 if version >= 8 {
66 {
67 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
68 for it in &self.groups {
69 it.encode(buf, version)?;
70 }
71 }
72 }
73 if version >= 7 {
74 put_bool(buf, self.require_stable);
75 }
76 if flex {
77 let tagged = WriteTaggedFields::new();
78 tagged.write(buf, &self.unknown_tagged_fields);
79 }
80 Ok(())
81 }
82 fn encoded_len(&self, version: i16) -> usize {
83 let flex = is_flexible(version);
84 let mut n: usize = 0;
85 if (0..=7).contains(&version) {
86 n += if flex {
87 compact_string_len(&self.group_id)
88 } else {
89 string_len(&self.group_id)
90 };
91 }
92 if (0..=7).contains(&version) {
93 n += if version >= 2 {
94 {
95 let opt: Option<&Vec<_>> = (self.topics).as_ref();
96 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
97 opt.map(std::vec::Vec::len),
98 flex,
99 );
100 let body: usize =
101 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
102 prefix + body
103 }
104 } else {
105 {
106 let v = (self.topics).as_deref().unwrap_or(&[]);
107 let prefix = crate::primitives::array::array_len_prefix_len(v.len(), flex);
108 let body: usize = v.iter().map(|it| it.encoded_len(version)).sum();
109 prefix + body
110 }
111 };
112 }
113 if version >= 8 {
114 n += {
115 let prefix =
116 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
117 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
118 prefix + body
119 };
120 }
121 if version >= 7 {
122 n += 1;
123 }
124 if flex {
125 let known_pairs: Vec<(u32, usize)> = Vec::new();
126 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
127 }
128 n
129 }
130}
131impl Decode<'_> for OffsetFetchRequest {
132 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
133 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
134 return Err(ProtocolError::UnsupportedVersion {
135 api_key: API_KEY,
136 version,
137 });
138 }
139 let flex = is_flexible(version);
140 let mut out = Self::default();
141 if (0..=7).contains(&version) {
142 out.group_id = if flex {
143 get_compact_string_owned(buf)?
144 } else {
145 get_string_owned(buf)?
146 };
147 }
148 if (0..=7).contains(&version) {
149 out.topics = if version >= 2 {
150 {
151 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
152 match opt {
153 None => None,
154 Some(n) => {
155 let mut v = Vec::with_capacity(n);
156 for _ in 0..n {
157 v.push(OffsetFetchRequestTopic::decode(buf, version)?);
158 }
159 Some(v)
160 }
161 }
162 }
163 } else {
164 Some({
165 let n = crate::primitives::array::get_array_len(buf, flex)?;
166 let mut v = Vec::with_capacity(n);
167 for _ in 0..n {
168 v.push(OffsetFetchRequestTopic::decode(buf, version)?);
169 }
170 v
171 })
172 };
173 }
174 if version >= 8 {
175 out.groups = {
176 let n = crate::primitives::array::get_array_len(buf, flex)?;
177 let mut v = Vec::with_capacity(n);
178 for _ in 0..n {
179 v.push(OffsetFetchRequestGroup::decode(buf, version)?);
180 }
181 v
182 };
183 }
184 if version >= 7 {
185 out.require_stable = get_bool(buf)?;
186 }
187 if flex {
188 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
189 }
190 Ok(out)
191 }
192}
193#[cfg(test)]
194impl OffsetFetchRequest {
195 #[must_use]
196 pub fn populated(version: i16) -> Self {
197 let mut m = Self::default();
198 if (0..=7).contains(&version) {
199 m.group_id = "x".to_string();
200 }
201 if (0..=7).contains(&version) {
202 m.topics = Some(vec![OffsetFetchRequestTopic::populated(version)]);
203 }
204 if version >= 8 {
205 m.groups = vec![OffsetFetchRequestGroup::populated(version)];
206 }
207 if version >= 7 {
208 m.require_stable = true;
209 }
210 m
211 }
212}
213#[derive(Debug, Clone, PartialEq, Eq, Default)]
214pub struct OffsetFetchRequestTopic {
215 pub name: String,
216 pub partition_indexes: Vec<i32>,
217 pub unknown_tagged_fields: UnknownTaggedFields,
218}
219impl Encode for OffsetFetchRequestTopic {
220 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
221 let flex = version >= 6;
222 if (0..=7).contains(&version) {
223 if flex {
224 put_compact_string(buf, &self.name);
225 } else {
226 put_string(buf, &self.name);
227 }
228 }
229 if (0..=7).contains(&version) {
230 {
231 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
232 for it in &self.partition_indexes {
233 put_i32(buf, *it);
234 }
235 }
236 }
237 if flex {
238 let tagged = WriteTaggedFields::new();
239 tagged.write(buf, &self.unknown_tagged_fields);
240 }
241 Ok(())
242 }
243 fn encoded_len(&self, version: i16) -> usize {
244 let flex = version >= 6;
245 let mut n: usize = 0;
246 if (0..=7).contains(&version) {
247 n += if flex {
248 compact_string_len(&self.name)
249 } else {
250 string_len(&self.name)
251 };
252 }
253 if (0..=7).contains(&version) {
254 n += {
255 let prefix = crate::primitives::array::array_len_prefix_len(
256 (self.partition_indexes).len(),
257 flex,
258 );
259 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
260 prefix + body
261 };
262 }
263 if flex {
264 let known_pairs: Vec<(u32, usize)> = Vec::new();
265 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
266 }
267 n
268 }
269}
270impl Decode<'_> for OffsetFetchRequestTopic {
271 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
272 let flex = version >= 6;
273 let mut out = Self::default();
274 if (0..=7).contains(&version) {
275 out.name = if flex {
276 get_compact_string_owned(buf)?
277 } else {
278 get_string_owned(buf)?
279 };
280 }
281 if (0..=7).contains(&version) {
282 out.partition_indexes = {
283 let n = crate::primitives::array::get_array_len(buf, flex)?;
284 let mut v = Vec::with_capacity(n);
285 for _ in 0..n {
286 v.push(get_i32(buf)?);
287 }
288 v
289 };
290 }
291 if flex {
292 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
293 }
294 Ok(out)
295 }
296}
297#[cfg(test)]
298impl OffsetFetchRequestTopic {
299 #[must_use]
300 pub fn populated(version: i16) -> Self {
301 let mut m = Self::default();
302 if (0..=7).contains(&version) {
303 m.name = "x".to_string();
304 }
305 if (0..=7).contains(&version) {
306 m.partition_indexes = vec![1i32];
307 }
308 m
309 }
310}
311#[derive(Debug, Clone, PartialEq, Eq)]
312pub struct OffsetFetchRequestGroup {
313 pub group_id: String,
314 pub member_id: Option<String>,
315 pub member_epoch: i32,
316 pub topics: Option<Vec<OffsetFetchRequestTopics>>,
317 pub unknown_tagged_fields: UnknownTaggedFields,
318}
319impl Default for OffsetFetchRequestGroup {
320 fn default() -> Self {
321 Self {
322 group_id: String::new(),
323 member_id: None,
324 member_epoch: -1i32,
325 topics: None,
326 unknown_tagged_fields: Default::default(),
327 }
328 }
329}
330impl Encode for OffsetFetchRequestGroup {
331 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
332 let flex = version >= 6;
333 if version >= 8 {
334 if flex {
335 put_compact_string(buf, &self.group_id);
336 } else {
337 put_string(buf, &self.group_id);
338 }
339 }
340 if version >= 9 {
341 if flex {
342 put_compact_nullable_string(buf, self.member_id.as_deref());
343 } else {
344 put_nullable_string(buf, self.member_id.as_deref());
345 }
346 }
347 if version >= 9 {
348 put_i32(buf, self.member_epoch);
349 }
350 if version >= 8 {
351 {
352 let len = (self.topics).as_ref().map(Vec::len);
353 crate::primitives::array::put_nullable_array_len(buf, len, flex);
354 if let Some(v) = &self.topics {
355 for it in v {
356 it.encode(buf, version)?;
357 }
358 }
359 }
360 }
361 if flex {
362 let tagged = WriteTaggedFields::new();
363 tagged.write(buf, &self.unknown_tagged_fields);
364 }
365 Ok(())
366 }
367 fn encoded_len(&self, version: i16) -> usize {
368 let flex = version >= 6;
369 let mut n: usize = 0;
370 if version >= 8 {
371 n += if flex {
372 compact_string_len(&self.group_id)
373 } else {
374 string_len(&self.group_id)
375 };
376 }
377 if version >= 9 {
378 n += if flex {
379 compact_nullable_string_len(self.member_id.as_deref())
380 } else {
381 nullable_string_len(self.member_id.as_deref())
382 };
383 }
384 if version >= 9 {
385 n += 4;
386 }
387 if version >= 8 {
388 n += {
389 let opt: Option<&Vec<_>> = (self.topics).as_ref();
390 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
391 opt.map(std::vec::Vec::len),
392 flex,
393 );
394 let body: usize =
395 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
396 prefix + body
397 };
398 }
399 if flex {
400 let known_pairs: Vec<(u32, usize)> = Vec::new();
401 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
402 }
403 n
404 }
405}
406impl Decode<'_> for OffsetFetchRequestGroup {
407 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
408 let flex = version >= 6;
409 let mut out = Self::default();
410 if version >= 8 {
411 out.group_id = if flex {
412 get_compact_string_owned(buf)?
413 } else {
414 get_string_owned(buf)?
415 };
416 }
417 if version >= 9 {
418 out.member_id = if flex {
419 get_compact_nullable_string_owned(buf)?
420 } else {
421 get_nullable_string_owned(buf)?
422 };
423 }
424 if version >= 9 {
425 out.member_epoch = get_i32(buf)?;
426 }
427 if version >= 8 {
428 out.topics = {
429 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
430 match opt {
431 None => None,
432 Some(n) => {
433 let mut v = Vec::with_capacity(n);
434 for _ in 0..n {
435 v.push(OffsetFetchRequestTopics::decode(buf, version)?);
436 }
437 Some(v)
438 }
439 }
440 };
441 }
442 if flex {
443 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
444 }
445 Ok(out)
446 }
447}
448#[cfg(test)]
449impl OffsetFetchRequestGroup {
450 #[must_use]
451 pub fn populated(version: i16) -> Self {
452 let mut m = Self::default();
453 if version >= 8 {
454 m.group_id = "x".to_string();
455 }
456 if version >= 9 {
457 m.member_id = Some("x".to_string());
458 }
459 if version >= 9 {
460 m.member_epoch = 1i32;
461 }
462 if version >= 8 {
463 m.topics = Some(vec![OffsetFetchRequestTopics::populated(version)]);
464 }
465 m
466 }
467}
468#[derive(Debug, Clone, PartialEq, Eq, Default)]
469pub struct OffsetFetchRequestTopics {
470 pub name: String,
471 pub topic_id: crate::primitives::uuid::Uuid,
472 pub partition_indexes: Vec<i32>,
473 pub unknown_tagged_fields: UnknownTaggedFields,
474}
475impl Encode for OffsetFetchRequestTopics {
476 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
477 let flex = version >= 6;
478 if (8..=9).contains(&version) {
479 if flex {
480 put_compact_string(buf, &self.name);
481 } else {
482 put_string(buf, &self.name);
483 }
484 }
485 if version >= 10 {
486 crate::primitives::uuid::put_uuid(buf, self.topic_id);
487 }
488 if version >= 8 {
489 {
490 crate::primitives::array::put_array_len(buf, (self.partition_indexes).len(), flex);
491 for it in &self.partition_indexes {
492 put_i32(buf, *it);
493 }
494 }
495 }
496 if flex {
497 let tagged = WriteTaggedFields::new();
498 tagged.write(buf, &self.unknown_tagged_fields);
499 }
500 Ok(())
501 }
502 fn encoded_len(&self, version: i16) -> usize {
503 let flex = version >= 6;
504 let mut n: usize = 0;
505 if (8..=9).contains(&version) {
506 n += if flex {
507 compact_string_len(&self.name)
508 } else {
509 string_len(&self.name)
510 };
511 }
512 if version >= 10 {
513 n += 16;
514 }
515 if version >= 8 {
516 n += {
517 let prefix = crate::primitives::array::array_len_prefix_len(
518 (self.partition_indexes).len(),
519 flex,
520 );
521 let body: usize = (self.partition_indexes).iter().map(|_| 4).sum();
522 prefix + body
523 };
524 }
525 if flex {
526 let known_pairs: Vec<(u32, usize)> = Vec::new();
527 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
528 }
529 n
530 }
531}
532impl Decode<'_> for OffsetFetchRequestTopics {
533 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
534 let flex = version >= 6;
535 let mut out = Self::default();
536 if (8..=9).contains(&version) {
537 out.name = if flex {
538 get_compact_string_owned(buf)?
539 } else {
540 get_string_owned(buf)?
541 };
542 }
543 if version >= 10 {
544 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
545 }
546 if version >= 8 {
547 out.partition_indexes = {
548 let n = crate::primitives::array::get_array_len(buf, flex)?;
549 let mut v = Vec::with_capacity(n);
550 for _ in 0..n {
551 v.push(get_i32(buf)?);
552 }
553 v
554 };
555 }
556 if flex {
557 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
558 }
559 Ok(out)
560 }
561}
562#[cfg(test)]
563impl OffsetFetchRequestTopics {
564 #[must_use]
565 pub fn populated(version: i16) -> Self {
566 let mut m = Self::default();
567 if (8..=9).contains(&version) {
568 m.name = "x".to_string();
569 }
570 if version >= 10 {
571 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
572 }
573 if version >= 8 {
574 m.partition_indexes = vec![1i32];
575 }
576 m
577 }
578}
579#[must_use]
582#[allow(unused_comparisons)]
583pub fn default_json(version: i16) -> ::serde_json::Value {
584 let mut obj = ::serde_json::Map::new();
585 if version <= 7 {
586 obj.insert(
587 "groupId".to_string(),
588 ::serde_json::Value::String(String::new()),
589 );
590 }
591 if version <= 7 {
592 obj.insert(
593 "topics".to_string(),
594 if (2..=7).contains(&version) {
595 ::serde_json::Value::Null
596 } else {
597 ::serde_json::Value::Array(vec![])
598 },
599 );
600 }
601 if version >= 8 {
602 obj.insert("groups".to_string(), ::serde_json::Value::Array(vec![]));
603 }
604 if version >= 7 {
605 obj.insert(
606 "requireStable".to_string(),
607 ::serde_json::Value::Bool(false),
608 );
609 }
610 ::serde_json::Value::Object(obj)
611}
612impl crate::ProtocolRequest for OffsetFetchRequest {
613 const API_KEY: i16 = API_KEY;
614 const MIN_VERSION: i16 = MIN_VERSION;
615 const MAX_VERSION: i16 = MAX_VERSION;
616 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
617 type Response = super::offset_fetch_response::OffsetFetchResponse;
618}