rskafka/protocol/messages/
list_offsets.rs1use std::io::{Read, Write};
7
8use crate::protocol::{
9 api_key::ApiKey,
10 api_version::{ApiVersion, ApiVersionRange},
11 error::Error as ApiError,
12 messages::{read_versioned_array, write_versioned_array, IsolationLevel},
13 primitives::{Array, Int16, Int32, Int64, Int8, String_},
14 traits::{ReadType, WriteType},
15};
16
17use super::{
18 ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
19};
20
21#[derive(Debug)]
22#[allow(missing_copy_implementations)]
23pub struct ListOffsetsRequestPartition {
24 pub partition_index: Int32,
26
27 pub timestamp: Int64,
41
42 pub max_num_offsets: Option<Int32>,
48}
49
50impl<W> WriteVersionedType<W> for ListOffsetsRequestPartition
51where
52 W: Write,
53{
54 fn write_versioned(
55 &self,
56 writer: &mut W,
57 version: ApiVersion,
58 ) -> Result<(), WriteVersionedError> {
59 let v = version.0 .0;
60 assert!(v <= 3);
61
62 self.partition_index.write(writer)?;
63 self.timestamp.write(writer)?;
64
65 if v < 1 {
66 self.max_num_offsets.unwrap_or(Int32(1)).write(writer)?;
68 }
69
70 Ok(())
71 }
72}
73
74#[derive(Debug)]
75pub struct ListOffsetsRequestTopic {
76 pub name: String_,
78
79 pub partitions: Vec<ListOffsetsRequestPartition>,
83}
84
85impl<W> WriteVersionedType<W> for ListOffsetsRequestTopic
86where
87 W: Write,
88{
89 fn write_versioned(
90 &self,
91 writer: &mut W,
92 version: ApiVersion,
93 ) -> Result<(), WriteVersionedError> {
94 let v = version.0 .0;
95 assert!(v <= 3);
96
97 self.name.write(writer)?;
98 write_versioned_array(writer, version, Some(&self.partitions))?;
99
100 Ok(())
101 }
102}
103
104#[derive(Debug)]
105pub struct ListOffsetsRequest {
106 pub replica_id: Int32,
108
109 pub isolation_level: Option<IsolationLevel>,
123
124 pub topics: Vec<ListOffsetsRequestTopic>,
128}
129
130impl<W> WriteVersionedType<W> for ListOffsetsRequest
131where
132 W: Write,
133{
134 fn write_versioned(
135 &self,
136 writer: &mut W,
137 version: ApiVersion,
138 ) -> Result<(), WriteVersionedError> {
139 let v = version.0 .0;
140 assert!(v <= 3);
141
142 self.replica_id.write(writer)?;
143
144 if v >= 2 {
145 let level: Int8 = self.isolation_level.unwrap_or_default().into();
147 level.write(writer)?;
148 }
149
150 write_versioned_array(writer, version, Some(&self.topics))?;
151
152 Ok(())
153 }
154}
155
156impl RequestBody for ListOffsetsRequest {
157 type ResponseBody = ListOffsetsResponse;
158
159 const API_KEY: ApiKey = ApiKey::ListOffsets;
160
161 const API_VERSION_RANGE: ApiVersionRange =
163 ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(3)));
164
165 const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(6));
166}
167
168#[derive(Debug)]
169pub struct ListOffsetsResponsePartition {
170 pub partition_index: Int32,
172
173 pub error_code: Option<ApiError>,
175
176 pub old_style_offsets: Option<Array<Int64>>,
180
181 pub timestamp: Option<Int64>,
185
186 pub offset: Option<Int64>,
190}
191
192impl<R> ReadVersionedType<R> for ListOffsetsResponsePartition
193where
194 R: Read,
195{
196 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
197 let v = version.0 .0;
198 assert!(v <= 3);
199
200 Ok(Self {
201 partition_index: Int32::read(reader)?,
202 error_code: ApiError::new(Int16::read(reader)?.0),
203 old_style_offsets: (v < 1).then(|| Array::read(reader)).transpose()?,
204 timestamp: (v >= 1).then(|| Int64::read(reader)).transpose()?,
205 offset: (v >= 1).then(|| Int64::read(reader)).transpose()?,
206 })
207 }
208}
209
210#[derive(Debug)]
211pub struct ListOffsetsResponseTopic {
212 pub name: String_,
214
215 pub partitions: Vec<ListOffsetsResponsePartition>,
217}
218
219impl<R> ReadVersionedType<R> for ListOffsetsResponseTopic
220where
221 R: Read,
222{
223 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
224 let v = version.0 .0;
225 assert!(v <= 3);
226
227 Ok(Self {
228 name: String_::read(reader)?,
229 partitions: read_versioned_array(reader, version)?.unwrap_or_default(),
230 })
231 }
232}
233
234#[derive(Debug)]
235pub struct ListOffsetsResponse {
236 pub throttle_time_ms: Option<Int32>,
240
241 pub topics: Vec<ListOffsetsResponseTopic>,
243}
244
245impl<R> ReadVersionedType<R> for ListOffsetsResponse
246where
247 R: Read,
248{
249 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
250 let v = version.0 .0;
251 assert!(v <= 3);
252
253 Ok(Self {
254 throttle_time_ms: (v >= 2).then(|| Int32::read(reader)).transpose()?,
255 topics: read_versioned_array(reader, version)?.unwrap_or_default(),
256 })
257 }
258}