1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{
6 get_i16, get_i32, get_i64, get_u16, put_i16, put_i32, put_i64, put_u16,
7};
8use crate::primitives::string_bytes::{
9 compact_string_len, get_compact_string_owned, get_string_owned, put_compact_string, put_string,
10 string_len,
11};
12use crate::primitives::string_bytes::{
13 get_bytes_owned, get_compact_bytes_owned, put_bytes, put_compact_bytes,
14};
15use crate::tagged_fields::{
16 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
17};
18use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
19
20pub const API_KEY: i16 = 59;
21pub const MIN_VERSION: i16 = 0;
22pub const MAX_VERSION: i16 = 1;
23pub const FLEXIBLE_MIN: i16 = 0;
24
25#[inline]
26fn is_flexible(version: i16) -> bool {
27 version >= FLEXIBLE_MIN
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Default)]
31pub struct FetchSnapshotResponse {
32 pub throttle_time_ms: i32,
33 pub error_code: i16,
34 pub topics: Vec<TopicSnapshot>,
35 pub node_endpoints: Vec<NodeEndpoint>,
36 pub unknown_tagged_fields: UnknownTaggedFields,
37}
38impl Encode for FetchSnapshotResponse {
39 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
40 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
41 return Err(ProtocolError::UnsupportedVersion {
42 api_key: API_KEY,
43 version,
44 });
45 }
46 let flex = is_flexible(version);
47 if version >= 0 {
48 put_i32(buf, self.throttle_time_ms);
49 }
50 if version >= 0 {
51 put_i16(buf, self.error_code);
52 }
53 if version >= 0 {
54 {
55 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
56 for it in &self.topics {
57 it.encode(buf, version)?;
58 }
59 }
60 }
61 if flex {
62 let mut tagged = WriteTaggedFields::new();
63 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
64 let payload = encode_to_bytes(
65 {
66 let prefix = crate::primitives::array::array_len_prefix_len(
67 (self.node_endpoints).len(),
68 flex,
69 );
70 let body: usize = (self.node_endpoints)
71 .iter()
72 .map(|it| it.encoded_len(version))
73 .sum();
74 prefix + body
75 },
76 |b| {
77 {
78 crate::primitives::array::put_array_len(
79 b,
80 (self.node_endpoints).len(),
81 flex,
82 );
83 for it in &self.node_endpoints {
84 it.encode(b, version)?;
85 }
86 };
87 Ok(())
88 },
89 );
90 tagged.add(0, payload);
91 }
92 tagged.write(buf, &self.unknown_tagged_fields);
93 }
94 Ok(())
95 }
96 fn encoded_len(&self, version: i16) -> usize {
97 let flex = is_flexible(version);
98 let mut n: usize = 0;
99 if version >= 0 {
100 n += 4;
101 }
102 if version >= 0 {
103 n += 2;
104 }
105 if version >= 0 {
106 n += {
107 let prefix =
108 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
109 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
110 prefix + body
111 };
112 }
113 if flex {
114 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
115 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
116 known_pairs.push((0, {
117 let prefix = crate::primitives::array::array_len_prefix_len(
118 (self.node_endpoints).len(),
119 flex,
120 );
121 let body: usize = (self.node_endpoints)
122 .iter()
123 .map(|it| it.encoded_len(version))
124 .sum();
125 prefix + body
126 }));
127 }
128 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
129 }
130 n
131 }
132}
133impl Decode<'_> for FetchSnapshotResponse {
134 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
135 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
136 return Err(ProtocolError::UnsupportedVersion {
137 api_key: API_KEY,
138 version,
139 });
140 }
141 let flex = is_flexible(version);
142 let mut out = Self::default();
143 if version >= 0 {
144 out.throttle_time_ms = get_i32(buf)?;
145 }
146 if version >= 0 {
147 out.error_code = get_i16(buf)?;
148 }
149 if version >= 0 {
150 out.topics = {
151 let n = crate::primitives::array::get_array_len(buf, flex)?;
152 let mut v = Vec::with_capacity(n);
153 for _ in 0..n {
154 v.push(TopicSnapshot::decode(buf, version)?);
155 }
156 v
157 };
158 }
159 if flex {
160 let mut tag_node_endpoints = None;
161 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
162 0 => {
163 tag_node_endpoints = Some({
164 let b: &mut &[u8] = payload;
165 {
166 let n = crate::primitives::array::get_array_len(b, flex)?;
167 let mut v = Vec::with_capacity(n);
168 for _ in 0..n {
169 v.push(NodeEndpoint::decode(b, version)?);
170 }
171 v
172 }
173 });
174 Ok(true)
175 }
176 _ => Ok(false),
177 })?;
178 if let Some(v) = tag_node_endpoints {
179 out.node_endpoints = v;
180 }
181 }
182 Ok(out)
183 }
184}
185#[cfg(test)]
186impl FetchSnapshotResponse {
187 #[must_use]
188 pub fn populated(version: i16) -> Self {
189 let mut m = Self::default();
190 if version >= 0 {
191 m.throttle_time_ms = 1i32;
192 }
193 if version >= 0 {
194 m.error_code = 1i16;
195 }
196 if version >= 0 {
197 m.topics = vec![TopicSnapshot::populated(version)];
198 }
199 if version >= 1 {
200 m.node_endpoints = vec![NodeEndpoint::populated(version)];
201 }
202 m
203 }
204}
205#[derive(Debug, Clone, PartialEq, Eq, Default)]
206pub struct TopicSnapshot {
207 pub name: String,
208 pub partitions: Vec<PartitionSnapshot>,
209 pub unknown_tagged_fields: UnknownTaggedFields,
210}
211impl Encode for TopicSnapshot {
212 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
213 let flex = version >= 0;
214 if version >= 0 {
215 if flex {
216 put_compact_string(buf, &self.name);
217 } else {
218 put_string(buf, &self.name);
219 }
220 }
221 if version >= 0 {
222 {
223 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
224 for it in &self.partitions {
225 it.encode(buf, version)?;
226 }
227 }
228 }
229 if flex {
230 let tagged = WriteTaggedFields::new();
231 tagged.write(buf, &self.unknown_tagged_fields);
232 }
233 Ok(())
234 }
235 fn encoded_len(&self, version: i16) -> usize {
236 let flex = version >= 0;
237 let mut n: usize = 0;
238 if version >= 0 {
239 n += if flex {
240 compact_string_len(&self.name)
241 } else {
242 string_len(&self.name)
243 };
244 }
245 if version >= 0 {
246 n += {
247 let prefix =
248 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
249 let body: usize = (self.partitions)
250 .iter()
251 .map(|it| it.encoded_len(version))
252 .sum();
253 prefix + body
254 };
255 }
256 if flex {
257 let known_pairs: Vec<(u32, usize)> = Vec::new();
258 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
259 }
260 n
261 }
262}
263impl Decode<'_> for TopicSnapshot {
264 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
265 let flex = version >= 0;
266 let mut out = Self::default();
267 if version >= 0 {
268 out.name = if flex {
269 get_compact_string_owned(buf)?
270 } else {
271 get_string_owned(buf)?
272 };
273 }
274 if version >= 0 {
275 out.partitions = {
276 let n = crate::primitives::array::get_array_len(buf, flex)?;
277 let mut v = Vec::with_capacity(n);
278 for _ in 0..n {
279 v.push(PartitionSnapshot::decode(buf, version)?);
280 }
281 v
282 };
283 }
284 if flex {
285 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
286 }
287 Ok(out)
288 }
289}
290#[cfg(test)]
291impl TopicSnapshot {
292 #[must_use]
293 pub fn populated(version: i16) -> Self {
294 let mut m = Self::default();
295 if version >= 0 {
296 m.name = "x".to_string();
297 }
298 if version >= 0 {
299 m.partitions = vec![PartitionSnapshot::populated(version)];
300 }
301 m
302 }
303}
304#[derive(Debug, Clone, PartialEq, Eq, Default)]
305pub struct PartitionSnapshot {
306 pub index: i32,
307 pub error_code: i16,
308 pub snapshot_id: SnapshotId,
309 pub size: i64,
310 pub position: i64,
311 pub unaligned_records: crate::records::RecordsPayload,
312 pub current_leader: LeaderIdAndEpoch,
313 pub unknown_tagged_fields: UnknownTaggedFields,
314}
315impl Encode for PartitionSnapshot {
316 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
317 let flex = version >= 0;
318 if version >= 0 {
319 put_i32(buf, self.index);
320 }
321 if version >= 0 {
322 put_i16(buf, self.error_code);
323 }
324 if version >= 0 {
325 self.snapshot_id.encode(buf, version)?;
326 }
327 if version >= 0 {
328 put_i64(buf, self.size);
329 }
330 if version >= 0 {
331 put_i64(buf, self.position);
332 }
333 if version >= 0 {
334 {
335 let mut __rb_buf = bytes::BytesMut::new();
336 <crate::records::RecordsPayload as crate::Encode>::encode(
337 &self.unaligned_records,
338 &mut __rb_buf,
339 version,
340 )?;
341 if flex {
342 put_compact_bytes(buf, &__rb_buf);
343 } else {
344 put_bytes(buf, &__rb_buf);
345 }
346 }
347 }
348 if flex {
349 let mut tagged = WriteTaggedFields::new();
350 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
351 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| {
352 self.current_leader.encode(b, version)?;
353 Ok(())
354 });
355 tagged.add(0, payload);
356 }
357 tagged.write(buf, &self.unknown_tagged_fields);
358 }
359 Ok(())
360 }
361 fn encoded_len(&self, version: i16) -> usize {
362 let flex = version >= 0;
363 let mut n: usize = 0;
364 if version >= 0 {
365 n += 4;
366 }
367 if version >= 0 {
368 n += 2;
369 }
370 if version >= 0 {
371 n += self.snapshot_id.encoded_len(version);
372 }
373 if version >= 0 {
374 n += 8;
375 }
376 if version >= 0 {
377 n += 8;
378 }
379 if version >= 0 {
380 n += {
381 let __rb_len = <crate::records::RecordsPayload as crate::Encode>::encoded_len(
382 &self.unaligned_records,
383 version,
384 );
385 if flex {
386 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
387 } else {
388 4 + __rb_len
389 }
390 };
391 }
392 if flex {
393 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
394 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
395 known_pairs.push((0, self.current_leader.encoded_len(version)));
396 }
397 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
398 }
399 n
400 }
401}
402impl Decode<'_> for PartitionSnapshot {
403 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
404 let flex = version >= 0;
405 let mut out = Self::default();
406 if version >= 0 {
407 out.index = get_i32(buf)?;
408 }
409 if version >= 0 {
410 out.error_code = get_i16(buf)?;
411 }
412 if version >= 0 {
413 out.snapshot_id = SnapshotId::decode(buf, version)?;
414 }
415 if version >= 0 {
416 out.size = get_i64(buf)?;
417 }
418 if version >= 0 {
419 out.position = get_i64(buf)?;
420 }
421 if version >= 0 {
422 out.unaligned_records = {
423 let __rb_bytes = if flex {
424 get_compact_bytes_owned(buf)?
425 } else {
426 get_bytes_owned(buf)?
427 };
428 let mut __rb_cur: &[u8] = &__rb_bytes;
429 crate::records::RecordsPayload::decode_lenient(&mut __rb_cur, version)?
430 };
431 }
432 if flex {
433 let mut tag_current_leader = None;
434 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
435 0 => {
436 tag_current_leader = Some({
437 let b: &mut &[u8] = payload;
438 LeaderIdAndEpoch::decode(b, version)?
439 });
440 Ok(true)
441 }
442 _ => Ok(false),
443 })?;
444 if let Some(v) = tag_current_leader {
445 out.current_leader = v;
446 }
447 }
448 Ok(out)
449 }
450}
451#[cfg(test)]
452impl PartitionSnapshot {
453 #[must_use]
454 pub fn populated(version: i16) -> Self {
455 let mut m = Self::default();
456 if version >= 0 {
457 m.index = 1i32;
458 }
459 if version >= 0 {
460 m.error_code = 1i16;
461 }
462 if version >= 0 {
463 m.snapshot_id = SnapshotId::populated(version);
464 }
465 if version >= 0 {
466 m.size = 1i64;
467 }
468 if version >= 0 {
469 m.position = 1i64;
470 }
471 if version >= 0 {
472 m.current_leader = LeaderIdAndEpoch::populated(version);
473 }
474 m
475 }
476}
477#[derive(Debug, Clone, PartialEq, Eq, Default)]
478pub struct SnapshotId {
479 pub end_offset: i64,
480 pub epoch: i32,
481 pub unknown_tagged_fields: UnknownTaggedFields,
482}
483impl Encode for SnapshotId {
484 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
485 let flex = version >= 0;
486 if version >= 0 {
487 put_i64(buf, self.end_offset);
488 }
489 if version >= 0 {
490 put_i32(buf, self.epoch);
491 }
492 if flex {
493 let tagged = WriteTaggedFields::new();
494 tagged.write(buf, &self.unknown_tagged_fields);
495 }
496 Ok(())
497 }
498 fn encoded_len(&self, version: i16) -> usize {
499 let flex = version >= 0;
500 let mut n: usize = 0;
501 if version >= 0 {
502 n += 8;
503 }
504 if version >= 0 {
505 n += 4;
506 }
507 if flex {
508 let known_pairs: Vec<(u32, usize)> = Vec::new();
509 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
510 }
511 n
512 }
513}
514impl Decode<'_> for SnapshotId {
515 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
516 let flex = version >= 0;
517 let mut out = Self::default();
518 if version >= 0 {
519 out.end_offset = get_i64(buf)?;
520 }
521 if version >= 0 {
522 out.epoch = get_i32(buf)?;
523 }
524 if flex {
525 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
526 }
527 Ok(out)
528 }
529}
530#[cfg(test)]
531impl SnapshotId {
532 #[must_use]
533 pub fn populated(version: i16) -> Self {
534 let mut m = Self::default();
535 if version >= 0 {
536 m.end_offset = 1i64;
537 }
538 if version >= 0 {
539 m.epoch = 1i32;
540 }
541 m
542 }
543}
544#[derive(Debug, Clone, PartialEq, Eq, Default)]
545pub struct LeaderIdAndEpoch {
546 pub leader_id: i32,
547 pub leader_epoch: i32,
548 pub unknown_tagged_fields: UnknownTaggedFields,
549}
550impl Encode for LeaderIdAndEpoch {
551 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
552 let flex = version >= 0;
553 if version >= 0 {
554 put_i32(buf, self.leader_id);
555 }
556 if version >= 0 {
557 put_i32(buf, self.leader_epoch);
558 }
559 if flex {
560 let tagged = WriteTaggedFields::new();
561 tagged.write(buf, &self.unknown_tagged_fields);
562 }
563 Ok(())
564 }
565 fn encoded_len(&self, version: i16) -> usize {
566 let flex = version >= 0;
567 let mut n: usize = 0;
568 if version >= 0 {
569 n += 4;
570 }
571 if version >= 0 {
572 n += 4;
573 }
574 if flex {
575 let known_pairs: Vec<(u32, usize)> = Vec::new();
576 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
577 }
578 n
579 }
580}
581impl Decode<'_> for LeaderIdAndEpoch {
582 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
583 let flex = version >= 0;
584 let mut out = Self::default();
585 if version >= 0 {
586 out.leader_id = get_i32(buf)?;
587 }
588 if version >= 0 {
589 out.leader_epoch = get_i32(buf)?;
590 }
591 if flex {
592 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
593 }
594 Ok(out)
595 }
596}
597#[cfg(test)]
598impl LeaderIdAndEpoch {
599 #[must_use]
600 pub fn populated(version: i16) -> Self {
601 let mut m = Self::default();
602 if version >= 0 {
603 m.leader_id = 1i32;
604 }
605 if version >= 0 {
606 m.leader_epoch = 1i32;
607 }
608 m
609 }
610}
611#[derive(Debug, Clone, PartialEq, Eq, Default)]
612pub struct NodeEndpoint {
613 pub node_id: i32,
614 pub host: String,
615 pub port: u16,
616 pub unknown_tagged_fields: UnknownTaggedFields,
617}
618impl Encode for NodeEndpoint {
619 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
620 let flex = version >= 0;
621 if version >= 1 {
622 put_i32(buf, self.node_id);
623 }
624 if version >= 1 {
625 if flex {
626 put_compact_string(buf, &self.host);
627 } else {
628 put_string(buf, &self.host);
629 }
630 }
631 if version >= 1 {
632 put_u16(buf, self.port);
633 }
634 if flex {
635 let tagged = WriteTaggedFields::new();
636 tagged.write(buf, &self.unknown_tagged_fields);
637 }
638 Ok(())
639 }
640 fn encoded_len(&self, version: i16) -> usize {
641 let flex = version >= 0;
642 let mut n: usize = 0;
643 if version >= 1 {
644 n += 4;
645 }
646 if version >= 1 {
647 n += if flex {
648 compact_string_len(&self.host)
649 } else {
650 string_len(&self.host)
651 };
652 }
653 if version >= 1 {
654 n += 2;
655 }
656 if flex {
657 let known_pairs: Vec<(u32, usize)> = Vec::new();
658 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
659 }
660 n
661 }
662}
663impl Decode<'_> for NodeEndpoint {
664 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
665 let flex = version >= 0;
666 let mut out = Self::default();
667 if version >= 1 {
668 out.node_id = get_i32(buf)?;
669 }
670 if version >= 1 {
671 out.host = if flex {
672 get_compact_string_owned(buf)?
673 } else {
674 get_string_owned(buf)?
675 };
676 }
677 if version >= 1 {
678 out.port = get_u16(buf)?;
679 }
680 if flex {
681 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
682 }
683 Ok(out)
684 }
685}
686#[cfg(test)]
687impl NodeEndpoint {
688 #[must_use]
689 pub fn populated(version: i16) -> Self {
690 let mut m = Self::default();
691 if version >= 1 {
692 m.node_id = 1i32;
693 }
694 if version >= 1 {
695 m.host = "x".to_string();
696 }
697 if version >= 1 {
698 m.port = 1u16;
699 }
700 m
701 }
702}
703
704#[must_use]
707#[allow(unused_comparisons)]
708pub fn default_json(version: i16) -> ::serde_json::Value {
709 let mut obj = ::serde_json::Map::new();
710 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
711 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
712 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
713 if version >= 1 {
714 obj.insert(
715 "nodeEndpoints".to_string(),
716 ::serde_json::Value::Array(vec![]),
717 );
718 }
719 ::serde_json::Value::Object(obj)
720}