rskafka/protocol/messages/
metadata.rs

1use std::io::{Read, Write};
2
3use super::{
4    ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType,
5};
6use crate::protocol::api_version::ApiVersionRange;
7use crate::protocol::messages::{read_versioned_array, write_versioned_array};
8use crate::protocol::{
9    api_key::ApiKey,
10    api_version::ApiVersion,
11    error::Error,
12    primitives::*,
13    traits::{ReadType, WriteType},
14};
15
16#[derive(Debug)]
17pub struct MetadataRequest {
18    /// The topics to fetch metadata for
19    ///
20    /// Requests data for all topics if None
21    pub topics: Option<Vec<MetadataRequestTopic>>,
22
23    /// If this is true, the broker may auto-create topics that we requested
24    /// which do not already exist, if it is configured to do so.
25    ///
26    /// Added in version 4
27    pub allow_auto_topic_creation: Option<Boolean>,
28}
29
30impl RequestBody for MetadataRequest {
31    type ResponseBody = MetadataResponse;
32
33    const API_KEY: ApiKey = ApiKey::Metadata;
34
35    /// At the time of writing this is the same subset supported by rdkafka
36    const API_VERSION_RANGE: ApiVersionRange =
37        ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(4)));
38
39    const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(9));
40}
41
42impl<W> WriteVersionedType<W> for MetadataRequest
43where
44    W: Write,
45{
46    fn write_versioned(
47        &self,
48        writer: &mut W,
49        version: ApiVersion,
50    ) -> Result<(), WriteVersionedError> {
51        let v = version.0.0;
52        assert!(v <= 4);
53
54        if v < 4 && self.allow_auto_topic_creation.is_some() {
55            return Err(WriteVersionedError::FieldNotAvailable {
56                version,
57                field: "allow_auto_topic_creation".to_string(),
58            });
59        }
60
61        write_versioned_array(writer, version, self.topics.as_deref())?;
62        if v >= 4 {
63            match self.allow_auto_topic_creation {
64                // The default behaviour is to allow topic creation
65                None => Boolean(true).write(writer)?,
66                Some(b) => b.write(writer)?,
67            }
68        }
69        Ok(())
70    }
71}
72
73#[derive(Debug)]
74pub struct MetadataRequestTopic {
75    /// The topic name
76    pub name: String_,
77}
78
79impl<W> WriteVersionedType<W> for MetadataRequestTopic
80where
81    W: Write,
82{
83    fn write_versioned(
84        &self,
85        writer: &mut W,
86        version: ApiVersion,
87    ) -> Result<(), WriteVersionedError> {
88        assert!(version.0.0 <= 4);
89        Ok(self.name.write(writer)?)
90    }
91}
92
93#[derive(Debug, PartialEq, Eq, Clone)]
94pub struct MetadataResponse {
95    /// The duration in milliseconds for which the request was throttled due to
96    /// a quota violation, or zero if the request did not violate any quota.
97    ///
98    /// Added in version 3
99    pub throttle_time_ms: Option<Int32>,
100
101    /// Each broker in the response
102    pub brokers: Vec<MetadataResponseBroker>,
103
104    /// The cluster ID that responding broker belongs to.
105    ///
106    /// Added in version 2
107    pub cluster_id: Option<NullableString>,
108
109    /// The ID of the controller broker.
110    ///
111    /// Added in version 1
112    pub controller_id: Option<Int32>,
113
114    /// Each topic in the response
115    pub topics: Vec<MetadataResponseTopic>,
116}
117
118impl<R> ReadVersionedType<R> for MetadataResponse
119where
120    R: Read,
121{
122    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
123        let v = version.0.0;
124        assert!(v <= 4);
125
126        let throttle_time_ms = (v >= 3).then(|| Int32::read(reader)).transpose()?;
127        let brokers = read_versioned_array(reader, version)?.unwrap_or_default();
128        let cluster_id = (v >= 2).then(|| NullableString::read(reader)).transpose()?;
129        let controller_id = (v >= 1).then(|| Int32::read(reader)).transpose()?;
130        let topics = read_versioned_array(reader, version)?.unwrap_or_default();
131
132        Ok(Self {
133            throttle_time_ms,
134            brokers,
135            topics,
136            cluster_id,
137            controller_id,
138        })
139    }
140}
141
142#[derive(Debug, PartialEq, Eq, Clone)]
143pub struct MetadataResponseBroker {
144    /// The broker ID
145    pub node_id: Int32,
146    /// The broker hostname
147    pub host: String_,
148    /// The broker port
149    pub port: Int32,
150    /// Added in version 1
151    pub rack: Option<NullableString>,
152}
153
154impl<R> ReadVersionedType<R> for MetadataResponseBroker
155where
156    R: Read,
157{
158    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
159        let v = version.0.0;
160        assert!(v <= 4);
161
162        let node_id = Int32::read(reader)?;
163        let host = String_::read(reader)?;
164        let port = Int32::read(reader)?;
165        let rack = (v >= 1).then(|| NullableString::read(reader)).transpose()?;
166
167        Ok(Self {
168            node_id,
169            host,
170            port,
171            rack,
172        })
173    }
174}
175
176#[derive(Debug, PartialEq, Eq, Clone)]
177pub struct MetadataResponseTopic {
178    /// The topic error if any
179    pub error: Option<Error>,
180    /// The topic name
181    pub name: String_,
182    /// True if the topic is internal
183    pub is_internal: Option<Boolean>,
184    /// Each partition in the topic
185    pub partitions: Vec<MetadataResponsePartition>,
186}
187
188impl<R> ReadVersionedType<R> for MetadataResponseTopic
189where
190    R: Read,
191{
192    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
193        let v = version.0.0;
194        assert!(v <= 4);
195
196        let error = Error::new(Int16::read(reader)?.0);
197        let name = String_::read(reader)?;
198        let is_internal = (v >= 1).then(|| Boolean::read(reader)).transpose()?;
199        let partitions = read_versioned_array(reader, version)?.unwrap_or_default();
200
201        Ok(Self {
202            error,
203            name,
204            is_internal,
205            partitions,
206        })
207    }
208}
209
210#[derive(Debug, PartialEq, Eq, Clone)]
211pub struct MetadataResponsePartition {
212    /// The partition error if any
213    pub error: Option<Error>,
214    /// The partition index
215    pub partition_index: Int32,
216    /// The ID of the leader broker
217    pub leader_id: Int32,
218    /// The set of all nodes that host this partition
219    pub replica_nodes: Array<Int32>,
220    /// The set of all nodes that are in sync with the leader for this partition
221    pub isr_nodes: Array<Int32>,
222}
223
224impl<R> ReadVersionedType<R> for MetadataResponsePartition
225where
226    R: Read,
227{
228    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
229        let v = version.0.0;
230        assert!(v <= 4);
231
232        Ok(Self {
233            error: Error::new(Int16::read(reader)?.0),
234            partition_index: Int32::read(reader)?,
235            leader_id: Int32::read(reader)?,
236            replica_nodes: Array::read(reader)?,
237            isr_nodes: Array::read(reader)?,
238        })
239    }
240}