1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//! start the threadpool with a logging label
//!
use log::info;
use log::trace;

use crate::api::build_kafka_client_config::build_kafka_client_config;
use crate::kafka_publisher::KafkaPublisher;
use crate::pool::start_threads_from_config::start_threads_from_config;

/// start_threadpool
///
/// # Arguments
///
/// * `label` - optional tracking log label
/// (``ktp`` is the default if not set)
///
/// # Examples
///
/// ```rust
/// use kafka_threadpool::start_threadpool::start_threadpool;
/// let log_label = "ktp";
/// let kafka_publisher = start_threadpool(log_label);
/// ```
///
pub async fn start_threadpool(label: Option<&str>) -> KafkaPublisher {
    trace!("start_threadpool - building config");
    let ll =
        std::env::var("KAFKA_LOG_LABEL").unwrap_or_else(|_| "ktp".to_string());
    let use_label: &str = match label {
        Some(in_label) => in_label,
        None => &ll,
    };
    let config = build_kafka_client_config(use_label);
    trace!("start_threadpool - starting threads");
    match start_threads_from_config(config).await {
        Ok(kafka_publisher) => {
            info!(
                "{use_label} - started {} kafka publish threads",
                kafka_publisher.config.num_threads
            );
            kafka_publisher
        }
        Err(e) => {
            // rebuild the config after using (move-ing) it
            let err_config = build_kafka_client_config(use_label);
            panic!(
                "{use_label} failed to kafka publish threads with start_threads_from_config \
                config={} err={e} - stopping",
                err_config);
        }
    }
}