zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::time::Duration;

use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

use super::make_producer;
use super::KafkaSettings;

#[derive(Clone)]
pub struct KafkaClient {
    pub producer: FutureProducer,
    pub queue_timeout: u32,
}

impl KafkaClient {
    pub fn build(settings: &KafkaSettings) -> Self {
        Self {
            producer: make_producer(settings),
            queue_timeout: settings.queue_timeout_secs.unwrap(),
        }
    }

    pub async fn send_message<T>(&self, topic: &str, message: &T) -> Option<String>
    where
        T: serde::Serialize,
    {
        let msg = match serde_json::to_string(message) {
            Ok(val) => val,
            Err(e) => {
                log::error!("kafka-message-serde-failed: error={:?}", e);
                return None;
            }
        };

        let queue_timeout = Duration::from_secs(self.queue_timeout as u64);
        let message_id = uuid::Uuid::now_v7().to_string();
        let record = FutureRecord::to(topic).key(&message_id).payload(&msg);

        match self.producer.send(record, queue_timeout).await {
            Ok(_) => Some(message_id),
            Err(e) => {
                log::error!("kafka-message-send-failed: error={}", e.0);
                None
            }
        }
    }

    pub async fn msg(&self, topic: &str, msg: &str) -> Option<String> {
        let queue_timeout = Duration::from_secs(self.queue_timeout as u64);
        let message_id = uuid::Uuid::now_v7().to_string();
        let record = FutureRecord::to(topic).key(&message_id).payload(msg);

        match self.producer.send(record, queue_timeout).await {
            Ok(_) => Some(message_id),
            Err(e) => {
                log::error!("kafka-message-send-failed: error={:?}", e);
                None
            }
        }
    }

    pub async fn publish(&self, topic: &str, buf: &[u8]) -> Option<String> {
        match std::str::from_utf8(buf) {
            Ok(utf8_string) => {
                let queue_timeout = Duration::from_secs(self.queue_timeout as u64);
                let message_id = uuid::Uuid::now_v7().to_string();
                let record = FutureRecord::to(topic)
                    .key(&message_id)
                    .payload(utf8_string);

                match self.producer.send(record, queue_timeout).await {
                    Ok(_) => Some(message_id),
                    Err(e) => {
                        log::error!("kafka-message-send-failed: error={:?}", e);
                        None
                    }
                }
            }
            Err(e) => {
                log::error!("kafka-message-send-failed: error={:?}", e);
                None
            }
        }
    }
}