use std::sync::Arc;
use std::sync::Mutex;
use log::info;
use crate::config::kafka_client_config::KafkaClientConfig;
use crate::kafka_publisher::KafkaPublisher;
use crate::thread_process_messages_handler::thread_process_messages_handler;
pub async fn start_threads_from_config(
config: KafkaClientConfig,
) -> Result<KafkaPublisher, String> {
if !config.is_enabled {
info!("{} - kafka-threadpool disabled", config.label);
return Ok(KafkaPublisher::new());
}
info!("{} - starting threads={}", config.label, config.num_threads);
let new_publisher = KafkaPublisher {
config: config.clone(),
publish_msgs: Arc::new(Mutex::new(Vec::new())),
};
for cur_thread_num in 0..new_publisher.config.num_threads {
info!("{} - creating thread={cur_thread_num}", config.label);
let cloned_config = new_publisher.config.clone();
let cloned_publishable_work_vec = new_publisher.publish_msgs.clone();
tokio::spawn(async move {
thread_process_messages_handler(
cur_thread_num,
cloned_config,
cloned_publishable_work_vec,
)
.await;
});
}
Ok(new_publisher)
}