rskafka/protocol/messages/
delete_topics.rs1use std::io::{Read, Write};
2
3use crate::protocol::{
4 api_key::ApiKey,
5 api_version::{ApiVersion, ApiVersionRange},
6 error::Error,
7 messages::{
8 ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError,
9 WriteVersionedType, read_compact_versioned_array, read_versioned_array,
10 },
11 primitives::{
12 Array, CompactArrayRef, CompactNullableString, CompactString, CompactStringRef, Int16,
13 Int32, String_, TaggedFields,
14 },
15 traits::{ReadType, WriteType},
16};
17
18#[derive(Debug)]
19pub struct DeleteTopicsRequest {
20 pub topic_names: Array<String_>,
22
23 pub timeout_ms: Int32,
25
26 pub tagged_fields: Option<TaggedFields>,
30}
31
32impl RequestBody for DeleteTopicsRequest {
33 type ResponseBody = DeleteTopicsResponse;
34
35 const API_KEY: ApiKey = ApiKey::DeleteTopics;
36
37 const API_VERSION_RANGE: ApiVersionRange =
39 ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(5)));
40
41 const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(4));
42}
43
44impl<W> WriteVersionedType<W> for DeleteTopicsRequest
45where
46 W: Write,
47{
48 fn write_versioned(
49 &self,
50 writer: &mut W,
51 version: ApiVersion,
52 ) -> Result<(), WriteVersionedError> {
53 let v = version.0.0;
54 assert!(v <= 5);
55
56 if v >= 4 {
57 if let Some(topic_names) = self.topic_names.0.as_ref() {
58 let topic_names: Vec<_> = topic_names
59 .iter()
60 .map(|name| CompactStringRef(name.0.as_str()))
61 .collect();
62 CompactArrayRef(Some(&topic_names)).write(writer)?;
63 } else {
64 CompactArrayRef::<CompactStringRef<'_>>(None).write(writer)?;
65 }
66 } else {
67 self.topic_names.write(writer)?;
68 };
69
70 self.timeout_ms.write(writer)?;
71
72 if v >= 4 {
73 match self.tagged_fields.as_ref() {
74 Some(tagged_fields) => {
75 tagged_fields.write(writer)?;
76 }
77 None => {
78 TaggedFields::default().write(writer)?;
79 }
80 }
81 }
82
83 Ok(())
84 }
85}
86
87#[derive(Debug)]
88pub struct DeleteTopicsResponse {
89 pub throttle_time_ms: Option<Int32>,
94
95 pub responses: Vec<DeleteTopicsResponseTopic>,
97
98 pub tagged_fields: Option<TaggedFields>,
102}
103
104impl<R> ReadVersionedType<R> for DeleteTopicsResponse
105where
106 R: Read,
107{
108 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
109 let v = version.0.0;
110 assert!(v <= 5);
111
112 let throttle_time_ms = (v >= 1).then(|| Int32::read(reader)).transpose()?;
113 let responses = if v >= 4 {
114 read_compact_versioned_array(reader, version)?.unwrap_or_default()
115 } else {
116 read_versioned_array(reader, version)?.unwrap_or_default()
117 };
118 let tagged_fields = (v >= 4).then(|| TaggedFields::read(reader)).transpose()?;
119
120 Ok(Self {
121 throttle_time_ms,
122 responses,
123 tagged_fields,
124 })
125 }
126}
127
128#[derive(Debug)]
129pub struct DeleteTopicsResponseTopic {
130 pub name: String_,
132
133 pub error: Option<Error>,
135
136 pub error_message: Option<CompactNullableString>,
140
141 pub tagged_fields: Option<TaggedFields>,
145}
146
147impl<R> ReadVersionedType<R> for DeleteTopicsResponseTopic
148where
149 R: Read,
150{
151 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
152 let v = version.0.0;
153 assert!(v <= 5);
154
155 let name = if v >= 4 {
156 String_(CompactString::read(reader)?.0)
157 } else {
158 String_::read(reader)?
159 };
160 let error = Error::new(Int16::read(reader)?.0);
161 let error_message = (v >= 5)
162 .then(|| CompactNullableString::read(reader))
163 .transpose()?;
164 let tagged_fields = (v >= 4).then(|| TaggedFields::read(reader)).transpose()?;
165
166 Ok(Self {
167 name,
168 error,
169 error_message,
170 tagged_fields,
171 })
172 }
173}