rskafka/protocol/messages/
create_topics.rs1use std::io::{Read, Write};
2
3use super::{
4 ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
5};
6use crate::protocol::api_version::ApiVersionRange;
7use crate::protocol::error::Error;
8use crate::protocol::messages::{
9 read_compact_versioned_array, read_versioned_array, write_compact_versioned_array,
10 write_versioned_array,
11};
12use crate::protocol::{
13 api_key::ApiKey,
14 api_version::ApiVersion,
15 primitives::*,
16 traits::{ReadType, WriteType},
17};
18
19#[derive(Debug)]
20pub struct CreateTopicsRequest {
21 pub topics: Vec<CreateTopicRequest>,
23
24 pub timeout_ms: Int32,
26
27 pub validate_only: Option<Boolean>,
31
32 pub tagged_fields: Option<TaggedFields>,
36}
37
38impl RequestBody for CreateTopicsRequest {
39 type ResponseBody = CreateTopicsResponse;
40
41 const API_KEY: ApiKey = ApiKey::CreateTopics;
42
43 const API_VERSION_RANGE: ApiVersionRange =
45 ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(5)));
46
47 const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(5));
48}
49
50impl<W> WriteVersionedType<W> for CreateTopicsRequest
51where
52 W: Write,
53{
54 fn write_versioned(
55 &self,
56 writer: &mut W,
57 version: ApiVersion,
58 ) -> Result<(), WriteVersionedError> {
59 let v = version.0 .0;
60 assert!(v <= 5);
61
62 if self.validate_only.is_some() && v < 1 {
63 return Err(WriteVersionedError::FieldNotAvailable {
64 version,
65 field: "validate_only".to_string(),
66 });
67 }
68
69 if v >= 5 {
70 write_compact_versioned_array(writer, version, Some(self.topics.as_slice()))?;
71 } else {
72 write_versioned_array(writer, version, Some(self.topics.as_slice()))?;
73 }
74 self.timeout_ms.write(writer)?;
75
76 if v >= 1 {
77 match self.validate_only {
78 Some(b) => b.write(writer)?,
79 None => Boolean(false).write(writer)?,
80 }
81 }
82
83 if v >= 5 {
84 match self.tagged_fields.as_ref() {
85 Some(tagged_fields) => {
86 tagged_fields.write(writer)?;
87 }
88 None => {
89 TaggedFields::default().write(writer)?;
90 }
91 }
92 }
93
94 Ok(())
95 }
96}
97
98#[derive(Debug)]
99pub struct CreateTopicRequest {
100 pub name: String_,
102
103 pub num_partitions: Int32,
108
109 pub replication_factor: Int16,
114
115 pub assignments: Vec<CreateTopicAssignment>,
117
118 pub configs: Vec<CreateTopicConfig>,
120
121 pub tagged_fields: Option<TaggedFields>,
125}
126
127impl<W> WriteVersionedType<W> for CreateTopicRequest
128where
129 W: Write,
130{
131 fn write_versioned(
132 &self,
133 writer: &mut W,
134 version: ApiVersion,
135 ) -> Result<(), WriteVersionedError> {
136 let v = version.0 .0;
137 assert!(v <= 5);
138
139 if v >= 5 {
140 CompactStringRef(&self.name.0).write(writer)?
141 } else {
142 self.name.write(writer)?;
143 }
144
145 self.num_partitions.write(writer)?;
146 self.replication_factor.write(writer)?;
147
148 if v >= 5 {
149 write_compact_versioned_array(writer, version, Some(&self.assignments))?;
150 } else {
151 write_versioned_array(writer, version, Some(&self.assignments))?;
152 }
153
154 if v >= 5 {
155 write_compact_versioned_array(writer, version, Some(&self.configs))?;
156 } else {
157 write_versioned_array(writer, version, Some(&self.configs))?;
158 }
159
160 if v >= 5 {
161 match self.tagged_fields.as_ref() {
162 Some(tagged_fields) => {
163 tagged_fields.write(writer)?;
164 }
165 None => {
166 TaggedFields::default().write(writer)?;
167 }
168 }
169 }
170
171 Ok(())
172 }
173}
174
175#[derive(Debug)]
176pub struct CreateTopicAssignment {
177 pub partition_index: Int32,
179
180 pub broker_ids: Array<Int32>,
182
183 pub tagged_fields: Option<TaggedFields>,
187}
188
189impl<W> WriteVersionedType<W> for CreateTopicAssignment
190where
191 W: Write,
192{
193 fn write_versioned(
194 &self,
195 writer: &mut W,
196 version: ApiVersion,
197 ) -> Result<(), WriteVersionedError> {
198 let v = version.0 .0;
199 assert!(v <= 5);
200
201 self.partition_index.write(writer)?;
202
203 if v >= 5 {
204 CompactArrayRef(self.broker_ids.0.as_deref()).write(writer)?;
205 } else {
206 self.broker_ids.write(writer)?;
207 }
208
209 if v >= 5 {
210 match self.tagged_fields.as_ref() {
211 Some(tagged_fields) => {
212 tagged_fields.write(writer)?;
213 }
214 None => {
215 TaggedFields::default().write(writer)?;
216 }
217 }
218 }
219
220 Ok(())
221 }
222}
223
224#[derive(Debug)]
225pub struct CreateTopicConfig {
226 pub name: String_,
228
229 pub value: NullableString,
231
232 pub tagged_fields: Option<TaggedFields>,
236}
237
238impl<W> WriteVersionedType<W> for CreateTopicConfig
239where
240 W: Write,
241{
242 fn write_versioned(
243 &self,
244 writer: &mut W,
245 version: ApiVersion,
246 ) -> Result<(), WriteVersionedError> {
247 let v = version.0 .0;
248 assert!(v <= 5);
249
250 if v >= 5 {
251 CompactStringRef(&self.name.0).write(writer)?;
252 } else {
253 self.name.write(writer)?;
254 }
255
256 if v >= 5 {
257 CompactNullableStringRef(self.value.0.as_deref()).write(writer)?;
258 } else {
259 self.value.write(writer)?;
260 }
261
262 if v >= 5 {
263 match self.tagged_fields.as_ref() {
264 Some(tagged_fields) => {
265 tagged_fields.write(writer)?;
266 }
267 None => {
268 TaggedFields::default().write(writer)?;
269 }
270 }
271 }
272
273 Ok(())
274 }
275}
276
277#[derive(Debug)]
278pub struct CreateTopicsResponse {
279 pub throttle_time_ms: Option<Int32>,
284
285 pub topics: Vec<CreateTopicResponse>,
287
288 pub tagged_fields: Option<TaggedFields>,
292}
293
294impl<R> ReadVersionedType<R> for CreateTopicsResponse
295where
296 R: Read,
297{
298 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
299 let v = version.0 .0;
300 assert!(v <= 5);
301
302 let throttle_time_ms = (v >= 2).then(|| Int32::read(reader)).transpose()?;
303 let topics = if v >= 5 {
304 read_compact_versioned_array(reader, version)?.unwrap_or_default()
305 } else {
306 read_versioned_array(reader, version)?.unwrap_or_default()
307 };
308 let tagged_fields = (v >= 5).then(|| TaggedFields::read(reader)).transpose()?;
309
310 Ok(Self {
311 throttle_time_ms,
312 topics,
313 tagged_fields,
314 })
315 }
316}
317
318#[derive(Debug)]
319pub struct CreateTopicResponseConfig {
320 pub name: CompactString,
322
323 pub value: CompactNullableString,
325
326 pub read_only: Boolean,
328
329 pub config_source: Int8,
331
332 pub is_sensitive: Boolean,
334
335 pub tagged_fields: TaggedFields,
337}
338
339impl<R> ReadVersionedType<R> for CreateTopicResponseConfig
340where
341 R: Read,
342{
343 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
344 let v = version.0 .0;
345 assert!(v == 5);
346
347 Ok(Self {
348 name: CompactString::read(reader)?,
349 value: CompactNullableString::read(reader)?,
350 read_only: Boolean::read(reader)?,
351 config_source: Int8::read(reader)?,
352 is_sensitive: Boolean::read(reader)?,
353 tagged_fields: TaggedFields::read(reader)?,
354 })
355 }
356}
357
358#[derive(Debug)]
359pub struct CreateTopicResponse {
360 pub name: String_,
362
363 pub error: Option<Error>,
365
366 pub error_message: Option<NullableString>,
370
371 pub num_partitions: Option<Int32>,
375
376 pub replication_factor: Option<Int16>,
380
381 pub configs: Vec<CreateTopicResponseConfig>,
385
386 pub tagged_fields: Option<TaggedFields>,
390}
391
392impl<R> ReadVersionedType<R> for CreateTopicResponse
393where
394 R: Read,
395{
396 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
397 let v = version.0 .0;
398 assert!(v <= 5);
399
400 let name = if v >= 5 {
401 String_(CompactString::read(reader)?.0)
402 } else {
403 String_::read(reader)?
404 };
405 let error = Error::new(Int16::read(reader)?.0);
406 let error_message = (v >= 1)
407 .then(|| {
408 if v >= 5 {
409 Ok(NullableString(CompactNullableString::read(reader)?.0))
410 } else {
411 NullableString::read(reader)
412 }
413 })
414 .transpose()?;
415 let num_partitions = (v >= 5).then(|| Int32::read(reader)).transpose()?;
416 let replication_factor = (v >= 5).then(|| Int16::read(reader)).transpose()?;
417 let configs = (v >= 5)
418 .then(|| read_compact_versioned_array(reader, version))
419 .transpose()?
420 .flatten()
421 .unwrap_or_default();
422 let tagged_fields = (v >= 5).then(|| TaggedFields::read(reader)).transpose()?;
423
424 Ok(Self {
425 name,
426 error,
427 error_message,
428 num_partitions,
429 replication_factor,
430 configs,
431 tagged_fields,
432 })
433 }
434}