use std::ffi::{CStr, CString};
use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::slice;
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use rdkafka_sys as rdsys;
use rdkafka_sys::rd_kafka_vtype_t::*;
use rdkafka_sys::types::*;
use crate::client::{Client, EventPollResult, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::consumer::ConsumerGroupMetadata;
use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
use crate::log::{trace, warn};
use crate::message::{BorrowedMessage, OwnedHeaders, ToBytes};
use crate::producer::{
DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
};
use crate::topic_partition_list::TopicPartitionList;
use crate::util::{Deadline, IntoOpaque, NativePtr, Timeout};
pub use crate::message::DeliveryResult;
use super::NoCustomPartitioner;
#[derive(Debug)]
pub struct BaseRecord<'a, K: ToBytes + ?Sized = (), P: ToBytes + ?Sized = (), D: IntoOpaque = ()> {
pub topic: &'a str,
pub partition: Option<i32>,
pub payload: Option<&'a P>,
pub key: Option<&'a K>,
pub timestamp: Option<i64>,
pub headers: Option<OwnedHeaders>,
pub delivery_opaque: D,
}
impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a, K, P, D> {
pub fn with_opaque_to(topic: &'a str, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
BaseRecord {
topic,
partition: None,
payload: None,
key: None,
timestamp: None,
headers: None,
delivery_opaque,
}
}
pub fn partition(mut self, partition: i32) -> BaseRecord<'a, K, P, D> {
self.partition = Some(partition);
self
}
pub fn payload(mut self, payload: &'a P) -> BaseRecord<'a, K, P, D> {
self.payload = Some(payload);
self
}
pub fn key(mut self, key: &'a K) -> BaseRecord<'a, K, P, D> {
self.key = Some(key);
self
}
pub fn timestamp(mut self, timestamp: i64) -> BaseRecord<'a, K, P, D> {
self.timestamp = Some(timestamp);
self
}
pub fn headers(mut self, headers: OwnedHeaders) -> BaseRecord<'a, K, P, D> {
self.headers = Some(headers);
self
}
pub fn topic(mut self, topic: &'a str) -> BaseRecord<'a, K, P, D> {
self.topic = topic;
self
}
pub fn delivery_opaque(mut self, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
self.delivery_opaque = delivery_opaque;
self
}
}
impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> {
pub fn to(topic: &'a str) -> BaseRecord<'a, K, P, ()> {
BaseRecord {
topic,
partition: None,
payload: None,
key: None,
timestamp: None,
headers: None,
delivery_opaque: (),
}
}
}
unsafe extern "C" fn partitioner_cb<Part: Partitioner, C: ProducerContext<Part>>(
topic: *const RDKafkaTopic,
keydata: *const c_void,
keylen: usize,
partition_cnt: i32,
rkt_opaque: *mut c_void,
_msg_opaque: *mut c_void,
) -> i32 {
let topic_name = CStr::from_ptr(rdsys::rd_kafka_topic_name(topic));
let topic_name = str::from_utf8_unchecked(topic_name.to_bytes());
let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1;
let key = if keydata.is_null() {
None
} else {
Some(slice::from_raw_parts(keydata as *const u8, keylen))
};
let producer_context = &mut *(rkt_opaque as *mut C);
producer_context
.get_custom_partitioner()
.expect("custom partitioner is not set")
.partition(topic_name, key, partition_cnt, is_partition_available)
}
impl FromClientConfig for BaseProducer<DefaultProducerContext> {
fn from_config(config: &ClientConfig) -> KafkaResult<BaseProducer<DefaultProducerContext>> {
BaseProducer::from_config_and_context(config, DefaultProducerContext)
}
}
impl<C, Part> FromClientConfigAndContext<C> for BaseProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part>,
{
fn from_config_and_context(
config: &ClientConfig,
context: C,
) -> KafkaResult<BaseProducer<C, Part>> {
let native_config = config.create_native_config()?;
let context = Arc::new(context);
if context.get_custom_partitioner().is_some() {
let default_topic_config =
unsafe { rdsys::rd_kafka_conf_get_default_topic_conf(native_config.ptr()) };
unsafe {
rdsys::rd_kafka_topic_conf_set_opaque(
default_topic_config,
Arc::as_ptr(&context) as *mut c_void,
)
};
unsafe {
rdsys::rd_kafka_topic_conf_set_partitioner_cb(
default_topic_config,
Some(partitioner_cb::<Part, C>),
)
}
}
unsafe {
rdsys::rd_kafka_conf_set_events(
native_config.ptr(),
rdsys::RD_KAFKA_EVENT_DR
| rdsys::RD_KAFKA_EVENT_STATS
| rdsys::RD_KAFKA_EVENT_ERROR
| rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH,
)
};
let client = Client::new_context_arc(
config,
native_config,
RDKafkaType::RD_KAFKA_PRODUCER,
context,
)?;
Ok(BaseProducer::from_client(client))
}
}
pub struct BaseProducer<C = DefaultProducerContext, Part = NoCustomPartitioner>
where
Part: Partitioner,
C: ProducerContext<Part>,
{
client: Client<C>,
queue: NativeQueue,
_partitioner: PhantomData<Part>,
}
impl<C, Part> BaseProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part>,
{
fn from_client(client: Client<C>) -> BaseProducer<C, Part> {
let queue = client.main_queue();
BaseProducer {
client,
queue,
_partitioner: PhantomData,
}
}
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
let deadline: Deadline = timeout.into().into();
loop {
let event = self.client().poll_event(&self.queue, &deadline);
if let EventPollResult::Event(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
rdsys::RD_KAFKA_EVENT_ERROR => self.handle_error_event(ev),
_ => {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(ev.ptr());
CStr::from_ptr(evname).to_string_lossy()
};
warn!("Ignored event '{}' on base producer poll", evname);
}
}
}
if deadline.elapsed() {
break;
}
}
}
fn handle_delivery_report_event(&self, event: NativePtr<RDKafkaEvent>) {
let max_messages = unsafe { rdsys::rd_kafka_event_message_count(event.ptr()) };
let messages: Vec<*const RDKafkaMessage> = Vec::with_capacity(max_messages);
let mut messages = mem::ManuallyDrop::new(messages);
let messages = unsafe {
let msgs_cnt = rdsys::rd_kafka_event_message_array(
event.ptr(),
messages.as_mut_ptr(),
max_messages,
);
Vec::from_raw_parts(messages.as_mut_ptr(), msgs_cnt, max_messages)
};
let ev = Arc::new(event);
for msg in messages {
let delivery_result =
unsafe { BorrowedMessage::from_dr_event(msg as *mut _, ev.clone(), self.client()) };
let delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr((*msg)._private) };
self.context().delivery(&delivery_result, delivery_opaque);
}
}
fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
let error = KafkaError::Global(rdkafka_err.into());
let reason = unsafe {
CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy()
};
self.context().error(error, reason.trim());
}
fn native_ptr(&self) -> *mut RDKafka {
self.client.native_ptr()
}
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
fn as_bytes(opt: Option<&(impl ?Sized + ToBytes)>) -> (*mut c_void, usize) {
match opt.map(ToBytes::to_bytes) {
None => (ptr::null_mut(), 0),
Some(p) => (p.as_ptr() as *mut c_void, p.len()),
}
}
let (payload_ptr, payload_len) = as_bytes(record.payload);
let (key_ptr, key_len) = as_bytes(record.key);
let topic_cstring = match CString::new(record.topic) {
Ok(topic) => topic,
Err(nul_error) => return Err((nul_error.into(), record)),
};
let opaque_ptr = record.delivery_opaque.into_ptr();
let produce_error = unsafe {
rdsys::rd_kafka_producev(
self.native_ptr(),
RD_KAFKA_VTYPE_TOPIC,
topic_cstring.as_ptr(),
RD_KAFKA_VTYPE_PARTITION,
record.partition.unwrap_or(-1),
RD_KAFKA_VTYPE_MSGFLAGS,
rdsys::RD_KAFKA_MSG_F_COPY,
RD_KAFKA_VTYPE_VALUE,
payload_ptr,
payload_len,
RD_KAFKA_VTYPE_KEY,
key_ptr,
key_len,
RD_KAFKA_VTYPE_OPAQUE,
opaque_ptr,
RD_KAFKA_VTYPE_TIMESTAMP,
record.timestamp.unwrap_or(0),
RD_KAFKA_VTYPE_HEADERS,
record
.headers
.as_ref()
.map_or(ptr::null_mut(), OwnedHeaders::ptr),
RD_KAFKA_VTYPE_END,
)
};
if produce_error.is_error() {
record.delivery_opaque = unsafe { C::DeliveryOpaque::from_ptr(opaque_ptr) };
Err((KafkaError::MessageProduction(produce_error.into()), record))
} else {
mem::forget(record.headers);
Ok(())
}
}
}
impl<C, Part> Producer<C, Part> for BaseProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part>,
{
fn client(&self) -> &Client<C> {
&self.client
}
fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
let deadline: Deadline = timeout.into().into();
loop {
match unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) } {
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR_NO_ERROR => {
return Ok(());
}
to @ rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__TIMED_OUT => {
if deadline.elapsed() {
return Err(KafkaError::Flush(to.into()));
}
self.poll(deadline.remaining().min(Duration::from_millis(100)));
}
e => return Err(KafkaError::Flush(e.into())),
};
}
}
fn purge(&self, flags: PurgeConfig) {
let ret = unsafe { rdsys::rd_kafka_purge(self.native_ptr(), flags.flag_bits) };
if ret.is_error() {
panic!(
"According to librdkafka's doc, calling this with valid arguments on a producer \
can only result in a success, but it still failed: {}",
RDKafkaErrorCode::from(ret)
)
}
}
fn in_flight_count(&self) -> i32 {
unsafe { rdsys::rd_kafka_outq_len(self.native_ptr()) }
}
fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_init_transactions(
self.native_ptr(),
timeout.into().as_millis(),
))
};
if ret.is_error() {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
}
}
fn begin_transaction(&self) -> KafkaResult<()> {
let ret =
unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_begin_transaction(self.native_ptr())) };
if ret.is_error() {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
}
}
fn send_offsets_to_transaction<T: Into<Timeout>>(
&self,
offsets: &TopicPartitionList,
cgm: &ConsumerGroupMetadata,
timeout: T,
) -> KafkaResult<()> {
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_send_offsets_to_transaction(
self.native_ptr(),
offsets.ptr(),
cgm.ptr(),
timeout.into().as_millis(),
))
};
if ret.is_error() {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
}
}
fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
let timeout = timeout.into();
self.flush(timeout)?;
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction(
self.native_ptr(),
timeout.as_millis(),
))
};
if ret.is_error() {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
}
}
fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_abort_transaction(
self.native_ptr(),
timeout.into().as_millis(),
))
};
if ret.is_error() {
Err(KafkaError::Transaction(ret))
} else {
Ok(())
}
}
}
impl<C, Part: Partitioner> Drop for BaseProducer<C, Part>
where
C: ProducerContext<Part>,
{
fn drop(&mut self) {
self.purge(PurgeConfig::default().queue().inflight());
if let Err(err) = self.flush(Timeout::After(Duration::from_millis(500))) {
warn!(
"Failed to flush outstanding messages while dropping the producer: {:?}",
err
);
}
}
}
#[must_use = "The threaded producer will stop immediately if unused"]
pub struct ThreadedProducer<C, Part: Partitioner = NoCustomPartitioner>
where
C: ProducerContext<Part> + 'static,
{
producer: Arc<BaseProducer<C, Part>>,
should_stop: Arc<AtomicBool>,
handle: Option<Arc<JoinHandle<()>>>,
}
impl FromClientConfig for ThreadedProducer<DefaultProducerContext, NoCustomPartitioner> {
fn from_config(config: &ClientConfig) -> KafkaResult<ThreadedProducer<DefaultProducerContext>> {
ThreadedProducer::from_config_and_context(config, DefaultProducerContext)
}
}
impl<C, Part> FromClientConfigAndContext<C> for ThreadedProducer<C, Part>
where
Part: Partitioner + Send + Sync + 'static,
C: ProducerContext<Part> + 'static,
{
fn from_config_and_context(
config: &ClientConfig,
context: C,
) -> KafkaResult<ThreadedProducer<C, Part>> {
let producer = Arc::new(BaseProducer::from_config_and_context(config, context)?);
let should_stop = Arc::new(AtomicBool::new(false));
let thread = {
let producer = Arc::clone(&producer);
let should_stop = should_stop.clone();
thread::Builder::new()
.name("producer polling thread".to_string())
.spawn(move || {
trace!("Polling thread loop started");
loop {
producer.poll(Duration::from_millis(100));
if should_stop.load(Ordering::Relaxed) {
break;
}
}
trace!("Polling thread loop terminated");
})
.expect("Failed to start polling thread")
};
Ok(ThreadedProducer {
producer,
should_stop,
handle: Some(Arc::new(thread)),
})
}
}
impl<C, Part> ThreadedProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
#[allow(clippy::result_large_err)]
pub fn send<'a, K, P>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
self.producer.send(record)
}
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
self.producer.poll(timeout);
}
}
impl<C, Part> Producer<C, Part> for ThreadedProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
fn client(&self) -> &Client<C> {
self.producer.client()
}
fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.flush(timeout)
}
fn purge(&self, flags: PurgeConfig) {
self.producer.purge(flags)
}
fn in_flight_count(&self) -> i32 {
self.producer.in_flight_count()
}
fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.init_transactions(timeout)
}
fn begin_transaction(&self) -> KafkaResult<()> {
self.producer.begin_transaction()
}
fn send_offsets_to_transaction<T: Into<Timeout>>(
&self,
offsets: &TopicPartitionList,
cgm: &ConsumerGroupMetadata,
timeout: T,
) -> KafkaResult<()> {
self.producer
.send_offsets_to_transaction(offsets, cgm, timeout)
}
fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.commit_transaction(timeout)
}
fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.abort_transaction(timeout)
}
}
impl<C: ProducerContext + 'static> Clone for ThreadedProducer<C> {
fn clone(&self) -> Self {
Self {
producer: Arc::clone(&self.producer),
should_stop: Arc::clone(&self.should_stop),
handle: self.handle.clone(),
}
}
}
impl<C, Part> Drop for ThreadedProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
fn drop(&mut self) {
trace!("Destroy ThreadedProducer");
if let Some(handle) = self.handle.take().and_then(Arc::into_inner) {
trace!("Stopping polling");
self.should_stop.store(true, Ordering::Relaxed);
trace!("Waiting for polling thread termination");
match handle.join() {
Ok(()) => trace!("Polling stopped"),
Err(e) => warn!("Failure while terminating thread: {:?}", e),
};
}
trace!("ThreadedProducer destroyed");
}
}