kafka_threadpool/
start_threadpool.rs1use log::info;
5use log::trace;
6
7use crate::api::build_kafka_client_config::build_kafka_client_config;
8use crate::kafka_publisher::KafkaPublisher;
9use crate::pool::start_threads_from_config::start_threads_from_config;
10
11pub async fn start_threadpool(label: Option<&str>) -> KafkaPublisher {
27 trace!("start_threadpool - building config");
28 let ll =
29 std::env::var("KAFKA_LOG_LABEL").unwrap_or_else(|_| "ktp".to_string());
30 let use_label: &str = match label {
31 Some(in_label) => in_label,
32 None => &ll,
33 };
34 let config = build_kafka_client_config(use_label);
35 if config.is_enabled {
36 trace!("start_threadpool - starting threads");
37 match start_threads_from_config(config).await {
38 Ok(kafka_publisher) => {
39 info!(
40 "{use_label} - started {} kafka publish threads",
41 kafka_publisher.config.num_threads
42 );
43 kafka_publisher
44 }
45 Err(e) => {
46 let err_config = build_kafka_client_config(use_label);
48 panic!(
49 "{use_label} \
50 failed to kafka publish threads with start_threads_from_config \
51 config={} err={e} - stopping",
52 err_config);
53 }
54 }
55 } else {
56 info!(
57 "kafka threadpool disabled KAFKA_ENABLED={}",
58 config.is_enabled
59 );
60 KafkaPublisher::new()
61 }
62}