use std::ffi::{CStr, CString};
use std::fmt;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::ptr;
use std::str;
use std::time::SystemTime;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Timestamp {
NotAvailable,
CreateTime(i64),
LogAppendTime(i64),
}
impl Timestamp {
pub fn to_millis(&self) -> Option<i64> {
match *self {
Timestamp::NotAvailable | Timestamp::CreateTime(-1) | Timestamp::LogAppendTime(-1) => {
None
}
Timestamp::CreateTime(t) | Timestamp::LogAppendTime(t) => Some(t),
}
}
pub fn now() -> Timestamp {
Timestamp::from(SystemTime::now())
}
}
impl From<i64> for Timestamp {
fn from(system_time: i64) -> Timestamp {
Timestamp::CreateTime(system_time)
}
}
impl From<SystemTime> for Timestamp {
fn from(system_time: SystemTime) -> Timestamp {
Timestamp::CreateTime(millis_to_epoch(system_time))
}
}
pub trait Headers {
fn count(&self) -> usize;
fn get(&self, idx: usize) -> Option<(&str, &[u8])>;
fn get_as<V: FromBytes + ?Sized>(&self, idx: usize) -> Option<(&str, Result<&V, V::Error>)> {
self.get(idx)
.map(|(name, value)| (name, V::from_bytes(value)))
}
}
pub trait Message {
type Headers: Headers;
fn key(&self) -> Option<&[u8]>;
fn payload(&self) -> Option<&[u8]>;
fn topic(&self) -> &str;
fn partition(&self) -> i32;
fn offset(&self) -> i64;
fn timestamp(&self) -> Timestamp;
fn payload_view<P: ?Sized + FromBytes>(&self) -> Option<Result<&P, P::Error>> {
self.payload().map(P::from_bytes)
}
fn key_view<K: ?Sized + FromBytes>(&self) -> Option<Result<&K, K::Error>> {
self.key().map(K::from_bytes)
}
fn headers(&self) -> Option<&Self::Headers>;
}
pub struct BorrowedHeaders;
impl BorrowedHeaders {
unsafe fn from_native_ptr<T>(
_owner: &T,
headers_ptr: *mut rdsys::rd_kafka_headers_t,
) -> &BorrowedHeaders {
&*(headers_ptr as *mut BorrowedHeaders)
}
fn as_native_ptr(&self) -> *const RDKafkaHeaders {
self as *const BorrowedHeaders as *const RDKafkaHeaders
}
pub fn detach(&self) -> OwnedHeaders {
OwnedHeaders {
ptr: unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.as_native_ptr())).unwrap()
},
}
}
}
impl Headers for BorrowedHeaders {
fn count(&self) -> usize {
unsafe { rdsys::rd_kafka_header_cnt(self.as_native_ptr()) }
}
fn get(&self, idx: usize) -> Option<(&str, &[u8])> {
let mut value_ptr = ptr::null();
let mut name_ptr = ptr::null();
let mut value_size = 0;
let err = unsafe {
rdsys::rd_kafka_header_get_all(
self.as_native_ptr(),
idx,
&mut name_ptr,
&mut value_ptr,
&mut value_size,
)
};
if err.is_error() {
None
} else {
unsafe {
Some((
CStr::from_ptr(name_ptr).to_str().unwrap(),
util::ptr_to_slice(value_ptr, value_size),
))
}
}
}
}
pub struct BorrowedMessage<'a> {
ptr: NativePtr<RDKafkaMessage>,
_owner: PhantomData<&'a u8>,
}
unsafe impl KafkaDrop for RDKafkaMessage {
const TYPE: &'static str = "message";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_message_destroy;
}
impl<'a> fmt::Debug for BorrowedMessage<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Message {{ ptr: {:?} }}", self.ptr())
}
}
impl<'a> BorrowedMessage<'a> {
pub(crate) unsafe fn from_consumer<C>(
ptr: NativePtr<RDKafkaMessage>,
_consumer: &'a C,
) -> KafkaResult<BorrowedMessage<'a>> {
if ptr.err.is_error() {
let err = match ptr.err {
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
KafkaError::PartitionEOF((*ptr).partition)
}
e => KafkaError::MessageConsumption(e.into()),
};
Err(err)
} else {
Ok(BorrowedMessage {
ptr,
_owner: PhantomData,
})
}
}
pub(crate) unsafe fn from_dr_callback<O>(
ptr: *mut RDKafkaMessage,
_owner: &'a O,
) -> DeliveryResult<'a> {
let borrowed_message = BorrowedMessage {
ptr: NativePtr::from_ptr(ptr).unwrap(),
_owner: PhantomData,
};
if (*ptr).err.is_error() {
Err((
KafkaError::MessageProduction((*ptr).err.into()),
borrowed_message,
))
} else {
Ok(borrowed_message)
}
}
pub fn ptr(&self) -> *mut RDKafkaMessage {
self.ptr.ptr()
}
pub fn topic_ptr(&self) -> *mut RDKafkaTopic {
self.ptr.rkt
}
pub fn key_len(&self) -> usize {
self.ptr.key_len
}
pub fn payload_len(&self) -> usize {
self.ptr.len
}
pub fn detach(&self) -> OwnedMessage {
OwnedMessage {
key: self.key().map(|k| k.to_vec()),
payload: self.payload().map(|p| p.to_vec()),
topic: self.topic().to_owned(),
timestamp: self.timestamp(),
partition: self.partition(),
offset: self.offset(),
headers: self.headers().map(BorrowedHeaders::detach),
}
}
}
impl<'a> Message for BorrowedMessage<'a> {
type Headers = BorrowedHeaders;
fn key(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
}
fn payload(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
}
fn topic(&self) -> &str {
unsafe {
CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
.to_str()
.expect("Topic name is not valid UTF-8")
}
}
fn partition(&self) -> i32 {
self.ptr.partition
}
fn offset(&self) -> i64 {
self.ptr.offset
}
fn timestamp(&self) -> Timestamp {
let mut timestamp_type = rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
let timestamp =
unsafe { rdsys::rd_kafka_message_timestamp(self.ptr.ptr(), &mut timestamp_type) };
if timestamp == -1 {
Timestamp::NotAvailable
} else {
match timestamp_type {
rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE => {
Timestamp::NotAvailable
}
rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_CREATE_TIME => {
Timestamp::CreateTime(timestamp)
}
rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME => {
Timestamp::LogAppendTime(timestamp)
}
}
}
}
fn headers(&self) -> Option<&BorrowedHeaders> {
let mut native_headers_ptr = ptr::null_mut();
unsafe {
let err = rdsys::rd_kafka_message_headers(self.ptr.ptr(), &mut native_headers_ptr);
match err.into() {
RDKafkaErrorCode::NoError => {
Some(BorrowedHeaders::from_native_ptr(self, native_headers_ptr))
}
RDKafkaErrorCode::NoEnt => None,
_ => None,
}
}
}
}
unsafe impl<'a> Send for BorrowedMessage<'a> {}
unsafe impl<'a> Sync for BorrowedMessage<'a> {}
#[derive(Debug)]
pub struct OwnedHeaders {
ptr: NativePtr<RDKafkaHeaders>,
}
unsafe impl KafkaDrop for RDKafkaHeaders {
const TYPE: &'static str = "headers";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_headers_destroy;
}
unsafe impl Send for OwnedHeaders {}
unsafe impl Sync for OwnedHeaders {}
impl OwnedHeaders {
pub fn new() -> OwnedHeaders {
OwnedHeaders::new_with_capacity(5)
}
pub fn new_with_capacity(initial_capacity: usize) -> OwnedHeaders {
OwnedHeaders {
ptr: unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_headers_new(initial_capacity)).unwrap()
},
}
}
pub fn add<V: ToBytes + ?Sized>(self, name: &str, value: &V) -> OwnedHeaders {
let name_cstring = CString::new(name.to_owned()).unwrap();
let value_bytes = value.to_bytes();
let err = unsafe {
rdsys::rd_kafka_header_add(
self.ptr(),
name_cstring.as_ptr(),
name_cstring.as_bytes().len() as isize,
value_bytes.as_ptr() as *mut c_void,
value_bytes.len() as isize,
)
};
assert!(!err.is_error());
self
}
pub(crate) fn ptr(&self) -> *mut RDKafkaHeaders {
self.ptr.ptr()
}
pub fn as_borrowed(&self) -> &BorrowedHeaders {
unsafe { &*(self.ptr() as *mut RDKafkaHeaders as *mut BorrowedHeaders) }
}
}
impl Default for OwnedHeaders {
fn default() -> OwnedHeaders {
OwnedHeaders::new()
}
}
impl Headers for OwnedHeaders {
fn count(&self) -> usize {
unsafe { rdsys::rd_kafka_header_cnt(self.ptr()) }
}
fn get(&self, idx: usize) -> Option<(&str, &[u8])> {
self.as_borrowed().get(idx)
}
}
impl Clone for OwnedHeaders {
fn clone(&self) -> Self {
OwnedHeaders {
ptr: unsafe { NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.ptr())).unwrap() },
}
}
}
#[derive(Debug, Clone)]
pub struct OwnedMessage {
payload: Option<Vec<u8>>,
key: Option<Vec<u8>>,
topic: String,
timestamp: Timestamp,
partition: i32,
offset: i64,
headers: Option<OwnedHeaders>,
}
impl OwnedMessage {
pub fn new(
payload: Option<Vec<u8>>,
key: Option<Vec<u8>>,
topic: String,
timestamp: Timestamp,
partition: i32,
offset: i64,
headers: Option<OwnedHeaders>,
) -> OwnedMessage {
OwnedMessage {
payload,
key,
topic,
timestamp,
partition,
offset,
headers,
}
}
}
impl Message for OwnedMessage {
type Headers = OwnedHeaders;
fn key(&self) -> Option<&[u8]> {
match self.key {
Some(ref k) => Some(k.as_slice()),
None => None,
}
}
fn payload(&self) -> Option<&[u8]> {
match self.payload {
Some(ref p) => Some(p.as_slice()),
None => None,
}
}
fn topic(&self) -> &str {
self.topic.as_ref()
}
fn partition(&self) -> i32 {
self.partition
}
fn offset(&self) -> i64 {
self.offset
}
fn timestamp(&self) -> Timestamp {
self.timestamp
}
fn headers(&self) -> Option<&OwnedHeaders> {
self.headers.as_ref()
}
}
pub type DeliveryResult<'a> = Result<BorrowedMessage<'a>, (KafkaError, BorrowedMessage<'a>)>;
pub trait FromBytes {
type Error;
fn from_bytes(_: &[u8]) -> Result<&Self, Self::Error>;
}
impl FromBytes for [u8] {
type Error = ();
fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
Ok(bytes)
}
}
impl FromBytes for str {
type Error = str::Utf8Error;
fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
str::from_utf8(bytes)
}
}
pub trait ToBytes {
fn to_bytes(&self) -> &[u8];
}
impl ToBytes for [u8] {
fn to_bytes(&self) -> &[u8] {
self
}
}
impl ToBytes for str {
fn to_bytes(&self) -> &[u8] {
self.as_bytes()
}
}
impl ToBytes for Vec<u8> {
fn to_bytes(&self) -> &[u8] {
self.as_slice()
}
}
impl ToBytes for String {
fn to_bytes(&self) -> &[u8] {
self.as_bytes()
}
}
impl<'a, T: ToBytes> ToBytes for &'a T {
fn to_bytes(&self) -> &[u8] {
(*self).to_bytes()
}
}
impl ToBytes for () {
fn to_bytes(&self) -> &[u8] {
&[]
}
}
macro_rules! array_impls {
($($N:expr)+) => {
$(
impl ToBytes for [u8; $N] {
fn to_bytes(&self) -> &[u8] { self }
}
)+
}
}
array_impls! {
0 1 2 3 4 5 6 7 8 9
10 11 12 13 14 15 16 17 18 19
20 21 22 23 24 25 26 27 28 29
30 31 32
}
#[cfg(test)]
mod test {
use super::*;
use std::time::SystemTime;
#[test]
fn test_timestamp_creation() {
let now = SystemTime::now();
let t1 = Timestamp::now();
let t2 = Timestamp::from(now);
let expected = Timestamp::CreateTime(util::millis_to_epoch(now));
assert_eq!(t2, expected);
assert!(t1.to_millis().unwrap() - t2.to_millis().unwrap() < 10);
}
#[test]
fn test_timestamp_conversion() {
assert_eq!(Timestamp::CreateTime(100).to_millis(), Some(100));
assert_eq!(Timestamp::LogAppendTime(100).to_millis(), Some(100));
assert_eq!(Timestamp::CreateTime(-1).to_millis(), None);
assert_eq!(Timestamp::LogAppendTime(-1).to_millis(), None);
assert_eq!(Timestamp::NotAvailable.to_millis(), None);
let t: Timestamp = 100.into();
assert_eq!(t, Timestamp::CreateTime(100));
}
#[test]
fn test_headers() {
let owned = OwnedHeaders::new()
.add("key1", "value1")
.add("key2", "value2");
assert_eq!(
owned.get(0),
Some(("key1", &[118, 97, 108, 117, 101, 49][..]))
);
assert_eq!(owned.get_as::<str>(1), Some(("key2", Ok("value2"))));
}
}