kafka_api/schemata/
create_topic_request.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::ReadBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 adds validateOnly.
21//
22// Version 4 makes partitions/replicationFactor optional even when assignments are not present
23// (KIP-464)
24//
25// Version 5 is the first flexible version.
26// Version 5 also returns topic configs in the response (KIP-525).
27//
28// Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error
29// in the response if the topics creation is throttled (KIP-599).
30//
31// Version 7 is the same as version 6.
32
33#[derive(Debug, Default, Clone)]
34pub struct CreateTopicsRequest {
35    /// The topics to create.
36    pub topics: Vec<CreatableTopic>,
37    /// How long to wait in milliseconds before timing out the request.
38    pub timeout_ms: i32,
39    /// If true, check that the topics can be created as specified, but don't create anything.
40    pub validate_only: bool,
41    /// Unknown tagged fields.
42    pub unknown_tagged_fields: Vec<RawTaggedField>,
43}
44
45impl Decodable for CreateTopicsRequest {
46    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
47        let mut res = CreateTopicsRequest {
48            topics: NullableArray(Struct(version), version >= 5)
49                .decode(buf)?
50                .ok_or_else(|| err_decode_message_null("name"))?,
51            timeout_ms: Int32.decode(buf)?,
52            ..Default::default()
53        };
54        if version >= 1 {
55            res.validate_only = Bool.decode(buf)?;
56        }
57        if version >= 5 {
58            res.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
59        }
60        Ok(res)
61    }
62}
63
64#[derive(Debug, Default, Clone)]
65pub struct CreatableTopic {
66    /// The topic name.
67    pub name: String,
68    /// The number of partitions to create in the topic, or -1 if we are either specifying a manual
69    /// partition assignment or using the default partitions.
70    pub num_partitions: i32,
71    /// The number of replicas to create for each partition in the topic, or -1 if we are either
72    /// specifying a manual partition assignment or using the default replication factor.
73    pub replication_factor: i16,
74    /// The manual partition assignment, or the empty array if we are using automatic assignment.
75    pub assignments: Vec<CreatableReplicaAssignment>,
76    /// The custom topic configurations to set.
77    pub configs: Vec<CreatableTopicConfig>,
78    /// Unknown tagged fields.
79    pub unknown_tagged_fields: Vec<RawTaggedField>,
80}
81
82impl Decodable for CreatableTopic {
83    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
84        let mut res = CreatableTopic {
85            name: NullableString(version >= 5)
86                .decode(buf)?
87                .ok_or_else(|| err_decode_message_null("name"))?,
88            num_partitions: Int32.decode(buf)?,
89            replication_factor: Int16.decode(buf)?,
90            assignments: NullableArray(Struct(version), version >= 5)
91                .decode(buf)?
92                .ok_or_else(|| err_decode_message_null("assignments"))?,
93            configs: NullableArray(Struct(version), version >= 5)
94                .decode(buf)?
95                .ok_or_else(|| err_decode_message_null("assignments"))?,
96            ..Default::default()
97        };
98        if version >= 5 {
99            res.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
100        }
101        Ok(res)
102    }
103}
104
105#[derive(Debug, Default, Clone)]
106pub struct CreatableTopicConfig {
107    /// The configuration name.
108    pub name: String,
109    /// The configuration value.
110    pub value: Option<String>,
111    /// Unknown tagged fields.
112    pub unknown_tagged_fields: Vec<RawTaggedField>,
113}
114
115impl Decodable for CreatableTopicConfig {
116    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
117        let mut res = CreatableTopicConfig {
118            name: NullableString(version >= 5)
119                .decode(buf)?
120                .ok_or_else(|| err_decode_message_null("name"))?,
121            value: NullableString(version >= 5).decode(buf)?,
122            ..Default::default()
123        };
124        if version >= 5 {
125            res.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
126        }
127        Ok(res)
128    }
129}
130
131#[derive(Debug, Default, Clone)]
132pub struct CreatableReplicaAssignment {
133    /// The partition index.
134    pub partition_index: i32,
135    /// The brokers to place the partition on.
136    pub broker_ids: Vec<i32>,
137    /// Unknown tagged fields.
138    pub unknown_tagged_fields: Vec<RawTaggedField>,
139}
140
141impl Decodable for CreatableReplicaAssignment {
142    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
143        let mut res = CreatableReplicaAssignment {
144            partition_index: Int32.decode(buf)?,
145            ..Default::default()
146        };
147        res.broker_ids = NullableArray(Int32, version >= 5)
148            .decode(buf)?
149            .ok_or_else(|| err_decode_message_null("broker_ids"))?;
150        if version >= 5 {
151            res.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
152        }
153        Ok(res)
154    }
155}