kafka_api/schemata/
create_topic_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 adds a per-topic error message string.
21//
22// Version 2 adds the throttle time.
23//
24// Starting in version 3, on quota violation, brokers send out responses before throttling.
25//
26// Version 4 makes partitions/replicationFactor optional even when assignments are not present
27// (KIP-464).
28//
29// Version 5 is the first flexible version.
30// Version 5 also returns topic configs in the response (KIP-525).
31//
32// Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error
33// in the response if the topics creation is throttled (KIP-599).
34//
35// Version 7 returns the topic ID of the newly created topic if creation is successful.
36
37#[derive(Debug, Default, Clone)]
38pub struct CreateTopicsResponse {
39    /// The duration in milliseconds for which the request was throttled due to a quota violation,
40    /// or zero if the request did not violate any quota.
41    pub throttle_time_ms: i32,
42    /// Results for each topic we tried to create.
43    pub topics: Vec<CreatableTopicResult>,
44    /// Unknown tagged fields.
45    pub unknown_tagged_fields: Vec<RawTaggedField>,
46}
47
48impl Encodable for CreateTopicsResponse {
49    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
50        if version >= 2 {
51            Int32.encode(buf, self.throttle_time_ms)?;
52        }
53        NullableArray(Struct(version), version >= 5).encode(buf, self.topics.as_slice())?;
54        if version >= 5 {
55            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
56        }
57        Ok(())
58    }
59
60    fn calculate_size(&self, version: i16) -> usize {
61        let mut res = 0;
62        if version >= 2 {
63            res += Int32::SIZE; // self.throttle_time_ms
64        }
65        res += NullableArray(Struct(version), version >= 5).calculate_size(self.topics.as_slice());
66        if version >= 5 {
67            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
68        }
69        res
70    }
71}
72
73#[derive(Debug, Default, Clone)]
74pub struct CreatableTopicResult {
75    /// The topic name.
76    pub name: String,
77    /// The unique topic ID
78    pub topic_id: uuid::Uuid,
79    /// The error code, or 0 if there was no error.
80    pub error_code: i16,
81    /// The error message, or null if there was no error.
82    pub error_message: Option<String>,
83    /// Optional topic config error returned if configs are not returned in the response.
84    pub topic_config_error_code: i16,
85    /// Number of partitions of the topic.
86    pub num_partitions: i32,
87    /// Replication factor of the topic.
88    pub replication_factor: i16,
89    /// Configuration of the topic.
90    pub configs: Vec<CreatableTopicConfigs>,
91    /// Unknown tagged fields.
92    pub unknown_tagged_fields: Vec<RawTaggedField>,
93}
94
95impl Encodable for CreatableTopicResult {
96    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
97        NullableString(version >= 5).encode(buf, self.name.as_str())?;
98        if version >= 7 {
99            Uuid.encode(buf, self.topic_id)?;
100        }
101        Int16.encode(buf, self.error_code)?;
102        if version >= 1 {
103            NullableString(version >= 5).encode(buf, self.error_message.as_deref())?;
104        }
105        if version >= 5 {
106            Int32.encode(buf, self.num_partitions)?;
107            Int16.encode(buf, self.replication_factor)?;
108            NullableArray(Struct(version), true).encode(buf, self.configs.as_slice())?;
109        }
110        if version >= 5 {
111            RawTaggedFieldList.encode_with(buf, 1, &self.unknown_tagged_fields, |buf| {
112                RawTaggedFieldWriter.write_field(buf, 0, Int16, self.topic_config_error_code)?;
113                Ok(())
114            })?;
115        }
116        Ok(())
117    }
118
119    fn calculate_size(&self, version: i16) -> usize {
120        let mut res = 0;
121        res += NullableString(version >= 5).calculate_size(self.name.as_str());
122        if version >= 7 {
123            res += Uuid::SIZE; // self.topic_id
124        }
125        res += Int16::SIZE; // self.error_code
126        if version >= 1 {
127            res += NullableString(version >= 5).calculate_size(self.error_message.as_deref());
128        }
129        if version >= 5 {
130            res += Int32::SIZE; // self.num_partitions
131            res += Int16::SIZE; // self.replication_factor
132            res += NullableArray(Struct(version), true).calculate_size(self.configs.as_slice());
133        }
134        if version >= 5 {
135            res += RawTaggedFieldList.calculate_size_with(
136                1,
137                RawTaggedFieldWriter.calculate_field_size(0, Int16, &self.topic_config_error_code),
138                &self.unknown_tagged_fields,
139            );
140        }
141        res
142    }
143}
144
145#[derive(Debug, Default, Clone)]
146pub struct CreatableTopicConfigs {
147    /// The configuration name.
148    pub name: String,
149    /// The configuration value.
150    pub value: Option<String>,
151    /// True if the configuration is read-only.
152    pub read_only: bool,
153    /// The configuration source.
154    pub config_source: i8,
155    /// True if this configuration is sensitive.
156    pub is_sensitive: bool,
157    /// Unknown tagged fields.
158    pub unknown_tagged_fields: Vec<RawTaggedField>,
159}
160
161impl Encodable for CreatableTopicConfigs {
162    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
163        if version < 5 {
164            Err(err_encode_message_unsupported(
165                version,
166                "CreatableTopicConfigs",
167            ))?
168        }
169        NullableString(true).encode(buf, self.name.as_str())?;
170        NullableString(true).encode(buf, self.value.as_deref())?;
171        Bool.encode(buf, self.read_only)?;
172        Int8.encode(buf, self.config_source)?;
173        Bool.encode(buf, self.is_sensitive)?;
174        RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
175        Ok(())
176    }
177
178    fn calculate_size(&self, _version: i16) -> usize {
179        let mut res = 0;
180        res += NullableString(true).calculate_size(self.name.as_str());
181        res += NullableString(true).calculate_size(self.value.as_deref());
182        res += Bool::SIZE; // self.read_only
183        res += Int8::SIZE; // self.config_source
184        res += Bool::SIZE; // self.is_sensitive
185        res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
186        res
187    }
188}