rskafka/protocol/messages/
delete_records.rs1use std::io::{Read, Write};
2
3use crate::protocol::{
4 api_key::ApiKey,
5 api_version::{ApiVersion, ApiVersionRange},
6 error::Error,
7 messages::{
8 read_compact_versioned_array, read_versioned_array, write_compact_versioned_array,
9 write_versioned_array,
10 },
11 primitives::{CompactString, CompactStringRef, Int16, Int32, Int64, String_, TaggedFields},
12 traits::{ReadType, WriteType},
13};
14
15use super::{
16 ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
17};
18
19#[derive(Debug)]
20pub struct DeleteRequestPartition {
21 pub partition_index: Int32,
23
24 pub offset: Int64,
26
27 pub tagged_fields: Option<TaggedFields>,
31}
32
33impl<W> WriteVersionedType<W> for DeleteRequestPartition
34where
35 W: Write,
36{
37 fn write_versioned(
38 &self,
39 writer: &mut W,
40 version: ApiVersion,
41 ) -> Result<(), WriteVersionedError> {
42 let v = version.0.0;
43 assert!(v <= 2);
44
45 self.partition_index.write(writer)?;
46 self.offset.write(writer)?;
47
48 if v >= 2 {
49 match self.tagged_fields.as_ref() {
50 Some(tagged_fields) => {
51 tagged_fields.write(writer)?;
52 }
53 None => {
54 TaggedFields::default().write(writer)?;
55 }
56 }
57 }
58
59 Ok(())
60 }
61}
62
63#[derive(Debug)]
64pub struct DeleteRequestTopic {
65 pub name: String_,
67
68 pub partitions: Vec<DeleteRequestPartition>,
70
71 pub tagged_fields: Option<TaggedFields>,
75}
76
77impl<W> WriteVersionedType<W> for DeleteRequestTopic
78where
79 W: Write,
80{
81 fn write_versioned(
82 &self,
83 writer: &mut W,
84 version: ApiVersion,
85 ) -> Result<(), WriteVersionedError> {
86 let v = version.0.0;
87 assert!(v <= 2);
88
89 if v >= 2 {
90 CompactStringRef(&self.name.0).write(writer)?
91 } else {
92 self.name.write(writer)?;
93 }
94
95 if v >= 2 {
96 write_compact_versioned_array(writer, version, Some(self.partitions.as_slice()))?;
97 } else {
98 write_versioned_array(writer, version, Some(self.partitions.as_slice()))?;
99 }
100
101 if v >= 2 {
102 match self.tagged_fields.as_ref() {
103 Some(tagged_fields) => {
104 tagged_fields.write(writer)?;
105 }
106 None => {
107 TaggedFields::default().write(writer)?;
108 }
109 }
110 }
111
112 Ok(())
113 }
114}
115
116#[derive(Debug)]
117pub struct DeleteRecordsRequest {
118 pub topics: Vec<DeleteRequestTopic>,
120
121 pub timeout_ms: Int32,
123
124 pub tagged_fields: Option<TaggedFields>,
128}
129
130impl<W> WriteVersionedType<W> for DeleteRecordsRequest
131where
132 W: Write,
133{
134 fn write_versioned(
135 &self,
136 writer: &mut W,
137 version: ApiVersion,
138 ) -> Result<(), WriteVersionedError> {
139 let v = version.0.0;
140 assert!(v <= 2);
141
142 if v >= 2 {
143 write_compact_versioned_array(writer, version, Some(self.topics.as_slice()))?;
144 } else {
145 write_versioned_array(writer, version, Some(self.topics.as_slice()))?;
146 }
147 self.timeout_ms.write(writer)?;
148
149 if v >= 2 {
150 match self.tagged_fields.as_ref() {
151 Some(tagged_fields) => {
152 tagged_fields.write(writer)?;
153 }
154 None => {
155 TaggedFields::default().write(writer)?;
156 }
157 }
158 }
159
160 Ok(())
161 }
162}
163
164impl RequestBody for DeleteRecordsRequest {
165 type ResponseBody = DeleteRecordsResponse;
166
167 const API_KEY: ApiKey = ApiKey::DeleteRecords;
168
169 const API_VERSION_RANGE: ApiVersionRange =
171 ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(2)));
172
173 const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(2));
174}
175
176#[derive(Debug)]
177pub struct DeleteResponsePartition {
178 pub partition_index: Int32,
180
181 pub low_watermark: Int64,
183
184 pub error: Option<Error>,
186
187 pub tagged_fields: Option<TaggedFields>,
191}
192
193impl<R> ReadVersionedType<R> for DeleteResponsePartition
194where
195 R: Read,
196{
197 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
198 let v = version.0.0;
199 assert!(v <= 2);
200
201 let partition_index = Int32::read(reader)?;
202 let low_watermark = Int64::read(reader)?;
203 let error = Error::new(Int16::read(reader)?.0);
204 let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
205
206 Ok(Self {
207 partition_index,
208 low_watermark,
209 error,
210 tagged_fields,
211 })
212 }
213}
214
215#[derive(Debug)]
216pub struct DeleteResponseTopic {
217 pub name: String_,
219
220 pub partitions: Vec<DeleteResponsePartition>,
222
223 pub tagged_fields: Option<TaggedFields>,
227}
228
229impl<R> ReadVersionedType<R> for DeleteResponseTopic
230where
231 R: Read,
232{
233 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
234 let v = version.0.0;
235 assert!(v <= 2);
236
237 let name = if v >= 2 {
238 String_(CompactString::read(reader)?.0)
239 } else {
240 String_::read(reader)?
241 };
242 let partitions = if v >= 2 {
243 read_compact_versioned_array(reader, version)?.unwrap_or_default()
244 } else {
245 read_versioned_array(reader, version)?.unwrap_or_default()
246 };
247 let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
248
249 Ok(Self {
250 name,
251 partitions,
252 tagged_fields,
253 })
254 }
255}
256
257#[derive(Debug)]
258pub struct DeleteRecordsResponse {
259 pub throttle_time_ms: Int32,
262
263 pub topics: Vec<DeleteResponseTopic>,
265
266 pub tagged_fields: Option<TaggedFields>,
270}
271
272impl<R> ReadVersionedType<R> for DeleteRecordsResponse
273where
274 R: Read,
275{
276 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
277 let v = version.0.0;
278 assert!(v <= 2);
279
280 let throttle_time_ms = Int32::read(reader)?;
281 let topics = if v >= 2 {
282 read_compact_versioned_array(reader, version)?.unwrap_or_default()
283 } else {
284 read_versioned_array(reader, version)?.unwrap_or_default()
285 };
286 let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
287
288 Ok(Self {
289 throttle_time_ms,
290 topics,
291 tagged_fields,
292 })
293 }
294}