use futures::StreamExt;
use rdkafka::consumer::StreamConsumer;
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
use rdkafka::message::Message as KafkaMessage;
use crate::core::elasticsearch::EsClient;
use crate::core::kafka::new_consumer;
#[derive(Debug, Clone)]
pub struct MessagePayload(String);
impl MessagePayload {
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl<'a> From<&'a BorrowedMessage<'a>> for MessagePayload {
fn from(bm: &'a BorrowedMessage) -> Self {
match bm.payload_view::<str>() {
Some(Ok(s)) => MessagePayload(String::from(s)),
Some(Err(e)) => MessagePayload(format!("{:?}", e)),
None => MessagePayload(String::from("")),
}
}
}
pub struct KafkaConsumer {
pub topic: String,
pub consumer: StreamConsumer,
pub es_client: EsClient,
}
impl KafkaConsumer {
pub fn new(
kafka_seed: String,
group_id: &str,
topic: String,
es_client: EsClient,
) -> Result<Self, KafkaError> {
let consumer = new_consumer(kafka_seed, group_id, vec![&topic])?;
Ok(KafkaConsumer {
topic,
consumer,
es_client,
})
}
pub fn process_message(borrowed_message: BorrowedMessage) -> (String, String) {
let topic_name = borrowed_message.topic().to_owned();
let partition = borrowed_message.partition();
let message_key = String::from_utf8(borrowed_message.key().unwrap().to_vec())
.unwrap_or_else(|_| "N/a".to_string());
let message_payload = MessagePayload::from(&borrowed_message);
let message_string = message_payload.as_str();
log::debug!(
"Kafka-Consumer: topic={}, partition={}, message_key={}",
topic_name,
partition,
message_key
);
(topic_name, message_string.to_string())
}
pub async fn run(&self) {
let mut stream = self.consumer.stream();
loop {
match stream.next().await {
Some(Ok(borrowed_message)) => {
let (topic, message) = Self::process_message(borrowed_message);
match self.es_client.save2(&topic, &message).await {
Ok(Some(id)) => {
log::info!(
"Kafka-consumer-es-save2-success: topic={}, id={}",
topic,
id
);
}
Ok(None) => {
log::info!(
"Kafka-consumer-es-save2-failed: topic={}, id={}",
topic,
"N/a"
);
}
Err(e) => {
log::error!(
"Kafka-consumer-es-save2-errors: topic={}, error={}",
topic,
e
);
}
}
}
Some(Err(kafka_error)) => match kafka_error {
KafkaError::PartitionEOF(_) => {}
_ => log::error!(
"Kafka-consumer-errors: topic={}, error={}",
self.topic,
kafka_error
),
},
None => {}
}
}
}
}