kafka_wire_protocol/schema/produce_response/
v8.rs1use std::io::{Read, Result, Write};
4
5use serde::{Deserialize, Serialize};
6#[cfg(test)] use proptest_derive::Arbitrary;
7
8use crate::arrays::{read_array, write_array};
9use crate::markers::{ApiMessage, Response};
10use crate::readable_writable::{Readable, Writable};
11#[cfg(test)] use crate::test_utils::proptest_strategies;
12
13#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
15#[cfg_attr(test, derive(Arbitrary))]
16pub struct ProduceResponse {
17 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
19 pub responses: Vec<TopicProduceResponse>,
20 pub throttle_time_ms: i32,
22}
23
24impl ApiMessage for ProduceResponse {
25 fn api_key(&self) -> i16 {
26 0
27 }
28
29 fn version(&self) -> i16 {
30 8
31 }
32}
33
34impl Response for ProduceResponse { }
35
36impl Default for ProduceResponse {
37 fn default() -> Self {
38 ProduceResponse {
39 responses: Vec::<TopicProduceResponse>::new(),
40 throttle_time_ms: 0_i32,
41 }
42 }
43}
44
45impl ProduceResponse {
46 pub fn new(responses: Vec<TopicProduceResponse>, throttle_time_ms: i32) -> Self {
47 Self {
48 responses,
49 throttle_time_ms,
50 }
51 }
52}
53
54#[cfg(test)]
55mod tests_produce_response_new_and_default {
56 use super::*;
57
58 #[test]
59 fn test() {
60 let d = ProduceResponse::new(
61 Vec::<TopicProduceResponse>::new(),
62 0_i32,
63 );
64 assert_eq!(d, ProduceResponse::default());
65 }
66}
67
68impl Readable for ProduceResponse {
69 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
70 let responses = read_array::<TopicProduceResponse>(input, "responses", false)?;
71 let throttle_time_ms = i32::read(input)?;
72 Ok(ProduceResponse {
73 responses, throttle_time_ms
74 })
75 }
76}
77
78impl Writable for ProduceResponse {
79 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
80 write_array(output, "self.responses", &self.responses, false)?;
81 self.throttle_time_ms.write(output)?;
82 Ok(())
83 }
84}
85
86#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
88#[cfg_attr(test, derive(Arbitrary))]
89pub struct TopicProduceResponse {
90 #[cfg_attr(test, proptest(strategy = "proptest_strategies::string()"))]
92 pub name: String,
93 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
95 pub partition_responses: Vec<PartitionProduceResponse>,
96}
97
98impl Default for TopicProduceResponse {
99 fn default() -> Self {
100 TopicProduceResponse {
101 name: String::from(""),
102 partition_responses: Vec::<PartitionProduceResponse>::new(),
103 }
104 }
105}
106
107impl TopicProduceResponse {
108 pub fn new<S1: AsRef<str>>(name: S1, partition_responses: Vec<PartitionProduceResponse>) -> Self {
109 Self {
110 name: name.as_ref().to_string(),
111 partition_responses,
112 }
113 }
114}
115
116#[cfg(test)]
117mod tests_topic_produce_response_new_and_default {
118 use super::*;
119
120 #[test]
121 fn test() {
122 let d = TopicProduceResponse::new(
123 String::from(""),
124 Vec::<PartitionProduceResponse>::new(),
125 );
126 assert_eq!(d, TopicProduceResponse::default());
127 }
128}
129
130impl Readable for TopicProduceResponse {
131 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
132 let name = String::read_ext(input, "name", false)?;
133 let partition_responses = read_array::<PartitionProduceResponse>(input, "partition_responses", false)?;
134 Ok(TopicProduceResponse {
135 name, partition_responses
136 })
137 }
138}
139
140impl Writable for TopicProduceResponse {
141 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
142 self.name.write_ext(output, "self.name", false)?;
143 write_array(output, "self.partition_responses", &self.partition_responses, false)?;
144 Ok(())
145 }
146}
147
148#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
150#[cfg_attr(test, derive(Arbitrary))]
151pub struct PartitionProduceResponse {
152 pub index: i32,
154 pub error_code: i16,
156 pub base_offset: i64,
158 pub log_append_time_ms: i64,
160 pub log_start_offset: i64,
162 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
164 pub record_errors: Vec<BatchIndexAndErrorMessage>,
165 #[cfg_attr(test, proptest(strategy = "proptest_strategies::optional_string()"))]
167 pub error_message: Option<String>,
168}
169
170impl Default for PartitionProduceResponse {
171 fn default() -> Self {
172 PartitionProduceResponse {
173 index: 0_i32,
174 error_code: 0_i16,
175 base_offset: 0_i64,
176 log_append_time_ms: -1_i64,
177 log_start_offset: -1_i64,
178 record_errors: Vec::<BatchIndexAndErrorMessage>::new(),
179 error_message: None,
180 }
181 }
182}
183
184impl PartitionProduceResponse {
185 pub fn new<S1: AsRef<str>>(index: i32, error_code: i16, base_offset: i64, log_append_time_ms: i64, log_start_offset: i64, record_errors: Vec<BatchIndexAndErrorMessage>, error_message: Option<S1>) -> Self {
186 Self {
187 index,
188 error_code,
189 base_offset,
190 log_append_time_ms,
191 log_start_offset,
192 record_errors,
193 error_message: error_message.map(|s| s.as_ref().to_string()),
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests_partition_produce_response_new_and_default {
200 use super::*;
201
202 #[test]
203 fn test() {
204 let d = PartitionProduceResponse::new(
205 0_i32,
206 0_i16,
207 0_i64,
208 -1_i64,
209 -1_i64,
210 Vec::<BatchIndexAndErrorMessage>::new(),
211 None::<String>,
212 );
213 assert_eq!(d, PartitionProduceResponse::default());
214 }
215}
216
217impl Readable for PartitionProduceResponse {
218 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
219 let index = i32::read(input)?;
220 let error_code = i16::read(input)?;
221 let base_offset = i64::read(input)?;
222 let log_append_time_ms = i64::read(input)?;
223 let log_start_offset = i64::read(input)?;
224 let record_errors = read_array::<BatchIndexAndErrorMessage>(input, "record_errors", false)?;
225 let error_message = Option::<String>::read_ext(input, "error_message", false)?;
226 Ok(PartitionProduceResponse {
227 index, error_code, base_offset, log_append_time_ms, log_start_offset, record_errors, error_message
228 })
229 }
230}
231
232impl Writable for PartitionProduceResponse {
233 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
234 self.index.write(output)?;
235 self.error_code.write(output)?;
236 self.base_offset.write(output)?;
237 self.log_append_time_ms.write(output)?;
238 self.log_start_offset.write(output)?;
239 write_array(output, "self.record_errors", &self.record_errors, false)?;
240 self.error_message.write_ext(output, "self.error_message", false)?;
241 Ok(())
242 }
243}
244
245#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
247#[cfg_attr(test, derive(Arbitrary))]
248pub struct BatchIndexAndErrorMessage {
249 pub batch_index: i32,
251 #[cfg_attr(test, proptest(strategy = "proptest_strategies::optional_string()"))]
253 pub batch_index_error_message: Option<String>,
254}
255
256impl Default for BatchIndexAndErrorMessage {
257 fn default() -> Self {
258 BatchIndexAndErrorMessage {
259 batch_index: 0_i32,
260 batch_index_error_message: None,
261 }
262 }
263}
264
265impl BatchIndexAndErrorMessage {
266 pub fn new<S1: AsRef<str>>(batch_index: i32, batch_index_error_message: Option<S1>) -> Self {
267 Self {
268 batch_index,
269 batch_index_error_message: batch_index_error_message.map(|s| s.as_ref().to_string()),
270 }
271 }
272}
273
274#[cfg(test)]
275mod tests_batch_index_and_error_message_new_and_default {
276 use super::*;
277
278 #[test]
279 fn test() {
280 let d = BatchIndexAndErrorMessage::new(
281 0_i32,
282 None::<String>,
283 );
284 assert_eq!(d, BatchIndexAndErrorMessage::default());
285 }
286}
287
288impl Readable for BatchIndexAndErrorMessage {
289 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
290 let batch_index = i32::read(input)?;
291 let batch_index_error_message = Option::<String>::read_ext(input, "batch_index_error_message", false)?;
292 Ok(BatchIndexAndErrorMessage {
293 batch_index, batch_index_error_message
294 })
295 }
296}
297
298impl Writable for BatchIndexAndErrorMessage {
299 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
300 self.batch_index.write(output)?;
301 self.batch_index_error_message.write_ext(output, "self.batch_index_error_message", false)?;
302 Ok(())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use proptest::prelude::*;
310
311 #[test]
312 fn test_java_default() {
313 crate::test_utils::test_java_default::<ProduceResponse>("ProduceResponse", 8);
314 }
315
316 proptest! {
317 #[test]
318 fn test_serde(data: ProduceResponse) {
319 crate::test_utils::test_serde(&data)?;
320 }
321 }
322
323 proptest! {
324 #[test]
325 fn test_java_arbitrary(data: ProduceResponse) {
326 crate::test_utils::test_java_arbitrary(&data, "ProduceResponse", 8);
327 }
328 }
329}