msrs 0.1.31

Micro Microservices framework for rust. Supports different transports
Documentation
use async_trait::async_trait;
use log::{debug, error, info};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, StreamConsumer};
use rdkafka::message::{Header, Headers, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use serde;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use tokio;

use crate::errors::msrs::MsRsError;
use crate::transport::transport::{MsMessage, Transport};

pub struct KafkaTransport {
    producer: FutureProducer,
    base_consumer: BaseConsumer,
    client: ClientConfig,
}

#[derive(Serialize, Deserialize)]
pub struct NestError {
    #[serde(rename(serialize = "type", deserialize = "type"))]
    typ: String,
    messsage: String,
}

impl KafkaTransport {
    pub fn new(brokers: &str, group_id: &str) -> Self {
        let mut client = ClientConfig::new();
        client = client
            .set("bootstrap.servers", brokers.to_string())
            .set("message.timeout.ms", "5000")
            .set("group.id", group_id)
            .to_owned();
        KafkaTransport {
            client: client.to_owned(),
            producer: client
                .set("group.id", "base")
                .create()
                .expect("Producer creation failed"),
            base_consumer: client
                .set("group.id", "base")
                .create()
                .expect("Consumer creation failed"),
        }
    }

    async fn create_topic_if_not_exists(&self, topic_name: &str) {
        let metadata = self
            .base_consumer
            .fetch_metadata(None, Duration::from_secs(5))
            .expect("fetch_metadata failed");

        let topic_exists = metadata
            .topics()
            .iter()
            .any(|topic| topic.name() == topic_name);

        if !topic_exists {
            let admin: AdminClient<_> = self.client.create().expect("Admin client creation failed");

            let new_topic = NewTopic {
                name: topic_name,
                num_partitions: 10,
                replication: TopicReplication::Fixed(1),
                config: vec![],
            };

            admin
                .create_topics(&[new_topic], &AdminOptions::new())
                .await
                .expect("Topic creation failed");

            println!("Topic {} has been created", topic_name);
        }
    }
}

#[async_trait]
impl Transport for KafkaTransport {
    async fn send(self: &Self, event: &str, data: MsMessage) -> Result<(), MsRsError> {
        self.create_topic_if_not_exists(event).await;

        let mut record = FutureRecord::to(event);
        if let Some(headers_map) = data.0 {
            let correlation_id = headers_map
                .iter()
                .find(|header| header.0 == "kafka_correlationId");
            let partition_header = headers_map
                .iter()
                .find(|header| header.0 == "kafka_replyPartition");
            let default_attempt = "1".to_string();
            let attempt_header = headers_map
                .iter()
                .find(|header| header.0 == "msrs_attempt")
                .map(|v| v.1)
                .unwrap_or(&default_attempt);

            let mut headers = OwnedHeaders::new();
            let filtered_headers = vec![
                "kafka_replyPartition",
                "kafka_replyTopic",
                "kafka_nest-is-disposed",
                "kafka_correlationId",
            ];

            for header_from_map in headers_map.clone() {
                if filtered_headers
                    .iter()
                    .any(|v| v.to_string() == *header_from_map.0)
                {
                    continue;
                }

                headers = headers.insert(Header {
                    key: &header_from_map.0,
                    value: Some(&header_from_map.1),
                })
            }

            record = record.headers({
                if let Some(correlation_header) = correlation_id {
                    headers = headers
                        .insert(Header {
                            key: &correlation_header.0,
                            value: Some(correlation_header.1),
                        })
                        // Needed to handle from NestJS side (should be moved to external parameters in future)
                        .insert(Header {
                            // TODO: Move all headernames to constants
                            key: "kafka_nest-is-disposed",
                            value: Some(&String::from("1")),
                        })
                        .insert(Header {
                            key: "msrs_attempt",
                            value: Some(attempt_header),
                        });
                }

                headers
            });

            if let Some(partition) = partition_header {
                record = record.partition(partition.1.parse::<i32>().map_err(|err| {
                    error!(
                        "Error while parsing partition number from header..: {}",
                        err
                    );

                    MsRsError::SendError(err.to_string())
                })?);
            }
        }

        record = record.payload(&data.1).key("");

        self.producer
            .send(record, Duration::from_secs(0))
            .await
            .map_err(|err| {
                error!("An error while sending message : {:?}", err);

                MsRsError::SendError(err.0.to_string())
            })?;

        Ok(())
    }

    async fn listen_response(
        &self,
        event: &str,
        predicate: Box<dyn Fn(MsMessage) -> bool + Send + Sync>,
    ) -> Result<MsMessage, MsRsError> {
        let event_reply = format!("{}.reply", event.to_string().clone());
        self.create_topic_if_not_exists(&event_reply).await;

        let consumer: StreamConsumer = self.client.create().map_err(|e| {
            error!("Error while creating consumer: {}", e);

            MsRsError::ResoponseError(e.to_string())
        })?;

        consumer.subscribe(&[&event_reply]).map_err(|e| {
            error!(
                "Error while subscribing on {} consumer: {}",
                &event_reply, e
            );

            MsRsError::ResoponseError(e.to_string())
        })?;

        loop {
            match consumer.recv().await {
                Err(e) => {
                    error!("Kafka error: {}", e);

                    return Err(MsRsError::ResoponseError(e.to_string()));
                }
                Ok(m) => {
                    let payload = match m.payload_view::<str>() {
                        None => "",
                        Some(Ok(s)) => s,
                        Some(Err(e)) => {
                            error!("Error while deserializing message payload: {:?}", e);

                            return Err(MsRsError::ResoponseError(e.to_string()));
                        }
                    };
                    debug!("Received message: {}", payload);

                    let mut headers_map = HashMap::new();
                    let mut reattempt_possible = true;
                    if let Some(headers) = m.headers() {
                        for header in headers.iter() {
                            let key = header.key.to_string();
                            let value = std::str::from_utf8(header.value.unwrap())
                                .unwrap()
                                .to_string();

                            if key == "msrs_attempt" {
                                let attempt: usize = value.parse().unwrap_or(1);

                                if attempt >= 5 {
                                    info!("More than 5 attempts to handle message, skipping message: {:?}", m);
                                    reattempt_possible = false;
                                } else {
                                    headers_map.insert(key, (attempt + 1).to_string());

                                    continue;
                                }
                            }

                            if key == "msrs-err" || key == "kafka_nest-err" {
                                if key == "kafka_nest-err" {
                                    let error_code = serde_json::from_str::<NestError>(&value)
                                        .map_err(|err| {
                                            error!(
                                                "An error occurs, when parsing nestjs error {}: {}",
                                                value, err
                                            );
                                            MsRsError::RpcError("parse-nest-error".to_string())
                                        })?
                                        .messsage;

                                    return Err(MsRsError::RpcError(error_code));
                                }
                                return Err(MsRsError::RpcError(value));
                            }

                            headers_map.insert(key, value);
                        }
                    }

                    debug!("Extracted headers from message: {:?}", headers_map);
                    consumer
                        .commit_message(&m, CommitMode::Async)
                        .map_err(|err| {
                            error!("An error occurs, when commiting message {:?}: {}", &m, err);

                            return MsRsError::ResoponseError(err.to_string());
                        })?;
                    consumer.unsubscribe();

                    let payload = payload.to_string().clone();
                    let predicate_headers = headers_map.clone();
                    let message = MsMessage(Some(headers_map), payload.clone());
                    let result = predicate(MsMessage(Some(predicate_headers), payload.clone()));

                    if result {
                        info!("Predicate returned true, got reply, returning...");
                        return Ok(message);
                    } else {
                        if reattempt_possible {
                            info!("Resend message to the same topic...");
                            self.send(&event_reply, message).await?;
                        }
                    }
                }
            };
        }
    }

    async fn consume(&self, event: &str) -> Result<MsMessage, MsRsError> {
        self.create_topic_if_not_exists(event).await;
        self.create_topic_if_not_exists(&format!("{}.reply", event))
            .await;
        let consumer: StreamConsumer = self.client.create().map_err(|e| {
            error!("Error while creating consumer: {}", e);

            MsRsError::SubscribeError(e.to_string())
        })?;

        consumer.subscribe(&[&event]).map_err(|e| {
            error!("Error while subscribing on {} consumer: {}", &event, e);

            MsRsError::SubscribeError(e.to_string())
        })?;

        match consumer.recv().await {
            Err(e) => {
                error!("Kafka error: {}", e);
                return Err(MsRsError::ReceiveError(e.to_string()));
            }
            Ok(m) => {
                let payload = match m.payload_view::<str>() {
                    None => "",
                    Some(Ok(s)) => s,
                    Some(Err(e)) => {
                        error!("Error while deserializing message payload: {:?}", e);

                        return Err(MsRsError::ReceiveError(e.to_string()));
                    }
                };

                debug!("Received message: {}", payload);
                let mut headers_map = HashMap::new();
                if let Some(headers) = m.headers() {
                    for header in headers.iter() {
                        headers_map.insert(
                            header.key.to_string(),
                            std::str::from_utf8(header.value.unwrap())
                                .unwrap()
                                .to_string(),
                        );
                    }
                }
                debug!("Extracted headers from message: {:?}", headers_map);
                consumer
                    .commit_message(&m, CommitMode::Async)
                    .map_err(|err| {
                        error!("Error while commiting message {:?}: {}", m, err);

                        MsRsError::CommitError(err.to_string())
                    })?;
                debug!("Commited message: {:?}", m);

                let payload = payload.to_string().clone();

                return Ok(MsMessage(Some(headers_map), payload));
            }
        };
    }
}

#[derive(Debug, Deserialize, Serialize)]
struct TestData {
    id: String,
}

async fn _handle_get_transactions(data: TestData) -> Result<Option<TestData>, MsRsError> {
    Ok(Some(data))
}

#[tokio::test]
async fn test_kafka() {
    use crate::communication::client::Client;
    use std::sync::Arc;

    let transport = Arc::new(KafkaTransport::new(&"localhost:9092", "base"));
    let event = Client::new(transport.clone(), false);

    event
        .listen(
            "billing.get-transactions",
            Box::new(_handle_get_transactions),
        )
        .await
        .unwrap();
}