rskafka/protocol/messages/
fetch.rs1use std::io::{Read, Write};
2
3use crate::protocol::{
4 api_key::ApiKey,
5 api_version::{ApiVersion, ApiVersionRange},
6 error::Error as ApiError,
7 messages::{read_versioned_array, write_versioned_array, IsolationLevel},
8 primitives::{Int16, Int32, Int64, Int8, Records, String_},
9 traits::{ReadType, WriteType},
10};
11
12use super::{
13 ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
14};
15
16#[derive(Debug)]
17#[allow(missing_copy_implementations)]
18pub struct FetchRequestPartition {
19 pub partition: Int32,
21
22 pub fetch_offset: Int64,
24
25 pub partition_max_bytes: Int32,
29}
30
31impl<W> WriteVersionedType<W> for FetchRequestPartition
32where
33 W: Write,
34{
35 fn write_versioned(
36 &self,
37 writer: &mut W,
38 version: ApiVersion,
39 ) -> Result<(), WriteVersionedError> {
40 let v = version.0 .0;
41 assert!(v <= 4);
42
43 self.partition.write(writer)?;
44 self.fetch_offset.write(writer)?;
45 self.partition_max_bytes.write(writer)?;
46
47 Ok(())
48 }
49}
50
51#[derive(Debug)]
52pub struct FetchRequestTopic {
53 pub topic: String_,
55
56 pub partitions: Vec<FetchRequestPartition>,
58}
59
60impl<W> WriteVersionedType<W> for FetchRequestTopic
61where
62 W: Write,
63{
64 fn write_versioned(
65 &self,
66 writer: &mut W,
67 version: ApiVersion,
68 ) -> Result<(), WriteVersionedError> {
69 let v = version.0 .0;
70 assert!(v <= 4);
71
72 self.topic.write(writer)?;
73 write_versioned_array(writer, version, Some(&self.partitions))?;
74
75 Ok(())
76 }
77}
78
79#[derive(Debug)]
80pub struct FetchRequest {
81 pub replica_id: Int32,
83
84 pub max_wait_ms: Int32,
86
87 pub min_bytes: Int32,
89
90 pub max_bytes: Option<Int32>,
96
97 pub isolation_level: Option<IsolationLevel>,
111
112 pub topics: Vec<FetchRequestTopic>,
114}
115
116impl<W> WriteVersionedType<W> for FetchRequest
117where
118 W: Write,
119{
120 fn write_versioned(
121 &self,
122 writer: &mut W,
123 version: ApiVersion,
124 ) -> Result<(), WriteVersionedError> {
125 let v = version.0 .0;
126 assert!(v <= 4);
127
128 self.replica_id.write(writer)?;
129 self.max_wait_ms.write(writer)?;
130 self.min_bytes.write(writer)?;
131
132 if v >= 3 {
133 self.max_bytes.unwrap_or(Int32(i32::MAX)).write(writer)?;
135 }
136
137 if v >= 4 {
138 let level: Int8 = self.isolation_level.unwrap_or_default().into();
140 level.write(writer)?;
141 }
142
143 write_versioned_array(writer, version, Some(&self.topics))?;
144
145 Ok(())
146 }
147}
148
149impl RequestBody for FetchRequest {
150 type ResponseBody = FetchResponse;
151
152 const API_KEY: ApiKey = ApiKey::Fetch;
153
154 const API_VERSION_RANGE: ApiVersionRange =
161 ApiVersionRange::new(ApiVersion(Int16(4)), ApiVersion(Int16(4)));
162
163 const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(12));
164}
165
166#[derive(Debug)]
167#[allow(missing_copy_implementations)]
168pub struct FetchResponseAbortedTransaction {
169 pub producer_id: Int64,
171
172 pub first_offset: Int64,
174}
175
176impl<R> ReadVersionedType<R> for FetchResponseAbortedTransaction
177where
178 R: Read,
179{
180 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
181 let v = version.0 .0;
182 assert!(4 <= v && v <= 4);
183
184 Ok(Self {
185 producer_id: Int64::read(reader)?,
186 first_offset: Int64::read(reader)?,
187 })
188 }
189}
190
191#[derive(Debug)]
192pub struct FetchResponsePartition {
193 pub partition_index: Int32,
195
196 pub error_code: Option<ApiError>,
198
199 pub high_watermark: Int64,
201
202 pub last_stable_offset: Option<Int64>,
209
210 pub aborted_transactions: Vec<FetchResponseAbortedTransaction>,
214
215 pub records: Records,
217}
218
219impl<R> ReadVersionedType<R> for FetchResponsePartition
220where
221 R: Read,
222{
223 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
224 let v = version.0 .0;
225 assert!(v <= 4);
226
227 Ok(Self {
228 partition_index: Int32::read(reader)?,
229 error_code: ApiError::new(Int16::read(reader)?.0),
230 high_watermark: Int64::read(reader)?,
231 last_stable_offset: (v >= 4).then(|| Int64::read(reader)).transpose()?,
232 aborted_transactions: (v >= 4)
233 .then(|| read_versioned_array(reader, version))
234 .transpose()?
235 .flatten()
236 .unwrap_or_default(),
237 records: Records::read(reader)?,
238 })
239 }
240}
241
242#[derive(Debug)]
243pub struct FetchResponseTopic {
244 pub topic: String_,
246
247 pub partitions: Vec<FetchResponsePartition>,
249}
250
251impl<R> ReadVersionedType<R> for FetchResponseTopic
252where
253 R: Read,
254{
255 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
256 let v = version.0 .0;
257 assert!(v <= 4);
258
259 Ok(Self {
260 topic: String_::read(reader)?,
261 partitions: read_versioned_array(reader, version)?.unwrap_or_default(),
262 })
263 }
264}
265
266#[derive(Debug)]
267pub struct FetchResponse {
268 pub throttle_time_ms: Option<Int32>,
272
273 pub responses: Vec<FetchResponseTopic>,
275}
276
277impl<R> ReadVersionedType<R> for FetchResponse
278where
279 R: Read,
280{
281 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
282 let v = version.0 .0;
283 assert!(v <= 4);
284
285 Ok(Self {
286 throttle_time_ms: (v >= 1).then(|| Int32::read(reader)).transpose()?,
287 responses: read_versioned_array(reader, version)?.unwrap_or_default(),
288 })
289 }
290}