use std::ffi::CStr;
use std::slice;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::error::IsError;
use crate::util::{KafkaDrop, NativePtr};
pub struct MetadataBroker(RDKafkaMetadataBroker);
impl MetadataBroker {
pub fn id(&self) -> i32 {
self.0.id
}
pub fn host(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.host)
.to_str()
.expect("Broker host is not a valid UTF-8 string")
}
}
pub fn port(&self) -> i32 {
self.0.port
}
}
pub struct MetadataPartition(RDKafkaMetadataPartition);
impl MetadataPartition {
pub fn id(&self) -> i32 {
self.0.id
}
pub fn leader(&self) -> i32 {
self.0.leader
}
pub fn error(&self) -> Option<RDKafkaRespErr> {
if self.0.err.is_error() {
Some(self.0.err)
} else {
None
}
}
pub fn replicas(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
}
pub fn isr(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
}
}
pub struct MetadataTopic(RDKafkaMetadataTopic);
impl MetadataTopic {
pub fn name(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.topic)
.to_str()
.expect("Topic name is not a valid UTF-8 string")
}
}
pub fn partitions(&self) -> &[MetadataPartition] {
unsafe {
slice::from_raw_parts(
self.0.partitions as *const MetadataPartition,
self.0.partition_cnt as usize,
)
}
}
pub fn error(&self) -> Option<RDKafkaRespErr> {
if self.0.err.is_error() {
Some(self.0.err)
} else {
None
}
}
}
pub struct Metadata(NativePtr<RDKafkaMetadata>);
unsafe impl KafkaDrop for RDKafkaMetadata {
const TYPE: &'static str = "metadata";
const DROP: unsafe extern "C" fn(*mut Self) = drop_metadata;
}
unsafe extern "C" fn drop_metadata(ptr: *mut RDKafkaMetadata) {
rdsys::rd_kafka_metadata_destroy(ptr as *const _)
}
impl Metadata {
pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaMetadata) -> Metadata {
Metadata(NativePtr::from_ptr(ptr as *mut _).unwrap())
}
pub fn orig_broker_id(&self) -> i32 {
self.0.orig_broker_id
}
pub fn orig_broker_name(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.orig_broker_name)
.to_str()
.expect("Broker name is not a valid UTF-8 string")
}
}
pub fn brokers(&self) -> &[MetadataBroker] {
unsafe {
slice::from_raw_parts(
self.0.brokers as *const MetadataBroker,
self.0.broker_cnt as usize,
)
}
}
pub fn topics(&self) -> &[MetadataTopic] {
unsafe {
slice::from_raw_parts(
self.0.topics as *const MetadataTopic,
self.0.topic_cnt as usize,
)
}
}
}
unsafe impl Send for Metadata {}
unsafe impl Sync for Metadata {}