extern crate rdkafka_sys as rdkafka;
use std::ffi::CStr;
use std::os::raw::c_void;
use std::ptr;
use std::marker::PhantomData;
use log::LogLevel;
use self::rdkafka::types::*;
use config::TopicConfig;
use error::{IsError, KafkaResult, KafkaError};
use util::{bytes_cstr_to_owned, cstr_to_owned};
pub fn get_native_metadata(client: *mut RDKafka) -> KafkaResult<Metadata> {
let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
let ret = unsafe {
rdkafka::rd_kafka_metadata(client, 1, ptr::null::<u8>() as *mut RDKafkaTopic, &mut metadata_ptr as *mut *const RDKafkaMetadata, 2000)
};
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret));
}
Ok(Metadata(metadata_ptr))
}
#[derive(Debug)]
pub struct Metadata(*const RDKafkaMetadata);
impl Metadata {
pub fn topic_iterator(&self) -> MetadataTopicIterator {
MetadataTopicIterator {
metadata: self,
index: 0,
}
}
pub fn topics(&'a self) -> &'a [RDKafkaMetadataTopic] {
unsafe { slice::from_raw_parts(self.0.topics, self.0.topic_cnt as usize) };
}
}
impl Drop for Metadata {
fn drop(&mut self) {
unsafe { rdkafka::rd_kafka_metadata_destroy(self.0) };
}
}
#[derive(Debug)]
pub struct MetadataTopicIterator<'a> {
metadata: &'a Metadata,
index: isize,
}
impl<'a> Iterator for MetadataTopicIterator<'a> {
type Item = MetadataTopic<'a>;
fn next(&mut self) -> Option<MetadataTopic<'a>> {
let count = unsafe { (*self.metadata.0).topic_cnt as isize };
if self.index >= count {
return None
}
let topic_info = MetadataTopic {
ptr: unsafe { (*self.metadata.0).topics.offset(self.index) },
_p: PhantomData
};
self.index += 1;
Some(topic_info)
}
}
#[derive(Debug)]
pub struct MetadataTopic<'a> {
ptr: *mut RDKafkaMetadataTopic,
_p: PhantomData<&'a u8>,
}
impl<'a> MetadataTopic<'a> {
pub fn name(&self) -> &'a str {
unsafe {
CStr::from_ptr((*self.ptr).topic)
.to_str()
.expect("Topic name is not a valid UTF-8 string")
}
}
pub fn partition_iterator(&self) -> MetadataPartitionIterator<'a> {
MetadataPartitionIterator {
partitions: unsafe { (*self.ptr).partitions },
count: unsafe { (*self.ptr).partition_cnt } as isize,
index: 0,
_p: PhantomData,
}
}
}
pub struct MetadataPartitionIterator<'a> {
partitions: *mut RDKafkaMetadataPartition,
count: isize,
index: isize,
_p: PhantomData<&'a u8>
}
impl<'a> Iterator for MetadataPartitionIterator<'a> {
type Item = MetadataPartition<'a>;
fn next(&mut self) -> Option<MetadataPartition<'a>> {
if self.index >= self.count {
return None
}
let partition_metadata_ptr = unsafe { self.partitions.offset(self.index) };
self.index += 1;
Some(MetadataPartition{ptr: partition_metadata_ptr, _p: PhantomData})
}
}
pub struct MetadataPartition<'a> {
ptr: *mut RDKafkaMetadataPartition,
_p: PhantomData<&'a u8>
}
impl<'a> MetadataPartition<'a> {
pub fn id(&self) -> i32 {
unsafe { (*self.ptr).id }
}
}