use rdkafka::client::Client;
use rdkafka::config::{FromClientConfig, FromClientConfigAndContext};
use rdkafka::consumer::ConsumerGroupMetadata;
use rdkafka::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
use rdkafka::message::{DeliveryResult, OwnedMessage, ToBytes};
use rdkafka::producer::{
BaseProducer, BaseRecord, DefaultProducerContext, NoCustomPartitioner, Partitioner, Producer,
ProducerContext, PurgeConfig,
};
use rdkafka::util::Timeout;
use rdkafka::{ClientConfig, ClientContext, TopicPartitionList};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
pub struct HpProducerContext<F> {
context_type: F,
}
impl<F> ClientContext for HpProducerContext<F> where F: Send + Sync {}
impl<F> ProducerContext for HpProducerContext<F>
where
F: Fn(KafkaError, OwnedMessage) + Send + Sync,
{
type DeliveryOpaque = ();
fn delivery(
&self,
delivery_result: &DeliveryResult<'_>,
_delivery_opaque: Self::DeliveryOpaque,
) {
match delivery_result {
Ok(_) => {}
Err((e, record)) => (self.context_type)(e.clone(), record.detach()),
}
}
}
#[must_use = "The High Performance producer will stop immediately if unused"]
pub struct HpProducer<C, Part: Partitioner = NoCustomPartitioner>
where
C: ProducerContext<Part> + 'static,
{
producer: Arc<BaseProducer<C, Part>>,
should_stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl FromClientConfig for HpProducer<DefaultProducerContext, NoCustomPartitioner> {
fn from_config(config: &ClientConfig) -> KafkaResult<HpProducer<DefaultProducerContext>> {
HpProducer::from_config_and_context(config, DefaultProducerContext)
}
}
impl<C, Part> FromClientConfigAndContext<C> for HpProducer<C, Part>
where
Part: Partitioner + Send + Sync + 'static,
C: ProducerContext<Part> + 'static,
{
fn from_config_and_context(
config: &ClientConfig,
context: C,
) -> KafkaResult<HpProducer<C, Part>> {
let producer = Arc::new(BaseProducer::from_config_and_context(config, context)?);
let should_stop = Arc::new(AtomicBool::new(false));
let thread = {
let producer = Arc::clone(&producer);
let should_stop = should_stop.clone();
thread::Builder::new()
.name("producer polling thread".to_string())
.spawn(move || {
trace!("Polling thread loop started");
loop {
let n = producer.poll(Duration::from_millis(100));
if n == 0 {
if should_stop.load(Ordering::Relaxed) {
break;
}
} else {
trace!("Received {} events", n);
}
}
trace!("Polling thread loop terminated");
})
.expect("Failed to start polling thread")
};
Ok(HpProducer {
producer,
should_stop,
handle: Some(thread),
})
}
}
impl<C, Part> HpProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
pub async fn send<'a, K, P, T>(
&self,
record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
queue_timeout: T,
) -> Result<(), (KafkaError, BaseRecord<'a, K, P, C::DeliveryOpaque>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
T: Into<Timeout>,
{
let start_time = Instant::now();
let queue_timeout = queue_timeout.into();
let can_retry = || match queue_timeout {
Timeout::Never => true,
Timeout::After(t) if start_time.elapsed() < t => true,
_ => false,
};
let mut base_record = record;
loop {
match self.producer.send(base_record) {
Err((e, record))
if e == KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
&& can_retry() =>
{
base_record = record;
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(_) => break Ok(()),
Err(e) => {
break Err(e);
}
}
}
}
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
self.producer.poll(timeout);
}
}
impl<C, Part> Producer<C, Part> for HpProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
fn client(&self) -> &Client<C> {
self.producer.client()
}
fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.flush(timeout)
}
fn purge(&self, flags: PurgeConfig) {
self.producer.purge(flags)
}
fn in_flight_count(&self) -> i32 {
self.producer.in_flight_count()
}
fn init_transactions<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.init_transactions(timeout)
}
fn begin_transaction(&self) -> KafkaResult<()> {
self.producer.begin_transaction()
}
fn send_offsets_to_transaction<T: Into<Timeout>>(
&self,
offsets: &TopicPartitionList,
cgm: &ConsumerGroupMetadata,
timeout: T,
) -> KafkaResult<()> {
self.producer
.send_offsets_to_transaction(offsets, cgm, timeout)
}
fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.commit_transaction(timeout)
}
fn abort_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
self.producer.abort_transaction(timeout)
}
}
impl<C, Part> Drop for HpProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part> + 'static,
{
fn drop(&mut self) {
trace!("Destroy HpProducer");
if let Some(handle) = self.handle.take() {
trace!("Stopping polling");
self.should_stop.store(true, Ordering::Relaxed);
trace!("Waiting for polling thread termination");
match handle.join() {
Ok(()) => trace!("Polling stopped"),
Err(e) => warn!("Failure while terminating thread: {:?}", e),
};
}
trace!("HpProducer destroyed");
}
}