rskafka/protocol/messages/
create_topics.rs

1use std::io::{Read, Write};
2
3use super::{
4    ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
5};
6use crate::protocol::api_version::ApiVersionRange;
7use crate::protocol::error::Error;
8use crate::protocol::messages::{
9    read_compact_versioned_array, read_versioned_array, write_compact_versioned_array,
10    write_versioned_array,
11};
12use crate::protocol::{
13    api_key::ApiKey,
14    api_version::ApiVersion,
15    primitives::*,
16    traits::{ReadType, WriteType},
17};
18
19#[derive(Debug)]
20pub struct CreateTopicsRequest {
21    /// The topics to create
22    pub topics: Vec<CreateTopicRequest>,
23
24    /// How long to wait in milliseconds before timing out the request.
25    pub timeout_ms: Int32,
26
27    /// If true, check that the topics can be created as specified, but don't create anything.
28    ///
29    /// Added in version 1
30    pub validate_only: Option<Boolean>,
31
32    /// The tagged fields.
33    ///
34    /// Added in version 5
35    pub tagged_fields: Option<TaggedFields>,
36}
37
38impl RequestBody for CreateTopicsRequest {
39    type ResponseBody = CreateTopicsResponse;
40
41    const API_KEY: ApiKey = ApiKey::CreateTopics;
42
43    /// Enough for now.
44    const API_VERSION_RANGE: ApiVersionRange =
45        ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(5)));
46
47    const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(5));
48}
49
50impl<W> WriteVersionedType<W> for CreateTopicsRequest
51where
52    W: Write,
53{
54    fn write_versioned(
55        &self,
56        writer: &mut W,
57        version: ApiVersion,
58    ) -> Result<(), WriteVersionedError> {
59        let v = version.0 .0;
60        assert!(v <= 5);
61
62        if self.validate_only.is_some() && v < 1 {
63            return Err(WriteVersionedError::FieldNotAvailable {
64                version,
65                field: "validate_only".to_string(),
66            });
67        }
68
69        if v >= 5 {
70            write_compact_versioned_array(writer, version, Some(self.topics.as_slice()))?;
71        } else {
72            write_versioned_array(writer, version, Some(self.topics.as_slice()))?;
73        }
74        self.timeout_ms.write(writer)?;
75
76        if v >= 1 {
77            match self.validate_only {
78                Some(b) => b.write(writer)?,
79                None => Boolean(false).write(writer)?,
80            }
81        }
82
83        if v >= 5 {
84            match self.tagged_fields.as_ref() {
85                Some(tagged_fields) => {
86                    tagged_fields.write(writer)?;
87                }
88                None => {
89                    TaggedFields::default().write(writer)?;
90                }
91            }
92        }
93
94        Ok(())
95    }
96}
97
98#[derive(Debug)]
99pub struct CreateTopicRequest {
100    /// The topic name
101    pub name: String_,
102
103    /// The number of partitions to create in the topic, or -1 if we are either
104    /// specifying a manual partition assignment or using the default partitions.
105    ///
106    /// Note: default partition count requires broker version >= 2.4.0 (KIP-464)
107    pub num_partitions: Int32,
108
109    /// The number of replicas to create for each partition in the topic, or -1 if we are either
110    /// specifying a manual partition assignment or using the default replication factor.
111    ///
112    /// Note: default replication factor requires broker version >= 2.4.0 (KIP-464)
113    pub replication_factor: Int16,
114
115    /// The manual partition assignment, or the empty array if we are using automatic assignment.
116    pub assignments: Vec<CreateTopicAssignment>,
117
118    /// The custom topic configurations to set.
119    pub configs: Vec<CreateTopicConfig>,
120
121    /// The tagged fields.
122    ///
123    /// Added in version 5
124    pub tagged_fields: Option<TaggedFields>,
125}
126
127impl<W> WriteVersionedType<W> for CreateTopicRequest
128where
129    W: Write,
130{
131    fn write_versioned(
132        &self,
133        writer: &mut W,
134        version: ApiVersion,
135    ) -> Result<(), WriteVersionedError> {
136        let v = version.0 .0;
137        assert!(v <= 5);
138
139        if v >= 5 {
140            CompactStringRef(&self.name.0).write(writer)?
141        } else {
142            self.name.write(writer)?;
143        }
144
145        self.num_partitions.write(writer)?;
146        self.replication_factor.write(writer)?;
147
148        if v >= 5 {
149            write_compact_versioned_array(writer, version, Some(&self.assignments))?;
150        } else {
151            write_versioned_array(writer, version, Some(&self.assignments))?;
152        }
153
154        if v >= 5 {
155            write_compact_versioned_array(writer, version, Some(&self.configs))?;
156        } else {
157            write_versioned_array(writer, version, Some(&self.configs))?;
158        }
159
160        if v >= 5 {
161            match self.tagged_fields.as_ref() {
162                Some(tagged_fields) => {
163                    tagged_fields.write(writer)?;
164                }
165                None => {
166                    TaggedFields::default().write(writer)?;
167                }
168            }
169        }
170
171        Ok(())
172    }
173}
174
175#[derive(Debug)]
176pub struct CreateTopicAssignment {
177    /// The partition index
178    pub partition_index: Int32,
179
180    /// The brokers to place the partition on
181    pub broker_ids: Array<Int32>,
182
183    /// The tagged fields.
184    ///
185    /// Added in version 5
186    pub tagged_fields: Option<TaggedFields>,
187}
188
189impl<W> WriteVersionedType<W> for CreateTopicAssignment
190where
191    W: Write,
192{
193    fn write_versioned(
194        &self,
195        writer: &mut W,
196        version: ApiVersion,
197    ) -> Result<(), WriteVersionedError> {
198        let v = version.0 .0;
199        assert!(v <= 5);
200
201        self.partition_index.write(writer)?;
202
203        if v >= 5 {
204            CompactArrayRef(self.broker_ids.0.as_deref()).write(writer)?;
205        } else {
206            self.broker_ids.write(writer)?;
207        }
208
209        if v >= 5 {
210            match self.tagged_fields.as_ref() {
211                Some(tagged_fields) => {
212                    tagged_fields.write(writer)?;
213                }
214                None => {
215                    TaggedFields::default().write(writer)?;
216                }
217            }
218        }
219
220        Ok(())
221    }
222}
223
224#[derive(Debug)]
225pub struct CreateTopicConfig {
226    /// The configuration name.
227    pub name: String_,
228
229    /// The configuration value.
230    pub value: NullableString,
231
232    /// The tagged fields.
233    ///
234    /// Added in version 5
235    pub tagged_fields: Option<TaggedFields>,
236}
237
238impl<W> WriteVersionedType<W> for CreateTopicConfig
239where
240    W: Write,
241{
242    fn write_versioned(
243        &self,
244        writer: &mut W,
245        version: ApiVersion,
246    ) -> Result<(), WriteVersionedError> {
247        let v = version.0 .0;
248        assert!(v <= 5);
249
250        if v >= 5 {
251            CompactStringRef(&self.name.0).write(writer)?;
252        } else {
253            self.name.write(writer)?;
254        }
255
256        if v >= 5 {
257            CompactNullableStringRef(self.value.0.as_deref()).write(writer)?;
258        } else {
259            self.value.write(writer)?;
260        }
261
262        if v >= 5 {
263            match self.tagged_fields.as_ref() {
264                Some(tagged_fields) => {
265                    tagged_fields.write(writer)?;
266                }
267                None => {
268                    TaggedFields::default().write(writer)?;
269                }
270            }
271        }
272
273        Ok(())
274    }
275}
276
277#[derive(Debug)]
278pub struct CreateTopicsResponse {
279    /// The duration in milliseconds for which the request was throttled due to a quota
280    /// violation, or zero if the request did not violate any quota.
281    ///
282    /// Added in version 2
283    pub throttle_time_ms: Option<Int32>,
284
285    /// Results for each topic we tried to create.
286    pub topics: Vec<CreateTopicResponse>,
287
288    /// The tagged fields.
289    ///
290    /// Added in version 5
291    pub tagged_fields: Option<TaggedFields>,
292}
293
294impl<R> ReadVersionedType<R> for CreateTopicsResponse
295where
296    R: Read,
297{
298    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
299        let v = version.0 .0;
300        assert!(v <= 5);
301
302        let throttle_time_ms = (v >= 2).then(|| Int32::read(reader)).transpose()?;
303        let topics = if v >= 5 {
304            read_compact_versioned_array(reader, version)?.unwrap_or_default()
305        } else {
306            read_versioned_array(reader, version)?.unwrap_or_default()
307        };
308        let tagged_fields = (v >= 5).then(|| TaggedFields::read(reader)).transpose()?;
309
310        Ok(Self {
311            throttle_time_ms,
312            topics,
313            tagged_fields,
314        })
315    }
316}
317
318#[derive(Debug)]
319pub struct CreateTopicResponseConfig {
320    /// The configuration name.
321    pub name: CompactString,
322
323    /// The configuration value.
324    pub value: CompactNullableString,
325
326    /// True if the configuration is read-only.
327    pub read_only: Boolean,
328
329    /// The configuration source.
330    pub config_source: Int8,
331
332    /// True if this configuration is sensitive.
333    pub is_sensitive: Boolean,
334
335    /// The tagged fields.
336    pub tagged_fields: TaggedFields,
337}
338
339impl<R> ReadVersionedType<R> for CreateTopicResponseConfig
340where
341    R: Read,
342{
343    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
344        let v = version.0 .0;
345        assert!(v == 5);
346
347        Ok(Self {
348            name: CompactString::read(reader)?,
349            value: CompactNullableString::read(reader)?,
350            read_only: Boolean::read(reader)?,
351            config_source: Int8::read(reader)?,
352            is_sensitive: Boolean::read(reader)?,
353            tagged_fields: TaggedFields::read(reader)?,
354        })
355    }
356}
357
358#[derive(Debug)]
359pub struct CreateTopicResponse {
360    /// The topic name.
361    pub name: String_,
362
363    /// The error code, or 0 if there was no error.
364    pub error: Option<Error>,
365
366    /// The error message
367    ///
368    /// Added in version 1
369    pub error_message: Option<NullableString>,
370
371    /// Number of partitions of the topic.
372    ///
373    /// Added in version 5
374    pub num_partitions: Option<Int32>,
375
376    /// Replication factor of the topic.
377    ///
378    /// Added in version 5.
379    pub replication_factor: Option<Int16>,
380
381    /// Configuration of the topic.
382    ///
383    /// Added in version 5
384    pub configs: Vec<CreateTopicResponseConfig>,
385
386    /// The tagged fields.
387    ///
388    /// Added in version 5
389    pub tagged_fields: Option<TaggedFields>,
390}
391
392impl<R> ReadVersionedType<R> for CreateTopicResponse
393where
394    R: Read,
395{
396    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
397        let v = version.0 .0;
398        assert!(v <= 5);
399
400        let name = if v >= 5 {
401            String_(CompactString::read(reader)?.0)
402        } else {
403            String_::read(reader)?
404        };
405        let error = Error::new(Int16::read(reader)?.0);
406        let error_message = (v >= 1)
407            .then(|| {
408                if v >= 5 {
409                    Ok(NullableString(CompactNullableString::read(reader)?.0))
410                } else {
411                    NullableString::read(reader)
412                }
413            })
414            .transpose()?;
415        let num_partitions = (v >= 5).then(|| Int32::read(reader)).transpose()?;
416        let replication_factor = (v >= 5).then(|| Int16::read(reader)).transpose()?;
417        let configs = (v >= 5)
418            .then(|| read_compact_versioned_array(reader, version))
419            .transpose()?
420            .flatten()
421            .unwrap_or_default();
422        let tagged_fields = (v >= 5).then(|| TaggedFields::read(reader)).transpose()?;
423
424        Ok(Self {
425            name,
426            error,
427            error_message,
428            num_partitions,
429            replication_factor,
430            configs,
431            tagged_fields,
432        })
433    }
434}