use std::ffi::CStr;
use std::fmt;
use std::slice;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::error::IsError;
use crate::util::{KafkaDrop, NativePtr};
pub struct MetadataBroker(RDKafkaMetadataBroker);
impl MetadataBroker {
pub fn id(&self) -> i32 {
self.0.id
}
pub fn host(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.host)
.to_str()
.expect("Broker host is not a valid UTF-8 string")
}
}
pub fn port(&self) -> i32 {
self.0.port
}
}
impl fmt::Debug for MetadataBroker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MetadataBroker")
.field("id", &self.id())
.field("host", &self.host())
.field("port", &self.port())
.finish()
}
}
pub struct MetadataPartition(RDKafkaMetadataPartition);
impl MetadataPartition {
pub fn id(&self) -> i32 {
self.0.id
}
pub fn leader(&self) -> i32 {
self.0.leader
}
pub fn error(&self) -> Option<RDKafkaRespErr> {
if self.0.err.is_error() {
Some(self.0.err)
} else {
None
}
}
pub fn replicas(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
}
pub fn isr(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
}
}
impl fmt::Debug for MetadataPartition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetadataPartition");
debug_struct.field("id", &self.id());
if let Some(err) = self.error() {
debug_struct.field("error", &err);
}
debug_struct
.field("leader", &self.leader())
.field("replicas", &self.replicas())
.field("isr", &self.isr()) .finish()
}
}
pub struct MetadataTopic(RDKafkaMetadataTopic);
impl MetadataTopic {
pub fn name(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.topic)
.to_str()
.expect("Topic name is not a valid UTF-8 string")
}
}
pub fn partitions(&self) -> &[MetadataPartition] {
unsafe {
slice::from_raw_parts(
self.0.partitions as *const MetadataPartition,
self.0.partition_cnt as usize,
)
}
}
pub fn error(&self) -> Option<RDKafkaRespErr> {
if self.0.err.is_error() {
Some(self.0.err)
} else {
None
}
}
}
impl fmt::Debug for MetadataTopic {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetadataTopic");
debug_struct.field("name", &self.name());
if let Some(err) = self.error() {
debug_struct.field("error", &err);
}
debug_struct.field("partitions", &self.partitions());
debug_struct.finish()
}
}
pub struct Metadata(NativePtr<RDKafkaMetadata>);
unsafe impl KafkaDrop for RDKafkaMetadata {
const TYPE: &'static str = "metadata";
const DROP: unsafe extern "C" fn(*mut Self) = drop_metadata;
}
unsafe extern "C" fn drop_metadata(ptr: *mut RDKafkaMetadata) {
rdsys::rd_kafka_metadata_destroy(ptr as *const _)
}
impl Metadata {
pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaMetadata) -> Metadata {
Metadata(NativePtr::from_ptr(ptr as *mut _).unwrap())
}
pub fn orig_broker_id(&self) -> i32 {
self.0.orig_broker_id
}
pub fn orig_broker_name(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.orig_broker_name)
.to_str()
.expect("Broker name is not a valid UTF-8 string")
}
}
pub fn brokers(&self) -> &[MetadataBroker] {
unsafe {
slice::from_raw_parts(
self.0.brokers as *const MetadataBroker,
self.0.broker_cnt as usize,
)
}
}
pub fn topics(&self) -> &[MetadataTopic] {
unsafe {
slice::from_raw_parts(
self.0.topics as *const MetadataTopic,
self.0.topic_cnt as usize,
)
}
}
}
impl fmt::Debug for Metadata {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Metadata")
.field("orig_broker_name", &self.orig_broker_name())
.field("orig_broker_id", &self.orig_broker_id())
.field("brokers", &self.brokers())
.field("topics", &self.topics())
.finish()
}
}
unsafe impl Send for Metadata {}
unsafe impl Sync for Metadata {}