use log::info;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::BaseConsumer;
use crate::config::kafka_client_config::KafkaClientConfig;
pub fn get_kafka_consumer(config: &KafkaClientConfig) -> BaseConsumer {
if config.tls_key.is_empty()
&& config.tls_cert.is_empty()
&& config.tls_ca.is_empty()
{
info!("connecting with PLAINTEXT");
ClientConfig::new()
.set("bootstrap.servers", config.broker_list.join(","))
.set("security.protocol", "PLAINTEXT")
.create()
.expect("Consumer creation error")
} else {
ClientConfig::new()
.set("bootstrap.servers", config.broker_list.join(","))
.set("security.protocol", "SSL")
.set("ssl.ca.location", config.tls_ca.clone())
.set("ssl.key.location", config.tls_key.clone())
.set("ssl.certificate.location", config.tls_cert.clone())
.set("enable.ssl.certificate.verification", "true")
.create()
.expect("Consumer creation error")
}
}