start_threadpool/
start-threadpool.rs

1//! # Build the release version
2//!
3//! ```bash
4//! cargo build --release --example start-threadpool && export RUST_BACKTRACE=1 && export RUST_LOG=info,kafka_threadpool=info && ./target/release/examples/start-threadpool
5//! ```
6//!
7//! # Build the debug version
8//!
9//! ```bash
10//! cargo build --example start-threadpool && export RUST_BACKTRACE=1 && export RUST_LOG=info,kafka_threadpool=info && ./target/debug/examples/start-threadpool
11//! ```
12//!
13extern crate pretty_env_logger;
14#[macro_use]
15extern crate log;
16
17use std::collections::HashMap;
18
19use kafka_threadpool::api::add_messages_to_locked_work_vec::add_messages_to_locked_work_vec;
20use kafka_threadpool::api::build_kafka_publish_message::build_kafka_publish_message;
21use kafka_threadpool::api::kafka_publish_message::KafkaPublishMessage;
22use kafka_threadpool::api::kafka_publish_message_type::KafkaPublishMessageType;
23use kafka_threadpool::start_threadpool::start_threadpool;
24
25/// main
26///
27/// Supported env vars:
28///
29/// | Environment Variable Name        | Purpose / Value                                |
30/// | -------------------------------- | ---------------------------------------------- |
31/// | KAFKA_ENABLED                    | toggle the kafka_threadpool on with: ``true`` or ``1`` anything else disables the threadpool |
32/// | KAFKA_LOG_LABEL                  | tracking label that shows up in all crate logs |
33/// | KAFKA_BROKERS                    | comma-delimited list of brokers (``host1:port,host2:port,host3:port``) |
34/// | KAFKA_TOPICS                     | comma-delimited list of supported topics |
35/// | KAFKA_PUBLISH_RETRY_INTERVAL_SEC | number of seconds to sleep before each publish retry |
36/// | KAFKA_PUBLISH_IDLE_INTERVAL_SEC  | number of seconds to sleep if there are no message to process |
37/// | KAFKA_NUM_THREADS                | number of threads for the threadpool |
38/// | KAFKA_TLS_CLIENT_KEY             | optional - path to the kafka mTLS key |
39/// | KAFKA_TLS_CLIENT_CERT            | optional - path to the kafka mTLS certificate |
40/// | KAFKA_TLS_CLIENT_CA              | optional - path to the kafka mTLS certificate authority (CA) |
41/// | KAFKA_METADATA_COUNT_MSG_OFFSETS | optional - set to anything but ``true`` to bypass counting the offsets |
42///
43#[tokio::main]
44async fn main() {
45    pretty_env_logger::init_timed();
46    let test_case = "start_threadpool";
47    let kafka_publisher = start_threadpool(Some(test_case)).await;
48
49    if !kafka_publisher.config.is_enabled {
50        info!(
51            "kafka threadpool disabled - \
52            please check the environment variable KAFKA_ENABLED \
53            is set to '1' or 'true' and retry"
54        );
55        return;
56    }
57
58    info!(
59        "{test_case} \
60        config={}",
61        kafka_publisher.config
62    );
63
64    info!("creating messages");
65    let mut new_msgs: Vec<KafkaPublishMessage> = vec![];
66    let num_to_send = 100;
67    for i in 0..num_to_send {
68        let mut headers: HashMap<String, String> = HashMap::new();
69        let payload = format!("test message {i}");
70        headers.insert(format!("header {i}"), format!("value {i}"));
71        new_msgs.push(build_kafka_publish_message(
72            KafkaPublishMessageType::Data,
73            "testing",
74            "testing",
75            Some(headers),
76            &payload,
77        ));
78    }
79
80    let num_to_publish = new_msgs.len();
81    info!(
82        "adding {num_to_publish} msgs to the \
83        lockable work vec: KafkaPublishMessage.publish_msgs"
84    );
85    match add_messages_to_locked_work_vec(
86        &kafka_publisher.publish_msgs,
87        new_msgs,
88    ) {
89        Ok(num_msgs_in_vec) => {
90            info!(
91                "added {num_to_publish} msgs with \
92                total in vec={num_msgs_in_vec}"
93            );
94        }
95        Err(e) => {
96            error!(
97                "failed to add {} msgs to \
98                locked vec with err={e}",
99                num_to_publish
100            );
101        }
102    }
103
104    // waits until threads exit or are shutdown
105    info!("waiting 3s to send shutdown");
106    std::thread::sleep(std::time::Duration::from_millis(3000));
107    // send shutdown message to all worker threads in the pool
108    match kafka_publisher.shutdown().await {
109        Ok(msg) => trace!("{msg}"),
110        Err(err_msg) => {
111            error!("publisher shutdown failed with err='{err_msg}'")
112        }
113    }
114}