zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use futures::StreamExt;

use rdkafka::consumer::StreamConsumer;
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
use rdkafka::message::Message as KafkaMessage;

use crate::core::elasticsearch::EsClient;
use crate::core::kafka::new_consumer;

#[derive(Debug, Clone)]
pub struct MessagePayload(String);

impl MessagePayload {
    pub fn as_str(&self) -> &str {
        self.0.as_str()
    }
}

/// generic way to turn a borrowed message into a (wrapped) string
impl<'a> From<&'a BorrowedMessage<'a>> for MessagePayload {
    fn from(bm: &'a BorrowedMessage) -> Self {
        match bm.payload_view::<str>() {
            Some(Ok(s)) => MessagePayload(String::from(s)),
            Some(Err(e)) => MessagePayload(format!("{:?}", e)),
            None => MessagePayload(String::from("")),
        }
    }
}

pub struct KafkaConsumer {
    pub topic: String,
    pub consumer: StreamConsumer,
    pub es_client: EsClient,
}

impl KafkaConsumer {
    pub fn new(
        kafka_seed: String,
        group_id: &str,
        topic: String,
        es_client: EsClient,
    ) -> Result<Self, KafkaError> {
        let consumer = new_consumer(kafka_seed, group_id, vec![&topic])?;

        Ok(KafkaConsumer {
            topic,
            consumer,
            es_client,
        })
    }

    pub fn process_message(borrowed_message: BorrowedMessage) -> (String, String) {
        let topic_name = borrowed_message.topic().to_owned();
        let partition = borrowed_message.partition();

        let message_key = String::from_utf8(borrowed_message.key().unwrap().to_vec())
            .unwrap_or_else(|_| "N/a".to_string());

        let message_payload = MessagePayload::from(&borrowed_message);
        let message_string = message_payload.as_str();

        log::debug!(
            "Kafka-Consumer: topic={}, partition={}, message_key={}",
            topic_name,
            partition,
            message_key
        );

        (topic_name, message_string.to_string())
    }

    pub async fn run(&self) {
        // let mut stream = self.consumer.start_with(Duration::from_millis(50), false);
        let mut stream = self.consumer.stream();

        loop {
            match stream.next().await {
                Some(Ok(borrowed_message)) => {
                    let (topic, message) = Self::process_message(borrowed_message);
                    match self.es_client.save2(&topic, &message).await {
                        Ok(Some(id)) => {
                            log::info!(
                                "Kafka-consumer-es-save2-success: topic={}, id={}",
                                topic,
                                id
                            );
                        }
                        Ok(None) => {
                            log::info!(
                                "Kafka-consumer-es-save2-failed: topic={}, id={}",
                                topic,
                                "N/a"
                            );
                        }
                        Err(e) => {
                            log::error!(
                                "Kafka-consumer-es-save2-errors: topic={}, error={}",
                                topic,
                                e
                            );
                        }
                    }
                }
                Some(Err(kafka_error)) => match kafka_error {
                    KafkaError::PartitionEOF(_) => {}
                    _ => log::error!(
                        "Kafka-consumer-errors: topic={}, error={}",
                        self.topic,
                        kafka_error
                    ),
                },

                None => {}
            }
        }
    }
}