1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//! Start the threadpool and return a
//! [`KafkaPublisher`](crate::kafka_publisher::KafkaPublisher)
//!
use log::info;
use log::trace;

use crate::api::build_kafka_client_config::build_kafka_client_config;
use crate::kafka_publisher::KafkaPublisher;
use crate::pool::start_threads_from_config::start_threads_from_config;

/// start_threadpool
///
/// # Arguments
///
/// * `label` - optional tracking log label
/// (``ktp`` is the default if not set)
///
/// # Examples
///
/// ```rust
/// use kafka_threadpool::start_threadpool::start_threadpool;
/// let log_label = "ktp";
/// let kafka_publisher = start_threadpool(log_label).await;
/// ```
///
pub async fn start_threadpool(label: Option<&str>) -> KafkaPublisher {
    trace!("start_threadpool - building config");
    let ll =
        std::env::var("KAFKA_LOG_LABEL").unwrap_or_else(|_| "ktp".to_string());
    let use_label: &str = match label {
        Some(in_label) => in_label,
        None => &ll,
    };
    let config = build_kafka_client_config(use_label);
    if config.is_enabled {
        trace!("start_threadpool - starting threads");
        match start_threads_from_config(config).await {
            Ok(kafka_publisher) => {
                info!(
                    "{use_label} - started {} kafka publish threads",
                    kafka_publisher.config.num_threads
                );
                kafka_publisher
            }
            Err(e) => {
                // rebuild the config after using (move-ing) it
                let err_config = build_kafka_client_config(use_label);
                panic!(
                    "{use_label} \
                    failed to kafka publish threads with start_threads_from_config \
                    config={} err={e} - stopping",
                    err_config);
            }
        }
    } else {
        info!(
            "kafka threadpool disabled KAFKA_ENABLED={}",
            config.is_enabled
        );
        KafkaPublisher::new()
    }
}