rskafka/protocol/messages/
metadata.rs1use std::io::{Read, Write};
2
3use super::{
4 ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
5};
6use crate::protocol::api_version::ApiVersionRange;
7use crate::protocol::messages::{read_versioned_array, write_versioned_array};
8use crate::protocol::{
9 api_key::ApiKey,
10 api_version::ApiVersion,
11 error::Error,
12 primitives::*,
13 traits::{ReadType, WriteType},
14};
15
16#[derive(Debug)]
17pub struct MetadataRequest {
18 pub topics: Option<Vec<MetadataRequestTopic>>,
22
23 pub allow_auto_topic_creation: Option<Boolean>,
28}
29
30impl RequestBody for MetadataRequest {
31 type ResponseBody = MetadataResponse;
32
33 const API_KEY: ApiKey = ApiKey::Metadata;
34
35 const API_VERSION_RANGE: ApiVersionRange =
37 ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(4)));
38
39 const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(9));
40}
41
42impl<W> WriteVersionedType<W> for MetadataRequest
43where
44 W: Write,
45{
46 fn write_versioned(
47 &self,
48 writer: &mut W,
49 version: ApiVersion,
50 ) -> Result<(), WriteVersionedError> {
51 let v = version.0.0;
52 assert!(v <= 4);
53
54 if v < 4 && self.allow_auto_topic_creation.is_some() {
55 return Err(WriteVersionedError::FieldNotAvailable {
56 version,
57 field: "allow_auto_topic_creation".to_string(),
58 });
59 }
60
61 write_versioned_array(writer, version, self.topics.as_deref())?;
62 if v >= 4 {
63 match self.allow_auto_topic_creation {
64 None => Boolean(true).write(writer)?,
66 Some(b) => b.write(writer)?,
67 }
68 }
69 Ok(())
70 }
71}
72
73#[derive(Debug)]
74pub struct MetadataRequestTopic {
75 pub name: String_,
77}
78
79impl<W> WriteVersionedType<W> for MetadataRequestTopic
80where
81 W: Write,
82{
83 fn write_versioned(
84 &self,
85 writer: &mut W,
86 version: ApiVersion,
87 ) -> Result<(), WriteVersionedError> {
88 assert!(version.0.0 <= 4);
89 Ok(self.name.write(writer)?)
90 }
91}
92
93#[derive(Debug, PartialEq, Eq, Clone)]
94pub struct MetadataResponse {
95 pub throttle_time_ms: Option<Int32>,
100
101 pub brokers: Vec<MetadataResponseBroker>,
103
104 pub cluster_id: Option<NullableString>,
108
109 pub controller_id: Option<Int32>,
113
114 pub topics: Vec<MetadataResponseTopic>,
116}
117
118impl<R> ReadVersionedType<R> for MetadataResponse
119where
120 R: Read,
121{
122 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
123 let v = version.0.0;
124 assert!(v <= 4);
125
126 let throttle_time_ms = (v >= 3).then(|| Int32::read(reader)).transpose()?;
127 let brokers = read_versioned_array(reader, version)?.unwrap_or_default();
128 let cluster_id = (v >= 2).then(|| NullableString::read(reader)).transpose()?;
129 let controller_id = (v >= 1).then(|| Int32::read(reader)).transpose()?;
130 let topics = read_versioned_array(reader, version)?.unwrap_or_default();
131
132 Ok(Self {
133 throttle_time_ms,
134 brokers,
135 topics,
136 cluster_id,
137 controller_id,
138 })
139 }
140}
141
142#[derive(Debug, PartialEq, Eq, Clone)]
143pub struct MetadataResponseBroker {
144 pub node_id: Int32,
146 pub host: String_,
148 pub port: Int32,
150 pub rack: Option<NullableString>,
152}
153
154impl<R> ReadVersionedType<R> for MetadataResponseBroker
155where
156 R: Read,
157{
158 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
159 let v = version.0.0;
160 assert!(v <= 4);
161
162 let node_id = Int32::read(reader)?;
163 let host = String_::read(reader)?;
164 let port = Int32::read(reader)?;
165 let rack = (v >= 1).then(|| NullableString::read(reader)).transpose()?;
166
167 Ok(Self {
168 node_id,
169 host,
170 port,
171 rack,
172 })
173 }
174}
175
176#[derive(Debug, PartialEq, Eq, Clone)]
177pub struct MetadataResponseTopic {
178 pub error: Option<Error>,
180 pub name: String_,
182 pub is_internal: Option<Boolean>,
184 pub partitions: Vec<MetadataResponsePartition>,
186}
187
188impl<R> ReadVersionedType<R> for MetadataResponseTopic
189where
190 R: Read,
191{
192 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
193 let v = version.0.0;
194 assert!(v <= 4);
195
196 let error = Error::new(Int16::read(reader)?.0);
197 let name = String_::read(reader)?;
198 let is_internal = (v >= 1).then(|| Boolean::read(reader)).transpose()?;
199 let partitions = read_versioned_array(reader, version)?.unwrap_or_default();
200
201 Ok(Self {
202 error,
203 name,
204 is_internal,
205 partitions,
206 })
207 }
208}
209
210#[derive(Debug, PartialEq, Eq, Clone)]
211pub struct MetadataResponsePartition {
212 pub error: Option<Error>,
214 pub partition_index: Int32,
216 pub leader_id: Int32,
218 pub replica_nodes: Array<Int32>,
220 pub isr_nodes: Array<Int32>,
222}
223
224impl<R> ReadVersionedType<R> for MetadataResponsePartition
225where
226 R: Read,
227{
228 fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
229 let v = version.0.0;
230 assert!(v <= 4);
231
232 Ok(Self {
233 error: Error::new(Int16::read(reader)?.0),
234 partition_index: Int32::read(reader)?,
235 leader_id: Int32::read(reader)?,
236 replica_nodes: Array::read(reader)?,
237 isr_nodes: Array::read(reader)?,
238 })
239 }
240}