1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i32, get_i64, get_i8, put_i32, put_i64, put_i8};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
10 string_len,
11};
12use crate::tagged_fields::{encode_to_bytes, read_tagged_fields, tagged_fields_len, WriteTaggedFields};
13use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
14
15pub const API_KEY: i16 = 1;
16pub const MIN_VERSION: i16 = 4;
17pub const MAX_VERSION: i16 = 18;
18pub const FLEXIBLE_MIN: i16 = 12;
19
20#[inline]
21fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct FetchRequest {
25 pub replica_id: i32,
26 pub max_wait_ms: i32,
27 pub min_bytes: i32,
28 pub max_bytes: i32,
29 pub isolation_level: i8,
30 pub session_id: i32,
31 pub session_epoch: i32,
32 pub topics: Vec<FetchTopic>,
33 pub forgotten_topics_data: Vec<ForgottenTopic>,
34 pub rack_id: String,
35 pub cluster_id: Option<String>,
36 pub replica_state: ReplicaState,
37 pub unknown_tagged_fields: UnknownTaggedFields,
38}
39
40impl Default for FetchRequest {
41 fn default() -> Self {
42 Self {
43 replica_id: -1i32,
44 max_wait_ms: 0i32,
45 min_bytes: 0i32,
46 max_bytes: 2_147_483_647i32,
47 isolation_level: 0i8,
48 session_id: 0i32,
49 session_epoch: -1i32,
50 topics: Vec::new(),
51 forgotten_topics_data: Vec::new(),
52 rack_id: "".to_string(),
53 cluster_id: None,
54 replica_state: Default::default(),
55 unknown_tagged_fields: Default::default(),
56 }
57 }
58}
59
60impl Encode for FetchRequest {
61 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
62 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
63 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
64 }
65 let flex = is_flexible(version);
66 if version >= 0 && version <= 14 { put_i32(buf, self.replica_id) }
67 if version >= 0 { put_i32(buf, self.max_wait_ms) }
68 if version >= 0 { put_i32(buf, self.min_bytes) }
69 if version >= 3 { put_i32(buf, self.max_bytes) }
70 if version >= 4 { put_i8(buf, self.isolation_level) }
71 if version >= 7 { put_i32(buf, self.session_id) }
72 if version >= 7 { put_i32(buf, self.session_epoch) }
73 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
74 if version >= 7 { { crate::primitives::array::put_array_len(buf, (self.forgotten_topics_data).len(), flex); for it in &self.forgotten_topics_data { it.encode(buf, version)?; } } }
75 if version >= 11 { if flex { put_compact_string(buf, &self.rack_id) } else { put_string(buf, &self.rack_id) } }
76 if flex {
77 let mut tagged = WriteTaggedFields::new();
78 if !(self.cluster_id.is_none()) {
79 let payload = encode_to_bytes(if flex { compact_nullable_string_len(self.cluster_id.as_deref()) } else { nullable_string_len(self.cluster_id.as_deref()) }, |b| { if flex { put_compact_nullable_string(b, self.cluster_id.as_deref()) } else { put_nullable_string(b, self.cluster_id.as_deref()) }; Ok(()) });
80 tagged.add(0, payload);
81 }
82 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
83 let payload = encode_to_bytes(self.replica_state.encoded_len(version), |b| { self.replica_state.encode(b, version)?; Ok(()) });
84 tagged.add(1, payload);
85 }
86 tagged.write(buf, &self.unknown_tagged_fields);
87 }
88 Ok(())
89 }
90 fn encoded_len(&self, version: i16) -> usize {
91 let flex = is_flexible(version);
92 let mut n: usize = 0;
93 if version >= 0 && version <= 14 { n += 4; }
94 if version >= 0 { n += 4; }
95 if version >= 0 { n += 4; }
96 if version >= 3 { n += 4; }
97 if version >= 4 { n += 1; }
98 if version >= 7 { n += 4; }
99 if version >= 7 { n += 4; }
100 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
101 if version >= 7 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.forgotten_topics_data).len(), flex); let body: usize = (self.forgotten_topics_data).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
102 if version >= 11 { n += if flex { compact_string_len(&self.rack_id) } else { string_len(&self.rack_id) }; }
103 if flex {
104 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
105 if !(self.cluster_id.is_none()) {
106 known_pairs.push((0, if flex { compact_nullable_string_len(self.cluster_id.as_deref()) } else { nullable_string_len(self.cluster_id.as_deref()) }));
107 }
108 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
109 known_pairs.push((1, self.replica_state.encoded_len(version)));
110 }
111 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
112 }
113 n
114 }
115}
116
117impl<'de> Decode<'de> for FetchRequest {
118 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
119 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
120 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
121 }
122 let flex = is_flexible(version);
123 let mut out = Self::default();
124 if version >= 0 && version <= 14 { out.replica_id = get_i32(buf)?; }
125 if version >= 0 { out.max_wait_ms = get_i32(buf)?; }
126 if version >= 0 { out.min_bytes = get_i32(buf)?; }
127 if version >= 3 { out.max_bytes = get_i32(buf)?; }
128 if version >= 4 { out.isolation_level = get_i8(buf)?; }
129 if version >= 7 { out.session_id = get_i32(buf)?; }
130 if version >= 7 { out.session_epoch = get_i32(buf)?; }
131 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchTopic::decode(buf, version)?); } v }; }
132 if version >= 7 { out.forgotten_topics_data = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(ForgottenTopic::decode(buf, version)?); } v }; }
133 if version >= 11 { out.rack_id = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
134 if flex {
135 let mut tag_cluster_id = None;
137 let mut tag_replica_state = None;
138 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
139 match tag {
140 0 => { tag_cluster_id = Some({ let b: &mut &[u8] = payload; if flex { get_compact_nullable_string_owned(b)? } else { get_nullable_string_owned(b)? } }); Ok(true) }
141 1 => { tag_replica_state = Some({ let b: &mut &[u8] = payload; ReplicaState::decode(b, version)? }); Ok(true) }
142 _ => Ok(false),
143 }
144 })?;
145 if let Some(v) = tag_cluster_id { out.cluster_id = v; }
146 if let Some(v) = tag_replica_state { out.replica_state = v; }
147 }
148 Ok(out)
149 }
150}
151
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct ReplicaState {
154 pub replica_id: i32,
155 pub replica_epoch: i64,
156 pub unknown_tagged_fields: UnknownTaggedFields,
157}
158
159impl Default for ReplicaState {
160 fn default() -> Self {
161 Self {
162 replica_id: -1i32,
163 replica_epoch: -1i64,
164 unknown_tagged_fields: Default::default(),
165 }
166 }
167}
168
169impl Encode for ReplicaState {
170 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
171 let flex = version >= 12;
172 if version >= 15 { put_i32(buf, self.replica_id) }
173 if version >= 15 { put_i64(buf, self.replica_epoch) }
174 if flex {
175 let tagged = WriteTaggedFields::new();
176 tagged.write(buf, &self.unknown_tagged_fields);
177 }
178 Ok(())
179 }
180 fn encoded_len(&self, version: i16) -> usize {
181 let flex = version >= 12;
182 let mut n: usize = 0;
183 if version >= 15 { n += 4; }
184 if version >= 15 { n += 8; }
185 if flex {
186 let known_pairs: Vec<(u32, usize)> = Vec::new();
187 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
188 }
189 n
190 }
191}
192
193impl<'de> Decode<'de> for ReplicaState {
194 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
195 let flex = version >= 12;
196 let mut out = Self::default();
197 if version >= 15 { out.replica_id = get_i32(buf)?; }
198 if version >= 15 { out.replica_epoch = get_i64(buf)?; }
199 if flex {
200 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
201 Ok(false)
202 })?;
203 }
204 Ok(out)
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq, Default)]
209pub struct FetchTopic {
210 pub topic: String,
211 pub topic_id: crate::primitives::uuid::Uuid,
212 pub partitions: Vec<FetchPartition>,
213 pub unknown_tagged_fields: UnknownTaggedFields,
214}
215
216impl Encode for FetchTopic {
217 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
218 let flex = version >= 12;
219 if version >= 0 && version <= 12 { if flex { put_compact_string(buf, &self.topic) } else { put_string(buf, &self.topic) } }
220 if version >= 13 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
221 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
222 if flex {
223 let tagged = WriteTaggedFields::new();
224 tagged.write(buf, &self.unknown_tagged_fields);
225 }
226 Ok(())
227 }
228 fn encoded_len(&self, version: i16) -> usize {
229 let flex = version >= 12;
230 let mut n: usize = 0;
231 if version >= 0 && version <= 12 { n += if flex { compact_string_len(&self.topic) } else { string_len(&self.topic) }; }
232 if version >= 13 { n += 16; }
233 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
234 if flex {
235 let known_pairs: Vec<(u32, usize)> = Vec::new();
236 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
237 }
238 n
239 }
240}
241
242impl<'de> Decode<'de> for FetchTopic {
243 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
244 let flex = version >= 12;
245 let mut out = Self::default();
246 if version >= 0 && version <= 12 { out.topic = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
247 if version >= 13 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
248 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchPartition::decode(buf, version)?); } v }; }
249 if flex {
250 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
251 Ok(false)
252 })?;
253 }
254 Ok(out)
255 }
256}
257
258#[derive(Debug, Clone, PartialEq, Eq)]
259pub struct FetchPartition {
260 pub partition: i32,
261 pub current_leader_epoch: i32,
262 pub fetch_offset: i64,
263 pub last_fetched_epoch: i32,
264 pub log_start_offset: i64,
265 pub partition_max_bytes: i32,
266 pub replica_directory_id: crate::primitives::uuid::Uuid,
267 pub high_watermark: i64,
268 pub unknown_tagged_fields: UnknownTaggedFields,
269}
270
271impl Default for FetchPartition {
272 fn default() -> Self {
273 Self {
274 partition: 0i32,
275 current_leader_epoch: -1i32,
276 fetch_offset: 0i64,
277 last_fetched_epoch: -1i32,
278 log_start_offset: -1i64,
279 partition_max_bytes: 0i32,
280 replica_directory_id: Default::default(),
281 high_watermark: 9_223_372_036_854_775_807i64,
282 unknown_tagged_fields: Default::default(),
283 }
284 }
285}
286
287impl Encode for FetchPartition {
288 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
289 let flex = version >= 12;
290 if version >= 0 { put_i32(buf, self.partition) }
291 if version >= 9 { put_i32(buf, self.current_leader_epoch) }
292 if version >= 0 { put_i64(buf, self.fetch_offset) }
293 if version >= 12 { put_i32(buf, self.last_fetched_epoch) }
294 if version >= 5 { put_i64(buf, self.log_start_offset) }
295 if version >= 0 { put_i32(buf, self.partition_max_bytes) }
296 if flex {
297 let mut tagged = WriteTaggedFields::new();
298 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
299 let payload = encode_to_bytes(16, |b| { crate::primitives::uuid::put_uuid(b, self.replica_directory_id); Ok(()) });
300 tagged.add(0, payload);
301 }
302 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
303 let payload = encode_to_bytes(8, |b| { put_i64(b, self.high_watermark); Ok(()) });
304 tagged.add(1, payload);
305 }
306 tagged.write(buf, &self.unknown_tagged_fields);
307 }
308 Ok(())
309 }
310 fn encoded_len(&self, version: i16) -> usize {
311 let flex = version >= 12;
312 let mut n: usize = 0;
313 if version >= 0 { n += 4; }
314 if version >= 9 { n += 4; }
315 if version >= 0 { n += 8; }
316 if version >= 12 { n += 4; }
317 if version >= 5 { n += 8; }
318 if version >= 0 { n += 4; }
319 if flex {
320 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
321 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
322 known_pairs.push((0, 16));
323 }
324 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
325 known_pairs.push((1, 8));
326 }
327 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
328 }
329 n
330 }
331}
332
333impl<'de> Decode<'de> for FetchPartition {
334 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
335 let flex = version >= 12;
336 let mut out = Self::default();
337 if version >= 0 { out.partition = get_i32(buf)?; }
338 if version >= 9 { out.current_leader_epoch = get_i32(buf)?; }
339 if version >= 0 { out.fetch_offset = get_i64(buf)?; }
340 if version >= 12 { out.last_fetched_epoch = get_i32(buf)?; }
341 if version >= 5 { out.log_start_offset = get_i64(buf)?; }
342 if version >= 0 { out.partition_max_bytes = get_i32(buf)?; }
343 if flex {
344 let mut tag_replica_directory_id = None;
346 let mut tag_high_watermark = None;
347 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
348 match tag {
349 0 => { tag_replica_directory_id = Some({ let b: &mut &[u8] = payload; crate::primitives::uuid::get_uuid(b)? }); Ok(true) }
350 1 => { tag_high_watermark = Some({ let b: &mut &[u8] = payload; get_i64(b)? }); Ok(true) }
351 _ => Ok(false),
352 }
353 })?;
354 if let Some(v) = tag_replica_directory_id { out.replica_directory_id = v; }
355 if let Some(v) = tag_high_watermark { out.high_watermark = v; }
356 }
357 Ok(out)
358 }
359}
360
361#[derive(Debug, Clone, PartialEq, Eq, Default)]
362pub struct ForgottenTopic {
363 pub topic: String,
364 pub topic_id: crate::primitives::uuid::Uuid,
365 pub partitions: Vec<i32>,
366 pub unknown_tagged_fields: UnknownTaggedFields,
367}
368
369impl Encode for ForgottenTopic {
370 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
371 let flex = version >= 12;
372 if version >= 7 && version <= 12 { if flex { put_compact_string(buf, &self.topic) } else { put_string(buf, &self.topic) } }
373 if version >= 13 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
374 if version >= 7 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { put_i32(buf, *it); } } }
375 if flex {
376 let tagged = WriteTaggedFields::new();
377 tagged.write(buf, &self.unknown_tagged_fields);
378 }
379 Ok(())
380 }
381 fn encoded_len(&self, version: i16) -> usize {
382 let flex = version >= 12;
383 let mut n: usize = 0;
384 if version >= 7 && version <= 12 { n += if flex { compact_string_len(&self.topic) } else { string_len(&self.topic) }; }
385 if version >= 13 { n += 16; }
386 if version >= 7 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|_| 4).sum(); prefix + body }; }
387 if flex {
388 let known_pairs: Vec<(u32, usize)> = Vec::new();
389 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
390 }
391 n
392 }
393}
394
395impl<'de> Decode<'de> for ForgottenTopic {
396 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
397 let flex = version >= 12;
398 let mut out = Self::default();
399 if version >= 7 && version <= 12 { out.topic = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
400 if version >= 13 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
401 if version >= 7 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i32(buf)?); } v }; }
402 if flex {
403 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
404 Ok(false)
405 })?;
406 }
407 Ok(out)
408 }
409}
410
411#[must_use]
414#[allow(unused_comparisons)]
415pub fn default_json(version: i16) -> ::serde_json::Value {
416 let mut obj = ::serde_json::Map::new();
417 if version >= 12 {
418 obj.insert("clusterId".to_string(), ::serde_json::Value::Null);
419 }
420 if version <= 14 {
421 obj.insert("replicaId".to_string(), ::serde_json::json!(-1));
422 }
423 if version >= 15 {
424 obj.insert("replicaState".to_string(), { let mut m = ::serde_json::Map::new(); m.insert("replicaId".to_string(), ::serde_json::json!(-1)); m.insert("replicaEpoch".to_string(), ::serde_json::json!(-1)); ::serde_json::Value::Object(m) });
425 }
426 obj.insert("maxWaitMs".to_string(), ::serde_json::json!(0));
427 obj.insert("minBytes".to_string(), ::serde_json::json!(0));
428 if version >= 3 {
429 obj.insert("maxBytes".to_string(), ::serde_json::json!(2147483647));
430 }
431 if version >= 4 {
432 obj.insert("isolationLevel".to_string(), ::serde_json::json!(0));
433 }
434 if version >= 7 {
435 obj.insert("sessionId".to_string(), ::serde_json::json!(0));
436 }
437 if version >= 7 {
438 obj.insert("sessionEpoch".to_string(), ::serde_json::json!(-1));
439 }
440 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
441 if version >= 7 {
442 obj.insert("forgottenTopicsData".to_string(), ::serde_json::Value::Array(vec![]));
443 }
444 if version >= 11 {
445 obj.insert("rackId".to_string(), ::serde_json::Value::String("".to_string()));
446 }
447 ::serde_json::Value::Object(obj)
448}
449
450impl crate::ProtocolRequest for FetchRequest {
451 const API_KEY: i16 = API_KEY;
452 const MIN_VERSION: i16 = MIN_VERSION;
453 const MAX_VERSION: i16 = MAX_VERSION;
454 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
455 type Response = super::fetch_response::FetchResponse;
456}