1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
9 string_len,
10};
11use crate::primitives::string_bytes_borrowed::{
12 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
13 get_nullable_string_borrowed, get_string_borrowed,
14};
15use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
16use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 9;
19pub const MIN_VERSION: i16 = 1;
20pub const MAX_VERSION: i16 = 10;
21pub const FLEXIBLE_MIN: i16 = 6;
22
23#[inline]
24fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct OffsetFetchResponse<'a> {
28 pub throttle_time_ms: i32,
29 pub topics: Vec<OffsetFetchResponseTopic<'a>>,
30 pub error_code: i16,
31 pub groups: Vec<OffsetFetchResponseGroup<'a>>,
32 pub unknown_tagged_fields: UnknownTaggedFields,
33}
34
35impl<'a> Default for OffsetFetchResponse<'a> {
36 fn default() -> Self {
37 Self {
38 throttle_time_ms: 0i32,
39 topics: Vec::new(),
40 error_code: 0i16,
41 groups: Vec::new(),
42 unknown_tagged_fields: Default::default(),
43 }
44 }
45}
46
47impl<'a> OffsetFetchResponse<'a> {
48 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponse {
49 crate::owned::offset_fetch_response::OffsetFetchResponse {
50 throttle_time_ms: (self.throttle_time_ms),
51 topics: (self.topics).iter().map(|it| it.to_owned()).collect(),
52 error_code: (self.error_code),
53 groups: (self.groups).iter().map(|it| it.to_owned()).collect(),
54 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
55 }
56 }
57}
58
59impl<'a> Encode for OffsetFetchResponse<'a> {
60 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
61 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
62 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
63 }
64 let flex = is_flexible(version);
65 if version >= 3 { put_i32(buf, self.throttle_time_ms) }
66 if version >= 0 && version <= 7 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
67 if version >= 2 && version <= 7 { put_i16(buf, self.error_code) }
68 if version >= 8 { { crate::primitives::array::put_array_len(buf, (self.groups).len(), flex); for it in &self.groups { it.encode(buf, version)?; } } }
69 if flex {
70 let tagged = WriteTaggedFields::new();
71 tagged.write(buf, &self.unknown_tagged_fields);
72 }
73 Ok(())
74 }
75 fn encoded_len(&self, version: i16) -> usize {
76 let flex = is_flexible(version);
77 let mut n: usize = 0;
78 if version >= 3 { n += 4; }
79 if version >= 0 && version <= 7 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
80 if version >= 2 && version <= 7 { n += 2; }
81 if version >= 8 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.groups).len(), flex); let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
82 if flex {
83 let known_pairs: Vec<(u32, usize)> = Vec::new();
84 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
85 }
86 n
87 }
88}
89
90impl<'de> DecodeBorrow<'de> for OffsetFetchResponse<'de> {
91 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
92 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
93 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
94 }
95 let flex = is_flexible(version);
96 let mut out = Self::default();
97 if version >= 3 { out.throttle_time_ms = get_i32(buf)?; }
98 if version >= 0 && version <= 7 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(OffsetFetchResponseTopic::decode_borrow(buf, version)?); } v }; }
99 if version >= 2 && version <= 7 { out.error_code = get_i16(buf)?; }
100 if version >= 8 { out.groups = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(OffsetFetchResponseGroup::decode_borrow(buf, version)?); } v }; }
101 if flex {
102 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
103 Ok(false)
104 })?;
105 }
106 Ok(out)
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct OffsetFetchResponseTopic<'a> {
112 pub name: &'a str,
113 pub partitions: Vec<OffsetFetchResponsePartition<'a>>,
114 pub unknown_tagged_fields: UnknownTaggedFields,
115}
116
117impl<'a> Default for OffsetFetchResponseTopic<'a> {
118 fn default() -> Self {
119 Self {
120 name: "",
121 partitions: Vec::new(),
122 unknown_tagged_fields: Default::default(),
123 }
124 }
125}
126
127impl<'a> OffsetFetchResponseTopic<'a> {
128 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseTopic {
129 crate::owned::offset_fetch_response::OffsetFetchResponseTopic {
130 name: (self.name).to_string(),
131 partitions: (self.partitions).iter().map(|it| it.to_owned()).collect(),
132 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
133 }
134 }
135}
136
137impl<'a> Encode for OffsetFetchResponseTopic<'a> {
138 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
139 let flex = version >= 6;
140 if version >= 0 && version <= 7 { if flex { put_compact_string(buf, self.name) } else { put_string(buf, self.name) } }
141 if version >= 0 && version <= 7 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
142 if flex {
143 let tagged = WriteTaggedFields::new();
144 tagged.write(buf, &self.unknown_tagged_fields);
145 }
146 Ok(())
147 }
148 fn encoded_len(&self, version: i16) -> usize {
149 let flex = version >= 6;
150 let mut n: usize = 0;
151 if version >= 0 && version <= 7 { n += if flex { compact_string_len(self.name) } else { string_len(self.name) }; }
152 if version >= 0 && version <= 7 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
153 if flex {
154 let known_pairs: Vec<(u32, usize)> = Vec::new();
155 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
156 }
157 n
158 }
159}
160
161impl<'de> DecodeBorrow<'de> for OffsetFetchResponseTopic<'de> {
162 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
163 let flex = version >= 6;
164 let mut out = Self::default();
165 if version >= 0 && version <= 7 { out.name = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
166 if version >= 0 && version <= 7 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(OffsetFetchResponsePartition::decode_borrow(buf, version)?); } v }; }
167 if flex {
168 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
169 Ok(false)
170 })?;
171 }
172 Ok(out)
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct OffsetFetchResponsePartition<'a> {
178 pub partition_index: i32,
179 pub committed_offset: i64,
180 pub committed_leader_epoch: i32,
181 pub metadata: Option<&'a str>,
182 pub error_code: i16,
183 pub unknown_tagged_fields: UnknownTaggedFields,
184}
185
186impl<'a> Default for OffsetFetchResponsePartition<'a> {
187 fn default() -> Self {
188 Self {
189 partition_index: 0i32,
190 committed_offset: 0i64,
191 committed_leader_epoch: -1i32,
192 metadata: None,
193 error_code: 0i16,
194 unknown_tagged_fields: Default::default(),
195 }
196 }
197}
198
199impl<'a> OffsetFetchResponsePartition<'a> {
200 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponsePartition {
201 crate::owned::offset_fetch_response::OffsetFetchResponsePartition {
202 partition_index: (self.partition_index),
203 committed_offset: (self.committed_offset),
204 committed_leader_epoch: (self.committed_leader_epoch),
205 metadata: (self.metadata).map(|s| s.to_string()),
206 error_code: (self.error_code),
207 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
208 }
209 }
210}
211
212impl<'a> Encode for OffsetFetchResponsePartition<'a> {
213 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
214 let flex = version >= 6;
215 if version >= 0 && version <= 7 { put_i32(buf, self.partition_index) }
216 if version >= 0 && version <= 7 { put_i64(buf, self.committed_offset) }
217 if version >= 5 && version <= 7 { put_i32(buf, self.committed_leader_epoch) }
218 if version >= 0 && version <= 7 { if flex { put_compact_nullable_string(buf, self.metadata) } else { put_nullable_string(buf, self.metadata) } }
219 if version >= 0 && version <= 7 { put_i16(buf, self.error_code) }
220 if flex {
221 let tagged = WriteTaggedFields::new();
222 tagged.write(buf, &self.unknown_tagged_fields);
223 }
224 Ok(())
225 }
226 fn encoded_len(&self, version: i16) -> usize {
227 let flex = version >= 6;
228 let mut n: usize = 0;
229 if version >= 0 && version <= 7 { n += 4; }
230 if version >= 0 && version <= 7 { n += 8; }
231 if version >= 5 && version <= 7 { n += 4; }
232 if version >= 0 && version <= 7 { n += if flex { compact_nullable_string_len(self.metadata) } else { nullable_string_len(self.metadata) }; }
233 if version >= 0 && version <= 7 { n += 2; }
234 if flex {
235 let known_pairs: Vec<(u32, usize)> = Vec::new();
236 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
237 }
238 n
239 }
240}
241
242impl<'de> DecodeBorrow<'de> for OffsetFetchResponsePartition<'de> {
243 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
244 let flex = version >= 6;
245 let mut out = Self::default();
246 if version >= 0 && version <= 7 { out.partition_index = get_i32(buf)?; }
247 if version >= 0 && version <= 7 { out.committed_offset = get_i64(buf)?; }
248 if version >= 5 && version <= 7 { out.committed_leader_epoch = get_i32(buf)?; }
249 if version >= 0 && version <= 7 { out.metadata = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
250 if version >= 0 && version <= 7 { out.error_code = get_i16(buf)?; }
251 if flex {
252 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
253 Ok(false)
254 })?;
255 }
256 Ok(out)
257 }
258}
259
260#[derive(Debug, Clone, PartialEq, Eq)]
261pub struct OffsetFetchResponseGroup<'a> {
262 pub group_id: &'a str,
263 pub topics: Vec<OffsetFetchResponseTopics<'a>>,
264 pub error_code: i16,
265 pub unknown_tagged_fields: UnknownTaggedFields,
266}
267
268impl<'a> Default for OffsetFetchResponseGroup<'a> {
269 fn default() -> Self {
270 Self {
271 group_id: "",
272 topics: Vec::new(),
273 error_code: 0i16,
274 unknown_tagged_fields: Default::default(),
275 }
276 }
277}
278
279impl<'a> OffsetFetchResponseGroup<'a> {
280 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseGroup {
281 crate::owned::offset_fetch_response::OffsetFetchResponseGroup {
282 group_id: (self.group_id).to_string(),
283 topics: (self.topics).iter().map(|it| it.to_owned()).collect(),
284 error_code: (self.error_code),
285 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
286 }
287 }
288}
289
290impl<'a> Encode for OffsetFetchResponseGroup<'a> {
291 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
292 let flex = version >= 6;
293 if version >= 8 { if flex { put_compact_string(buf, self.group_id) } else { put_string(buf, self.group_id) } }
294 if version >= 8 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
295 if version >= 8 { put_i16(buf, self.error_code) }
296 if flex {
297 let tagged = WriteTaggedFields::new();
298 tagged.write(buf, &self.unknown_tagged_fields);
299 }
300 Ok(())
301 }
302 fn encoded_len(&self, version: i16) -> usize {
303 let flex = version >= 6;
304 let mut n: usize = 0;
305 if version >= 8 { n += if flex { compact_string_len(self.group_id) } else { string_len(self.group_id) }; }
306 if version >= 8 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
307 if version >= 8 { n += 2; }
308 if flex {
309 let known_pairs: Vec<(u32, usize)> = Vec::new();
310 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
311 }
312 n
313 }
314}
315
316impl<'de> DecodeBorrow<'de> for OffsetFetchResponseGroup<'de> {
317 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
318 let flex = version >= 6;
319 let mut out = Self::default();
320 if version >= 8 { out.group_id = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
321 if version >= 8 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(OffsetFetchResponseTopics::decode_borrow(buf, version)?); } v }; }
322 if version >= 8 { out.error_code = get_i16(buf)?; }
323 if flex {
324 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
325 Ok(false)
326 })?;
327 }
328 Ok(out)
329 }
330}
331
332#[derive(Debug, Clone, PartialEq, Eq)]
333pub struct OffsetFetchResponseTopics<'a> {
334 pub name: &'a str,
335 pub topic_id: crate::primitives::uuid::Uuid,
336 pub partitions: Vec<OffsetFetchResponsePartitions<'a>>,
337 pub unknown_tagged_fields: UnknownTaggedFields,
338}
339
340impl<'a> Default for OffsetFetchResponseTopics<'a> {
341 fn default() -> Self {
342 Self {
343 name: "",
344 topic_id: Default::default(),
345 partitions: Vec::new(),
346 unknown_tagged_fields: Default::default(),
347 }
348 }
349}
350
351impl<'a> OffsetFetchResponseTopics<'a> {
352 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseTopics {
353 crate::owned::offset_fetch_response::OffsetFetchResponseTopics {
354 name: (self.name).to_string(),
355 topic_id: (self.topic_id),
356 partitions: (self.partitions).iter().map(|it| it.to_owned()).collect(),
357 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
358 }
359 }
360}
361
362impl<'a> Encode for OffsetFetchResponseTopics<'a> {
363 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
364 let flex = version >= 6;
365 if version >= 8 && version <= 9 { if flex { put_compact_string(buf, self.name) } else { put_string(buf, self.name) } }
366 if version >= 10 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
367 if version >= 8 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
368 if flex {
369 let tagged = WriteTaggedFields::new();
370 tagged.write(buf, &self.unknown_tagged_fields);
371 }
372 Ok(())
373 }
374 fn encoded_len(&self, version: i16) -> usize {
375 let flex = version >= 6;
376 let mut n: usize = 0;
377 if version >= 8 && version <= 9 { n += if flex { compact_string_len(self.name) } else { string_len(self.name) }; }
378 if version >= 10 { n += 16; }
379 if version >= 8 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
380 if flex {
381 let known_pairs: Vec<(u32, usize)> = Vec::new();
382 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
383 }
384 n
385 }
386}
387
388impl<'de> DecodeBorrow<'de> for OffsetFetchResponseTopics<'de> {
389 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
390 let flex = version >= 6;
391 let mut out = Self::default();
392 if version >= 8 && version <= 9 { out.name = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
393 if version >= 10 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
394 if version >= 8 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(OffsetFetchResponsePartitions::decode_borrow(buf, version)?); } v }; }
395 if flex {
396 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
397 Ok(false)
398 })?;
399 }
400 Ok(out)
401 }
402}
403
404#[derive(Debug, Clone, PartialEq, Eq)]
405pub struct OffsetFetchResponsePartitions<'a> {
406 pub partition_index: i32,
407 pub committed_offset: i64,
408 pub committed_leader_epoch: i32,
409 pub metadata: Option<&'a str>,
410 pub error_code: i16,
411 pub unknown_tagged_fields: UnknownTaggedFields,
412}
413
414impl<'a> Default for OffsetFetchResponsePartitions<'a> {
415 fn default() -> Self {
416 Self {
417 partition_index: 0i32,
418 committed_offset: 0i64,
419 committed_leader_epoch: -1i32,
420 metadata: None,
421 error_code: 0i16,
422 unknown_tagged_fields: Default::default(),
423 }
424 }
425}
426
427impl<'a> OffsetFetchResponsePartitions<'a> {
428 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponsePartitions {
429 crate::owned::offset_fetch_response::OffsetFetchResponsePartitions {
430 partition_index: (self.partition_index),
431 committed_offset: (self.committed_offset),
432 committed_leader_epoch: (self.committed_leader_epoch),
433 metadata: (self.metadata).map(|s| s.to_string()),
434 error_code: (self.error_code),
435 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
436 }
437 }
438}
439
440impl<'a> Encode for OffsetFetchResponsePartitions<'a> {
441 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
442 let flex = version >= 6;
443 if version >= 8 { put_i32(buf, self.partition_index) }
444 if version >= 8 { put_i64(buf, self.committed_offset) }
445 if version >= 8 { put_i32(buf, self.committed_leader_epoch) }
446 if version >= 8 { if flex { put_compact_nullable_string(buf, self.metadata) } else { put_nullable_string(buf, self.metadata) } }
447 if version >= 8 { put_i16(buf, self.error_code) }
448 if flex {
449 let tagged = WriteTaggedFields::new();
450 tagged.write(buf, &self.unknown_tagged_fields);
451 }
452 Ok(())
453 }
454 fn encoded_len(&self, version: i16) -> usize {
455 let flex = version >= 6;
456 let mut n: usize = 0;
457 if version >= 8 { n += 4; }
458 if version >= 8 { n += 8; }
459 if version >= 8 { n += 4; }
460 if version >= 8 { n += if flex { compact_nullable_string_len(self.metadata) } else { nullable_string_len(self.metadata) }; }
461 if version >= 8 { n += 2; }
462 if flex {
463 let known_pairs: Vec<(u32, usize)> = Vec::new();
464 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
465 }
466 n
467 }
468}
469
470impl<'de> DecodeBorrow<'de> for OffsetFetchResponsePartitions<'de> {
471 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
472 let flex = version >= 6;
473 let mut out = Self::default();
474 if version >= 8 { out.partition_index = get_i32(buf)?; }
475 if version >= 8 { out.committed_offset = get_i64(buf)?; }
476 if version >= 8 { out.committed_leader_epoch = get_i32(buf)?; }
477 if version >= 8 { out.metadata = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
478 if version >= 8 { out.error_code = get_i16(buf)?; }
479 if flex {
480 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
481 Ok(false)
482 })?;
483 }
484 Ok(out)
485 }
486}