use super::{KafkaConnection, KafkaEvent};
use crate::nodes::{RunParams, produce_async};
use crate::types::*;
use rdkafka::Message;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::types::RDKafkaErrorCode;
use std::rc::Rc;
#[must_use]
pub fn kafka_sub(
connection: KafkaConnection,
topic: impl Into<String>,
group_id: impl Into<String>,
) -> Rc<dyn Stream<Burst<KafkaEvent>>> {
let topic = topic.into();
let group_id = group_id.into();
produce_async(move |_ctx: RunParams| async move {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", &connection.brokers)
.set("group.id", &group_id)
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "true")
.set("session.timeout.ms", "6000")
.create()
.map_err(|e| anyhow::anyhow!("kafka consumer create failed: {e}"))?;
consumer
.subscribe(&[&topic])
.map_err(|e| anyhow::anyhow!("kafka subscribe failed: {e}"))?;
Ok(async_stream::stream! {
loop {
match consumer.recv().await {
Ok(msg) => {
let event = KafkaEvent {
topic: msg.topic().to_string(),
partition: msg.partition(),
offset: msg.offset(),
key: msg.key().map(|k| k.to_vec()),
value: msg.payload().unwrap_or_default().to_vec(),
};
yield Ok((NanoTime::now(), event));
}
Err(e) if is_transient_subscribe_error(&e) => continue,
Err(e) => {
yield Err(anyhow::anyhow!("kafka consume error: {e}"));
break;
}
}
}
})
})
}
fn is_transient_subscribe_error(err: &KafkaError) -> bool {
matches!(
err,
KafkaError::MessageConsumption(RDKafkaErrorCode::UnknownTopicOrPartition)
)
}