1use std::ffi::CStr;
4use std::fmt;
5use std::slice;
6
7use rdkafka_sys as rdsys;
8use rdkafka_sys::types::*;
9
10use crate::error::IsError;
11use crate::util::{KafkaDrop, NativePtr};
12
13pub struct MetadataBroker(RDKafkaMetadataBroker);
15
16impl MetadataBroker {
17 pub fn id(&self) -> i32 {
19 self.0.id
20 }
21
22 pub fn host(&self) -> &str {
24 unsafe {
25 CStr::from_ptr(self.0.host)
26 .to_str()
27 .expect("Broker host is not a valid UTF-8 string")
28 }
29 }
30
31 pub fn port(&self) -> i32 {
33 self.0.port
34 }
35}
36
37impl fmt::Debug for MetadataBroker {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("MetadataBroker")
40 .field("id", &self.id())
41 .field("host", &self.host())
42 .field("port", &self.port())
43 .finish()
44 }
45}
46
47pub struct MetadataPartition(RDKafkaMetadataPartition);
49
50impl MetadataPartition {
51 pub fn id(&self) -> i32 {
53 self.0.id
54 }
55
56 pub fn leader(&self) -> i32 {
58 self.0.leader
59 }
60
61 pub fn error(&self) -> Option<RDKafkaRespErr> {
65 if self.0.err.is_error() {
66 Some(self.0.err)
67 } else {
68 None
69 }
70 }
71
72 pub fn replicas(&self) -> &[i32] {
74 unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
75 }
76
77 pub fn isr(&self) -> &[i32] {
79 unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
80 }
81}
82
83impl fmt::Debug for MetadataPartition {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 let mut debug_struct = f.debug_struct("MetadataPartition");
86 debug_struct.field("id", &self.id());
87 if let Some(err) = self.error() {
88 debug_struct.field("error", &err);
89 }
90 debug_struct
91 .field("leader", &self.leader())
92 .field("replicas", &self.replicas())
93 .field("isr", &self.isr()) .finish()
95 }
96}
97
98pub struct MetadataTopic(RDKafkaMetadataTopic);
100
101impl MetadataTopic {
102 pub fn name(&self) -> &str {
104 unsafe {
105 CStr::from_ptr(self.0.topic)
106 .to_str()
107 .expect("Topic name is not a valid UTF-8 string")
108 }
109 }
110
111 pub fn partitions(&self) -> &[MetadataPartition] {
113 unsafe {
114 slice::from_raw_parts(
115 self.0.partitions as *const MetadataPartition,
116 self.0.partition_cnt as usize,
117 )
118 }
119 }
120
121 pub fn error(&self) -> Option<RDKafkaRespErr> {
124 if self.0.err.is_error() {
125 Some(self.0.err)
126 } else {
127 None
128 }
129 }
130}
131
132impl fmt::Debug for MetadataTopic {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 let mut debug_struct = f.debug_struct("MetadataTopic");
135 debug_struct.field("name", &self.name());
136 if let Some(err) = self.error() {
137 debug_struct.field("error", &err);
138 }
139 debug_struct.field("partitions", &self.partitions());
140 debug_struct.finish()
141 }
142}
143
144pub struct Metadata(NativePtr<RDKafkaMetadata>);
149
150unsafe impl KafkaDrop for RDKafkaMetadata {
151 const TYPE: &'static str = "metadata";
152 const DROP: unsafe extern "C" fn(*mut Self) = drop_metadata;
153}
154
155unsafe extern "C" fn drop_metadata(ptr: *mut RDKafkaMetadata) {
156 rdsys::rd_kafka_metadata_destroy(ptr as *const _)
157}
158
159impl Metadata {
160 pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaMetadata) -> Metadata {
162 Metadata(NativePtr::from_ptr(ptr as *mut _).unwrap())
163 }
164
165 pub fn orig_broker_id(&self) -> i32 {
167 self.0.orig_broker_id
168 }
169
170 pub fn orig_broker_name(&self) -> &str {
172 unsafe {
173 CStr::from_ptr(self.0.orig_broker_name)
174 .to_str()
175 .expect("Broker name is not a valid UTF-8 string")
176 }
177 }
178
179 pub fn brokers(&self) -> &[MetadataBroker] {
181 unsafe {
182 slice::from_raw_parts(
183 self.0.brokers as *const MetadataBroker,
184 self.0.broker_cnt as usize,
185 )
186 }
187 }
188
189 pub fn topics(&self) -> &[MetadataTopic] {
191 unsafe {
192 slice::from_raw_parts(
193 self.0.topics as *const MetadataTopic,
194 self.0.topic_cnt as usize,
195 )
196 }
197 }
198}
199
200impl fmt::Debug for Metadata {
201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202 f.debug_struct("Metadata")
203 .field("orig_broker_name", &self.orig_broker_name())
204 .field("orig_broker_id", &self.orig_broker_id())
205 .field("brokers", &self.brokers())
206 .field("topics", &self.topics())
207 .finish()
208 }
209}
210
211unsafe impl Send for Metadata {}
212unsafe impl Sync for Metadata {}