use rdkafka::admin::AdminClient;
use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::producer::FutureProducer;
use rdkafka::{ClientConfig, TopicPartitionList};
use std::sync::{Arc, RwLock};
#[derive(Clone)]
pub struct Builder {
brokers: Arc<Vec<String>>,
inner: Arc<RwLock<Inner>>,
}
struct Inner {
client: Option<Arc<AdminClient<DefaultClientContext>>>,
producer: Option<FutureProducer>,
socket_timeout: i32,
request_timeout: i32,
}
impl Builder {
pub fn new<V: AsRef<str>>(brokers: &[V]) -> Self {
Builder {
brokers: Arc::new(brokers.iter().map(|b| b.as_ref().to_string()).collect()),
inner: Arc::new(RwLock::new(Inner {
client: None,
producer: None,
socket_timeout: 10000,
request_timeout: 10000,
})),
}
}
pub fn socket_timeout(&mut self, mut to: i32) -> i32 {
let mut wg = self.inner.write().unwrap();
std::mem::swap(&mut wg.socket_timeout, &mut to);
to
}
pub fn request_timeout(&mut self, mut to: i32) -> i32 {
let mut wg = self.inner.write().unwrap();
std::mem::swap(&mut wg.request_timeout, &mut to);
to
}
fn get_timeout(&self) -> (i32, i32) {
let g = self.inner.read().unwrap();
(g.socket_timeout, g.request_timeout)
}
fn get_client(&self) -> Result<Arc<AdminClient<DefaultClientContext>>, KafkaError> {
{
let rg = self.inner.read().unwrap();
if rg.client.is_some() {
return Ok(rg.client.clone().unwrap());
}
}
{
let mut wg = self.inner.write().unwrap();
if wg.client.is_none() {
let c: AdminClient<DefaultClientContext> = ClientConfig::new()
.set(
"bootstrap.servers",
self.brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(","),
)
.set("socket.timeout.ms", wg.socket_timeout.to_string())
.set("request.timeout.ms", wg.request_timeout.to_string())
.create()?;
wg.client = Some(Arc::new(c));
}
Ok(wg.client.clone().unwrap())
}
}
fn get_producer(&self) -> Result<FutureProducer, KafkaError> {
{
let rg = self.inner.read().unwrap();
if rg.producer.is_some() {
return Ok(rg.producer.clone().unwrap());
}
}
{
let mut wg = self.inner.write().unwrap();
if wg.producer.is_none() {
let p: FutureProducer = ClientConfig::new()
.set("retries", "3")
.set(
"bootstrap.servers",
self.brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(","),
)
.set("socket.timeout.ms", wg.socket_timeout.to_string())
.set("request.timeout.ms", wg.request_timeout.to_string())
.set("acks", "all")
.set("enable.idempotence", "true")
.set("delivery.timeout.ms", "3000")
.set("message.max.bytes", "62464")
.create()?;
wg.producer = Some(p);
}
Ok(wg.producer.clone().unwrap())
}
}
fn create_consumer(&self, group_id: &str) -> Result<StreamConsumer, KafkaError> {
let to = self.get_timeout();
let c: Result<StreamConsumer, KafkaError> = ClientConfig::new()
.set("group.id", group_id)
.set("enable.auto.commit", "false")
.set("enable.partition.eof", "true")
.set("auto.offset.reset", "earliest")
.set("fetch.min.bytes", "1")
.set("fetch.max.bytes", "62464")
.set("fetch.wait.max.ms", "500")
.set("message.max.bytes", "62464")
.set(
"bootstrap.servers",
self.brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(","),
)
.set("socket.timeout.ms", to.0.to_string())
.set("request.timeout.ms", to.1.to_string())
.create();
c
}
pub fn shared_mailbox<V: Into<String>>(
&self,
name: V,
fixed_replication: i32,
) -> Result<crate::shared::Mailbox, KafkaError> {
let client = self.get_client()?;
Ok(crate::shared::Mailbox::new(
client,
name.into(),
fixed_replication,
))
}
pub fn shared<V: Into<String>>(
&self,
name: V,
fixed_replication: i32,
) -> Result<crate::shared::Mailbox, KafkaError> {
self.shared_mailbox(name, fixed_replication)
}
pub fn exclusive_mailbox<V: Into<String>>(
&self,
name: V,
num_partitions: i32,
fixed_replication: i32,
) -> Result<crate::exclusive::Mailbox, KafkaError> {
let client = self.get_client()?;
Ok(crate::exclusive::Mailbox::new(
client,
name.into(),
num_partitions,
fixed_replication,
))
}
pub fn exclusive<V: Into<String>>(
&self,
name: V,
num_partitions: i32,
fixed_replication: i32,
) -> Result<crate::exclusive::Mailbox, KafkaError> {
self.exclusive_mailbox(name, num_partitions, fixed_replication)
}
pub fn writer(&self) -> Result<crate::write::Writer, KafkaError> {
let producer = self.get_producer()?;
Ok(crate::write::Writer::new(producer))
}
pub fn shared_reader<V: Into<String>>(
&self,
name: V,
) -> Result<crate::read::Reader, KafkaError> {
let name = name.into();
let dead_letter = crate::dead_letter(name.clone());
let retry_name = crate::retry_mailbox(name.clone());
let group1 = crate::group(name.clone());
let group2 = crate::group(retry_name.clone());
let consumer = self.create_consumer(&group1)?;
let retry = self.create_consumer(&group2)?;
let producer = self.get_producer()?;
consumer.subscribe(&[&name])?;
retry.subscribe(&[&retry_name])?;
Ok(crate::read::Reader::new(
name,
dead_letter,
retry_name,
consumer,
retry,
producer,
))
}
pub fn exclusive_reader<V: Into<String>>(
&self,
name: V,
id: i32,
) -> Result<crate::read::Reader, KafkaError> {
let name = name.into();
let dead_letter = crate::dead_letter(name.clone());
let retry_name = crate::retry_mailbox(name.clone());
let group1 = crate::group_with_no(name.clone(), id);
let group2 = crate::group_with_no(retry_name.clone(), id);
let consumer = self.create_consumer(&group1)?;
let retry = self.create_consumer(&group2)?;
let producer = self.get_producer()?;
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&name, id);
consumer.assign(&tpl)?;
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&retry_name, id);
retry.assign(&tpl)?;
Ok(crate::read::Reader::new(
name,
dead_letter,
retry_name,
consumer,
retry,
producer,
))
}
}