1use std::io::{Read, Result, Write};
4
5use serde::{Deserialize, Serialize};
6#[cfg(test)] use proptest_derive::Arbitrary;
7
8use crate::arrays::{read_array, write_array};
9use crate::markers::{ApiMessage, Request};
10use crate::readable_writable::{Readable, Writable};
11#[cfg(test)] use crate::test_utils::proptest_strategies;
12
13#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
15#[cfg_attr(test, derive(Arbitrary))]
16pub struct FetchRequest {
17 pub replica_id: i32,
19 pub max_wait_ms: i32,
21 pub min_bytes: i32,
23 pub max_bytes: i32,
25 pub isolation_level: i8,
27 pub session_id: i32,
29 pub session_epoch: i32,
31 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
33 pub topics: Vec<FetchTopic>,
34 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
36 pub forgotten_topics_data: Vec<ForgottenTopic>,
37}
38
39impl ApiMessage for FetchRequest {
40 fn api_key(&self) -> i16 {
41 1
42 }
43
44 fn version(&self) -> i16 {
45 9
46 }
47}
48
49impl Request for FetchRequest { }
50
51impl Default for FetchRequest {
52 fn default() -> Self {
53 FetchRequest {
54 replica_id: -1_i32,
55 max_wait_ms: 0_i32,
56 min_bytes: 0_i32,
57 max_bytes: 0x7fffffff_i32,
58 isolation_level: 0_i8,
59 session_id: 0_i32,
60 session_epoch: -1_i32,
61 topics: Vec::<FetchTopic>::new(),
62 forgotten_topics_data: Vec::<ForgottenTopic>::new(),
63 }
64 }
65}
66
67impl FetchRequest {
68 pub fn new(replica_id: i32, max_wait_ms: i32, min_bytes: i32, max_bytes: i32, isolation_level: i8, session_id: i32, session_epoch: i32, topics: Vec<FetchTopic>, forgotten_topics_data: Vec<ForgottenTopic>) -> Self {
69 Self {
70 replica_id,
71 max_wait_ms,
72 min_bytes,
73 max_bytes,
74 isolation_level,
75 session_id,
76 session_epoch,
77 topics,
78 forgotten_topics_data,
79 }
80 }
81}
82
83#[cfg(test)]
84mod tests_fetch_request_new_and_default {
85 use super::*;
86
87 #[test]
88 fn test() {
89 let d = FetchRequest::new(
90 -1_i32,
91 0_i32,
92 0_i32,
93 0x7fffffff_i32,
94 0_i8,
95 0_i32,
96 -1_i32,
97 Vec::<FetchTopic>::new(),
98 Vec::<ForgottenTopic>::new(),
99 );
100 assert_eq!(d, FetchRequest::default());
101 }
102}
103
104impl Readable for FetchRequest {
105 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
106 let replica_id = i32::read(input)?;
107 let max_wait_ms = i32::read(input)?;
108 let min_bytes = i32::read(input)?;
109 let max_bytes = i32::read(input)?;
110 let isolation_level = i8::read(input)?;
111 let session_id = i32::read(input)?;
112 let session_epoch = i32::read(input)?;
113 let topics = read_array::<FetchTopic>(input, "topics", false)?;
114 let forgotten_topics_data = read_array::<ForgottenTopic>(input, "forgotten_topics_data", false)?;
115 Ok(FetchRequest {
116 replica_id, max_wait_ms, min_bytes, max_bytes, isolation_level, session_id, session_epoch, topics, forgotten_topics_data
117 })
118 }
119}
120
121impl Writable for FetchRequest {
122 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
123 self.replica_id.write(output)?;
124 self.max_wait_ms.write(output)?;
125 self.min_bytes.write(output)?;
126 self.max_bytes.write(output)?;
127 self.isolation_level.write(output)?;
128 self.session_id.write(output)?;
129 self.session_epoch.write(output)?;
130 write_array(output, "self.topics", &self.topics, false)?;
131 write_array(output, "self.forgotten_topics_data", &self.forgotten_topics_data, false)?;
132 Ok(())
133 }
134}
135
136#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
138#[cfg_attr(test, derive(Arbitrary))]
139pub struct FetchTopic {
140 #[cfg_attr(test, proptest(strategy = "proptest_strategies::string()"))]
142 pub topic: String,
143 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
145 pub partitions: Vec<FetchPartition>,
146}
147
148impl Default for FetchTopic {
149 fn default() -> Self {
150 FetchTopic {
151 topic: String::from(""),
152 partitions: Vec::<FetchPartition>::new(),
153 }
154 }
155}
156
157impl FetchTopic {
158 pub fn new<S1: AsRef<str>>(topic: S1, partitions: Vec<FetchPartition>) -> Self {
159 Self {
160 topic: topic.as_ref().to_string(),
161 partitions,
162 }
163 }
164}
165
166#[cfg(test)]
167mod tests_fetch_topic_new_and_default {
168 use super::*;
169
170 #[test]
171 fn test() {
172 let d = FetchTopic::new(
173 String::from(""),
174 Vec::<FetchPartition>::new(),
175 );
176 assert_eq!(d, FetchTopic::default());
177 }
178}
179
180impl Readable for FetchTopic {
181 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
182 let topic = String::read_ext(input, "topic", false)?;
183 let partitions = read_array::<FetchPartition>(input, "partitions", false)?;
184 Ok(FetchTopic {
185 topic, partitions
186 })
187 }
188}
189
190impl Writable for FetchTopic {
191 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
192 self.topic.write_ext(output, "self.topic", false)?;
193 write_array(output, "self.partitions", &self.partitions, false)?;
194 Ok(())
195 }
196}
197
198#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
200#[cfg_attr(test, derive(Arbitrary))]
201pub struct FetchPartition {
202 pub partition: i32,
204 pub current_leader_epoch: i32,
206 pub fetch_offset: i64,
208 pub log_start_offset: i64,
210 pub partition_max_bytes: i32,
212}
213
214impl Default for FetchPartition {
215 fn default() -> Self {
216 FetchPartition {
217 partition: 0_i32,
218 current_leader_epoch: -1_i32,
219 fetch_offset: 0_i64,
220 log_start_offset: -1_i64,
221 partition_max_bytes: 0_i32,
222 }
223 }
224}
225
226impl FetchPartition {
227 pub fn new(partition: i32, current_leader_epoch: i32, fetch_offset: i64, log_start_offset: i64, partition_max_bytes: i32) -> Self {
228 Self {
229 partition,
230 current_leader_epoch,
231 fetch_offset,
232 log_start_offset,
233 partition_max_bytes,
234 }
235 }
236}
237
238#[cfg(test)]
239mod tests_fetch_partition_new_and_default {
240 use super::*;
241
242 #[test]
243 fn test() {
244 let d = FetchPartition::new(
245 0_i32,
246 -1_i32,
247 0_i64,
248 -1_i64,
249 0_i32,
250 );
251 assert_eq!(d, FetchPartition::default());
252 }
253}
254
255impl Readable for FetchPartition {
256 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
257 let partition = i32::read(input)?;
258 let current_leader_epoch = i32::read(input)?;
259 let fetch_offset = i64::read(input)?;
260 let log_start_offset = i64::read(input)?;
261 let partition_max_bytes = i32::read(input)?;
262 Ok(FetchPartition {
263 partition, current_leader_epoch, fetch_offset, log_start_offset, partition_max_bytes
264 })
265 }
266}
267
268impl Writable for FetchPartition {
269 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
270 self.partition.write(output)?;
271 self.current_leader_epoch.write(output)?;
272 self.fetch_offset.write(output)?;
273 self.log_start_offset.write(output)?;
274 self.partition_max_bytes.write(output)?;
275 Ok(())
276 }
277}
278
279#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
281#[cfg_attr(test, derive(Arbitrary))]
282pub struct ForgottenTopic {
283 #[cfg_attr(test, proptest(strategy = "proptest_strategies::string()"))]
285 pub topic: String,
286 #[cfg_attr(test, proptest(strategy = "proptest_strategies::vec()"))]
288 pub partitions: Vec<i32>,
289}
290
291impl Default for ForgottenTopic {
292 fn default() -> Self {
293 ForgottenTopic {
294 topic: String::from(""),
295 partitions: Vec::<i32>::new(),
296 }
297 }
298}
299
300impl ForgottenTopic {
301 pub fn new<S1: AsRef<str>>(topic: S1, partitions: Vec<i32>) -> Self {
302 Self {
303 topic: topic.as_ref().to_string(),
304 partitions,
305 }
306 }
307}
308
309#[cfg(test)]
310mod tests_forgotten_topic_new_and_default {
311 use super::*;
312
313 #[test]
314 fn test() {
315 let d = ForgottenTopic::new(
316 String::from(""),
317 Vec::<i32>::new(),
318 );
319 assert_eq!(d, ForgottenTopic::default());
320 }
321}
322
323impl Readable for ForgottenTopic {
324 fn read(#[allow(unused)] input: &mut impl Read) -> Result<Self> {
325 let topic = String::read_ext(input, "topic", false)?;
326 let partitions = read_array::<i32>(input, "partitions", false)?;
327 Ok(ForgottenTopic {
328 topic, partitions
329 })
330 }
331}
332
333impl Writable for ForgottenTopic {
334 fn write(&self, #[allow(unused)] output: &mut impl Write) -> Result<()> {
335 self.topic.write_ext(output, "self.topic", false)?;
336 write_array(output, "self.partitions", &self.partitions, false)?;
337 Ok(())
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use proptest::prelude::*;
345
346 #[test]
347 fn test_java_default() {
348 crate::test_utils::test_java_default::<FetchRequest>("FetchRequest", 9);
349 }
350
351 proptest! {
352 #[test]
353 fn test_serde(data: FetchRequest) {
354 crate::test_utils::test_serde(&data)?;
355 }
356 }
357
358 proptest! {
359 #[test]
360 fn test_java_arbitrary(data: FetchRequest) {
361 crate::test_utils::test_java_arbitrary(&data, "FetchRequest", 9);
362 }
363 }
364}