extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use std::collections::HashMap;
use kafka_threadpool::api::add_messages_to_locked_work_vec::add_messages_to_locked_work_vec;
use kafka_threadpool::api::build_kafka_publish_message::build_kafka_publish_message;
use kafka_threadpool::api::kafka_publish_message::KafkaPublishMessage;
use kafka_threadpool::api::kafka_publish_message_type::KafkaPublishMessageType;
use kafka_threadpool::start_threadpool::start_threadpool;
#[tokio::main]
async fn main() {
pretty_env_logger::init_timed();
let test_case = "start_threadpool";
let kafka_publisher = start_threadpool(Some(test_case)).await;
if !kafka_publisher.config.is_enabled {
info!(
"kafka threadpool disabled - \
please check the environment variable KAFKA_ENABLED \
is set to '1' or 'true' and retry"
);
return;
}
info!(
"{test_case} \
config={}",
kafka_publisher.config
);
info!("creating messages");
let mut new_msgs: Vec<KafkaPublishMessage> = vec![];
let num_to_send = 100;
for i in 0..num_to_send {
let mut headers: HashMap<String, String> = HashMap::new();
let payload = format!("test message {i}");
headers.insert(format!("header {i}"), format!("value {i}"));
new_msgs.push(build_kafka_publish_message(
KafkaPublishMessageType::Data,
"testing",
"testing",
Some(headers),
&payload,
));
}
let num_to_publish = new_msgs.len();
info!(
"adding {num_to_publish} msgs to the \
lockable work vec: KafkaPublishMessage.publish_msgs"
);
match add_messages_to_locked_work_vec(
&kafka_publisher.publish_msgs,
new_msgs,
) {
Ok(num_msgs_in_vec) => {
info!(
"added {num_to_publish} msgs with \
total in vec={num_msgs_in_vec}"
);
}
Err(e) => {
error!(
"failed to add {} msgs to \
locked vec with err={e}",
num_to_publish
);
}
}
info!("waiting 3s to send shutdown");
std::thread::sleep(std::time::Duration::from_millis(3000));
match kafka_publisher.shutdown().await {
Ok(msg) => trace!("{msg}"),
Err(err_msg) => {
error!("publisher shutdown failed with err='{err_msg}'")
}
}
}