Skip to main content

rdkafka/
metadata.rs

1//! Cluster metadata.
2
3use std::ffi::CStr;
4use std::fmt;
5use std::slice;
6
7use rdkafka_sys as rdsys;
8use rdkafka_sys::types::*;
9
10use crate::error::IsError;
11use crate::util::{KafkaDrop, NativePtr};
12
13/// Broker metadata information.
14pub struct MetadataBroker(RDKafkaMetadataBroker);
15
16impl MetadataBroker {
17    /// Returns the id of the broker.
18    pub fn id(&self) -> i32 {
19        self.0.id
20    }
21
22    /// Returns the host name of the broker.
23    pub fn host(&self) -> &str {
24        unsafe {
25            CStr::from_ptr(self.0.host)
26                .to_str()
27                .expect("Broker host is not a valid UTF-8 string")
28        }
29    }
30
31    /// Returns the port of the broker.
32    pub fn port(&self) -> i32 {
33        self.0.port
34    }
35}
36
37impl fmt::Debug for MetadataBroker {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("MetadataBroker")
40            .field("id", &self.id())
41            .field("host", &self.host())
42            .field("port", &self.port())
43            .finish()
44    }
45}
46
47/// Partition metadata information.
48pub struct MetadataPartition(RDKafkaMetadataPartition);
49
50impl MetadataPartition {
51    /// Returns the id of the partition.
52    pub fn id(&self) -> i32 {
53        self.0.id
54    }
55
56    /// Returns the broker id of the leader broker for the partition.
57    pub fn leader(&self) -> i32 {
58        self.0.leader
59    }
60
61    // TODO: return result?
62    /// Returns the metadata error for the partition, or `None` if there is no
63    /// error.
64    pub fn error(&self) -> Option<RDKafkaRespErr> {
65        if self.0.err.is_error() {
66            Some(self.0.err)
67        } else {
68            None
69        }
70    }
71
72    /// Returns the broker IDs of the replicas.
73    pub fn replicas(&self) -> &[i32] {
74        unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
75    }
76
77    /// Returns the broker IDs of the in-sync replicas.
78    pub fn isr(&self) -> &[i32] {
79        unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
80    }
81}
82
83impl fmt::Debug for MetadataPartition {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        let mut debug_struct = f.debug_struct("MetadataPartition");
86        debug_struct.field("id", &self.id());
87        if let Some(err) = self.error() {
88            debug_struct.field("error", &err);
89        }
90        debug_struct
91            .field("leader", &self.leader())
92            .field("replicas", &self.replicas())
93            .field("isr", &self.isr()) // In-Sync Replicas
94            .finish()
95    }
96}
97
98/// Topic metadata information.
99pub struct MetadataTopic(RDKafkaMetadataTopic);
100
101impl MetadataTopic {
102    /// Returns the name of the topic.
103    pub fn name(&self) -> &str {
104        unsafe {
105            CStr::from_ptr(self.0.topic)
106                .to_str()
107                .expect("Topic name is not a valid UTF-8 string")
108        }
109    }
110
111    /// Returns the partition metadata information for all the partitions.
112    pub fn partitions(&self) -> &[MetadataPartition] {
113        unsafe {
114            slice::from_raw_parts(
115                self.0.partitions as *const MetadataPartition,
116                self.0.partition_cnt as usize,
117            )
118        }
119    }
120
121    /// Returns the metadata error for the topic, or `None` if there was no
122    /// error.
123    pub fn error(&self) -> Option<RDKafkaRespErr> {
124        if self.0.err.is_error() {
125            Some(self.0.err)
126        } else {
127            None
128        }
129    }
130}
131
132impl fmt::Debug for MetadataTopic {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        let mut debug_struct = f.debug_struct("MetadataTopic");
135        debug_struct.field("name", &self.name());
136        if let Some(err) = self.error() {
137            debug_struct.field("error", &err);
138        }
139        debug_struct.field("partitions", &self.partitions());
140        debug_struct.finish()
141    }
142}
143
144/// Metadata container.
145///
146/// This structure wraps the metadata pointer returned by rdkafka-sys, and
147/// deallocates all the native resources when dropped.
148pub struct Metadata(NativePtr<RDKafkaMetadata>);
149
150unsafe impl KafkaDrop for RDKafkaMetadata {
151    const TYPE: &'static str = "metadata";
152    const DROP: unsafe extern "C" fn(*mut Self) = drop_metadata;
153}
154
155unsafe extern "C" fn drop_metadata(ptr: *mut RDKafkaMetadata) {
156    rdsys::rd_kafka_metadata_destroy(ptr as *const _)
157}
158
159impl Metadata {
160    /// Creates a new Metadata container given a pointer to the native rdkafka-sys metadata.
161    pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaMetadata) -> Metadata {
162        Metadata(NativePtr::from_ptr(ptr as *mut _).unwrap())
163    }
164
165    /// Returns the ID of the broker originating this metadata.
166    pub fn orig_broker_id(&self) -> i32 {
167        self.0.orig_broker_id
168    }
169
170    /// Returns the hostname of the broker originating this metadata.
171    pub fn orig_broker_name(&self) -> &str {
172        unsafe {
173            CStr::from_ptr(self.0.orig_broker_name)
174                .to_str()
175                .expect("Broker name is not a valid UTF-8 string")
176        }
177    }
178
179    /// Returns the metadata information for all the brokers in the cluster.
180    pub fn brokers(&self) -> &[MetadataBroker] {
181        unsafe {
182            slice::from_raw_parts(
183                self.0.brokers as *const MetadataBroker,
184                self.0.broker_cnt as usize,
185            )
186        }
187    }
188
189    /// Returns the metadata information for all the topics in the cluster.
190    pub fn topics(&self) -> &[MetadataTopic] {
191        unsafe {
192            slice::from_raw_parts(
193                self.0.topics as *const MetadataTopic,
194                self.0.topic_cnt as usize,
195            )
196        }
197    }
198}
199
200impl fmt::Debug for Metadata {
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        f.debug_struct("Metadata")
203            .field("orig_broker_name", &self.orig_broker_name())
204            .field("orig_broker_id", &self.orig_broker_id())
205            .field("brokers", &self.brokers())
206            .field("topics", &self.topics())
207            .finish()
208    }
209}
210
211unsafe impl Send for Metadata {}
212unsafe impl Sync for Metadata {}