use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use log::info;
use crate::api::add_messages_to_locked_work_vec::add_messages_to_locked_work_vec;
use crate::api::build_kafka_publish_message::build_kafka_publish_message;
use crate::api::drain_messages_from_locked_work_vec::drain_messages_from_locked_work_vec;
use crate::api::get_kafka_consumer::get_kafka_consumer;
use crate::api::kafka_publish_message::KafkaPublishMessage;
use crate::api::kafka_publish_message_type::KafkaPublishMessageType;
use crate::config::kafka_client_config::KafkaClientConfig;
use crate::metadata::get_kafka_metadata::get_kafka_metadata;
#[derive(Default, Clone)]
pub struct KafkaPublisher {
pub config: KafkaClientConfig,
pub publish_msgs: Arc<Mutex<Vec<KafkaPublishMessage>>>,
}
impl KafkaPublisher {
pub fn new() -> Self {
KafkaPublisher {
config: KafkaClientConfig::new(
&std::env::var("KAFKA_LOG_LABEL")
.unwrap_or_else(|_| "ktp".to_string()),
),
publish_msgs: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn is_enabled(&self) -> bool {
self.config.is_enabled
}
pub async fn add_data_msg(
&self,
topic: &str,
key: &str,
headers: Option<HashMap<String, String>>,
payload: &str,
) -> Result<usize, String> {
if self.config.is_enabled {
let msg = build_kafka_publish_message(
KafkaPublishMessageType::Data,
topic,
key,
headers,
payload,
);
let pub_vec: Vec<KafkaPublishMessage> = vec![msg];
add_messages_to_locked_work_vec(&self.publish_msgs, pub_vec)
} else {
Ok(0)
}
}
pub async fn add_msg(
&self,
msg: KafkaPublishMessage,
) -> Result<usize, String> {
if self.config.is_enabled {
let pub_vec: Vec<KafkaPublishMessage> = vec![msg];
add_messages_to_locked_work_vec(&self.publish_msgs, pub_vec)
} else {
Ok(0)
}
}
pub async fn add_msgs(
&self,
msgs: Vec<KafkaPublishMessage>,
) -> Result<usize, String> {
if self.config.is_enabled {
add_messages_to_locked_work_vec(&self.publish_msgs, msgs)
} else {
Ok(0)
}
}
pub async fn drain_msgs(&self) -> Vec<KafkaPublishMessage> {
if self.config.is_enabled {
drain_messages_from_locked_work_vec(&self.publish_msgs)
} else {
vec![]
}
}
pub async fn shutdown(&self) -> Result<String, String> {
if self.config.is_enabled {
let shutdown_msg_vec: Vec<KafkaPublishMessage> =
vec![build_kafka_publish_message(
KafkaPublishMessageType::Shutdown,
"",
"",
None,
"",
)];
info!("sending shutdown msg");
match add_messages_to_locked_work_vec(
&self.publish_msgs,
shutdown_msg_vec,
) {
Ok(_) => Ok("shutdown started".to_string()),
Err(e) => Err(e),
}
} else {
Ok("kafka not enabled".to_string())
}
}
pub async fn get_metadata(&self, fetch_offsets: bool, topic: Option<&str>) {
if self.config.is_enabled {
info!("creating consumer");
let consumer = get_kafka_consumer(&self.config);
get_kafka_metadata(&self.config, consumer, fetch_offsets, topic)
} else {
info!("kafka not enabled KAFKA_ENABLED={}", self.config.is_enabled);
}
}
}