1use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{
10 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
11};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::{Buf, BufMut};
14pub const API_KEY: i16 = 0;
15pub const MIN_VERSION: i16 = 3;
16pub const MAX_VERSION: i16 = 13;
17pub const FLEXIBLE_MIN: i16 = 9;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq, Default)]
23pub struct ProduceResponse {
24 pub responses: Vec<TopicProduceResponse>,
25 pub throttle_time_ms: i32,
26 pub node_endpoints: Vec<NodeEndpoint>,
27 pub unknown_tagged_fields: UnknownTaggedFields,
28}
29impl Encode for ProduceResponse {
30 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
31 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
32 return Err(ProtocolError::UnsupportedVersion {
33 api_key: API_KEY,
34 version,
35 });
36 }
37 let flex = is_flexible(version);
38 if version >= 0 {
39 {
40 crate::primitives::array::put_array_len(buf, (self.responses).len(), flex);
41 for it in &self.responses {
42 it.encode(buf, version)?;
43 }
44 }
45 }
46 if version >= 1 {
47 put_i32(buf, self.throttle_time_ms);
48 }
49 if flex {
50 let mut tagged = WriteTaggedFields::new();
51 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
52 let payload = encode_to_bytes(
53 {
54 let prefix = crate::primitives::array::array_len_prefix_len(
55 (self.node_endpoints).len(),
56 flex,
57 );
58 let body: usize = (self.node_endpoints)
59 .iter()
60 .map(|it| it.encoded_len(version))
61 .sum();
62 prefix + body
63 },
64 |b| {
65 {
66 crate::primitives::array::put_array_len(
67 b,
68 (self.node_endpoints).len(),
69 flex,
70 );
71 for it in &self.node_endpoints {
72 it.encode(b, version)?;
73 }
74 };
75 Ok(())
76 },
77 );
78 tagged.add(0, payload);
79 }
80 tagged.write(buf, &self.unknown_tagged_fields);
81 }
82 Ok(())
83 }
84 fn encoded_len(&self, version: i16) -> usize {
85 let flex = is_flexible(version);
86 let mut n: usize = 0;
87 if version >= 0 {
88 n += {
89 let prefix =
90 crate::primitives::array::array_len_prefix_len((self.responses).len(), flex);
91 let body: usize = (self.responses)
92 .iter()
93 .map(|it| it.encoded_len(version))
94 .sum();
95 prefix + body
96 };
97 }
98 if version >= 1 {
99 n += 4;
100 }
101 if flex {
102 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
103 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
104 known_pairs.push((0, {
105 let prefix = crate::primitives::array::array_len_prefix_len(
106 (self.node_endpoints).len(),
107 flex,
108 );
109 let body: usize = (self.node_endpoints)
110 .iter()
111 .map(|it| it.encoded_len(version))
112 .sum();
113 prefix + body
114 }));
115 }
116 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
117 }
118 n
119 }
120}
121impl Decode<'_> for ProduceResponse {
122 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
123 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
124 return Err(ProtocolError::UnsupportedVersion {
125 api_key: API_KEY,
126 version,
127 });
128 }
129 let flex = is_flexible(version);
130 let mut out = Self::default();
131 if version >= 0 {
132 out.responses = {
133 let n = crate::primitives::array::get_array_len(buf, flex)?;
134 let mut v = Vec::with_capacity(n);
135 for _ in 0..n {
136 v.push(TopicProduceResponse::decode(buf, version)?);
137 }
138 v
139 };
140 }
141 if version >= 1 {
142 out.throttle_time_ms = get_i32(buf)?;
143 }
144 if flex {
145 let mut tag_node_endpoints = None;
146 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
147 0 => {
148 tag_node_endpoints = Some({
149 let b: &mut &[u8] = payload;
150 {
151 let n = crate::primitives::array::get_array_len(b, flex)?;
152 let mut v = Vec::with_capacity(n);
153 for _ in 0..n {
154 v.push(NodeEndpoint::decode(b, version)?);
155 }
156 v
157 }
158 });
159 Ok(true)
160 }
161 _ => Ok(false),
162 })?;
163 if let Some(v) = tag_node_endpoints {
164 out.node_endpoints = v;
165 }
166 }
167 Ok(out)
168 }
169}
170#[cfg(test)]
171impl ProduceResponse {
172 #[must_use]
173 pub fn populated(version: i16) -> Self {
174 let mut m = Self::default();
175 if version >= 0 {
176 m.responses = vec![TopicProduceResponse::populated(version)];
177 }
178 if version >= 1 {
179 m.throttle_time_ms = 1i32;
180 }
181 if version >= 10 {
182 m.node_endpoints = vec![NodeEndpoint::populated(version)];
183 }
184 m
185 }
186}
187#[derive(Debug, Clone, PartialEq, Eq, Default)]
188pub struct TopicProduceResponse {
189 pub name: String,
190 pub topic_id: crate::primitives::uuid::Uuid,
191 pub partition_responses: Vec<PartitionProduceResponse>,
192 pub unknown_tagged_fields: UnknownTaggedFields,
193}
194impl Encode for TopicProduceResponse {
195 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
196 let flex = version >= 9;
197 if (0..=12).contains(&version) {
198 if flex {
199 put_compact_string(buf, &self.name);
200 } else {
201 put_string(buf, &self.name);
202 }
203 }
204 if version >= 13 {
205 crate::primitives::uuid::put_uuid(buf, self.topic_id);
206 }
207 if version >= 0 {
208 {
209 crate::primitives::array::put_array_len(
210 buf,
211 (self.partition_responses).len(),
212 flex,
213 );
214 for it in &self.partition_responses {
215 it.encode(buf, version)?;
216 }
217 }
218 }
219 if flex {
220 let tagged = WriteTaggedFields::new();
221 tagged.write(buf, &self.unknown_tagged_fields);
222 }
223 Ok(())
224 }
225 fn encoded_len(&self, version: i16) -> usize {
226 let flex = version >= 9;
227 let mut n: usize = 0;
228 if (0..=12).contains(&version) {
229 n += if flex {
230 compact_string_len(&self.name)
231 } else {
232 string_len(&self.name)
233 };
234 }
235 if version >= 13 {
236 n += 16;
237 }
238 if version >= 0 {
239 n += {
240 let prefix = crate::primitives::array::array_len_prefix_len(
241 (self.partition_responses).len(),
242 flex,
243 );
244 let body: usize = (self.partition_responses)
245 .iter()
246 .map(|it| it.encoded_len(version))
247 .sum();
248 prefix + body
249 };
250 }
251 if flex {
252 let known_pairs: Vec<(u32, usize)> = Vec::new();
253 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
254 }
255 n
256 }
257}
258impl Decode<'_> for TopicProduceResponse {
259 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
260 let flex = version >= 9;
261 let mut out = Self::default();
262 if (0..=12).contains(&version) {
263 out.name = if flex {
264 get_compact_string_owned(buf)?
265 } else {
266 get_string_owned(buf)?
267 };
268 }
269 if version >= 13 {
270 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
271 }
272 if version >= 0 {
273 out.partition_responses = {
274 let n = crate::primitives::array::get_array_len(buf, flex)?;
275 let mut v = Vec::with_capacity(n);
276 for _ in 0..n {
277 v.push(PartitionProduceResponse::decode(buf, version)?);
278 }
279 v
280 };
281 }
282 if flex {
283 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
284 }
285 Ok(out)
286 }
287}
288#[cfg(test)]
289impl TopicProduceResponse {
290 #[must_use]
291 pub fn populated(version: i16) -> Self {
292 let mut m = Self::default();
293 if (0..=12).contains(&version) {
294 m.name = "x".to_string();
295 }
296 if version >= 13 {
297 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
298 }
299 if version >= 0 {
300 m.partition_responses = vec![PartitionProduceResponse::populated(version)];
301 }
302 m
303 }
304}
305#[derive(Debug, Clone, PartialEq, Eq)]
306pub struct PartitionProduceResponse {
307 pub index: i32,
308 pub error_code: i16,
309 pub base_offset: i64,
310 pub log_append_time_ms: i64,
311 pub log_start_offset: i64,
312 pub record_errors: Vec<BatchIndexAndErrorMessage>,
313 pub error_message: Option<String>,
314 pub current_leader: LeaderIdAndEpoch,
315 pub unknown_tagged_fields: UnknownTaggedFields,
316}
317impl Default for PartitionProduceResponse {
318 fn default() -> Self {
319 Self {
320 index: 0i32,
321 error_code: 0i16,
322 base_offset: 0i64,
323 log_append_time_ms: -1i64,
324 log_start_offset: -1i64,
325 record_errors: Vec::new(),
326 error_message: None,
327 current_leader: Default::default(),
328 unknown_tagged_fields: Default::default(),
329 }
330 }
331}
332impl Encode for PartitionProduceResponse {
333 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
334 let flex = version >= 9;
335 if version >= 0 {
336 put_i32(buf, self.index);
337 }
338 if version >= 0 {
339 put_i16(buf, self.error_code);
340 }
341 if version >= 0 {
342 put_i64(buf, self.base_offset);
343 }
344 if version >= 2 {
345 put_i64(buf, self.log_append_time_ms);
346 }
347 if version >= 5 {
348 put_i64(buf, self.log_start_offset);
349 }
350 if version >= 8 {
351 {
352 crate::primitives::array::put_array_len(buf, (self.record_errors).len(), flex);
353 for it in &self.record_errors {
354 it.encode(buf, version)?;
355 }
356 }
357 }
358 if version >= 8 {
359 if flex {
360 put_compact_nullable_string(buf, self.error_message.as_deref());
361 } else {
362 put_nullable_string(buf, self.error_message.as_deref());
363 }
364 }
365 if flex {
366 let mut tagged = WriteTaggedFields::new();
367 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
368 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| {
369 self.current_leader.encode(b, version)?;
370 Ok(())
371 });
372 tagged.add(0, payload);
373 }
374 tagged.write(buf, &self.unknown_tagged_fields);
375 }
376 Ok(())
377 }
378 fn encoded_len(&self, version: i16) -> usize {
379 let flex = version >= 9;
380 let mut n: usize = 0;
381 if version >= 0 {
382 n += 4;
383 }
384 if version >= 0 {
385 n += 2;
386 }
387 if version >= 0 {
388 n += 8;
389 }
390 if version >= 2 {
391 n += 8;
392 }
393 if version >= 5 {
394 n += 8;
395 }
396 if version >= 8 {
397 n += {
398 let prefix = crate::primitives::array::array_len_prefix_len(
399 (self.record_errors).len(),
400 flex,
401 );
402 let body: usize = (self.record_errors)
403 .iter()
404 .map(|it| it.encoded_len(version))
405 .sum();
406 prefix + body
407 };
408 }
409 if version >= 8 {
410 n += if flex {
411 compact_nullable_string_len(self.error_message.as_deref())
412 } else {
413 nullable_string_len(self.error_message.as_deref())
414 };
415 }
416 if flex {
417 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
418 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
419 known_pairs.push((0, self.current_leader.encoded_len(version)));
420 }
421 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
422 }
423 n
424 }
425}
426impl Decode<'_> for PartitionProduceResponse {
427 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
428 let flex = version >= 9;
429 let mut out = Self::default();
430 if version >= 0 {
431 out.index = get_i32(buf)?;
432 }
433 if version >= 0 {
434 out.error_code = get_i16(buf)?;
435 }
436 if version >= 0 {
437 out.base_offset = get_i64(buf)?;
438 }
439 if version >= 2 {
440 out.log_append_time_ms = get_i64(buf)?;
441 }
442 if version >= 5 {
443 out.log_start_offset = get_i64(buf)?;
444 }
445 if version >= 8 {
446 out.record_errors = {
447 let n = crate::primitives::array::get_array_len(buf, flex)?;
448 let mut v = Vec::with_capacity(n);
449 for _ in 0..n {
450 v.push(BatchIndexAndErrorMessage::decode(buf, version)?);
451 }
452 v
453 };
454 }
455 if version >= 8 {
456 out.error_message = if flex {
457 get_compact_nullable_string_owned(buf)?
458 } else {
459 get_nullable_string_owned(buf)?
460 };
461 }
462 if flex {
463 let mut tag_current_leader = None;
464 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
465 0 => {
466 tag_current_leader = Some({
467 let b: &mut &[u8] = payload;
468 LeaderIdAndEpoch::decode(b, version)?
469 });
470 Ok(true)
471 }
472 _ => Ok(false),
473 })?;
474 if let Some(v) = tag_current_leader {
475 out.current_leader = v;
476 }
477 }
478 Ok(out)
479 }
480}
481#[cfg(test)]
482impl PartitionProduceResponse {
483 #[must_use]
484 pub fn populated(version: i16) -> Self {
485 let mut m = Self::default();
486 if version >= 0 {
487 m.index = 1i32;
488 }
489 if version >= 0 {
490 m.error_code = 1i16;
491 }
492 if version >= 0 {
493 m.base_offset = 1i64;
494 }
495 if version >= 2 {
496 m.log_append_time_ms = 1i64;
497 }
498 if version >= 5 {
499 m.log_start_offset = 1i64;
500 }
501 if version >= 8 {
502 m.record_errors = vec![BatchIndexAndErrorMessage::populated(version)];
503 }
504 if version >= 8 {
505 m.error_message = Some("x".to_string());
506 }
507 if version >= 10 {
508 m.current_leader = LeaderIdAndEpoch::populated(version);
509 }
510 m
511 }
512}
513#[derive(Debug, Clone, PartialEq, Eq, Default)]
514pub struct BatchIndexAndErrorMessage {
515 pub batch_index: i32,
516 pub batch_index_error_message: Option<String>,
517 pub unknown_tagged_fields: UnknownTaggedFields,
518}
519impl Encode for BatchIndexAndErrorMessage {
520 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
521 let flex = version >= 9;
522 if version >= 8 {
523 put_i32(buf, self.batch_index);
524 }
525 if version >= 8 {
526 if flex {
527 put_compact_nullable_string(buf, self.batch_index_error_message.as_deref());
528 } else {
529 put_nullable_string(buf, self.batch_index_error_message.as_deref());
530 }
531 }
532 if flex {
533 let tagged = WriteTaggedFields::new();
534 tagged.write(buf, &self.unknown_tagged_fields);
535 }
536 Ok(())
537 }
538 fn encoded_len(&self, version: i16) -> usize {
539 let flex = version >= 9;
540 let mut n: usize = 0;
541 if version >= 8 {
542 n += 4;
543 }
544 if version >= 8 {
545 n += if flex {
546 compact_nullable_string_len(self.batch_index_error_message.as_deref())
547 } else {
548 nullable_string_len(self.batch_index_error_message.as_deref())
549 };
550 }
551 if flex {
552 let known_pairs: Vec<(u32, usize)> = Vec::new();
553 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
554 }
555 n
556 }
557}
558impl Decode<'_> for BatchIndexAndErrorMessage {
559 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
560 let flex = version >= 9;
561 let mut out = Self::default();
562 if version >= 8 {
563 out.batch_index = get_i32(buf)?;
564 }
565 if version >= 8 {
566 out.batch_index_error_message = if flex {
567 get_compact_nullable_string_owned(buf)?
568 } else {
569 get_nullable_string_owned(buf)?
570 };
571 }
572 if flex {
573 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
574 }
575 Ok(out)
576 }
577}
578#[cfg(test)]
579impl BatchIndexAndErrorMessage {
580 #[must_use]
581 pub fn populated(version: i16) -> Self {
582 let mut m = Self::default();
583 if version >= 8 {
584 m.batch_index = 1i32;
585 }
586 if version >= 8 {
587 m.batch_index_error_message = Some("x".to_string());
588 }
589 m
590 }
591}
592#[derive(Debug, Clone, PartialEq, Eq)]
593pub struct LeaderIdAndEpoch {
594 pub leader_id: i32,
595 pub leader_epoch: i32,
596 pub unknown_tagged_fields: UnknownTaggedFields,
597}
598impl Default for LeaderIdAndEpoch {
599 fn default() -> Self {
600 Self {
601 leader_id: -1i32,
602 leader_epoch: -1i32,
603 unknown_tagged_fields: Default::default(),
604 }
605 }
606}
607impl Encode for LeaderIdAndEpoch {
608 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
609 let flex = version >= 9;
610 if version >= 10 {
611 put_i32(buf, self.leader_id);
612 }
613 if version >= 10 {
614 put_i32(buf, self.leader_epoch);
615 }
616 if flex {
617 let tagged = WriteTaggedFields::new();
618 tagged.write(buf, &self.unknown_tagged_fields);
619 }
620 Ok(())
621 }
622 fn encoded_len(&self, version: i16) -> usize {
623 let flex = version >= 9;
624 let mut n: usize = 0;
625 if version >= 10 {
626 n += 4;
627 }
628 if version >= 10 {
629 n += 4;
630 }
631 if flex {
632 let known_pairs: Vec<(u32, usize)> = Vec::new();
633 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
634 }
635 n
636 }
637}
638impl Decode<'_> for LeaderIdAndEpoch {
639 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
640 let flex = version >= 9;
641 let mut out = Self::default();
642 if version >= 10 {
643 out.leader_id = get_i32(buf)?;
644 }
645 if version >= 10 {
646 out.leader_epoch = get_i32(buf)?;
647 }
648 if flex {
649 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
650 }
651 Ok(out)
652 }
653}
654#[cfg(test)]
655impl LeaderIdAndEpoch {
656 #[must_use]
657 pub fn populated(version: i16) -> Self {
658 let mut m = Self::default();
659 if version >= 10 {
660 m.leader_id = 1i32;
661 }
662 if version >= 10 {
663 m.leader_epoch = 1i32;
664 }
665 m
666 }
667}
668#[derive(Debug, Clone, PartialEq, Eq, Default)]
669pub struct NodeEndpoint {
670 pub node_id: i32,
671 pub host: String,
672 pub port: i32,
673 pub rack: Option<String>,
674 pub unknown_tagged_fields: UnknownTaggedFields,
675}
676impl Encode for NodeEndpoint {
677 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
678 let flex = version >= 9;
679 if version >= 10 {
680 put_i32(buf, self.node_id);
681 }
682 if version >= 10 {
683 if flex {
684 put_compact_string(buf, &self.host);
685 } else {
686 put_string(buf, &self.host);
687 }
688 }
689 if version >= 10 {
690 put_i32(buf, self.port);
691 }
692 if version >= 10 {
693 if flex {
694 put_compact_nullable_string(buf, self.rack.as_deref());
695 } else {
696 put_nullable_string(buf, self.rack.as_deref());
697 }
698 }
699 if flex {
700 let tagged = WriteTaggedFields::new();
701 tagged.write(buf, &self.unknown_tagged_fields);
702 }
703 Ok(())
704 }
705 fn encoded_len(&self, version: i16) -> usize {
706 let flex = version >= 9;
707 let mut n: usize = 0;
708 if version >= 10 {
709 n += 4;
710 }
711 if version >= 10 {
712 n += if flex {
713 compact_string_len(&self.host)
714 } else {
715 string_len(&self.host)
716 };
717 }
718 if version >= 10 {
719 n += 4;
720 }
721 if version >= 10 {
722 n += if flex {
723 compact_nullable_string_len(self.rack.as_deref())
724 } else {
725 nullable_string_len(self.rack.as_deref())
726 };
727 }
728 if flex {
729 let known_pairs: Vec<(u32, usize)> = Vec::new();
730 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
731 }
732 n
733 }
734}
735impl Decode<'_> for NodeEndpoint {
736 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
737 let flex = version >= 9;
738 let mut out = Self::default();
739 if version >= 10 {
740 out.node_id = get_i32(buf)?;
741 }
742 if version >= 10 {
743 out.host = if flex {
744 get_compact_string_owned(buf)?
745 } else {
746 get_string_owned(buf)?
747 };
748 }
749 if version >= 10 {
750 out.port = get_i32(buf)?;
751 }
752 if version >= 10 {
753 out.rack = if flex {
754 get_compact_nullable_string_owned(buf)?
755 } else {
756 get_nullable_string_owned(buf)?
757 };
758 }
759 if flex {
760 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
761 }
762 Ok(out)
763 }
764}
765#[cfg(test)]
766impl NodeEndpoint {
767 #[must_use]
768 pub fn populated(version: i16) -> Self {
769 let mut m = Self::default();
770 if version >= 10 {
771 m.node_id = 1i32;
772 }
773 if version >= 10 {
774 m.host = "x".to_string();
775 }
776 if version >= 10 {
777 m.port = 1i32;
778 }
779 if version >= 10 {
780 m.rack = Some("x".to_string());
781 }
782 m
783 }
784}
785#[must_use]
788#[allow(unused_comparisons)]
789pub fn default_json(version: i16) -> ::serde_json::Value {
790 let mut obj = ::serde_json::Map::new();
791 obj.insert("responses".to_string(), ::serde_json::Value::Array(vec![]));
792 if version >= 1 {
793 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
794 }
795 if version >= 10 {
796 obj.insert(
797 "nodeEndpoints".to_string(),
798 ::serde_json::Value::Array(vec![]),
799 );
800 }
801 ::serde_json::Value::Object(obj)
802}