start_threadpool

Function start_threadpool 

Source
pub async fn start_threadpool(label: Option<&str>) -> KafkaPublisher
Expand description

start_threadpool

§Arguments

  • label - optional tracking log label (ktp is the default if not set)

§Examples

use kafka_threadpool::start_threadpool::start_threadpool;
let log_label = "ktp";
let kafka_publisher = start_threadpool(log_label).await;
Examples found in repository?
examples/get-all-metadata.rs (line 21)
18async fn main() {
19    pretty_env_logger::init_timed();
20    let label = "get-all-metadata";
21    let kafka_publisher = start_threadpool(Some(label)).await;
22
23    info!(
24        "{label} \
25        config={} \
26        getting all metadata",
27        kafka_publisher.config
28    );
29
30    kafka_publisher.get_metadata(true, None).await;
31
32    info!("shutting down");
33    // send shutdown message to all worker threads in the pool
34    match kafka_publisher.shutdown().await {
35        Ok(msg) => trace!("{msg}"),
36        Err(err_msg) => {
37            error!("publisher shutdown failed with err='{err_msg}'")
38        }
39    }
40}
More examples
Hide additional examples
examples/get-metadata-for-topic.rs (line 23)
20async fn main() {
21    pretty_env_logger::init_timed();
22    let label = "get-metadata-for-topic";
23    let kafka_publisher = start_threadpool(Some(label)).await;
24
25    let topic = std::env::var("KAFKA_TOPIC").unwrap_or_else(|_| "".to_string());
26    if topic.is_empty() {
27        error!("please set a topic with: export KAFKA_TOPIC=TOPIC_NAME")
28    } else {
29        info!(
30            "{label} \
31            config={} \
32            getting all metadata",
33            kafka_publisher.config
34        );
35
36        kafka_publisher.get_metadata(true, Some(&topic)).await;
37    }
38
39    info!("shutting down");
40    // send shutdown message to all worker threads in the pool
41    match kafka_publisher.shutdown().await {
42        Ok(msg) => trace!("{msg}"),
43        Err(err_msg) => {
44            error!("publisher shutdown failed with err='{err_msg}'")
45        }
46    }
47}
examples/start-threadpool.rs (line 47)
44async fn main() {
45    pretty_env_logger::init_timed();
46    let test_case = "start_threadpool";
47    let kafka_publisher = start_threadpool(Some(test_case)).await;
48
49    if !kafka_publisher.config.is_enabled {
50        info!(
51            "kafka threadpool disabled - \
52            please check the environment variable KAFKA_ENABLED \
53            is set to '1' or 'true' and retry"
54        );
55        return;
56    }
57
58    info!(
59        "{test_case} \
60        config={}",
61        kafka_publisher.config
62    );
63
64    info!("creating messages");
65    let mut new_msgs: Vec<KafkaPublishMessage> = vec![];
66    let num_to_send = 100;
67    for i in 0..num_to_send {
68        let mut headers: HashMap<String, String> = HashMap::new();
69        let payload = format!("test message {i}");
70        headers.insert(format!("header {i}"), format!("value {i}"));
71        new_msgs.push(build_kafka_publish_message(
72            KafkaPublishMessageType::Data,
73            "testing",
74            "testing",
75            Some(headers),
76            &payload,
77        ));
78    }
79
80    let num_to_publish = new_msgs.len();
81    info!(
82        "adding {num_to_publish} msgs to the \
83        lockable work vec: KafkaPublishMessage.publish_msgs"
84    );
85    match add_messages_to_locked_work_vec(
86        &kafka_publisher.publish_msgs,
87        new_msgs,
88    ) {
89        Ok(num_msgs_in_vec) => {
90            info!(
91                "added {num_to_publish} msgs with \
92                total in vec={num_msgs_in_vec}"
93            );
94        }
95        Err(e) => {
96            error!(
97                "failed to add {} msgs to \
98                locked vec with err={e}",
99                num_to_publish
100            );
101        }
102    }
103
104    // waits until threads exit or are shutdown
105    info!("waiting 3s to send shutdown");
106    std::thread::sleep(std::time::Duration::from_millis(3000));
107    // send shutdown message to all worker threads in the pool
108    match kafka_publisher.shutdown().await {
109        Ok(msg) => trace!("{msg}"),
110        Err(err_msg) => {
111            error!("publisher shutdown failed with err='{err_msg}'")
112        }
113    }
114}