use std::collections::HashMap;
use crate::Config;
use crate::config::OffsetReset;
pub struct Consumer {
inner: rdkafka::consumer::StreamConsumer,
}
impl Consumer {
pub fn build(config: &Config) -> Result<Consumer, rdkafka::error::KafkaError> {
let consumer: rdkafka::consumer::StreamConsumer = config.inner.create()?;
Ok(Consumer { inner: consumer })
}
pub fn consumer(&self) -> &rdkafka::consumer::StreamConsumer {
&self.inner
}
pub fn build_with_default(config: &Config) -> Result<Consumer, rdkafka::error::KafkaError> {
let mut config = config.clone();
config
.set_partition_eof(true)
.set_auto_commit(true)
.set_offset_reset(OffsetReset::Earliest);
Self::build(&config)
}
pub fn simple_subscribe(&self, topic: &str) -> Result<(), rdkafka::error::KafkaError> {
self.subscribe(&[topic])
}
pub fn subscribe(&self, topics: &[&str]) -> Result<(), rdkafka::error::KafkaError> {
rdkafka::consumer::Consumer::subscribe(&self.inner, topics)
}
pub fn unsubscribe(&self) {
rdkafka::consumer::Consumer::unsubscribe(&self.inner)
}
pub fn simple_assign(
&self,
topics: HashMap<&str, i32>,
) -> Result<(), rdkafka::error::KafkaError> {
let mut tpl = rdkafka::TopicPartitionList::new();
for (k, v) in topics {
tpl.add_partition(k, v);
}
rdkafka::consumer::Consumer::assign(&self.inner, &tpl)
}
pub async fn recv(
&self,
) -> Result<rdkafka::message::BorrowedMessage<'_>, rdkafka::error::KafkaError> {
self.inner.recv().await
}
}