start_threadpool/start-threadpool.rs
1//! # Build the release version
2//!
3//! ```bash
4//! cargo build --release --example start-threadpool && export RUST_BACKTRACE=1 && export RUST_LOG=info,kafka_threadpool=info && ./target/release/examples/start-threadpool
5//! ```
6//!
7//! # Build the debug version
8//!
9//! ```bash
10//! cargo build --example start-threadpool && export RUST_BACKTRACE=1 && export RUST_LOG=info,kafka_threadpool=info && ./target/debug/examples/start-threadpool
11//! ```
12//!
13extern crate pretty_env_logger;
14#[macro_use]
15extern crate log;
16
17use std::collections::HashMap;
18
19use kafka_threadpool::api::add_messages_to_locked_work_vec::add_messages_to_locked_work_vec;
20use kafka_threadpool::api::build_kafka_publish_message::build_kafka_publish_message;
21use kafka_threadpool::api::kafka_publish_message::KafkaPublishMessage;
22use kafka_threadpool::api::kafka_publish_message_type::KafkaPublishMessageType;
23use kafka_threadpool::start_threadpool::start_threadpool;
24
25/// main
26///
27/// Supported env vars:
28///
29/// | Environment Variable Name | Purpose / Value |
30/// | -------------------------------- | ---------------------------------------------- |
31/// | KAFKA_ENABLED | toggle the kafka_threadpool on with: ``true`` or ``1`` anything else disables the threadpool |
32/// | KAFKA_LOG_LABEL | tracking label that shows up in all crate logs |
33/// | KAFKA_BROKERS | comma-delimited list of brokers (``host1:port,host2:port,host3:port``) |
34/// | KAFKA_TOPICS | comma-delimited list of supported topics |
35/// | KAFKA_PUBLISH_RETRY_INTERVAL_SEC | number of seconds to sleep before each publish retry |
36/// | KAFKA_PUBLISH_IDLE_INTERVAL_SEC | number of seconds to sleep if there are no message to process |
37/// | KAFKA_NUM_THREADS | number of threads for the threadpool |
38/// | KAFKA_TLS_CLIENT_KEY | optional - path to the kafka mTLS key |
39/// | KAFKA_TLS_CLIENT_CERT | optional - path to the kafka mTLS certificate |
40/// | KAFKA_TLS_CLIENT_CA | optional - path to the kafka mTLS certificate authority (CA) |
41/// | KAFKA_METADATA_COUNT_MSG_OFFSETS | optional - set to anything but ``true`` to bypass counting the offsets |
42///
43#[tokio::main]
44async fn main() {
45 pretty_env_logger::init_timed();
46 let test_case = "start_threadpool";
47 let kafka_publisher = start_threadpool(Some(test_case)).await;
48
49 if !kafka_publisher.config.is_enabled {
50 info!(
51 "kafka threadpool disabled - \
52 please check the environment variable KAFKA_ENABLED \
53 is set to '1' or 'true' and retry"
54 );
55 return;
56 }
57
58 info!(
59 "{test_case} \
60 config={}",
61 kafka_publisher.config
62 );
63
64 info!("creating messages");
65 let mut new_msgs: Vec<KafkaPublishMessage> = vec![];
66 let num_to_send = 100;
67 for i in 0..num_to_send {
68 let mut headers: HashMap<String, String> = HashMap::new();
69 let payload = format!("test message {i}");
70 headers.insert(format!("header {i}"), format!("value {i}"));
71 new_msgs.push(build_kafka_publish_message(
72 KafkaPublishMessageType::Data,
73 "testing",
74 "testing",
75 Some(headers),
76 &payload,
77 ));
78 }
79
80 let num_to_publish = new_msgs.len();
81 info!(
82 "adding {num_to_publish} msgs to the \
83 lockable work vec: KafkaPublishMessage.publish_msgs"
84 );
85 match add_messages_to_locked_work_vec(
86 &kafka_publisher.publish_msgs,
87 new_msgs,
88 ) {
89 Ok(num_msgs_in_vec) => {
90 info!(
91 "added {num_to_publish} msgs with \
92 total in vec={num_msgs_in_vec}"
93 );
94 }
95 Err(e) => {
96 error!(
97 "failed to add {} msgs to \
98 locked vec with err={e}",
99 num_to_publish
100 );
101 }
102 }
103
104 // waits until threads exit or are shutdown
105 info!("waiting 3s to send shutdown");
106 std::thread::sleep(std::time::Duration::from_millis(3000));
107 // send shutdown message to all worker threads in the pool
108 match kafka_publisher.shutdown().await {
109 Ok(msg) => trace!("{msg}"),
110 Err(err_msg) => {
111 error!("publisher shutdown failed with err='{err_msg}'")
112 }
113 }
114}