use async_trait::async_trait;
use log::{debug, error, info};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, StreamConsumer};
use rdkafka::message::{Header, Headers, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use serde;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use tokio;
use crate::errors::msrs::MsRsError;
use crate::transport::transport::{MsMessage, Transport};
pub struct KafkaTransport {
producer: FutureProducer,
base_consumer: BaseConsumer,
client: ClientConfig,
}
#[derive(Serialize, Deserialize)]
pub struct NestError {
#[serde(rename(serialize = "type", deserialize = "type"))]
typ: String,
messsage: String,
}
impl KafkaTransport {
pub fn new(brokers: &str, group_id: &str) -> Self {
let mut client = ClientConfig::new();
client = client
.set("bootstrap.servers", brokers.to_string())
.set("message.timeout.ms", "5000")
.set("group.id", group_id)
.to_owned();
KafkaTransport {
client: client.to_owned(),
producer: client
.set("group.id", "base")
.create()
.expect("Producer creation failed"),
base_consumer: client
.set("group.id", "base")
.create()
.expect("Consumer creation failed"),
}
}
async fn create_topic_if_not_exists(&self, topic_name: &str) {
let metadata = self
.base_consumer
.fetch_metadata(None, Duration::from_secs(5))
.expect("fetch_metadata failed");
let topic_exists = metadata
.topics()
.iter()
.any(|topic| topic.name() == topic_name);
if !topic_exists {
let admin: AdminClient<_> = self.client.create().expect("Admin client creation failed");
let new_topic = NewTopic {
name: topic_name,
num_partitions: 10,
replication: TopicReplication::Fixed(1),
config: vec![],
};
admin
.create_topics(&[new_topic], &AdminOptions::new())
.await
.expect("Topic creation failed");
println!("Topic {} has been created", topic_name);
}
}
}
#[async_trait]
impl Transport for KafkaTransport {
async fn send(self: &Self, event: &str, data: MsMessage) -> Result<(), MsRsError> {
self.create_topic_if_not_exists(event).await;
let mut record = FutureRecord::to(event);
if let Some(headers_map) = data.0 {
let correlation_id = headers_map
.iter()
.find(|header| header.0 == "kafka_correlationId");
let partition_header = headers_map
.iter()
.find(|header| header.0 == "kafka_replyPartition");
let default_attempt = "1".to_string();
let attempt_header = headers_map
.iter()
.find(|header| header.0 == "msrs_attempt")
.map(|v| v.1)
.unwrap_or(&default_attempt);
let mut headers = OwnedHeaders::new();
let filtered_headers = vec![
"kafka_replyPartition",
"kafka_replyTopic",
"kafka_nest-is-disposed",
"kafka_correlationId",
];
for header_from_map in headers_map.clone() {
if filtered_headers
.iter()
.any(|v| v.to_string() == *header_from_map.0)
{
continue;
}
headers = headers.insert(Header {
key: &header_from_map.0,
value: Some(&header_from_map.1),
})
}
record = record.headers({
if let Some(correlation_header) = correlation_id {
headers = headers
.insert(Header {
key: &correlation_header.0,
value: Some(correlation_header.1),
})
.insert(Header {
key: "kafka_nest-is-disposed",
value: Some(&String::from("1")),
})
.insert(Header {
key: "msrs_attempt",
value: Some(attempt_header),
});
}
headers
});
if let Some(partition) = partition_header {
record = record.partition(partition.1.parse::<i32>().map_err(|err| {
error!(
"Error while parsing partition number from header..: {}",
err
);
MsRsError::SendError(err.to_string())
})?);
}
}
record = record.payload(&data.1).key("");
self.producer
.send(record, Duration::from_secs(0))
.await
.map_err(|err| {
error!("An error while sending message : {:?}", err);
MsRsError::SendError(err.0.to_string())
})?;
Ok(())
}
async fn listen_response(
&self,
event: &str,
predicate: Box<dyn Fn(MsMessage) -> bool + Send + Sync>,
) -> Result<MsMessage, MsRsError> {
let event_reply = format!("{}.reply", event.to_string().clone());
self.create_topic_if_not_exists(&event_reply).await;
let consumer: StreamConsumer = self.client.create().map_err(|e| {
error!("Error while creating consumer: {}", e);
MsRsError::ResoponseError(e.to_string())
})?;
consumer.subscribe(&[&event_reply]).map_err(|e| {
error!(
"Error while subscribing on {} consumer: {}",
&event_reply, e
);
MsRsError::ResoponseError(e.to_string())
})?;
loop {
match consumer.recv().await {
Err(e) => {
error!("Kafka error: {}", e);
return Err(MsRsError::ResoponseError(e.to_string()));
}
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
error!("Error while deserializing message payload: {:?}", e);
return Err(MsRsError::ResoponseError(e.to_string()));
}
};
debug!("Received message: {}", payload);
let mut headers_map = HashMap::new();
let mut reattempt_possible = true;
if let Some(headers) = m.headers() {
for header in headers.iter() {
let key = header.key.to_string();
let value = std::str::from_utf8(header.value.unwrap())
.unwrap()
.to_string();
if key == "msrs_attempt" {
let attempt: usize = value.parse().unwrap_or(1);
if attempt >= 5 {
info!("More than 5 attempts to handle message, skipping message: {:?}", m);
reattempt_possible = false;
} else {
headers_map.insert(key, (attempt + 1).to_string());
continue;
}
}
if key == "msrs-err" || key == "kafka_nest-err" {
if key == "kafka_nest-err" {
let error_code = serde_json::from_str::<NestError>(&value)
.map_err(|err| {
error!(
"An error occurs, when parsing nestjs error {}: {}",
value, err
);
MsRsError::RpcError("parse-nest-error".to_string())
})?
.messsage;
return Err(MsRsError::RpcError(error_code));
}
return Err(MsRsError::RpcError(value));
}
headers_map.insert(key, value);
}
}
debug!("Extracted headers from message: {:?}", headers_map);
consumer
.commit_message(&m, CommitMode::Async)
.map_err(|err| {
error!("An error occurs, when commiting message {:?}: {}", &m, err);
return MsRsError::ResoponseError(err.to_string());
})?;
consumer.unsubscribe();
let payload = payload.to_string().clone();
let predicate_headers = headers_map.clone();
let message = MsMessage(Some(headers_map), payload.clone());
let result = predicate(MsMessage(Some(predicate_headers), payload.clone()));
if result {
info!("Predicate returned true, got reply, returning...");
return Ok(message);
} else {
if reattempt_possible {
info!("Resend message to the same topic...");
self.send(&event_reply, message).await?;
}
}
}
};
}
}
async fn consume(&self, event: &str) -> Result<MsMessage, MsRsError> {
self.create_topic_if_not_exists(event).await;
self.create_topic_if_not_exists(&format!("{}.reply", event))
.await;
let consumer: StreamConsumer = self.client.create().map_err(|e| {
error!("Error while creating consumer: {}", e);
MsRsError::SubscribeError(e.to_string())
})?;
consumer.subscribe(&[&event]).map_err(|e| {
error!("Error while subscribing on {} consumer: {}", &event, e);
MsRsError::SubscribeError(e.to_string())
})?;
match consumer.recv().await {
Err(e) => {
error!("Kafka error: {}", e);
return Err(MsRsError::ReceiveError(e.to_string()));
}
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
error!("Error while deserializing message payload: {:?}", e);
return Err(MsRsError::ReceiveError(e.to_string()));
}
};
debug!("Received message: {}", payload);
let mut headers_map = HashMap::new();
if let Some(headers) = m.headers() {
for header in headers.iter() {
headers_map.insert(
header.key.to_string(),
std::str::from_utf8(header.value.unwrap())
.unwrap()
.to_string(),
);
}
}
debug!("Extracted headers from message: {:?}", headers_map);
consumer
.commit_message(&m, CommitMode::Async)
.map_err(|err| {
error!("Error while commiting message {:?}: {}", m, err);
MsRsError::CommitError(err.to_string())
})?;
debug!("Commited message: {:?}", m);
let payload = payload.to_string().clone();
return Ok(MsMessage(Some(headers_map), payload));
}
};
}
}
#[derive(Debug, Deserialize, Serialize)]
struct TestData {
id: String,
}
async fn _handle_get_transactions(data: TestData) -> Result<Option<TestData>, MsRsError> {
Ok(Some(data))
}
#[tokio::test]
async fn test_kafka() {
use crate::communication::client::Client;
use std::sync::Arc;
let transport = Arc::new(KafkaTransport::new(&"localhost:9092", "base"));
let event = Client::new(transport.clone(), false);
event
.listen(
"billing.get-transactions",
Box::new(_handle_get_transactions),
)
.await
.unwrap();
}