use crate::MetaMail;
use crate::util::{
dead_letter, decode_dead_letter, decode_retry_mailbox, group, group_with_no, retry_mailbox,
};
use rdkafka::admin::AdminClient;
use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{BaseConsumer, Consumer, StreamConsumer};
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::metadata::Metadata;
use rdkafka::producer::FutureProducer;
use rdkafka::{ClientConfig, TopicPartitionList};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::time::Duration;
#[derive(Clone)]
pub struct Builder {
brokers: Arc<Vec<String>>,
inner: Arc<RwLock<Inner>>,
}
struct Inner {
client: Option<Arc<AdminClient<DefaultClientContext>>>,
producer: Option<FutureProducer>,
socket_timeout: i32,
request_timeout: i32,
connection_timeout: i32,
}
impl Builder {
pub fn new<V: AsRef<str>>(brokers: &[V]) -> Self {
Builder {
brokers: Arc::new(brokers.iter().map(|b| b.as_ref().to_string()).collect()),
inner: Arc::new(RwLock::new(Inner {
client: None,
producer: None,
socket_timeout: 10000,
request_timeout: 10000,
connection_timeout: 10000,
})),
}
}
pub fn socket_timeout(&self, mut to: i32) -> i32 {
let mut wg = self.inner.write().unwrap();
std::mem::swap(&mut wg.socket_timeout, &mut to);
to
}
pub fn request_timeout(&self, mut to: i32) -> i32 {
let mut wg = self.inner.write().unwrap();
std::mem::swap(&mut wg.request_timeout, &mut to);
to
}
pub fn connection_timeout(&self, mut to: i32) -> i32 {
let mut wg = self.inner.write().unwrap();
std::mem::swap(&mut wg.connection_timeout, &mut to);
to
}
fn get_timeout(&self) -> (i32, i32, i32) {
let g = self.inner.read().unwrap();
(g.socket_timeout, g.request_timeout, g.connection_timeout)
}
fn get_client(&self) -> Result<Arc<AdminClient<DefaultClientContext>>, KafkaError> {
{
let rg = self.inner.read().unwrap();
if rg.client.is_some() {
return Ok(rg.client.clone().unwrap());
}
}
{
let mut wg = self.inner.write().unwrap();
if wg.client.is_none() {
let c: AdminClient<DefaultClientContext> = ClientConfig::new()
.set(
"bootstrap.servers",
self.brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(","),
)
.set("socket.timeout.ms", wg.socket_timeout.to_string())
.set("request.timeout.ms", wg.request_timeout.to_string())
.set(
"socket.connection.setup.timeout.ms",
wg.connection_timeout.to_string(),
)
.create()?;
wg.client = Some(Arc::new(c));
}
Ok(wg.client.clone().unwrap())
}
}
fn get_producer(&self) -> Result<FutureProducer, KafkaError> {
{
let rg = self.inner.read().unwrap();
if rg.producer.is_some() {
return Ok(rg.producer.clone().unwrap());
}
}
{
let mut wg = self.inner.write().unwrap();
if wg.producer.is_none() {
let p: FutureProducer = ClientConfig::new()
.set("retries", "3")
.set(
"bootstrap.servers",
self.brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(","),
)
.set("socket.timeout.ms", wg.socket_timeout.to_string())
.set("request.timeout.ms", wg.request_timeout.to_string())
.set(
"socket.connection.setup.timeout.ms",
wg.connection_timeout.to_string(),
)
.set("acks", "all")
.set("enable.idempotence", "true")
.set("delivery.timeout.ms", "3000")
.set("message.max.bytes", "62464")
.create()?;
wg.producer = Some(p);
}
Ok(wg.producer.clone().unwrap())
}
}
fn create_consumer(&self, group_id: &str) -> Result<StreamConsumer, KafkaError> {
let to = self.get_timeout();
let c: Result<StreamConsumer, KafkaError> = ClientConfig::new()
.set("group.id", group_id)
.set("enable.auto.commit", "false")
.set("enable.partition.eof", "true")
.set("auto.offset.reset", "earliest")
.set("fetch.min.bytes", "1")
.set("fetch.max.bytes", "62464")
.set("fetch.wait.max.ms", "500")
.set("message.max.bytes", "62464")
.set(
"bootstrap.servers",
self.brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(","),
)
.set("socket.timeout.ms", to.0.to_string())
.set("request.timeout.ms", to.1.to_string())
.set("socket.connection.setup.timeout.ms", to.2.to_string())
.create();
c
}
pub fn shared_mailbox<V: Into<String>>(
&self,
name: V,
fixed_replication: i32,
) -> Result<crate::shared::Mailbox, KafkaError> {
let client = self.get_client()?;
Ok(crate::shared::Mailbox::new(
client,
name.into(),
fixed_replication,
))
}
pub fn shared<V: Into<String>>(
&self,
name: V,
fixed_replication: i32,
) -> Result<crate::shared::Mailbox, KafkaError> {
self.shared_mailbox(name, fixed_replication)
}
pub fn exclusive_mailbox<V: Into<String>>(
&self,
name: V,
num_partitions: i32,
fixed_replication: i32,
) -> Result<crate::exclusive::Mailbox, KafkaError> {
let client = self.get_client()?;
Ok(crate::exclusive::Mailbox::new(
client,
name.into(),
num_partitions,
fixed_replication,
))
}
pub fn exclusive<V: Into<String>>(
&self,
name: V,
num_partitions: i32,
fixed_replication: i32,
) -> Result<crate::exclusive::Mailbox, KafkaError> {
self.exclusive_mailbox(name, num_partitions, fixed_replication)
}
pub fn writer(&self) -> Result<crate::write::Writer, KafkaError> {
let producer = self.get_producer()?;
Ok(crate::write::Writer::new(producer))
}
pub fn shared_reader<V: AsRef<str>>(&self, name: V) -> Result<crate::read::Reader, KafkaError> {
let name = name.as_ref();
let dead_letter = dead_letter(name);
let retry_name = retry_mailbox(name);
let group1 = group(name);
let group2 = group(&retry_name);
let consumer = self.create_consumer(&group1)?;
let retry = self.create_consumer(&group2)?;
let producer = self.get_producer()?;
consumer.subscribe(&[&name])?;
retry.subscribe(&[&retry_name])?;
Ok(crate::read::Reader::new(
name.to_string(),
dead_letter,
retry_name,
consumer,
retry,
producer,
))
}
pub fn exclusive_reader<V: AsRef<str>>(
&self,
name: V,
id: i32,
) -> Result<crate::read::Reader, KafkaError> {
let name = name.as_ref();
let dead_letter = dead_letter(name);
let retry_name = retry_mailbox(name);
let group1 = group_with_no(name, id);
let group2 = group_with_no(&retry_name, id);
let consumer = self.create_consumer(&group1)?;
let retry = self.create_consumer(&group2)?;
let producer = self.get_producer()?;
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&name, id);
consumer.assign(&tpl)?;
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&retry_name, id);
retry.assign(&tpl)?;
Ok(crate::read::Reader::new(
name.to_string(),
dead_letter,
retry_name,
consumer,
retry,
producer,
))
}
pub fn all_metas(&self) -> Result<Vec<MetaMail>, KafkaError> {
let metas = self.real_metas()?;
let mut meta_mails = vec![];
for topic in metas.topics() {
let mut partitions = vec![];
for p in topic.partitions() {
partitions.push((p.id(), p.replicas().len()));
}
meta_mails.push(MetaMail {
name: topic.name().to_string(),
partitions,
})
}
Ok(meta_mails)
}
pub fn metas<V: AsRef<str>>(
&self,
names: &[V],
) -> Result<HashMap<String, Vec<MetaMail>>, KafkaError> {
if names.is_empty() {
return Ok(HashMap::new());
}
let metas = self.real_metas()?;
let mut name_set = HashSet::new();
for name in names {
name_set.insert(name.as_ref());
}
let mut meta_mails: HashMap<String, Vec<MetaMail>> = HashMap::new();
for topic in metas.topics() {
let name = {
let name = topic.name();
if name_set.contains(name) {
Some(name)
} else {
let name2 = decode_retry_mailbox(name);
if name_set.contains(name2) {
Some(name2)
} else {
let name3 = decode_dead_letter(name);
if name_set.contains(name3) {
Some(name3)
} else {
None
}
}
}
};
if let Some(name) = name {
let mut partitions = vec![];
for p in topic.partitions() {
partitions.push((p.id(), p.replicas().len()));
}
let mm = MetaMail {
name: topic.name().to_string(),
partitions,
};
match meta_mails.entry(name.to_string()) {
Entry::Occupied(mut e) => {
e.get_mut().push(mm);
}
Entry::Vacant(e) => {
e.insert(vec![mm]);
}
}
}
}
Ok(meta_mails)
}
fn real_metas(&self) -> KafkaResult<Metadata> {
let to = self.get_timeout();
let c: Result<BaseConsumer, KafkaError> = ClientConfig::new()
.set("socket.timeout.ms", to.0.to_string())
.set("request.timeout.ms", to.1.to_string())
.set("socket.connection.setup.timeout.ms", to.2.to_string())
.set(
"bootstrap.servers",
self.brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(","),
)
.create();
let c = c?;
c.fetch_metadata(None, Duration::from_secs(5))
}
}