1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i32, get_i64, get_i8, put_i32, put_i64, put_i8};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
9 string_len,
10};
11use crate::primitives::string_bytes_borrowed::{
12 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
13 get_nullable_string_borrowed, get_string_borrowed,
14};
15use crate::tagged_fields::{encode_to_bytes, read_tagged_fields, tagged_fields_len, WriteTaggedFields};
16use crate::{Decode, DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 1;
19pub const MIN_VERSION: i16 = 4;
20pub const MAX_VERSION: i16 = 18;
21pub const FLEXIBLE_MIN: i16 = 12;
22
23#[inline]
24fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct FetchRequest<'a> {
28 pub replica_id: i32,
29 pub max_wait_ms: i32,
30 pub min_bytes: i32,
31 pub max_bytes: i32,
32 pub isolation_level: i8,
33 pub session_id: i32,
34 pub session_epoch: i32,
35 pub topics: Vec<FetchTopic<'a>>,
36 pub forgotten_topics_data: Vec<ForgottenTopic<'a>>,
37 pub rack_id: &'a str,
38 pub cluster_id: Option<String>,
39 pub replica_state: ReplicaState,
40 pub unknown_tagged_fields: UnknownTaggedFields,
41}
42
43impl<'a> Default for FetchRequest<'a> {
44 fn default() -> Self {
45 Self {
46 replica_id: -1i32,
47 max_wait_ms: 0i32,
48 min_bytes: 0i32,
49 max_bytes: 2_147_483_647i32,
50 isolation_level: 0i8,
51 session_id: 0i32,
52 session_epoch: -1i32,
53 topics: Vec::new(),
54 forgotten_topics_data: Vec::new(),
55 rack_id: "",
56 cluster_id: None,
57 replica_state: Default::default(),
58 unknown_tagged_fields: Default::default(),
59 }
60 }
61}
62
63impl<'a> FetchRequest<'a> {
64 pub fn to_owned(&self) -> crate::owned::fetch_request::FetchRequest {
65 crate::owned::fetch_request::FetchRequest {
66 replica_id: (self.replica_id),
67 max_wait_ms: (self.max_wait_ms),
68 min_bytes: (self.min_bytes),
69 max_bytes: (self.max_bytes),
70 isolation_level: (self.isolation_level),
71 session_id: (self.session_id),
72 session_epoch: (self.session_epoch),
73 topics: (self.topics).iter().map(|it| it.to_owned()).collect(),
74 forgotten_topics_data: (self.forgotten_topics_data).iter().map(|it| it.to_owned()).collect(),
75 rack_id: (self.rack_id).to_string(),
76 cluster_id: self.cluster_id.clone(),
77 replica_state: (self.replica_state).to_owned(),
78 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
79 }
80 }
81}
82
83impl<'a> Encode for FetchRequest<'a> {
84 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
85 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
86 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
87 }
88 let flex = is_flexible(version);
89 if version >= 0 && version <= 14 { put_i32(buf, self.replica_id) }
90 if version >= 0 { put_i32(buf, self.max_wait_ms) }
91 if version >= 0 { put_i32(buf, self.min_bytes) }
92 if version >= 3 { put_i32(buf, self.max_bytes) }
93 if version >= 4 { put_i8(buf, self.isolation_level) }
94 if version >= 7 { put_i32(buf, self.session_id) }
95 if version >= 7 { put_i32(buf, self.session_epoch) }
96 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
97 if version >= 7 { { crate::primitives::array::put_array_len(buf, (self.forgotten_topics_data).len(), flex); for it in &self.forgotten_topics_data { it.encode(buf, version)?; } } }
98 if version >= 11 { if flex { put_compact_string(buf, self.rack_id) } else { put_string(buf, self.rack_id) } }
99 if flex {
100 let mut tagged = WriteTaggedFields::new();
101 if !(self.cluster_id.is_none()) {
102 let payload = encode_to_bytes(if flex { compact_nullable_string_len(self.cluster_id.as_deref()) } else { nullable_string_len(self.cluster_id.as_deref()) }, |b| { if flex { put_compact_nullable_string(b, self.cluster_id.as_deref()) } else { put_nullable_string(b, self.cluster_id.as_deref()) }; Ok(()) });
103 tagged.add(0, payload);
104 }
105 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
106 let payload = encode_to_bytes(self.replica_state.encoded_len(version), |b| { self.replica_state.encode(b, version)?; Ok(()) });
107 tagged.add(1, payload);
108 }
109 tagged.write(buf, &self.unknown_tagged_fields);
110 }
111 Ok(())
112 }
113 fn encoded_len(&self, version: i16) -> usize {
114 let flex = is_flexible(version);
115 let mut n: usize = 0;
116 if version >= 0 && version <= 14 { n += 4; }
117 if version >= 0 { n += 4; }
118 if version >= 0 { n += 4; }
119 if version >= 3 { n += 4; }
120 if version >= 4 { n += 1; }
121 if version >= 7 { n += 4; }
122 if version >= 7 { n += 4; }
123 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
124 if version >= 7 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.forgotten_topics_data).len(), flex); let body: usize = (self.forgotten_topics_data).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
125 if version >= 11 { n += if flex { compact_string_len(self.rack_id) } else { string_len(self.rack_id) }; }
126 if flex {
127 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
128 if !(self.cluster_id.is_none()) {
129 known_pairs.push((0, if flex { compact_nullable_string_len(self.cluster_id.as_deref()) } else { nullable_string_len(self.cluster_id.as_deref()) }));
130 }
131 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
132 known_pairs.push((1, self.replica_state.encoded_len(version)));
133 }
134 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
135 }
136 n
137 }
138}
139
140impl<'de> DecodeBorrow<'de> for FetchRequest<'de> {
141 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
142 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
143 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
144 }
145 let flex = is_flexible(version);
146 let mut out = Self::default();
147 if version >= 0 && version <= 14 { out.replica_id = get_i32(buf)?; }
148 if version >= 0 { out.max_wait_ms = get_i32(buf)?; }
149 if version >= 0 { out.min_bytes = get_i32(buf)?; }
150 if version >= 3 { out.max_bytes = get_i32(buf)?; }
151 if version >= 4 { out.isolation_level = get_i8(buf)?; }
152 if version >= 7 { out.session_id = get_i32(buf)?; }
153 if version >= 7 { out.session_epoch = get_i32(buf)?; }
154 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchTopic::decode_borrow(buf, version)?); } v }; }
155 if version >= 7 { out.forgotten_topics_data = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(ForgottenTopic::decode_borrow(buf, version)?); } v }; }
156 if version >= 11 { out.rack_id = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
157 if flex {
158 let mut tag_cluster_id = None;
160 let mut tag_replica_state = None;
161 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
162 match tag {
163 0 => { tag_cluster_id = Some({ let b: &mut &[u8] = payload; if flex { crate::primitives::string_bytes::get_compact_nullable_string_owned(b)? } else { crate::primitives::string_bytes::get_nullable_string_owned(b)? } }); Ok(true) }
164 1 => { tag_replica_state = Some({ let b: &mut &[u8] = payload; ReplicaState::decode_borrow(b, version)? }); Ok(true) }
165 _ => Ok(false),
166 }
167 })?;
168 if let Some(v) = tag_cluster_id { out.cluster_id = v; }
169 if let Some(v) = tag_replica_state { out.replica_state = v; }
170 }
171 Ok(out)
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct ReplicaState {
177 pub replica_id: i32,
178 pub replica_epoch: i64,
179 pub unknown_tagged_fields: UnknownTaggedFields,
180}
181
182impl Default for ReplicaState {
183 fn default() -> Self {
184 Self {
185 replica_id: -1i32,
186 replica_epoch: -1i64,
187 unknown_tagged_fields: Default::default(),
188 }
189 }
190}
191
192impl ReplicaState {
193 pub fn to_owned(&self) -> crate::owned::fetch_request::ReplicaState {
194 crate::owned::fetch_request::ReplicaState {
195 replica_id: (self.replica_id),
196 replica_epoch: (self.replica_epoch),
197 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
198 }
199 }
200}
201
202impl Encode for ReplicaState {
203 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
204 let flex = version >= 12;
205 if version >= 15 { put_i32(buf, self.replica_id) }
206 if version >= 15 { put_i64(buf, self.replica_epoch) }
207 if flex {
208 let tagged = WriteTaggedFields::new();
209 tagged.write(buf, &self.unknown_tagged_fields);
210 }
211 Ok(())
212 }
213 fn encoded_len(&self, version: i16) -> usize {
214 let flex = version >= 12;
215 let mut n: usize = 0;
216 if version >= 15 { n += 4; }
217 if version >= 15 { n += 8; }
218 if flex {
219 let known_pairs: Vec<(u32, usize)> = Vec::new();
220 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
221 }
222 n
223 }
224}
225
226impl<'de> DecodeBorrow<'de> for ReplicaState {
227 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
228 let flex = version >= 12;
229 let mut out = Self::default();
230 if version >= 15 { out.replica_id = get_i32(buf)?; }
231 if version >= 15 { out.replica_epoch = get_i64(buf)?; }
232 if flex {
233 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
234 Ok(false)
235 })?;
236 }
237 Ok(out)
238 }
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct FetchTopic<'a> {
243 pub topic: &'a str,
244 pub topic_id: crate::primitives::uuid::Uuid,
245 pub partitions: Vec<FetchPartition>,
246 pub unknown_tagged_fields: UnknownTaggedFields,
247}
248
249impl<'a> Default for FetchTopic<'a> {
250 fn default() -> Self {
251 Self {
252 topic: "",
253 topic_id: Default::default(),
254 partitions: Vec::new(),
255 unknown_tagged_fields: Default::default(),
256 }
257 }
258}
259
260impl<'a> FetchTopic<'a> {
261 pub fn to_owned(&self) -> crate::owned::fetch_request::FetchTopic {
262 crate::owned::fetch_request::FetchTopic {
263 topic: (self.topic).to_string(),
264 topic_id: (self.topic_id),
265 partitions: (self.partitions).iter().map(|it| it.to_owned()).collect(),
266 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
267 }
268 }
269}
270
271impl<'a> Encode for FetchTopic<'a> {
272 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
273 let flex = version >= 12;
274 if version >= 0 && version <= 12 { if flex { put_compact_string(buf, self.topic) } else { put_string(buf, self.topic) } }
275 if version >= 13 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
276 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
277 if flex {
278 let tagged = WriteTaggedFields::new();
279 tagged.write(buf, &self.unknown_tagged_fields);
280 }
281 Ok(())
282 }
283 fn encoded_len(&self, version: i16) -> usize {
284 let flex = version >= 12;
285 let mut n: usize = 0;
286 if version >= 0 && version <= 12 { n += if flex { compact_string_len(self.topic) } else { string_len(self.topic) }; }
287 if version >= 13 { n += 16; }
288 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
289 if flex {
290 let known_pairs: Vec<(u32, usize)> = Vec::new();
291 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
292 }
293 n
294 }
295}
296
297impl<'de> DecodeBorrow<'de> for FetchTopic<'de> {
298 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
299 let flex = version >= 12;
300 let mut out = Self::default();
301 if version >= 0 && version <= 12 { out.topic = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
302 if version >= 13 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
303 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchPartition::decode_borrow(buf, version)?); } v }; }
304 if flex {
305 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
306 Ok(false)
307 })?;
308 }
309 Ok(out)
310 }
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub struct FetchPartition {
315 pub partition: i32,
316 pub current_leader_epoch: i32,
317 pub fetch_offset: i64,
318 pub last_fetched_epoch: i32,
319 pub log_start_offset: i64,
320 pub partition_max_bytes: i32,
321 pub replica_directory_id: crate::primitives::uuid::Uuid,
322 pub high_watermark: i64,
323 pub unknown_tagged_fields: UnknownTaggedFields,
324}
325
326impl Default for FetchPartition {
327 fn default() -> Self {
328 Self {
329 partition: 0i32,
330 current_leader_epoch: -1i32,
331 fetch_offset: 0i64,
332 last_fetched_epoch: -1i32,
333 log_start_offset: -1i64,
334 partition_max_bytes: 0i32,
335 replica_directory_id: Default::default(),
336 high_watermark: 9_223_372_036_854_775_807i64,
337 unknown_tagged_fields: Default::default(),
338 }
339 }
340}
341
342impl FetchPartition {
343 pub fn to_owned(&self) -> crate::owned::fetch_request::FetchPartition {
344 crate::owned::fetch_request::FetchPartition {
345 partition: (self.partition),
346 current_leader_epoch: (self.current_leader_epoch),
347 fetch_offset: (self.fetch_offset),
348 last_fetched_epoch: (self.last_fetched_epoch),
349 log_start_offset: (self.log_start_offset),
350 partition_max_bytes: (self.partition_max_bytes),
351 replica_directory_id: (self.replica_directory_id),
352 high_watermark: (self.high_watermark),
353 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
354 }
355 }
356}
357
358impl Encode for FetchPartition {
359 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
360 let flex = version >= 12;
361 if version >= 0 { put_i32(buf, self.partition) }
362 if version >= 9 { put_i32(buf, self.current_leader_epoch) }
363 if version >= 0 { put_i64(buf, self.fetch_offset) }
364 if version >= 12 { put_i32(buf, self.last_fetched_epoch) }
365 if version >= 5 { put_i64(buf, self.log_start_offset) }
366 if version >= 0 { put_i32(buf, self.partition_max_bytes) }
367 if flex {
368 let mut tagged = WriteTaggedFields::new();
369 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
370 let payload = encode_to_bytes(16, |b| { crate::primitives::uuid::put_uuid(b, self.replica_directory_id); Ok(()) });
371 tagged.add(0, payload);
372 }
373 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
374 let payload = encode_to_bytes(8, |b| { put_i64(b, self.high_watermark); Ok(()) });
375 tagged.add(1, payload);
376 }
377 tagged.write(buf, &self.unknown_tagged_fields);
378 }
379 Ok(())
380 }
381 fn encoded_len(&self, version: i16) -> usize {
382 let flex = version >= 12;
383 let mut n: usize = 0;
384 if version >= 0 { n += 4; }
385 if version >= 9 { n += 4; }
386 if version >= 0 { n += 8; }
387 if version >= 12 { n += 4; }
388 if version >= 5 { n += 8; }
389 if version >= 0 { n += 4; }
390 if flex {
391 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
392 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
393 known_pairs.push((0, 16));
394 }
395 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
396 known_pairs.push((1, 8));
397 }
398 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
399 }
400 n
401 }
402}
403
404impl<'de> DecodeBorrow<'de> for FetchPartition {
405 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
406 let flex = version >= 12;
407 let mut out = Self::default();
408 if version >= 0 { out.partition = get_i32(buf)?; }
409 if version >= 9 { out.current_leader_epoch = get_i32(buf)?; }
410 if version >= 0 { out.fetch_offset = get_i64(buf)?; }
411 if version >= 12 { out.last_fetched_epoch = get_i32(buf)?; }
412 if version >= 5 { out.log_start_offset = get_i64(buf)?; }
413 if version >= 0 { out.partition_max_bytes = get_i32(buf)?; }
414 if flex {
415 let mut tag_replica_directory_id = None;
417 let mut tag_high_watermark = None;
418 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
419 match tag {
420 0 => { tag_replica_directory_id = Some({ let b: &mut &[u8] = payload; crate::primitives::uuid::get_uuid(b)? }); Ok(true) }
421 1 => { tag_high_watermark = Some({ let b: &mut &[u8] = payload; get_i64(b)? }); Ok(true) }
422 _ => Ok(false),
423 }
424 })?;
425 if let Some(v) = tag_replica_directory_id { out.replica_directory_id = v; }
426 if let Some(v) = tag_high_watermark { out.high_watermark = v; }
427 }
428 Ok(out)
429 }
430}
431
432#[derive(Debug, Clone, PartialEq, Eq)]
433pub struct ForgottenTopic<'a> {
434 pub topic: &'a str,
435 pub topic_id: crate::primitives::uuid::Uuid,
436 pub partitions: Vec<i32>,
437 pub unknown_tagged_fields: UnknownTaggedFields,
438}
439
440impl<'a> Default for ForgottenTopic<'a> {
441 fn default() -> Self {
442 Self {
443 topic: "",
444 topic_id: Default::default(),
445 partitions: Vec::new(),
446 unknown_tagged_fields: Default::default(),
447 }
448 }
449}
450
451impl<'a> ForgottenTopic<'a> {
452 pub fn to_owned(&self) -> crate::owned::fetch_request::ForgottenTopic {
453 crate::owned::fetch_request::ForgottenTopic {
454 topic: (self.topic).to_string(),
455 topic_id: (self.topic_id),
456 partitions: (self.partitions).clone(),
457 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
458 }
459 }
460}
461
462impl<'a> Encode for ForgottenTopic<'a> {
463 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
464 let flex = version >= 12;
465 if version >= 7 && version <= 12 { if flex { put_compact_string(buf, self.topic) } else { put_string(buf, self.topic) } }
466 if version >= 13 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
467 if version >= 7 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { put_i32(buf, *it); } } }
468 if flex {
469 let tagged = WriteTaggedFields::new();
470 tagged.write(buf, &self.unknown_tagged_fields);
471 }
472 Ok(())
473 }
474 fn encoded_len(&self, version: i16) -> usize {
475 let flex = version >= 12;
476 let mut n: usize = 0;
477 if version >= 7 && version <= 12 { n += if flex { compact_string_len(self.topic) } else { string_len(self.topic) }; }
478 if version >= 13 { n += 16; }
479 if version >= 7 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|_| 4).sum(); prefix + body }; }
480 if flex {
481 let known_pairs: Vec<(u32, usize)> = Vec::new();
482 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
483 }
484 n
485 }
486}
487
488impl<'de> DecodeBorrow<'de> for ForgottenTopic<'de> {
489 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
490 let flex = version >= 12;
491 let mut out = Self::default();
492 if version >= 7 && version <= 12 { out.topic = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
493 if version >= 13 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
494 if version >= 7 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i32(buf)?); } v }; }
495 if flex {
496 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
497 Ok(false)
498 })?;
499 }
500 Ok(out)
501 }
502}