use std::{collections::HashMap, time::Duration};
use log::{error, info};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
consumer::{BaseConsumer, Consumer, StreamConsumer},
message::{Header, Headers, OwnedHeaders, OwnedMessage},
producer::{FutureProducer, FutureRecord},
ClientConfig, Message, TopicPartitionList,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::{sync::Mutex, time::timeout};
use crate::errors::msrs::MsRsError;
use super::transport::{RequestResponseChannel, TransportV2};
use async_trait::async_trait;
pub struct KafkaResponseMessageMeta {
offset: i64,
partition: i32,
}
pub struct KafkaV2 {
kafka_brokers: String,
response_map: Mutex<HashMap<String, tokio::sync::oneshot::Sender<KafkaResponseMessageMeta>>>,
listen_partitions: Mutex<HashMap<String, Vec<i32>>>,
}
impl KafkaV2 {
pub fn new(kafka_brokers: String) -> Self {
KafkaV2 {
kafka_brokers,
response_map: Mutex::new(HashMap::new()),
listen_partitions: Mutex::new(HashMap::new()),
}
}
async fn get_message_by_offset(
&self,
topic: String,
partition: i32,
offset: i64,
) -> Result<OwnedMessage, MsRsError> {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", self.kafka_brokers.clone())
.set("group.id", format!("group_{}", topic))
.create()
.expect("Consumer creation failed");
let mut tpl = TopicPartitionList::new();
let _ = tpl.add_partition_offset(&topic, partition, rdkafka::Offset::Offset(offset));
consumer
.assign(&tpl)
.expect("Failed to assign topic partitions");
let result = consumer.recv().await.map_err(|err| {
error!("Error when receiving message by offset: {}", err);
MsRsError::InternalError(err.to_string())
})?;
Ok(result.detach())
}
async fn create_topic_if_not_exists(&self, client: &ClientConfig, topic_name: &str) {
let metadata = client
.create::<BaseConsumer>()
.unwrap()
.fetch_metadata(None, Duration::from_secs(5))
.expect("fetch_metadata failed");
let topic_exists = metadata
.topics()
.iter()
.any(|topic| topic.name() == topic_name);
if !topic_exists {
let admin: AdminClient<_> = client.create().expect("Admin client creation failed");
let new_topic = NewTopic {
name: topic_name,
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
};
admin
.create_topics(&[new_topic], &AdminOptions::new())
.await
.expect("Topic creation failed");
info!("Topic {} has been created", topic_name);
}
}
}
#[async_trait]
impl TransportV2 for KafkaV2 {
async fn listen_responses(&self, method: String) {
let mut client = ClientConfig::new();
client = client
.set("bootstrap.servers", &self.kafka_brokers)
.to_owned();
let reply_topic = format!("{method}.reply");
self.create_topic_if_not_exists(&client, &reply_topic).await;
let consumer: StreamConsumer = client
.set("group.id", format!("group_{}", reply_topic))
.create()
.unwrap();
consumer.subscribe(&[&reply_topic]).unwrap();
let partitions = consumer
.fetch_metadata(Some(&reply_topic), Duration::from_secs(5))
.map(|metadata| {
let topic = metadata.topics().first().unwrap();
topic
.partitions()
.iter()
.map(|p| p.id())
.collect::<Vec<i32>>()
})
.map_err(|err| {
error!("Error fetching metadata: {err}");
MsRsError::InternalError(err.to_string())
})
.unwrap();
self.listen_partitions
.lock()
.await
.insert(reply_topic, partitions);
loop {
let message = consumer.recv().await.unwrap();
if let Some(headers) = message.headers() {
for header in headers.iter() {
let key = header.key.to_string();
if key == "kafka_correlationId" {
let value = std::str::from_utf8(header.value.unwrap())
.unwrap()
.to_string();
let tx = self.response_map.lock().await.remove(&value);
if tx.is_none() {
error!(
"Not found handler for request with id: {}, skipping...",
value
);
continue;
}
let _ = tx.unwrap().send(KafkaResponseMessageMeta {
offset: message.offset().clone(),
partition: message.partition().clone(),
});
}
}
}
}
}
async fn send_event<Event>(&self, event_name: String, event: Event) -> Result<(), MsRsError>
where
Event: Serialize + Send,
{
let mut client = ClientConfig::new();
client = client
.set("bootstrap.servers", &self.kafka_brokers)
.to_owned();
self.create_topic_if_not_exists(&client, &event_name).await;
let mut record = FutureRecord::to(&event_name);
let event_serialized = serde_json::to_string(&event).unwrap();
record = record.payload(&event_serialized).key("");
client
.create::<FutureProducer>()
.unwrap()
.send(record, Duration::from_secs(0))
.await
.map_err(|err| {
error!("An error while sending message : {:?}", err);
MsRsError::SendError(err.0.to_string())
})?;
Ok(())
}
async fn send<Request, Response>(
&self,
method: String,
req: Request,
) -> Result<Response, MsRsError>
where
Request: Serialize + Send,
Response: DeserializeOwned + Send,
{
let reply_topic = format!("{method}.reply");
let request_id: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect();
let partition_id = self
.listen_partitions
.lock()
.await
.get(&reply_topic)
.expect("Check listen_response called")
.first()
.unwrap()
.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let _res = self
.response_map
.lock()
.await
.insert(request_id.clone(), tx);
let mut client = ClientConfig::new();
client = client
.set("bootstrap.servers", &self.kafka_brokers)
.to_owned();
self.create_topic_if_not_exists(&client, &method).await;
self.create_topic_if_not_exists(&client, &reply_topic).await;
let mut record = FutureRecord::to(&method);
record = record.headers({
let headers = OwnedHeaders::new()
.insert(Header {
key: "kafka_correlationId",
value: Some(&request_id.clone()),
})
.insert(Header {
key: "kafka_replyPartition",
value: Some(&partition_id.to_string()),
})
.insert(Header {
key: "kafka_nest-is-disposed",
value: Some(&String::from("1")),
});
headers
});
let payload = serde_json::to_string(&req).unwrap();
record = record.payload(&payload).key("");
client
.create::<FutureProducer>()
.unwrap()
.send(record, Duration::from_secs(0))
.await
.map_err(|err| {
error!("An error while sending message : {:?}", err);
MsRsError::SendError(err.0.to_string())
})?;
let result = timeout(Duration::from_secs(30), rx).await.map_err(|err| {
error!("Timeout error {err}");
MsRsError::SendError("timeout-error".to_string())
})?;
let result = result.map_err(|_err| MsRsError::SendError("recv-error".to_string()))?;
let message = self
.get_message_by_offset(reply_topic, result.partition, result.offset)
.await?;
if let Some(headers) = message.headers() {
for header in headers.iter() {
let key = header.key.to_string();
if key == "msrs_err" {
let value = std::str::from_utf8(header.value.unwrap())
.unwrap()
.to_string();
return Err(MsRsError::RpcError(value));
}
}
}
let response = message.payload_view::<str>().unwrap().map_err(|err| {
error!("Error when reading response: {err}");
MsRsError::SendError("reading-response-error".to_string())
})?;
let response: Response = serde_json::from_str(response).map_err(|err| {
error!("Error when parsing response: {err}");
MsRsError::SendError("parsing-response-error".to_string())
})?;
Ok(response)
}
async fn listen<Request, Response>(
&self,
method: String,
channel: RequestResponseChannel<Request, Response>,
) -> Result<(), MsRsError>
where
Request: Serialize + DeserializeOwned + Send,
Response: Serialize + DeserializeOwned + Send,
{
let reply_topic = format!("{method}.reply");
let mut client = ClientConfig::new();
client = client
.set("bootstrap.servers", &self.kafka_brokers)
.to_owned();
self.create_topic_if_not_exists(&client, &method).await;
self.create_topic_if_not_exists(&client, &reply_topic).await;
let consumer: StreamConsumer = client
.set("group.id", format!("group_{}", method))
.create()
.unwrap();
consumer.subscribe(&[&method]).unwrap();
let producer: FutureProducer = client.create().unwrap();
loop {
let message = consumer.recv().await;
if message.is_err() {
error!(
"An error while receiving kafka message: {:?}",
message.err()
);
continue;
}
let message = message.unwrap();
let mut id: String = "".to_string();
let mut partition_id: String = "0".to_string();
if let Some(headers) = message.headers() {
for header in headers.iter() {
let key = header.key.to_string();
if key == "kafka_correlationId" {
if header.value.is_none() {
error!("Korellation header is empty, skipping...");
continue;
}
let id_res = std::str::from_utf8(header.value.unwrap());
if id_res.is_err() {
error!("Error parsing kafka correlation header: {:?}", id_res.err());
continue;
}
id = id_res.unwrap().to_string();
}
if key == "kafka_replyPartition" {
if header.value.is_none() {
error!("Korellation header is empty, skipping...");
continue;
}
let partition_id_res = std::str::from_utf8(header.value.unwrap());
if partition_id_res.is_err() {
error!(
"Error parsing kafka correlation header: {:?}",
partition_id_res.err()
);
continue;
}
partition_id = partition_id_res.unwrap().to_string();
}
}
}
let request = message.payload_view::<str>();
if request.is_none() {
error!("Kafka message is empty, skipping...",);
continue;
}
let request = request.unwrap();
if request.is_err() {
error!("An error fetching message {:?}, skipping...", request.err());
continue;
}
let request = serde_json::from_str::<Request>(request.unwrap());
if request.is_err() {
error!("An error parsing message {:?}, skipping...", request.err());
continue;
}
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = channel.send((request.unwrap(), tx)).await;
let response = timeout(Duration::from_secs(30), rx).await;
if response.is_err() {
error!("Timeout when waiting response for {method}, skipping...");
continue;
}
let response = response.unwrap();
if response.is_err() {
error!(
"An error occurs while receiving response for {method}, err: {:?}, skipping...",
response.err()
);
continue;
}
let response = response.unwrap();
let mut record = FutureRecord::to(&reply_topic);
let mut headers = OwnedHeaders::new();
headers = headers
.insert(Header {
key: "kafka_correlationId",
value: Some(&id),
})
.insert(Header {
key: "kafka_nest-is-disposed",
value: Some(&String::from("1")),
});
let partition_id = partition_id.parse::<i32>();
if partition_id.is_err() {
error!(
"Error while parsing partition number from header..: {:?}",
partition_id.err(),
);
continue;
}
record = record.partition(partition_id.unwrap());
if response.is_err() {
let error = response.err().unwrap().to_string();
headers = headers
.insert(Header {
key: "kafka_nest-err",
value: Some(&format!("{{\"type\":\"rpc\",\"message\":\"{error}\"}}")),
})
.insert(Header {
key: "msrs_err",
value: Some(&error),
});
record = record.headers(headers);
record = record.payload("").key("");
let _ = producer.send(record, Duration::from_secs(0)).await;
} else {
let response = serde_json::to_string(&(response.unwrap()));
if response.is_err() {
error!("Error while serializing response..: {:?}", response.err(),);
continue;
}
let response = response.unwrap();
record = record.headers(headers);
record = record.payload(&response).key("");
let _ = producer.send(record, Duration::from_secs(0)).await;
}
}
}
}