rskafka/protocol/messages/
delete_topics.rs

1use std::io::{Read, Write};
2
3use crate::protocol::{
4    api_key::ApiKey,
5    api_version::{ApiVersion, ApiVersionRange},
6    error::Error,
7    messages::{
8        ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError,
9        WriteVersionedType, read_compact_versioned_array, read_versioned_array,
10    },
11    primitives::{
12        Array, CompactArrayRef, CompactNullableString, CompactString, CompactStringRef, Int16,
13        Int32, String_, TaggedFields,
14    },
15    traits::{ReadType, WriteType},
16};
17
18#[derive(Debug)]
19pub struct DeleteTopicsRequest {
20    /// The names of the topics to delete.
21    pub topic_names: Array<String_>,
22
23    /// The length of time in milliseconds to wait for the deletions to complete.
24    pub timeout_ms: Int32,
25
26    /// The tagged fields.
27    ///
28    /// Added in version 4.
29    pub tagged_fields: Option<TaggedFields>,
30}
31
32impl RequestBody for DeleteTopicsRequest {
33    type ResponseBody = DeleteTopicsResponse;
34
35    const API_KEY: ApiKey = ApiKey::DeleteTopics;
36
37    /// Enough for now.
38    const API_VERSION_RANGE: ApiVersionRange =
39        ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(5)));
40
41    const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(4));
42}
43
44impl<W> WriteVersionedType<W> for DeleteTopicsRequest
45where
46    W: Write,
47{
48    fn write_versioned(
49        &self,
50        writer: &mut W,
51        version: ApiVersion,
52    ) -> Result<(), WriteVersionedError> {
53        let v = version.0.0;
54        assert!(v <= 5);
55
56        if v >= 4 {
57            if let Some(topic_names) = self.topic_names.0.as_ref() {
58                let topic_names: Vec<_> = topic_names
59                    .iter()
60                    .map(|name| CompactStringRef(name.0.as_str()))
61                    .collect();
62                CompactArrayRef(Some(&topic_names)).write(writer)?;
63            } else {
64                CompactArrayRef::<CompactStringRef<'_>>(None).write(writer)?;
65            }
66        } else {
67            self.topic_names.write(writer)?;
68        };
69
70        self.timeout_ms.write(writer)?;
71
72        if v >= 4 {
73            match self.tagged_fields.as_ref() {
74                Some(tagged_fields) => {
75                    tagged_fields.write(writer)?;
76                }
77                None => {
78                    TaggedFields::default().write(writer)?;
79                }
80            }
81        }
82
83        Ok(())
84    }
85}
86
87#[derive(Debug)]
88pub struct DeleteTopicsResponse {
89    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the
90    /// request did not violate any quota.
91    ///
92    /// Added in version 1.
93    pub throttle_time_ms: Option<Int32>,
94
95    /// The results for each topic we tried to delete.
96    pub responses: Vec<DeleteTopicsResponseTopic>,
97
98    /// The tagged fields.
99    ///
100    /// Added in version 4.
101    pub tagged_fields: Option<TaggedFields>,
102}
103
104impl<R> ReadVersionedType<R> for DeleteTopicsResponse
105where
106    R: Read,
107{
108    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
109        let v = version.0.0;
110        assert!(v <= 5);
111
112        let throttle_time_ms = (v >= 1).then(|| Int32::read(reader)).transpose()?;
113        let responses = if v >= 4 {
114            read_compact_versioned_array(reader, version)?.unwrap_or_default()
115        } else {
116            read_versioned_array(reader, version)?.unwrap_or_default()
117        };
118        let tagged_fields = (v >= 4).then(|| TaggedFields::read(reader)).transpose()?;
119
120        Ok(Self {
121            throttle_time_ms,
122            responses,
123            tagged_fields,
124        })
125    }
126}
127
128#[derive(Debug)]
129pub struct DeleteTopicsResponseTopic {
130    /// The topic name.
131    pub name: String_,
132
133    /// The error code, or 0 if there was no error.
134    pub error: Option<Error>,
135
136    /// The error message, or null if there was no error.
137    ///
138    /// Added in version 5.
139    pub error_message: Option<CompactNullableString>,
140
141    /// The tagged fields.
142    ///
143    /// Added in version 4.
144    pub tagged_fields: Option<TaggedFields>,
145}
146
147impl<R> ReadVersionedType<R> for DeleteTopicsResponseTopic
148where
149    R: Read,
150{
151    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
152        let v = version.0.0;
153        assert!(v <= 5);
154
155        let name = if v >= 4 {
156            String_(CompactString::read(reader)?.0)
157        } else {
158            String_::read(reader)?
159        };
160        let error = Error::new(Int16::read(reader)?.0);
161        let error_message = (v >= 5)
162            .then(|| CompactNullableString::read(reader))
163            .transpose()?;
164        let tagged_fields = (v >= 4).then(|| TaggedFields::read(reader)).transpose()?;
165
166        Ok(Self {
167            name,
168            error,
169            error_message,
170            tagged_fields,
171        })
172    }
173}