use rdsys;
use rdsys::types::*;
use std::slice;
use std::str;
#[derive(Debug,PartialEq,Eq)]
pub enum Timestamp {
NotAvailable,
CreateTime(i64),
LogAppendTime(i64)
}
#[derive(Debug)]
pub struct Message {
ptr: *mut RDKafkaMessage,
}
unsafe impl Send for Message {}
impl<'a> Message {
pub fn new(ptr: *mut RDKafkaMessage) -> Message {
Message { ptr: ptr }
}
pub fn ptr(&self) -> *mut RDKafkaMessage {
self.ptr
}
pub fn key_len(&self) -> usize {
unsafe { (*self.ptr).key_len }
}
pub fn payload_len(&self) -> usize {
unsafe { (*self.ptr).len }
}
pub fn key(&'a self) -> Option<&'a [u8]> {
unsafe {
if (*self.ptr).key.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>((*self.ptr).key as *const u8, (*self.ptr).key_len))
}
}
}
pub fn payload(&'a self) -> Option<&'a [u8]> {
unsafe {
if (*self.ptr).payload.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>((*self.ptr).payload as *const u8, (*self.ptr).len))
}
}
}
pub fn payload_view<P: ?Sized + FromBytes>(&'a self) -> Option<Result<&'a P, P::Error>> {
self.payload().map(P::from_bytes)
}
pub fn key_view<K: ?Sized + FromBytes>(&'a self) -> Option<Result<&'a K, K::Error>> {
self.key().map(K::from_bytes)
}
pub fn partition(&self) -> i32 {
unsafe { (*self.ptr).partition }
}
pub fn offset(&self) -> i64 {
unsafe { (*self.ptr).offset }
}
pub fn timestamp(&self) -> Timestamp {
let mut timestamp_type = rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
let timestamp = unsafe {
rdsys::rd_kafka_message_timestamp(
self.ptr,
&mut timestamp_type
)
};
match timestamp_type {
rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE => Timestamp::NotAvailable,
rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_CREATE_TIME => Timestamp::CreateTime(timestamp),
rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME => Timestamp::LogAppendTime(timestamp)
}
}
}
impl Drop for Message {
fn drop(&mut self) {
trace!("Destroying message {:?}", self);
unsafe { rdsys::rd_kafka_message_destroy(self.ptr) };
}
}
pub trait FromBytes {
type Error;
fn from_bytes(&[u8]) -> Result<&Self, Self::Error>;
}
impl FromBytes for [u8] {
type Error = ();
fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
Ok(bytes)
}
}
impl FromBytes for str {
type Error = str::Utf8Error;
fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
str::from_utf8(bytes)
}
}
pub trait ToBytes {
fn to_bytes(&self) -> &[u8];
}
impl<'a> ToBytes for &'a [u8] {
fn to_bytes(&self) -> &[u8] {
self
}
}
impl ToBytes for Vec<u8> {
fn to_bytes(&self) -> &[u8] {
self.as_slice()
}
}
impl<'a> ToBytes for &'a str {
fn to_bytes(&self) -> &[u8] {
self.as_bytes()
}
}
impl ToBytes for String {
fn to_bytes(&self) -> &[u8] {
self.as_bytes()
}
}
impl ToBytes for () {
fn to_bytes(&self) -> &[u8] {
&[]
}
}