use rdkafka::error::KafkaError;
use rdkafka::config::ClientConfig;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, StreamConsumer};
pub fn new_consumer(
brokers: String,
group_id: &str,
topics: Vec<&str>,
) -> Result<StreamConsumer, KafkaError> {
log::info!(
"Kafka-subscribing: brokers={}, group_id={}, topics={:?}",
brokers,
group_id,
topics
);
let stream_consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", &brokers)
.set("auto.offset.reset", "latest")
.set("enable.partition.eof", "true")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set_log_level(RDKafkaLogLevel::Debug)
.create()?;
stream_consumer.subscribe(topics.to_vec().as_slice())?;
Ok(stream_consumer)
}