rskafka/protocol/messages/
delete_records.rs

1use std::io::{Read, Write};
2
3use crate::protocol::{
4    api_key::ApiKey,
5    api_version::{ApiVersion, ApiVersionRange},
6    error::Error,
7    messages::{
8        read_compact_versioned_array, read_versioned_array, write_compact_versioned_array,
9        write_versioned_array,
10    },
11    primitives::{CompactString, CompactStringRef, Int16, Int32, Int64, String_, TaggedFields},
12    traits::{ReadType, WriteType},
13};
14
15use super::{
16    ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
17};
18
19#[derive(Debug)]
20pub struct DeleteRequestPartition {
21    /// The partition index.
22    pub partition_index: Int32,
23
24    /// The deletion offset.
25    pub offset: Int64,
26
27    /// The tagged fields.
28    ///
29    /// Added in version 2
30    pub tagged_fields: Option<TaggedFields>,
31}
32
33impl<W> WriteVersionedType<W> for DeleteRequestPartition
34where
35    W: Write,
36{
37    fn write_versioned(
38        &self,
39        writer: &mut W,
40        version: ApiVersion,
41    ) -> Result<(), WriteVersionedError> {
42        let v = version.0.0;
43        assert!(v <= 2);
44
45        self.partition_index.write(writer)?;
46        self.offset.write(writer)?;
47
48        if v >= 2 {
49            match self.tagged_fields.as_ref() {
50                Some(tagged_fields) => {
51                    tagged_fields.write(writer)?;
52                }
53                None => {
54                    TaggedFields::default().write(writer)?;
55                }
56            }
57        }
58
59        Ok(())
60    }
61}
62
63#[derive(Debug)]
64pub struct DeleteRequestTopic {
65    /// The topic name.
66    pub name: String_,
67
68    /// Each partition that we want to delete records from.
69    pub partitions: Vec<DeleteRequestPartition>,
70
71    /// The tagged fields.
72    ///
73    /// Added in version 2
74    pub tagged_fields: Option<TaggedFields>,
75}
76
77impl<W> WriteVersionedType<W> for DeleteRequestTopic
78where
79    W: Write,
80{
81    fn write_versioned(
82        &self,
83        writer: &mut W,
84        version: ApiVersion,
85    ) -> Result<(), WriteVersionedError> {
86        let v = version.0.0;
87        assert!(v <= 2);
88
89        if v >= 2 {
90            CompactStringRef(&self.name.0).write(writer)?
91        } else {
92            self.name.write(writer)?;
93        }
94
95        if v >= 2 {
96            write_compact_versioned_array(writer, version, Some(self.partitions.as_slice()))?;
97        } else {
98            write_versioned_array(writer, version, Some(self.partitions.as_slice()))?;
99        }
100
101        if v >= 2 {
102            match self.tagged_fields.as_ref() {
103                Some(tagged_fields) => {
104                    tagged_fields.write(writer)?;
105                }
106                None => {
107                    TaggedFields::default().write(writer)?;
108                }
109            }
110        }
111
112        Ok(())
113    }
114}
115
116#[derive(Debug)]
117pub struct DeleteRecordsRequest {
118    /// Each topic that we want to delete records from.
119    pub topics: Vec<DeleteRequestTopic>,
120
121    /// How long to wait for the deletion to complete, in milliseconds.
122    pub timeout_ms: Int32,
123
124    /// The tagged fields.
125    ///
126    /// Added in version 2
127    pub tagged_fields: Option<TaggedFields>,
128}
129
130impl<W> WriteVersionedType<W> for DeleteRecordsRequest
131where
132    W: Write,
133{
134    fn write_versioned(
135        &self,
136        writer: &mut W,
137        version: ApiVersion,
138    ) -> Result<(), WriteVersionedError> {
139        let v = version.0.0;
140        assert!(v <= 2);
141
142        if v >= 2 {
143            write_compact_versioned_array(writer, version, Some(self.topics.as_slice()))?;
144        } else {
145            write_versioned_array(writer, version, Some(self.topics.as_slice()))?;
146        }
147        self.timeout_ms.write(writer)?;
148
149        if v >= 2 {
150            match self.tagged_fields.as_ref() {
151                Some(tagged_fields) => {
152                    tagged_fields.write(writer)?;
153                }
154                None => {
155                    TaggedFields::default().write(writer)?;
156                }
157            }
158        }
159
160        Ok(())
161    }
162}
163
164impl RequestBody for DeleteRecordsRequest {
165    type ResponseBody = DeleteRecordsResponse;
166
167    const API_KEY: ApiKey = ApiKey::DeleteRecords;
168
169    /// All versions.
170    const API_VERSION_RANGE: ApiVersionRange =
171        ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(2)));
172
173    const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(2));
174}
175
176#[derive(Debug)]
177pub struct DeleteResponsePartition {
178    /// The partition index.
179    pub partition_index: Int32,
180
181    /// The partition low water mark.
182    pub low_watermark: Int64,
183
184    /// The error code, or 0 if there was no error.
185    pub error: Option<Error>,
186
187    /// The tagged fields.
188    ///
189    /// Added in version 2
190    pub tagged_fields: Option<TaggedFields>,
191}
192
193impl<R> ReadVersionedType<R> for DeleteResponsePartition
194where
195    R: Read,
196{
197    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
198        let v = version.0.0;
199        assert!(v <= 2);
200
201        let partition_index = Int32::read(reader)?;
202        let low_watermark = Int64::read(reader)?;
203        let error = Error::new(Int16::read(reader)?.0);
204        let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
205
206        Ok(Self {
207            partition_index,
208            low_watermark,
209            error,
210            tagged_fields,
211        })
212    }
213}
214
215#[derive(Debug)]
216pub struct DeleteResponseTopic {
217    /// The topic name.
218    pub name: String_,
219
220    /// Each partition that we wanted to delete records from.
221    pub partitions: Vec<DeleteResponsePartition>,
222
223    /// The tagged fields.
224    ///
225    /// Added in version 2
226    pub tagged_fields: Option<TaggedFields>,
227}
228
229impl<R> ReadVersionedType<R> for DeleteResponseTopic
230where
231    R: Read,
232{
233    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
234        let v = version.0.0;
235        assert!(v <= 2);
236
237        let name = if v >= 2 {
238            String_(CompactString::read(reader)?.0)
239        } else {
240            String_::read(reader)?
241        };
242        let partitions = if v >= 2 {
243            read_compact_versioned_array(reader, version)?.unwrap_or_default()
244        } else {
245            read_versioned_array(reader, version)?.unwrap_or_default()
246        };
247        let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
248
249        Ok(Self {
250            name,
251            partitions,
252            tagged_fields,
253        })
254    }
255}
256
257#[derive(Debug)]
258pub struct DeleteRecordsResponse {
259    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the
260    /// request did not violate any quota.
261    pub throttle_time_ms: Int32,
262
263    /// Each topic that we wanted to delete records from.
264    pub topics: Vec<DeleteResponseTopic>,
265
266    /// The tagged fields.
267    ///
268    /// Added in version 2
269    pub tagged_fields: Option<TaggedFields>,
270}
271
272impl<R> ReadVersionedType<R> for DeleteRecordsResponse
273where
274    R: Read,
275{
276    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
277        let v = version.0.0;
278        assert!(v <= 2);
279
280        let throttle_time_ms = Int32::read(reader)?;
281        let topics = if v >= 2 {
282            read_compact_versioned_array(reader, version)?.unwrap_or_default()
283        } else {
284            read_versioned_array(reader, version)?.unwrap_or_default()
285        };
286        let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?;
287
288        Ok(Self {
289            throttle_time_ms,
290            topics,
291            tagged_fields,
292        })
293    }
294}