kafka_threadpool/
start_threadpool.rs

1//! Start the threadpool and return a
2//! [`KafkaPublisher`](crate::kafka_publisher::KafkaPublisher)
3//!
4use log::info;
5use log::trace;
6
7use crate::api::build_kafka_client_config::build_kafka_client_config;
8use crate::kafka_publisher::KafkaPublisher;
9use crate::pool::start_threads_from_config::start_threads_from_config;
10
11/// start_threadpool
12///
13/// # Arguments
14///
15/// * `label` - optional tracking log label
16/// (``ktp`` is the default if not set)
17///
18/// # Examples
19///
20/// ```rust
21/// use kafka_threadpool::start_threadpool::start_threadpool;
22/// let log_label = "ktp";
23/// let kafka_publisher = start_threadpool(log_label).await;
24/// ```
25///
26pub async fn start_threadpool(label: Option<&str>) -> KafkaPublisher {
27    trace!("start_threadpool - building config");
28    let ll =
29        std::env::var("KAFKA_LOG_LABEL").unwrap_or_else(|_| "ktp".to_string());
30    let use_label: &str = match label {
31        Some(in_label) => in_label,
32        None => &ll,
33    };
34    let config = build_kafka_client_config(use_label);
35    if config.is_enabled {
36        trace!("start_threadpool - starting threads");
37        match start_threads_from_config(config).await {
38            Ok(kafka_publisher) => {
39                info!(
40                    "{use_label} - started {} kafka publish threads",
41                    kafka_publisher.config.num_threads
42                );
43                kafka_publisher
44            }
45            Err(e) => {
46                // rebuild the config after using (move-ing) it
47                let err_config = build_kafka_client_config(use_label);
48                panic!(
49                    "{use_label} \
50                    failed to kafka publish threads with start_threads_from_config \
51                    config={} err={e} - stopping",
52                    err_config);
53            }
54        }
55    } else {
56        info!(
57            "kafka threadpool disabled KAFKA_ENABLED={}",
58            config.is_enabled
59        );
60        KafkaPublisher::new()
61    }
62}