use std::sync::Arc;
use std::sync::Mutex;
use log::error;
use log::info;
use log::trace;
use rdkafka::message::OwnedHeaders;
use crate::api::add_messages_to_locked_work_vec::add_messages_to_locked_work_vec;
use crate::api::drain_messages_from_locked_work_vec::drain_messages_from_locked_work_vec;
use crate::api::get_kafka_consumer::get_kafka_consumer;
use crate::api::get_kafka_producer::get_kafka_producer;
use crate::api::kafka_publish_message::KafkaPublishMessage;
use crate::api::kafka_publish_message_type::KafkaPublishMessageType;
use crate::config::kafka_client_config::KafkaClientConfig;
use crate::metadata::get_kafka_metadata::get_kafka_metadata;
use crate::msg::publish_message::convert_hashmap_headers_to_ownedheaders;
use crate::msg::publish_message::publish_message;
pub async fn thread_process_messages_handler(
cur_thread_num: u8,
config: KafkaClientConfig,
lockable_work_vec: Arc<Mutex<Vec<KafkaPublishMessage>>>,
) {
let mut work_vec: Vec<KafkaPublishMessage> = Vec::with_capacity(20);
let log_label = format!("{}-tid-{}", config.label, cur_thread_num + 1);
if config.broker_list.is_empty() {
error!(
"{log_label} - \
no brokers to connect to KAFKA_BROKERS={:?} - stopping thread",
config.broker_list
);
return;
}
if config.broker_list[0].is_empty() {
error!(
"{log_label} - \
no brokers to connect to KAFKA_BROKERS={:?} - stopping thread",
config.broker_list
);
return;
}
if cur_thread_num == 0 {
info!(
"threadpool connecting to brokers={:?} topics={:?} \
tls ca={} key={} cert={} \
work_vec_cap={}",
config.broker_list,
config.publish_topics,
config.tls_ca,
config.tls_key,
config.tls_ca,
work_vec.capacity()
);
}
let producer = get_kafka_producer(&config);
trace!("{log_label} - start");
loop {
let mut should_shutdown = false;
work_vec = drain_messages_from_locked_work_vec(&lockable_work_vec);
if work_vec.is_empty() {
trace!("{log_label} - idle");
std::thread::sleep(std::time::Duration::from_millis(
config.idle_sleep_sec,
));
continue;
} else {
trace!("{log_label} - processing {} msgs", work_vec.len());
while !work_vec.is_empty() {
let msg = work_vec.remove(0);
if msg.msg_type == KafkaPublishMessageType::Shutdown {
should_shutdown = true;
let requeue_vec: Vec<KafkaPublishMessage> =
vec![msg.clone()];
match add_messages_to_locked_work_vec(
&lockable_work_vec,
requeue_vec,
) {
Ok(num_msgs_in_vec) => {
trace!(
"{log_label} - requeue shutdown message \
success with total in vec={num_msgs_in_vec}"
);
}
Err(e) => {
error!(
"{log_label} - failed to requeue shutdown \
message into vec with err={e}"
);
}
}
break;
} else if msg.msg_type == KafkaPublishMessageType::Data {
let payload_sub = msg.payload[..10].to_string();
trace!(
"{log_label} pub \
topic={} data='{}'",
msg.topic,
payload_sub
);
let topic = msg.topic.clone();
let mut owned_headers: OwnedHeaders = OwnedHeaders::new();
if msg.headers.is_some() {
owned_headers = convert_hashmap_headers_to_ownedheaders(
msg.headers.clone().unwrap(),
owned_headers,
);
}
loop {
let delivery_status =
publish_message(&producer, &msg, &owned_headers)
.await;
if delivery_status != -1 {
trace!("published message topic={topic}");
break;
} else {
error!(
"failed to publish delivery status={} \
retrying msg={:?}",
delivery_status, msg
);
std::thread::sleep(
std::time::Duration::from_millis(
config.retry_sleep_sec,
),
);
}
}
} else if msg.msg_type
== KafkaPublishMessageType::LogBrokerDetails
{
info!(
"{log_label} - get all broker config={} information",
config
);
{
let consumer = get_kafka_consumer(&config);
let count_msgs =
std::env::var("KAFKA_METADATA_COUNT_MSG_OFFSETS")
.unwrap_or_else(|_| "true".to_string())
== *"true";
get_kafka_metadata(&config, consumer, count_msgs, None);
}
break;
} else if msg.msg_type
== KafkaPublishMessageType::LogBrokerTopicDetails
{
if msg.payload.is_empty() {
error!(
"{log_label} - \
unable to get broker config={} \
missing topic={} in msg.payload",
config, msg.payload
);
} else {
info!(
"{log_label} - \
get broker config={} topic={} information",
config, msg.payload
);
let consumer = get_kafka_consumer(&config);
let count_msgs =
std::env::var("KAFKA_METADATA_COUNT_MSG_OFFSETS")
.unwrap_or_else(|_| "true".to_string())
== *"true";
get_kafka_metadata(
&config,
consumer,
count_msgs,
Some(&msg.payload),
);
}
break;
} else {
error!(
"{log_label} - \
unsupported KafkaPublishMessageType={:?}",
msg.msg_type
);
break;
}
}
if should_shutdown {
let num_left = work_vec.len();
if num_left == 0 {
trace!("{log_label} - work vec empty={num_left}");
} else {
error!("{log_label} - work vec NOT empty={num_left}");
}
break;
}
work_vec.clear();
}
}
info!("{log_label} - done exiting thread");
}