1use crate::primitives::fixed::{
4 get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16,
5};
6use crate::primitives::string_bytes::{
7 compact_string_len, get_compact_string_owned, get_string_owned, put_compact_string, put_string,
8 string_len,
9};
10use crate::primitives::string_bytes::{
11 get_bytes_owned, get_compact_bytes_owned, put_bytes, put_compact_bytes,
12};
13use crate::tagged_fields::{
14 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
15};
16use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
17use bytes::{Buf, BufMut};
18pub const API_KEY: i16 = 59;
19pub const MIN_VERSION: i16 = 0;
20pub const MAX_VERSION: i16 = 1;
21pub const FLEXIBLE_MIN: i16 = 0;
22#[inline]
23fn is_flexible(version: i16) -> bool {
24 version >= FLEXIBLE_MIN
25}
26#[derive(Debug, Clone, PartialEq, Eq, Default)]
27pub struct FetchSnapshotResponse {
28 pub throttle_time_ms: i32,
29 pub error_code: i16,
30 pub topics: Vec<TopicSnapshot>,
31 pub node_endpoints: Vec<NodeEndpoint>,
32 pub unknown_tagged_fields: UnknownTaggedFields,
33}
34impl Encode for FetchSnapshotResponse {
35 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
36 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
37 return Err(ProtocolError::UnsupportedVersion {
38 api_key: API_KEY,
39 version,
40 });
41 }
42 let flex = is_flexible(version);
43 if version >= 0 {
44 put_i32(buf, self.throttle_time_ms);
45 }
46 if version >= 0 {
47 put_i16(buf, self.error_code);
48 }
49 if version >= 0 {
50 {
51 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
52 for it in &self.topics {
53 it.encode(buf, version)?;
54 }
55 }
56 }
57 if flex {
58 let mut tagged = WriteTaggedFields::new();
59 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
60 let payload = encode_to_bytes(
61 {
62 let prefix = crate::primitives::array::array_len_prefix_len(
63 (self.node_endpoints).len(),
64 flex,
65 );
66 let body: usize = (self.node_endpoints)
67 .iter()
68 .map(|it| it.encoded_len(version))
69 .sum();
70 prefix + body
71 },
72 |b| {
73 {
74 crate::primitives::array::put_array_len(
75 b,
76 (self.node_endpoints).len(),
77 flex,
78 );
79 for it in &self.node_endpoints {
80 it.encode(b, version)?;
81 }
82 };
83 Ok(())
84 },
85 );
86 tagged.add(0, payload);
87 }
88 tagged.write(buf, &self.unknown_tagged_fields);
89 }
90 Ok(())
91 }
92 fn encoded_len(&self, version: i16) -> usize {
93 let flex = is_flexible(version);
94 let mut n: usize = 0;
95 if version >= 0 {
96 n += 4;
97 }
98 if version >= 0 {
99 n += 2;
100 }
101 if version >= 0 {
102 n += {
103 let prefix =
104 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
105 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
106 prefix + body
107 };
108 }
109 if flex {
110 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
111 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
112 known_pairs.push((0, {
113 let prefix = crate::primitives::array::array_len_prefix_len(
114 (self.node_endpoints).len(),
115 flex,
116 );
117 let body: usize = (self.node_endpoints)
118 .iter()
119 .map(|it| it.encoded_len(version))
120 .sum();
121 prefix + body
122 }));
123 }
124 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
125 }
126 n
127 }
128}
129impl Decode<'_> for FetchSnapshotResponse {
130 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
131 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
132 return Err(ProtocolError::UnsupportedVersion {
133 api_key: API_KEY,
134 version,
135 });
136 }
137 let flex = is_flexible(version);
138 let mut out = Self::default();
139 if version >= 0 {
140 out.throttle_time_ms = get_i32(buf)?;
141 }
142 if version >= 0 {
143 out.error_code = get_i16(buf)?;
144 }
145 if version >= 0 {
146 out.topics = {
147 let n = crate::primitives::array::get_array_len(buf, flex)?;
148 let mut v = Vec::with_capacity(n);
149 for _ in 0..n {
150 v.push(TopicSnapshot::decode(buf, version)?);
151 }
152 v
153 };
154 }
155 if flex {
156 let mut tag_node_endpoints = None;
157 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
158 0 => {
159 tag_node_endpoints = Some({
160 let b: &mut &[u8] = payload;
161 {
162 let n = crate::primitives::array::get_array_len(b, flex)?;
163 let mut v = Vec::with_capacity(n);
164 for _ in 0..n {
165 v.push(NodeEndpoint::decode(b, version)?);
166 }
167 v
168 }
169 });
170 Ok(true)
171 }
172 _ => Ok(false),
173 })?;
174 if let Some(v) = tag_node_endpoints {
175 out.node_endpoints = v;
176 }
177 }
178 Ok(out)
179 }
180}
181#[cfg(test)]
182impl FetchSnapshotResponse {
183 #[must_use]
184 pub fn populated(version: i16) -> Self {
185 let mut m = Self::default();
186 if version >= 0 {
187 m.throttle_time_ms = 1i32;
188 }
189 if version >= 0 {
190 m.error_code = 1i16;
191 }
192 if version >= 0 {
193 m.topics = vec![TopicSnapshot::populated(version)];
194 }
195 if version >= 1 {
196 m.node_endpoints = vec![NodeEndpoint::populated(version)];
197 }
198 m
199 }
200}
201#[derive(Debug, Clone, PartialEq, Eq, Default)]
202pub struct TopicSnapshot {
203 pub name: String,
204 pub partitions: Vec<PartitionSnapshot>,
205 pub unknown_tagged_fields: UnknownTaggedFields,
206}
207impl Encode for TopicSnapshot {
208 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
209 let flex = version >= 0;
210 if version >= 0 {
211 if flex {
212 put_compact_string(buf, &self.name);
213 } else {
214 put_string(buf, &self.name);
215 }
216 }
217 if version >= 0 {
218 {
219 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
220 for it in &self.partitions {
221 it.encode(buf, version)?;
222 }
223 }
224 }
225 if flex {
226 let tagged = WriteTaggedFields::new();
227 tagged.write(buf, &self.unknown_tagged_fields);
228 }
229 Ok(())
230 }
231 fn encoded_len(&self, version: i16) -> usize {
232 let flex = version >= 0;
233 let mut n: usize = 0;
234 if version >= 0 {
235 n += if flex {
236 compact_string_len(&self.name)
237 } else {
238 string_len(&self.name)
239 };
240 }
241 if version >= 0 {
242 n += {
243 let prefix =
244 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
245 let body: usize = (self.partitions)
246 .iter()
247 .map(|it| it.encoded_len(version))
248 .sum();
249 prefix + body
250 };
251 }
252 if flex {
253 let known_pairs: Vec<(u32, usize)> = Vec::new();
254 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
255 }
256 n
257 }
258}
259impl Decode<'_> for TopicSnapshot {
260 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
261 let flex = version >= 0;
262 let mut out = Self::default();
263 if version >= 0 {
264 out.name = if flex {
265 get_compact_string_owned(buf)?
266 } else {
267 get_string_owned(buf)?
268 };
269 }
270 if version >= 0 {
271 out.partitions = {
272 let n = crate::primitives::array::get_array_len(buf, flex)?;
273 let mut v = Vec::with_capacity(n);
274 for _ in 0..n {
275 v.push(PartitionSnapshot::decode(buf, version)?);
276 }
277 v
278 };
279 }
280 if flex {
281 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
282 }
283 Ok(out)
284 }
285}
286#[cfg(test)]
287impl TopicSnapshot {
288 #[must_use]
289 pub fn populated(version: i16) -> Self {
290 let mut m = Self::default();
291 if version >= 0 {
292 m.name = "x".to_string();
293 }
294 if version >= 0 {
295 m.partitions = vec![PartitionSnapshot::populated(version)];
296 }
297 m
298 }
299}
300#[derive(Debug, Clone, PartialEq, Eq, Default)]
301pub struct PartitionSnapshot {
302 pub index: i32,
303 pub error_code: i16,
304 pub snapshot_id: SnapshotId,
305 pub size: i64,
306 pub position: i64,
307 pub unaligned_records: crate::records::RecordsPayload,
308 pub current_leader: LeaderIdAndEpoch,
309 pub unknown_tagged_fields: UnknownTaggedFields,
310}
311impl Encode for PartitionSnapshot {
312 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
313 let flex = version >= 0;
314 if version >= 0 {
315 put_i32(buf, self.index);
316 }
317 if version >= 0 {
318 put_i16(buf, self.error_code);
319 }
320 if version >= 0 {
321 self.snapshot_id.encode(buf, version)?;
322 }
323 if version >= 0 {
324 put_i64(buf, self.size);
325 }
326 if version >= 0 {
327 put_i64(buf, self.position);
328 }
329 if version >= 0 {
330 {
331 let mut __rb_buf = bytes::BytesMut::new();
332 <crate::records::RecordsPayload as crate::Encode>::encode(
333 &self.unaligned_records,
334 &mut __rb_buf,
335 version,
336 )?;
337 if flex {
338 put_compact_bytes(buf, &__rb_buf);
339 } else {
340 put_bytes(buf, &__rb_buf);
341 }
342 }
343 }
344 if flex {
345 let mut tagged = WriteTaggedFields::new();
346 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
347 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| {
348 self.current_leader.encode(b, version)?;
349 Ok(())
350 });
351 tagged.add(0, payload);
352 }
353 tagged.write(buf, &self.unknown_tagged_fields);
354 }
355 Ok(())
356 }
357 fn encoded_len(&self, version: i16) -> usize {
358 let flex = version >= 0;
359 let mut n: usize = 0;
360 if version >= 0 {
361 n += 4;
362 }
363 if version >= 0 {
364 n += 2;
365 }
366 if version >= 0 {
367 n += self.snapshot_id.encoded_len(version);
368 }
369 if version >= 0 {
370 n += 8;
371 }
372 if version >= 0 {
373 n += 8;
374 }
375 if version >= 0 {
376 n += {
377 let __rb_len = <crate::records::RecordsPayload as crate::Encode>::encoded_len(
378 &self.unaligned_records,
379 version,
380 );
381 if flex {
382 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
383 } else {
384 4 + __rb_len
385 }
386 };
387 }
388 if flex {
389 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
390 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
391 known_pairs.push((0, self.current_leader.encoded_len(version)));
392 }
393 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
394 }
395 n
396 }
397}
398impl Decode<'_> for PartitionSnapshot {
399 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
400 let flex = version >= 0;
401 let mut out = Self::default();
402 if version >= 0 {
403 out.index = get_i32(buf)?;
404 }
405 if version >= 0 {
406 out.error_code = get_i16(buf)?;
407 }
408 if version >= 0 {
409 out.snapshot_id = SnapshotId::decode(buf, version)?;
410 }
411 if version >= 0 {
412 out.size = get_i64(buf)?;
413 }
414 if version >= 0 {
415 out.position = get_i64(buf)?;
416 }
417 if version >= 0 {
418 out.unaligned_records = {
419 let __rb_bytes = if flex {
420 get_compact_bytes_owned(buf)?
421 } else {
422 get_bytes_owned(buf)?
423 };
424 let mut __rb_cur: &[u8] = &__rb_bytes;
425 crate::records::RecordsPayload::decode_lenient(&mut __rb_cur, version)?
426 };
427 }
428 if flex {
429 let mut tag_current_leader = None;
430 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
431 0 => {
432 tag_current_leader = Some({
433 let b: &mut &[u8] = payload;
434 LeaderIdAndEpoch::decode(b, version)?
435 });
436 Ok(true)
437 }
438 _ => Ok(false),
439 })?;
440 if let Some(v) = tag_current_leader {
441 out.current_leader = v;
442 }
443 }
444 Ok(out)
445 }
446}
447#[cfg(test)]
448impl PartitionSnapshot {
449 #[must_use]
450 pub fn populated(version: i16) -> Self {
451 let mut m = Self::default();
452 if version >= 0 {
453 m.index = 1i32;
454 }
455 if version >= 0 {
456 m.error_code = 1i16;
457 }
458 if version >= 0 {
459 m.snapshot_id = SnapshotId::populated(version);
460 }
461 if version >= 0 {
462 m.size = 1i64;
463 }
464 if version >= 0 {
465 m.position = 1i64;
466 }
467 if version >= 0 {
468 m.current_leader = LeaderIdAndEpoch::populated(version);
469 }
470 m
471 }
472}
473#[derive(Debug, Clone, PartialEq, Eq, Default)]
474pub struct SnapshotId {
475 pub end_offset: i64,
476 pub epoch: i32,
477 pub unknown_tagged_fields: UnknownTaggedFields,
478}
479impl Encode for SnapshotId {
480 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
481 let flex = version >= 0;
482 if version >= 0 {
483 put_i64(buf, self.end_offset);
484 }
485 if version >= 0 {
486 put_i32(buf, self.epoch);
487 }
488 if flex {
489 let tagged = WriteTaggedFields::new();
490 tagged.write(buf, &self.unknown_tagged_fields);
491 }
492 Ok(())
493 }
494 fn encoded_len(&self, version: i16) -> usize {
495 let flex = version >= 0;
496 let mut n: usize = 0;
497 if version >= 0 {
498 n += 8;
499 }
500 if version >= 0 {
501 n += 4;
502 }
503 if flex {
504 let known_pairs: Vec<(u32, usize)> = Vec::new();
505 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
506 }
507 n
508 }
509}
510impl Decode<'_> for SnapshotId {
511 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
512 let flex = version >= 0;
513 let mut out = Self::default();
514 if version >= 0 {
515 out.end_offset = get_i64(buf)?;
516 }
517 if version >= 0 {
518 out.epoch = get_i32(buf)?;
519 }
520 if flex {
521 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
522 }
523 Ok(out)
524 }
525}
526#[cfg(test)]
527impl SnapshotId {
528 #[must_use]
529 pub fn populated(version: i16) -> Self {
530 let mut m = Self::default();
531 if version >= 0 {
532 m.end_offset = 1i64;
533 }
534 if version >= 0 {
535 m.epoch = 1i32;
536 }
537 m
538 }
539}
540#[derive(Debug, Clone, PartialEq, Eq, Default)]
541pub struct LeaderIdAndEpoch {
542 pub leader_id: i32,
543 pub leader_epoch: i32,
544 pub unknown_tagged_fields: UnknownTaggedFields,
545}
546impl Encode for LeaderIdAndEpoch {
547 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
548 let flex = version >= 0;
549 if version >= 0 {
550 put_i32(buf, self.leader_id);
551 }
552 if version >= 0 {
553 put_i32(buf, self.leader_epoch);
554 }
555 if flex {
556 let tagged = WriteTaggedFields::new();
557 tagged.write(buf, &self.unknown_tagged_fields);
558 }
559 Ok(())
560 }
561 fn encoded_len(&self, version: i16) -> usize {
562 let flex = version >= 0;
563 let mut n: usize = 0;
564 if version >= 0 {
565 n += 4;
566 }
567 if version >= 0 {
568 n += 4;
569 }
570 if flex {
571 let known_pairs: Vec<(u32, usize)> = Vec::new();
572 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
573 }
574 n
575 }
576}
577impl Decode<'_> for LeaderIdAndEpoch {
578 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
579 let flex = version >= 0;
580 let mut out = Self::default();
581 if version >= 0 {
582 out.leader_id = get_i32(buf)?;
583 }
584 if version >= 0 {
585 out.leader_epoch = get_i32(buf)?;
586 }
587 if flex {
588 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
589 }
590 Ok(out)
591 }
592}
593#[cfg(test)]
594impl LeaderIdAndEpoch {
595 #[must_use]
596 pub fn populated(version: i16) -> Self {
597 let mut m = Self::default();
598 if version >= 0 {
599 m.leader_id = 1i32;
600 }
601 if version >= 0 {
602 m.leader_epoch = 1i32;
603 }
604 m
605 }
606}
607#[derive(Debug, Clone, PartialEq, Eq, Default)]
608pub struct NodeEndpoint {
609 pub node_id: i32,
610 pub host: String,
611 pub port: u16,
612 pub unknown_tagged_fields: UnknownTaggedFields,
613}
614impl Encode for NodeEndpoint {
615 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
616 let flex = version >= 0;
617 if version >= 1 {
618 put_i32(buf, self.node_id);
619 }
620 if version >= 1 {
621 if flex {
622 put_compact_string(buf, &self.host);
623 } else {
624 put_string(buf, &self.host);
625 }
626 }
627 if version >= 1 {
628 put_u16(buf, self.port);
629 }
630 if flex {
631 let tagged = WriteTaggedFields::new();
632 tagged.write(buf, &self.unknown_tagged_fields);
633 }
634 Ok(())
635 }
636 fn encoded_len(&self, version: i16) -> usize {
637 let flex = version >= 0;
638 let mut n: usize = 0;
639 if version >= 1 {
640 n += 4;
641 }
642 if version >= 1 {
643 n += if flex {
644 compact_string_len(&self.host)
645 } else {
646 string_len(&self.host)
647 };
648 }
649 if version >= 1 {
650 n += 2;
651 }
652 if flex {
653 let known_pairs: Vec<(u32, usize)> = Vec::new();
654 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
655 }
656 n
657 }
658}
659impl Decode<'_> for NodeEndpoint {
660 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
661 let flex = version >= 0;
662 let mut out = Self::default();
663 if version >= 1 {
664 out.node_id = get_i32(buf)?;
665 }
666 if version >= 1 {
667 out.host = if flex {
668 get_compact_string_owned(buf)?
669 } else {
670 get_string_owned(buf)?
671 };
672 }
673 if version >= 1 {
674 out.port = get_u16(buf)?;
675 }
676 if flex {
677 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
678 }
679 Ok(out)
680 }
681}
682#[cfg(test)]
683impl NodeEndpoint {
684 #[must_use]
685 pub fn populated(version: i16) -> Self {
686 let mut m = Self::default();
687 if version >= 1 {
688 m.node_id = 1i32;
689 }
690 if version >= 1 {
691 m.host = "x".to_string();
692 }
693 if version >= 1 {
694 m.port = 1u16;
695 }
696 m
697 }
698}
699#[must_use]
702#[allow(unused_comparisons)]
703pub fn default_json(version: i16) -> ::serde_json::Value {
704 let mut obj = ::serde_json::Map::new();
705 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
706 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
707 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
708 if version >= 1 {
709 obj.insert(
710 "nodeEndpoints".to_string(),
711 ::serde_json::Value::Array(vec![]),
712 );
713 }
714 ::serde_json::Value::Object(obj)
715}