1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16};
6use crate::primitives::string_bytes::{
7 compact_string_len, put_compact_string, put_string, string_len,
8};
9use crate::primitives::string_bytes_borrowed::{
10 get_compact_string_borrowed, get_string_borrowed,
11};
12use crate::primitives::string_bytes::{put_bytes, put_compact_bytes};
13use crate::primitives::string_bytes_borrowed::{get_bytes_borrowed, get_compact_bytes_borrowed};
14use crate::tagged_fields::{encode_to_bytes, read_tagged_fields, tagged_fields_len, WriteTaggedFields};
15use crate::{Decode, DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
16
17pub const API_KEY: i16 = 59;
18pub const MIN_VERSION: i16 = 0;
19pub const MAX_VERSION: i16 = 1;
20pub const FLEXIBLE_MIN: i16 = 0;
21
22#[inline]
23fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct FetchSnapshotResponse<'a> {
27 pub throttle_time_ms: i32,
28 pub error_code: i16,
29 pub topics: Vec<TopicSnapshot<'a>>,
30 pub node_endpoints: Vec<crate::owned::fetch_snapshot_response::NodeEndpoint>,
31 pub unknown_tagged_fields: UnknownTaggedFields,
32}
33
34impl<'a> Default for FetchSnapshotResponse<'a> {
35 fn default() -> Self {
36 Self {
37 throttle_time_ms: 0i32,
38 error_code: 0i16,
39 topics: Vec::new(),
40 node_endpoints: Vec::new(),
41 unknown_tagged_fields: Default::default(),
42 }
43 }
44}
45
46impl<'a> FetchSnapshotResponse<'a> {
47 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_response::FetchSnapshotResponse {
48 crate::owned::fetch_snapshot_response::FetchSnapshotResponse {
49 throttle_time_ms: (self.throttle_time_ms),
50 error_code: (self.error_code),
51 topics: (self.topics).iter().map(|it| it.to_owned()).collect(),
52 node_endpoints: self.node_endpoints.clone(),
53 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
54 }
55 }
56}
57
58impl<'a> Encode for FetchSnapshotResponse<'a> {
59 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
60 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
61 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
62 }
63 let flex = is_flexible(version);
64 if version >= 0 { put_i32(buf, self.throttle_time_ms) }
65 if version >= 0 { put_i16(buf, self.error_code) }
66 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
67 if flex {
68 let mut tagged = WriteTaggedFields::new();
69 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
70 let payload = encode_to_bytes({ let prefix = crate::primitives::array::array_len_prefix_len((self.node_endpoints).len(), flex); let body: usize = (self.node_endpoints).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }, |b| { { crate::primitives::array::put_array_len(b, (self.node_endpoints).len(), flex); for it in &self.node_endpoints { it.encode(b, version)?; } }; Ok(()) });
71 tagged.add(0, payload);
72 }
73 tagged.write(buf, &self.unknown_tagged_fields);
74 }
75 Ok(())
76 }
77 fn encoded_len(&self, version: i16) -> usize {
78 let flex = is_flexible(version);
79 let mut n: usize = 0;
80 if version >= 0 { n += 4; }
81 if version >= 0 { n += 2; }
82 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
83 if flex {
84 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
85 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
86 known_pairs.push((0, { let prefix = crate::primitives::array::array_len_prefix_len((self.node_endpoints).len(), flex); let body: usize = (self.node_endpoints).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }));
87 }
88 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
89 }
90 n
91 }
92}
93
94impl<'de> DecodeBorrow<'de> for FetchSnapshotResponse<'de> {
95 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
96 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
97 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
98 }
99 let flex = is_flexible(version);
100 let mut out = Self::default();
101 if version >= 0 { out.throttle_time_ms = get_i32(buf)?; }
102 if version >= 0 { out.error_code = get_i16(buf)?; }
103 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(TopicSnapshot::decode_borrow(buf, version)?); } v }; }
104 if flex {
105 let mut tag_node_endpoints = None;
107 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
108 match tag {
109 0 => { tag_node_endpoints = Some({ let b: &mut &[u8] = payload; { let n = crate::primitives::array::get_array_len(b, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(crate::owned::fetch_snapshot_response::NodeEndpoint::decode(b, version)?); } v } }); Ok(true) }
110 _ => Ok(false),
111 }
112 })?;
113 if let Some(v) = tag_node_endpoints { out.node_endpoints = v; }
114 }
115 Ok(out)
116 }
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct TopicSnapshot<'a> {
121 pub name: &'a str,
122 pub partitions: Vec<PartitionSnapshot<'a>>,
123 pub unknown_tagged_fields: UnknownTaggedFields,
124}
125
126impl<'a> Default for TopicSnapshot<'a> {
127 fn default() -> Self {
128 Self {
129 name: "",
130 partitions: Vec::new(),
131 unknown_tagged_fields: Default::default(),
132 }
133 }
134}
135
136impl<'a> TopicSnapshot<'a> {
137 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_response::TopicSnapshot {
138 crate::owned::fetch_snapshot_response::TopicSnapshot {
139 name: (self.name).to_string(),
140 partitions: (self.partitions).iter().map(|it| it.to_owned()).collect(),
141 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
142 }
143 }
144}
145
146impl<'a> Encode for TopicSnapshot<'a> {
147 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
148 let flex = version >= 0;
149 if version >= 0 { if flex { put_compact_string(buf, self.name) } else { put_string(buf, self.name) } }
150 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
151 if flex {
152 let tagged = WriteTaggedFields::new();
153 tagged.write(buf, &self.unknown_tagged_fields);
154 }
155 Ok(())
156 }
157 fn encoded_len(&self, version: i16) -> usize {
158 let flex = version >= 0;
159 let mut n: usize = 0;
160 if version >= 0 { n += if flex { compact_string_len(self.name) } else { string_len(self.name) }; }
161 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
162 if flex {
163 let known_pairs: Vec<(u32, usize)> = Vec::new();
164 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
165 }
166 n
167 }
168}
169
170impl<'de> DecodeBorrow<'de> for TopicSnapshot<'de> {
171 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
172 let flex = version >= 0;
173 let mut out = Self::default();
174 if version >= 0 { out.name = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
175 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(PartitionSnapshot::decode_borrow(buf, version)?); } v }; }
176 if flex {
177 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
178 Ok(false)
179 })?;
180 }
181 Ok(out)
182 }
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct PartitionSnapshot<'a> {
187 pub index: i32,
188 pub error_code: i16,
189 pub snapshot_id: SnapshotId,
190 pub size: i64,
191 pub position: i64,
192 pub unaligned_records: crate::records::RecordsPayloadBorrowed<'a>,
193 pub current_leader: LeaderIdAndEpoch,
194 pub unknown_tagged_fields: UnknownTaggedFields,
195}
196
197impl<'a> Default for PartitionSnapshot<'a> {
198 fn default() -> Self {
199 Self {
200 index: 0i32,
201 error_code: 0i16,
202 snapshot_id: Default::default(),
203 size: 0i64,
204 position: 0i64,
205 unaligned_records: Default::default(),
206 current_leader: Default::default(),
207 unknown_tagged_fields: Default::default(),
208 }
209 }
210}
211
212impl<'a> PartitionSnapshot<'a> {
213 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_response::PartitionSnapshot {
214 crate::owned::fetch_snapshot_response::PartitionSnapshot {
215 index: (self.index),
216 error_code: (self.error_code),
217 snapshot_id: (self.snapshot_id).to_owned(),
218 size: (self.size),
219 position: (self.position),
220 unaligned_records: (self.unaligned_records).to_owned().expect("records to_owned"),
221 current_leader: (self.current_leader).to_owned(),
222 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
223 }
224 }
225}
226
227impl<'a> Encode for PartitionSnapshot<'a> {
228 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
229 let flex = version >= 0;
230 if version >= 0 { put_i32(buf, self.index) }
231 if version >= 0 { put_i16(buf, self.error_code) }
232 if version >= 0 { self.snapshot_id.encode(buf, version)? }
233 if version >= 0 { put_i64(buf, self.size) }
234 if version >= 0 { put_i64(buf, self.position) }
235 if version >= 0 { { let mut __rb_buf = bytes::BytesMut::new(); <crate::records::RecordsPayloadBorrowed as crate::Encode>::encode(&self.unaligned_records, &mut __rb_buf, version)?; if flex { put_compact_bytes(buf, &__rb_buf) } else { put_bytes(buf, &__rb_buf) } } }
236 if flex {
237 let mut tagged = WriteTaggedFields::new();
238 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
239 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| { self.current_leader.encode(b, version)?; Ok(()) });
240 tagged.add(0, payload);
241 }
242 tagged.write(buf, &self.unknown_tagged_fields);
243 }
244 Ok(())
245 }
246 fn encoded_len(&self, version: i16) -> usize {
247 let flex = version >= 0;
248 let mut n: usize = 0;
249 if version >= 0 { n += 4; }
250 if version >= 0 { n += 2; }
251 if version >= 0 { n += self.snapshot_id.encoded_len(version); }
252 if version >= 0 { n += 8; }
253 if version >= 0 { n += 8; }
254 if version >= 0 { n += { let __rb_len = <crate::records::RecordsPayloadBorrowed as crate::Encode>::encoded_len(&(self.unaligned_records), version); if flex { crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len) } else { 4 + __rb_len } }; }
255 if flex {
256 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
257 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
258 known_pairs.push((0, self.current_leader.encoded_len(version)));
259 }
260 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
261 }
262 n
263 }
264}
265
266impl<'de> DecodeBorrow<'de> for PartitionSnapshot<'de> {
267 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
268 let flex = version >= 0;
269 let mut out = Self::default();
270 if version >= 0 { out.index = get_i32(buf)?; }
271 if version >= 0 { out.error_code = get_i16(buf)?; }
272 if version >= 0 { out.snapshot_id = SnapshotId::decode_borrow(buf, version)?; }
273 if version >= 0 { out.size = get_i64(buf)?; }
274 if version >= 0 { out.position = get_i64(buf)?; }
275 if version >= 0 { out.unaligned_records = { let __rb_slice = if flex { get_compact_bytes_borrowed(buf)? } else { get_bytes_borrowed(buf)? }; let mut __rb_cur = __rb_slice; <crate::records::RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(&mut __rb_cur, version)? }; }
276 if flex {
277 let mut tag_current_leader = None;
279 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
280 match tag {
281 0 => { tag_current_leader = Some({ let b: &mut &[u8] = payload; LeaderIdAndEpoch::decode_borrow(b, version)? }); Ok(true) }
282 _ => Ok(false),
283 }
284 })?;
285 if let Some(v) = tag_current_leader { out.current_leader = v; }
286 }
287 Ok(out)
288 }
289}
290
291#[derive(Debug, Clone, PartialEq, Eq)]
292pub struct SnapshotId {
293 pub end_offset: i64,
294 pub epoch: i32,
295 pub unknown_tagged_fields: UnknownTaggedFields,
296}
297
298impl Default for SnapshotId {
299 fn default() -> Self {
300 Self {
301 end_offset: 0i64,
302 epoch: 0i32,
303 unknown_tagged_fields: Default::default(),
304 }
305 }
306}
307
308impl SnapshotId {
309 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_response::SnapshotId {
310 crate::owned::fetch_snapshot_response::SnapshotId {
311 end_offset: (self.end_offset),
312 epoch: (self.epoch),
313 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
314 }
315 }
316}
317
318impl Encode for SnapshotId {
319 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
320 let flex = version >= 0;
321 if version >= 0 { put_i64(buf, self.end_offset) }
322 if version >= 0 { put_i32(buf, self.epoch) }
323 if flex {
324 let tagged = WriteTaggedFields::new();
325 tagged.write(buf, &self.unknown_tagged_fields);
326 }
327 Ok(())
328 }
329 fn encoded_len(&self, version: i16) -> usize {
330 let flex = version >= 0;
331 let mut n: usize = 0;
332 if version >= 0 { n += 8; }
333 if version >= 0 { n += 4; }
334 if flex {
335 let known_pairs: Vec<(u32, usize)> = Vec::new();
336 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
337 }
338 n
339 }
340}
341
342impl<'de> DecodeBorrow<'de> for SnapshotId {
343 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
344 let flex = version >= 0;
345 let mut out = Self::default();
346 if version >= 0 { out.end_offset = get_i64(buf)?; }
347 if version >= 0 { out.epoch = get_i32(buf)?; }
348 if flex {
349 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
350 Ok(false)
351 })?;
352 }
353 Ok(out)
354 }
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
358pub struct LeaderIdAndEpoch {
359 pub leader_id: i32,
360 pub leader_epoch: i32,
361 pub unknown_tagged_fields: UnknownTaggedFields,
362}
363
364impl Default for LeaderIdAndEpoch {
365 fn default() -> Self {
366 Self {
367 leader_id: 0i32,
368 leader_epoch: 0i32,
369 unknown_tagged_fields: Default::default(),
370 }
371 }
372}
373
374impl LeaderIdAndEpoch {
375 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_response::LeaderIdAndEpoch {
376 crate::owned::fetch_snapshot_response::LeaderIdAndEpoch {
377 leader_id: (self.leader_id),
378 leader_epoch: (self.leader_epoch),
379 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
380 }
381 }
382}
383
384impl Encode for LeaderIdAndEpoch {
385 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
386 let flex = version >= 0;
387 if version >= 0 { put_i32(buf, self.leader_id) }
388 if version >= 0 { put_i32(buf, self.leader_epoch) }
389 if flex {
390 let tagged = WriteTaggedFields::new();
391 tagged.write(buf, &self.unknown_tagged_fields);
392 }
393 Ok(())
394 }
395 fn encoded_len(&self, version: i16) -> usize {
396 let flex = version >= 0;
397 let mut n: usize = 0;
398 if version >= 0 { n += 4; }
399 if version >= 0 { n += 4; }
400 if flex {
401 let known_pairs: Vec<(u32, usize)> = Vec::new();
402 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
403 }
404 n
405 }
406}
407
408impl<'de> DecodeBorrow<'de> for LeaderIdAndEpoch {
409 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
410 let flex = version >= 0;
411 let mut out = Self::default();
412 if version >= 0 { out.leader_id = get_i32(buf)?; }
413 if version >= 0 { out.leader_epoch = get_i32(buf)?; }
414 if flex {
415 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
416 Ok(false)
417 })?;
418 }
419 Ok(out)
420 }
421}
422
423#[derive(Debug, Clone, PartialEq, Eq)]
424pub struct NodeEndpoint<'a> {
425 pub node_id: i32,
426 pub host: &'a str,
427 pub port: u16,
428 pub unknown_tagged_fields: UnknownTaggedFields,
429}
430
431impl<'a> Default for NodeEndpoint<'a> {
432 fn default() -> Self {
433 Self {
434 node_id: 0i32,
435 host: "",
436 port: 0u16,
437 unknown_tagged_fields: Default::default(),
438 }
439 }
440}
441
442impl<'a> NodeEndpoint<'a> {
443 pub fn to_owned(&self) -> crate::owned::fetch_snapshot_response::NodeEndpoint {
444 crate::owned::fetch_snapshot_response::NodeEndpoint {
445 node_id: (self.node_id),
446 host: (self.host).to_string(),
447 port: (self.port),
448 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
449 }
450 }
451}
452
453impl<'a> Encode for NodeEndpoint<'a> {
454 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
455 let flex = version >= 0;
456 if version >= 1 { put_i32(buf, self.node_id) }
457 if version >= 1 { if flex { put_compact_string(buf, self.host) } else { put_string(buf, self.host) } }
458 if version >= 1 { put_u16(buf, self.port) }
459 if flex {
460 let tagged = WriteTaggedFields::new();
461 tagged.write(buf, &self.unknown_tagged_fields);
462 }
463 Ok(())
464 }
465 fn encoded_len(&self, version: i16) -> usize {
466 let flex = version >= 0;
467 let mut n: usize = 0;
468 if version >= 1 { n += 4; }
469 if version >= 1 { n += if flex { compact_string_len(self.host) } else { string_len(self.host) }; }
470 if version >= 1 { n += 2; }
471 if flex {
472 let known_pairs: Vec<(u32, usize)> = Vec::new();
473 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
474 }
475 n
476 }
477}
478
479impl<'de> DecodeBorrow<'de> for NodeEndpoint<'de> {
480 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
481 let flex = version >= 0;
482 let mut out = Self::default();
483 if version >= 1 { out.node_id = get_i32(buf)?; }
484 if version >= 1 { out.host = if flex { get_compact_string_borrowed(buf)? } else { get_string_borrowed(buf)? }; }
485 if version >= 1 { out.port = get_u16(buf)?; }
486 if flex {
487 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
488 Ok(false)
489 })?;
490 }
491 Ok(out)
492 }
493}