rdkafka 0.8.1

Rust wrapper for librdkafka
Documentation
//! Metadata fetch API
extern crate rdkafka_sys as rdkafka;

use std::ffi::CStr;
use std::os::raw::c_void;
use std::ptr;
use std::marker::PhantomData;

use log::LogLevel;

use self::rdkafka::types::*;

use config::TopicConfig;
use error::{IsError, KafkaResult, KafkaError};
use util::{bytes_cstr_to_owned, cstr_to_owned};

pub fn get_native_metadata(client: *mut RDKafka) -> KafkaResult<Metadata> {
    let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
    let ret = unsafe {
        rdkafka::rd_kafka_metadata(client, 1, ptr::null::<u8>() as *mut RDKafkaTopic, &mut metadata_ptr as *mut *const RDKafkaMetadata, 2000)
    };
    if ret.is_error() {
        return Err(KafkaError::MetadataFetch(ret));
    }

    Ok(Metadata(metadata_ptr))
}

#[derive(Debug)]
pub struct Metadata(*const RDKafkaMetadata);

impl Metadata {
    pub fn topic_iterator(&self) -> MetadataTopicIterator {
        MetadataTopicIterator {
            metadata: self,
            index: 0,
        }
    }

    pub fn topics(&'a self) -> &'a [RDKafkaMetadataTopic] {
        unsafe { slice::from_raw_parts(self.0.topics, self.0.topic_cnt as usize) };
    }
}

impl Drop for Metadata {
    fn drop(&mut self) {
        unsafe { rdkafka::rd_kafka_metadata_destroy(self.0) };
    }
}

#[derive(Debug)]
pub struct MetadataTopicIterator<'a> {
    metadata: &'a Metadata,
    index: isize,
}

impl<'a> Iterator for MetadataTopicIterator<'a> {
    type Item = MetadataTopic<'a>;

    fn next(&mut self) -> Option<MetadataTopic<'a>> {
        let count = unsafe { (*self.metadata.0).topic_cnt as isize };
        if self.index >= count {
            return None
        }
        let topic_info = MetadataTopic {
            ptr: unsafe { (*self.metadata.0).topics.offset(self.index) },
            _p: PhantomData
        };
        self.index += 1;

        Some(topic_info)
    }
}

#[derive(Debug)]
pub struct MetadataTopic<'a> {
    ptr: *mut RDKafkaMetadataTopic,
    _p: PhantomData<&'a u8>,
}

impl<'a> MetadataTopic<'a> {
    pub fn name(&self) -> &'a str {
        unsafe {
            CStr::from_ptr((*self.ptr).topic)
                .to_str()
                .expect("Topic name is not a valid UTF-8 string")
        }
    }

    pub fn partition_iterator(&self) -> MetadataPartitionIterator<'a> {
        MetadataPartitionIterator {
            partitions: unsafe { (*self.ptr).partitions },
            count: unsafe { (*self.ptr).partition_cnt } as isize,
            index: 0,
            _p: PhantomData,
        }
    }
}

pub struct MetadataPartitionIterator<'a> {
    partitions: *mut RDKafkaMetadataPartition,
    count: isize,
    index: isize,
    _p: PhantomData<&'a u8>
}

impl<'a> Iterator for MetadataPartitionIterator<'a> {
    type Item = MetadataPartition<'a>;

    fn next(&mut self) -> Option<MetadataPartition<'a>> {
        if self.index >= self.count {
            return None
        }
        let partition_metadata_ptr = unsafe { self.partitions.offset(self.index) };
        self.index += 1;
        Some(MetadataPartition{ptr: partition_metadata_ptr, _p: PhantomData})
    }
}

pub struct MetadataPartition<'a> {
    ptr: *mut RDKafkaMetadataPartition,
    _p: PhantomData<&'a u8>
}

impl<'a> MetadataPartition<'a> {
    pub fn id(&self) -> i32 {
        unsafe { (*self.ptr).id }
    }
}