1use std::io::{Cursor, Read, Result, Write};
4
5use serde::{Deserialize, Serialize};
6#[cfg(test)] use proptest_derive::Arbitrary;
7
8use crate::arrays::{read_array, write_array};
9use crate::markers::{ApiMessage, Response};
10use crate::readable_writable::{Readable, Writable};
11use crate::tagged_fields::{RawTaggedField, read_tagged_fields, write_tagged_fields};
12#[cfg(test)] use crate::test_utils::proptest_strategies;
13
14#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
16#[cfg_attr(test, derive(Arbitrary))]
17pub struct ProduceResponse {
18 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
20 pub responses: Vec<TopicProduceResponse>,
21 pub throttle_time_ms: i32,
23 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
25 pub node_endpoints: Vec<NodeEndpoint>,
26 #[cfg_attr(test, proptest(strategy = "proptest_strategies::unknown_tagged_fields()"))]
28 pub _unknown_tagged_fields: Vec<RawTaggedField>,
29}
30
31impl ApiMessage for ProduceResponse {
32 fn api_key(&self) -> i16 {
33 0
34 }
35
36 fn version(&self) -> i16 {
37 11
38 }
39}
40
41impl Response for ProduceResponse { }
42
43impl Default for ProduceResponse {
44 fn default() -> Self {
45 ProduceResponse {
46 responses: Vec::<TopicProduceResponse>::new(),
47 throttle_time_ms: 0_i32,
48 node_endpoints: Vec::<NodeEndpoint>::new(),
49 _unknown_tagged_fields: Vec::new(),
50 }
51 }
52}
53
54impl ProduceResponse {
55 pub fn new(responses: Vec<TopicProduceResponse>, throttle_time_ms: i32, node_endpoints: Vec<NodeEndpoint>) -> Self {
56 Self {
57 responses,
58 throttle_time_ms,
59 node_endpoints,
60 _unknown_tagged_fields: vec![],
61 }
62 }
63}
64
65#[cfg(test)]
66mod tests_produce_response_new_and_default {
67 use super::*;
68
69 #[test]
70 fn test() {
71 let d = ProduceResponse::new(
72 Vec::<TopicProduceResponse>::new(),
73 0_i32,
74 Vec::<NodeEndpoint>::new(),
75 );
76 assert_eq!(d, ProduceResponse::default());
77 }
78}
79
80impl Readable for ProduceResponse {
81 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
82 let responses = read_array::<TopicProduceResponse>(input, "responses", true)?;
83 let throttle_time_ms = i32::read(input)?;
84 let mut node_endpoints = Vec::<NodeEndpoint>::new();
85 let tagged_fields_callback = |tag: i32, tag_data: &[u8]| {
86 match tag {
87 0 => {
88 let mut cur = Cursor::new(tag_data);
89 node_endpoints = read_array::<NodeEndpoint>(&mut cur, "node_endpoints", true)?;
90 Ok(true)
91 },
92 _ => Ok(false)
93 }
94 };
95 let _unknown_tagged_fields = read_tagged_fields(input, tagged_fields_callback)?;
96 Ok(ProduceResponse {
97 responses, throttle_time_ms, node_endpoints, _unknown_tagged_fields
98 })
99 }
100}
101
102impl Writable for ProduceResponse {
103 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
104 write_array(output, "self.responses", &self.responses, true)?;
105 self.throttle_time_ms.write(output)?;
106 let mut known_tagged_fields = Vec::<RawTaggedField>::new();
107 if !self.node_endpoints.is_empty() {
108 let mut cur = Cursor::new(Vec::<u8>::new());
109 write_array(&mut cur, "self.node_endpoints", &self.node_endpoints, true)?;
110 known_tagged_fields.push(RawTaggedField { tag: 0, data: cur.into_inner() });
111 }
112 write_tagged_fields(output, &known_tagged_fields, &self._unknown_tagged_fields)?;
113 Ok(())
114 }
115}
116
117#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
119#[cfg_attr(test, derive(Arbitrary))]
120pub struct TopicProduceResponse {
121 #[cfg_attr(test, proptest(strategy = "proptest_strategies::string()"))]
123 pub name: String,
124 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
126 pub partition_responses: Vec<PartitionProduceResponse>,
127 #[cfg_attr(test, proptest(strategy = "proptest_strategies::unknown_tagged_fields()"))]
129 pub _unknown_tagged_fields: Vec<RawTaggedField>,
130}
131
132impl Default for TopicProduceResponse {
133 fn default() -> Self {
134 TopicProduceResponse {
135 name: String::from(""),
136 partition_responses: Vec::<PartitionProduceResponse>::new(),
137 _unknown_tagged_fields: Vec::new(),
138 }
139 }
140}
141
142impl TopicProduceResponse {
143 pub fn new<S1: AsRef<str>>(name: S1, partition_responses: Vec<PartitionProduceResponse>) -> Self {
144 Self {
145 name: name.as_ref().to_string(),
146 partition_responses,
147 _unknown_tagged_fields: vec![],
148 }
149 }
150}
151
152#[cfg(test)]
153mod tests_topic_produce_response_new_and_default {
154 use super::*;
155
156 #[test]
157 fn test() {
158 let d = TopicProduceResponse::new(
159 String::from(""),
160 Vec::<PartitionProduceResponse>::new(),
161 );
162 assert_eq!(d, TopicProduceResponse::default());
163 }
164}
165
166impl Readable for TopicProduceResponse {
167 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
168 let name = String::read_ext(input, "name", true)?;
169 let partition_responses = read_array::<PartitionProduceResponse>(input, "partition_responses", true)?;
170 let tagged_fields_callback = |tag: i32, _: &[u8]| {
171 match tag {
172 _ => Ok(false)
173 }
174 };
175 let _unknown_tagged_fields = read_tagged_fields(input, tagged_fields_callback)?;
176 Ok(TopicProduceResponse {
177 name, partition_responses, _unknown_tagged_fields
178 })
179 }
180}
181
182impl Writable for TopicProduceResponse {
183 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
184 self.name.write_ext(output, "self.name", true)?;
185 write_array(output, "self.partition_responses", &self.partition_responses, true)?;
186 write_tagged_fields(output, &[], &self._unknown_tagged_fields)?;
187 Ok(())
188 }
189}
190
191#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
193#[cfg_attr(test, derive(Arbitrary))]
194pub struct PartitionProduceResponse {
195 pub index: i32,
197 pub error_code: i16,
199 pub base_offset: i64,
201 pub log_append_time_ms: i64,
203 pub log_start_offset: i64,
205 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
207 pub record_errors: Vec<BatchIndexAndErrorMessage>,
208 #[cfg_attr(test, proptest(strategy = "proptest_strategies::optional_string()"))]
210 pub error_message: Option<String>,
211 pub current_leader: LeaderIdAndEpoch,
213 #[cfg_attr(test, proptest(strategy = "proptest_strategies::unknown_tagged_fields()"))]
215 pub _unknown_tagged_fields: Vec<RawTaggedField>,
216}
217
218impl Default for PartitionProduceResponse {
219 fn default() -> Self {
220 PartitionProduceResponse {
221 index: 0_i32,
222 error_code: 0_i16,
223 base_offset: 0_i64,
224 log_append_time_ms: -1_i64,
225 log_start_offset: -1_i64,
226 record_errors: Vec::<BatchIndexAndErrorMessage>::new(),
227 error_message: None,
228 current_leader: LeaderIdAndEpoch::default(),
229 _unknown_tagged_fields: Vec::new(),
230 }
231 }
232}
233
234impl PartitionProduceResponse {
235 pub fn new<S1: AsRef<str>>(index: i32, error_code: i16, base_offset: i64, log_append_time_ms: i64, log_start_offset: i64, record_errors: Vec<BatchIndexAndErrorMessage>, error_message: Option<S1>, current_leader: LeaderIdAndEpoch) -> Self {
236 Self {
237 index,
238 error_code,
239 base_offset,
240 log_append_time_ms,
241 log_start_offset,
242 record_errors,
243 error_message: error_message.map(|s| s.as_ref().to_string()),
244 current_leader,
245 _unknown_tagged_fields: vec![],
246 }
247 }
248}
249
250#[cfg(test)]
251mod tests_partition_produce_response_new_and_default {
252 use super::*;
253
254 #[test]
255 fn test() {
256 let d = PartitionProduceResponse::new(
257 0_i32,
258 0_i16,
259 0_i64,
260 -1_i64,
261 -1_i64,
262 Vec::<BatchIndexAndErrorMessage>::new(),
263 None::<String>,
264 LeaderIdAndEpoch::default(),
265 );
266 assert_eq!(d, PartitionProduceResponse::default());
267 }
268}
269
270impl Readable for PartitionProduceResponse {
271 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
272 let index = i32::read(input)?;
273 let error_code = i16::read(input)?;
274 let base_offset = i64::read(input)?;
275 let log_append_time_ms = i64::read(input)?;
276 let log_start_offset = i64::read(input)?;
277 let record_errors = read_array::<BatchIndexAndErrorMessage>(input, "record_errors", true)?;
278 let error_message = Option::<String>::read_ext(input, "error_message", true)?;
279 let mut current_leader = LeaderIdAndEpoch::default();
280 let tagged_fields_callback = |tag: i32, tag_data: &[u8]| {
281 match tag {
282 0 => {
283 let mut cur = Cursor::new(tag_data);
284 current_leader = LeaderIdAndEpoch::read(&mut cur)?;
285 Ok(true)
286 },
287 _ => Ok(false)
288 }
289 };
290 let _unknown_tagged_fields = read_tagged_fields(input, tagged_fields_callback)?;
291 Ok(PartitionProduceResponse {
292 index, error_code, base_offset, log_append_time_ms, log_start_offset, record_errors, error_message, current_leader, _unknown_tagged_fields
293 })
294 }
295}
296
297impl Writable for PartitionProduceResponse {
298 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
299 self.index.write(output)?;
300 self.error_code.write(output)?;
301 self.base_offset.write(output)?;
302 self.log_append_time_ms.write(output)?;
303 self.log_start_offset.write(output)?;
304 write_array(output, "self.record_errors", &self.record_errors, true)?;
305 self.error_message.write_ext(output, "self.error_message", true)?;
306 let mut known_tagged_fields = Vec::<RawTaggedField>::new();
307 if self.current_leader != LeaderIdAndEpoch::default() {
308 let mut cur = Cursor::new(Vec::<u8>::new());
309 self.current_leader.write(&mut cur)?;
310 known_tagged_fields.push(RawTaggedField { tag: 0, data: cur.into_inner() });
311 }
312 write_tagged_fields(output, &known_tagged_fields, &self._unknown_tagged_fields)?;
313 Ok(())
314 }
315}
316
317#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
319#[cfg_attr(test, derive(Arbitrary))]
320pub struct BatchIndexAndErrorMessage {
321 pub batch_index: i32,
323 #[cfg_attr(test, proptest(strategy = "proptest_strategies::optional_string()"))]
325 pub batch_index_error_message: Option<String>,
326 #[cfg_attr(test, proptest(strategy = "proptest_strategies::unknown_tagged_fields()"))]
328 pub _unknown_tagged_fields: Vec<RawTaggedField>,
329}
330
331impl Default for BatchIndexAndErrorMessage {
332 fn default() -> Self {
333 BatchIndexAndErrorMessage {
334 batch_index: 0_i32,
335 batch_index_error_message: None,
336 _unknown_tagged_fields: Vec::new(),
337 }
338 }
339}
340
341impl BatchIndexAndErrorMessage {
342 pub fn new<S1: AsRef<str>>(batch_index: i32, batch_index_error_message: Option<S1>) -> Self {
343 Self {
344 batch_index,
345 batch_index_error_message: batch_index_error_message.map(|s| s.as_ref().to_string()),
346 _unknown_tagged_fields: vec![],
347 }
348 }
349}
350
351#[cfg(test)]
352mod tests_batch_index_and_error_message_new_and_default {
353 use super::*;
354
355 #[test]
356 fn test() {
357 let d = BatchIndexAndErrorMessage::new(
358 0_i32,
359 None::<String>,
360 );
361 assert_eq!(d, BatchIndexAndErrorMessage::default());
362 }
363}
364
365impl Readable for BatchIndexAndErrorMessage {
366 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
367 let batch_index = i32::read(input)?;
368 let batch_index_error_message = Option::<String>::read_ext(input, "batch_index_error_message", true)?;
369 let tagged_fields_callback = |tag: i32, _: &[u8]| {
370 match tag {
371 _ => Ok(false)
372 }
373 };
374 let _unknown_tagged_fields = read_tagged_fields(input, tagged_fields_callback)?;
375 Ok(BatchIndexAndErrorMessage {
376 batch_index, batch_index_error_message, _unknown_tagged_fields
377 })
378 }
379}
380
381impl Writable for BatchIndexAndErrorMessage {
382 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
383 self.batch_index.write(output)?;
384 self.batch_index_error_message.write_ext(output, "self.batch_index_error_message", true)?;
385 write_tagged_fields(output, &[], &self._unknown_tagged_fields)?;
386 Ok(())
387 }
388}
389
390#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
392#[cfg_attr(test, derive(Arbitrary))]
393pub struct LeaderIdAndEpoch {
394 pub leader_id: i32,
396 pub leader_epoch: i32,
398 #[cfg_attr(test, proptest(strategy = "proptest_strategies::unknown_tagged_fields()"))]
400 pub _unknown_tagged_fields: Vec<RawTaggedField>,
401}
402
403impl Default for LeaderIdAndEpoch {
404 fn default() -> Self {
405 LeaderIdAndEpoch {
406 leader_id: -1_i32,
407 leader_epoch: -1_i32,
408 _unknown_tagged_fields: Vec::new(),
409 }
410 }
411}
412
413impl LeaderIdAndEpoch {
414 pub fn new(leader_id: i32, leader_epoch: i32) -> Self {
415 Self {
416 leader_id,
417 leader_epoch,
418 _unknown_tagged_fields: vec![],
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests_leader_id_and_epoch_new_and_default {
425 use super::*;
426
427 #[test]
428 fn test() {
429 let d = LeaderIdAndEpoch::new(
430 -1_i32,
431 -1_i32,
432 );
433 assert_eq!(d, LeaderIdAndEpoch::default());
434 }
435}
436
437impl Readable for LeaderIdAndEpoch {
438 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
439 let leader_id = i32::read(input)?;
440 let leader_epoch = i32::read(input)?;
441 let tagged_fields_callback = |tag: i32, _: &[u8]| {
442 match tag {
443 _ => Ok(false)
444 }
445 };
446 let _unknown_tagged_fields = read_tagged_fields(input, tagged_fields_callback)?;
447 Ok(LeaderIdAndEpoch {
448 leader_id, leader_epoch, _unknown_tagged_fields
449 })
450 }
451}
452
453impl Writable for LeaderIdAndEpoch {
454 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
455 self.leader_id.write(output)?;
456 self.leader_epoch.write(output)?;
457 write_tagged_fields(output, &[], &self._unknown_tagged_fields)?;
458 Ok(())
459 }
460}
461
462#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
464#[cfg_attr(test, derive(Arbitrary))]
465pub struct NodeEndpoint {
466 pub node_id: i32,
468 #[cfg_attr(test, proptest(strategy = "proptest_strategies::string()"))]
470 pub host: String,
471 pub port: i32,
473 #[cfg_attr(test, proptest(strategy = "proptest_strategies::optional_string()"))]
475 pub rack: Option<String>,
476 #[cfg_attr(test, proptest(strategy = "proptest_strategies::unknown_tagged_fields()"))]
478 pub _unknown_tagged_fields: Vec<RawTaggedField>,
479}
480
481impl Default for NodeEndpoint {
482 fn default() -> Self {
483 NodeEndpoint {
484 node_id: 0_i32,
485 host: String::from(""),
486 port: 0_i32,
487 rack: None,
488 _unknown_tagged_fields: Vec::new(),
489 }
490 }
491}
492
493impl NodeEndpoint {
494 pub fn new<S1: AsRef<str>, S2: AsRef<str>>(node_id: i32, host: S1, port: i32, rack: Option<S2>) -> Self {
495 Self {
496 node_id,
497 host: host.as_ref().to_string(),
498 port,
499 rack: rack.map(|s| s.as_ref().to_string()),
500 _unknown_tagged_fields: vec![],
501 }
502 }
503}
504
505#[cfg(test)]
506mod tests_node_endpoint_new_and_default {
507 use super::*;
508
509 #[test]
510 fn test() {
511 let d = NodeEndpoint::new(
512 0_i32,
513 String::from(""),
514 0_i32,
515 None::<String>,
516 );
517 assert_eq!(d, NodeEndpoint::default());
518 }
519}
520
521impl Readable for NodeEndpoint {
522 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
523 let node_id = i32::read(input)?;
524 let host = String::read_ext(input, "host", true)?;
525 let port = i32::read(input)?;
526 let rack = Option::<String>::read_ext(input, "rack", true)?;
527 let tagged_fields_callback = |tag: i32, _: &[u8]| {
528 match tag {
529 _ => Ok(false)
530 }
531 };
532 let _unknown_tagged_fields = read_tagged_fields(input, tagged_fields_callback)?;
533 Ok(NodeEndpoint {
534 node_id, host, port, rack, _unknown_tagged_fields
535 })
536 }
537}
538
539impl Writable for NodeEndpoint {
540 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
541 self.node_id.write(output)?;
542 self.host.write_ext(output, "self.host", true)?;
543 self.port.write(output)?;
544 self.rack.write_ext(output, "self.rack", true)?;
545 write_tagged_fields(output, &[], &self._unknown_tagged_fields)?;
546 Ok(())
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use proptest::prelude::*;
554
555 #[test]
556 fn test_java_default() {
557 crate::test_utils::test_java_default::<ProduceResponse>("ProduceResponse", 11);
558 }
559
560 proptest! {
561 #[test]
562 fn test_serde(data: ProduceResponse) {
563 crate::test_utils::test_serde(&data)?;
564 }
565 }
566
567 proptest! {
568 #[test]
569 fn test_java_arbitrary(data: ProduceResponse) {
570 crate::test_utils::test_java_arbitrary(&data, "ProduceResponse", 11);
571 }
572 }
573}