use futures::{self, Canceled, Complete, Future, Poll, Oneshot};
use rdsys::rd_kafka_vtype_t::*;
use rdsys::types::*;
use rdsys;
use client::{Client, Context, EmptyContext};
use config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel, TopicConfig};
use error::{KafkaError, KafkaResult, IsError};
use message::ToBytes;
use statistics::Statistics;
use util::cstr_to_owned;
use std::ffi::CString;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
pub trait ProducerContext: Context {
type DeliveryContext: Send + Sync;
fn delivery(&self, DeliveryReport, Self::DeliveryContext);
}
pub struct EmptyProducerContext;
impl Context for EmptyProducerContext { }
impl ProducerContext for EmptyProducerContext {
type DeliveryContext = ();
fn delivery(&self, _: DeliveryReport, _: Self::DeliveryContext) { }
}
#[derive(Debug)]
pub struct DeliveryReport {
error: RDKafkaRespErr,
partition: i32,
offset: i64,
}
impl DeliveryReport {
fn new(err: RDKafkaRespErr, partition: i32, offset: i64) -> DeliveryReport {
DeliveryReport {
error: err,
partition: partition,
offset: offset,
}
}
pub fn result(&self) -> KafkaResult<(i32, i64)> {
if self.error.is_error() {
Err(KafkaError::MessageProduction(self.error))
} else {
Ok((self.partition, self.offset))
}
}
pub fn partition(&self) -> i32 {
self.partition
}
pub fn offset(&self) -> i64 {
self.offset
}
}
unsafe extern "C" fn delivery_cb<C: ProducerContext>(
_client: *mut RDKafka, msg: *const RDKafkaMessage, _opaque: *mut c_void) {
let context = Box::from_raw(_opaque as *mut C);
let delivery_context = Box::from_raw((*msg)._private as *mut C::DeliveryContext);
let delivery_status = DeliveryReport::new((*msg).err, (*msg).partition, (*msg).offset);
trace!("Delivery event received: {:?}", delivery_status);
(*context).delivery(delivery_status, (*delivery_context));
mem::forget(context); }
pub struct BaseProducer<C: ProducerContext> {
client_arc: Arc<Client<C>>,
}
impl<C: ProducerContext> BaseProducer<C> {
fn from_client(client: Client<C>) -> BaseProducer<C> {
BaseProducer { client_arc: Arc::new(client) }
}
pub fn get_topic(&self, name: &str, config: &TopicConfig) -> KafkaResult<BaseProducerTopic<C>> {
BaseProducerTopic::new(self.clone(), name, config)
}
pub fn poll(&self, timeout_ms: i32) -> i32 {
unsafe { rdsys::rd_kafka_poll(self.client_arc.native_ptr(), timeout_ms) }
}
fn native_ptr(&self) -> *mut RDKafka {
self.client_arc.native_ptr()
}
}
impl<C: ProducerContext> Clone for BaseProducer<C> {
fn clone(&self) -> BaseProducer<C> {
BaseProducer { client_arc: self.client_arc.clone() }
}
}
impl FromClientConfig for BaseProducer<EmptyProducerContext> {
fn from_config(config: &ClientConfig) -> KafkaResult<BaseProducer<EmptyProducerContext>> {
BaseProducer::from_config_and_context(config, EmptyProducerContext)
}
}
impl<C: ProducerContext> FromClientConfigAndContext<C> for BaseProducer<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseProducer<C>> {
let native_config = config.create_native_config()?;
unsafe { rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<C>)) };
let client = Client::new(config, native_config, RDKafkaType::RD_KAFKA_PRODUCER, context)?;
Ok(BaseProducer::from_client(client))
}
}
pub struct BaseProducerTopic<C: ProducerContext> {
topic_ptr: *mut RDKafkaTopic,
producer: BaseProducer<C>,
}
impl<C: ProducerContext> BaseProducerTopic<C> {
pub fn new(producer: BaseProducer<C>, name: &str, topic_config: &TopicConfig)
-> KafkaResult<BaseProducerTopic<C>> {
let name_cstring = CString::new(name.to_string())?;
let native_topic_config = topic_config.create_native_config()?;
let topic_ptr = unsafe {
rdsys::rd_kafka_topic_new(producer.native_ptr(), name_cstring.as_ptr(), native_topic_config.ptr_move())
};
if topic_ptr.is_null() {
Err(KafkaError::TopicCreation(name.to_owned()))
} else {
Ok(BaseProducerTopic { topic_ptr: topic_ptr, producer: producer })
}
}
pub fn name(&self) -> String {
unsafe {
cstr_to_owned(rdsys::rd_kafka_topic_name(self.topic_ptr))
}
}
pub fn send_copy<P, K>(
&self,
partition: Option<i32>,
payload: Option<&P>,
key: Option<&K>,
delivery_context: Option<Box<C::DeliveryContext>>,
timestamp: Option<i64>
) -> KafkaResult<()>
where K: ToBytes,
P: ToBytes {
let (payload_ptr, payload_len) = match payload.map(P::to_bytes) {
None => (ptr::null_mut(), 0),
Some(p) => (p.as_ptr() as *mut c_void, p.len()),
};
let (key_ptr, key_len) = match key.map(K::to_bytes) {
None => (ptr::null_mut(), 0),
Some(k) => (k.as_ptr() as *mut c_void, k.len()),
};
let delivery_context_ptr = match delivery_context {
Some(context) => Box::into_raw(context) as *mut c_void,
None => ptr::null_mut(),
};
let timestamp = match timestamp {
Some(t) => t,
None => 0
};
let produce_error = unsafe {
rdsys::rd_kafka_producev(
self.producer.native_ptr(),
RD_KAFKA_VTYPE_RKT, self.topic_ptr,
RD_KAFKA_VTYPE_PARTITION, partition.unwrap_or(-1),
RD_KAFKA_VTYPE_MSGFLAGS, rdsys::RD_KAFKA_MSG_F_COPY as i32,
RD_KAFKA_VTYPE_VALUE, payload_ptr, payload_len,
RD_KAFKA_VTYPE_KEY, key_ptr, key_len,
RD_KAFKA_VTYPE_OPAQUE, delivery_context_ptr,
RD_KAFKA_VTYPE_TIMESTAMP, timestamp,
RD_KAFKA_VTYPE_END
)
};
if produce_error.is_error() {
Err(KafkaError::MessageProduction(produce_error))
} else {
Ok(())
}
}
}
impl<C: ProducerContext> Drop for BaseProducerTopic<C> {
fn drop(&mut self) {
trace!("Destroy BaseProducerTopic");
unsafe {
rdsys::rd_kafka_topic_destroy(self.topic_ptr);
}
}
}
pub struct FutureProducerContext<C: Context + 'static> {
wrapped_context: C
}
impl<C: Context + 'static> Context for FutureProducerContext<C> {
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
self.wrapped_context.log(level, fac, log_message);
}
fn stats(&self, json: Statistics) {
self.wrapped_context.stats(json);
}
}
impl<C: Context + 'static> ProducerContext for FutureProducerContext<C> {
type DeliveryContext = Complete<DeliveryReport>;
fn delivery(&self, status: DeliveryReport, tx: Complete<DeliveryReport>) {
tx.complete(status);
}
}
#[must_use = "Producer polling thread will stop immediately if unused"]
struct _FutureProducer<C: Context + 'static> {
producer: BaseProducer<FutureProducerContext<C>>,
should_stop: Arc<AtomicBool>,
handle: RwLock<Option<JoinHandle<()>>>, }
impl<C: Context + 'static> _FutureProducer<C> {
fn start(&self) {
let producer_clone = self.producer.clone();
let should_stop = self.should_stop.clone();
let handle = thread::Builder::new()
.name("polling thread".to_string())
.spawn(move || {
trace!("Polling thread loop started");
while !should_stop.load(Ordering::Relaxed) {
let n = producer_clone.poll(100);
if n != 0 {
trace!("Received {} events", n);
}
}
trace!("Polling thread loop terminated");
})
.expect("Failed to start polling thread");
match self.handle.write() {
Ok(mut handle_ref) => *handle_ref = Some(handle),
Err(_) => panic!("Poison error"),
};
}
fn stop(&self) {
match self.handle.write() {
Ok(mut handle) => {
if handle.is_some() {
trace!("Stopping polling");
self.should_stop.store(true, Ordering::Relaxed);
trace!("Waiting for polling thread termination");
match handle.take().expect("No handle present in producer context").join() {
Ok(()) => trace!("Polling stopped"),
Err(e) => warn!("Failure while terminating thread: {:?}", e),
};
}
},
Err(_) => panic!("Poison error"),
};
}
}
impl<C: Context + 'static> Drop for _FutureProducer<C> {
fn drop(&mut self) {
trace!("Destroy _FutureProducer");
self.stop();
}
}
#[must_use = "Producer polling thread will stop immediately if unused"]
pub struct FutureProducer<C: Context + 'static> {
inner: Arc<_FutureProducer<C>>,
}
impl<C: Context + 'static> FutureProducer<C> {
fn base_producer(&self) -> BaseProducer<FutureProducerContext<C>> {
self.inner.producer.clone()
}
}
impl<C: Context + 'static> Clone for FutureProducer<C> {
fn clone(&self) -> FutureProducer<C> {
FutureProducer { inner: self.inner.clone() }
}
}
impl FromClientConfig for FutureProducer<EmptyContext> {
fn from_config(config: &ClientConfig) -> KafkaResult<FutureProducer<EmptyContext>> {
FutureProducer::from_config_and_context(config, EmptyContext)
}
}
impl<C: Context + 'static> FromClientConfigAndContext<C> for FutureProducer<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<FutureProducer<C>> {
let future_context = FutureProducerContext { wrapped_context: context};
let inner = _FutureProducer {
producer: BaseProducer::from_config_and_context(config, future_context)?,
should_stop: Arc::new(AtomicBool::new(false)),
handle: RwLock::new(None),
};
Ok(FutureProducer { inner: Arc::new(inner) })
}
}
pub struct DeliveryFuture {
rx: Oneshot<DeliveryReport>,
}
impl Future for DeliveryFuture {
type Item = DeliveryReport;
type Error = Canceled;
fn poll(&mut self) -> Poll<DeliveryReport, Canceled> {
self.rx.poll()
}
}
impl<C: Context + 'static> FutureProducer<C> {
pub fn get_topic(&self, name: &str, config: &TopicConfig) -> KafkaResult<FutureProducerTopic<C>> {
FutureProducerTopic::new(self.clone(), name, config)
}
pub fn start(&self) {
self.inner.start();
}
pub fn stop(&self) {
self.inner.stop();
}
}
pub struct FutureProducerTopic<C: Context + 'static> {
topic: BaseProducerTopic<FutureProducerContext<C>>,
}
impl<C: Context + 'static> FutureProducerTopic<C> {
pub fn new(producer: FutureProducer<C>, name: &str, topic_config: &TopicConfig)
-> KafkaResult<FutureProducerTopic<C>> {
let producer_topic = BaseProducerTopic::new(producer.base_producer(), name, topic_config)?;
Ok(FutureProducerTopic {topic : producer_topic})
}
pub fn send_copy<P, K>(
&self,
partition: Option<i32>,
payload: Option<&P>,
key: Option<&K>,
timestamp: Option<i64>
) -> KafkaResult<DeliveryFuture>
where K: ToBytes,
P: ToBytes {
let (tx, rx) = futures::oneshot();
self.topic.send_copy(partition, payload, key, Some(Box::new(tx)), timestamp)?;
Ok(DeliveryFuture{rx: rx})
}
pub fn name(&self) -> String {
self.topic.name()
}
}
#[cfg(test)]
mod tests {
use super::*;
use config::{ClientConfig, TopicConfig};
#[test]
fn test_base_producer_topic() {
let producer = ClientConfig::new().create::<BaseProducer<_>>().unwrap();
let producer_topic = producer.get_topic("topic_name", &TopicConfig::new()).unwrap();
assert_eq!(producer_topic.name(), "topic_name");
}
#[test]
fn test_future_producer_topic() {
let producer = ClientConfig::new().create::<FutureProducer<_>>().unwrap();
let producer_topic = producer.get_topic("topic_name", &TopicConfig::new()).unwrap();
assert_eq!(producer_topic.name(), "topic_name");
}
}