pub async fn start_threadpool(label: Option<&str>) -> KafkaPublisherExpand description
Examples found in repository?
examples/get-all-metadata.rs (line 21)
18async fn main() {
19 pretty_env_logger::init_timed();
20 let label = "get-all-metadata";
21 let kafka_publisher = start_threadpool(Some(label)).await;
22
23 info!(
24 "{label} \
25 config={} \
26 getting all metadata",
27 kafka_publisher.config
28 );
29
30 kafka_publisher.get_metadata(true, None).await;
31
32 info!("shutting down");
33 // send shutdown message to all worker threads in the pool
34 match kafka_publisher.shutdown().await {
35 Ok(msg) => trace!("{msg}"),
36 Err(err_msg) => {
37 error!("publisher shutdown failed with err='{err_msg}'")
38 }
39 }
40}More examples
examples/get-metadata-for-topic.rs (line 23)
20async fn main() {
21 pretty_env_logger::init_timed();
22 let label = "get-metadata-for-topic";
23 let kafka_publisher = start_threadpool(Some(label)).await;
24
25 let topic = std::env::var("KAFKA_TOPIC").unwrap_or_else(|_| "".to_string());
26 if topic.is_empty() {
27 error!("please set a topic with: export KAFKA_TOPIC=TOPIC_NAME")
28 } else {
29 info!(
30 "{label} \
31 config={} \
32 getting all metadata",
33 kafka_publisher.config
34 );
35
36 kafka_publisher.get_metadata(true, Some(&topic)).await;
37 }
38
39 info!("shutting down");
40 // send shutdown message to all worker threads in the pool
41 match kafka_publisher.shutdown().await {
42 Ok(msg) => trace!("{msg}"),
43 Err(err_msg) => {
44 error!("publisher shutdown failed with err='{err_msg}'")
45 }
46 }
47}examples/start-threadpool.rs (line 47)
44async fn main() {
45 pretty_env_logger::init_timed();
46 let test_case = "start_threadpool";
47 let kafka_publisher = start_threadpool(Some(test_case)).await;
48
49 if !kafka_publisher.config.is_enabled {
50 info!(
51 "kafka threadpool disabled - \
52 please check the environment variable KAFKA_ENABLED \
53 is set to '1' or 'true' and retry"
54 );
55 return;
56 }
57
58 info!(
59 "{test_case} \
60 config={}",
61 kafka_publisher.config
62 );
63
64 info!("creating messages");
65 let mut new_msgs: Vec<KafkaPublishMessage> = vec![];
66 let num_to_send = 100;
67 for i in 0..num_to_send {
68 let mut headers: HashMap<String, String> = HashMap::new();
69 let payload = format!("test message {i}");
70 headers.insert(format!("header {i}"), format!("value {i}"));
71 new_msgs.push(build_kafka_publish_message(
72 KafkaPublishMessageType::Data,
73 "testing",
74 "testing",
75 Some(headers),
76 &payload,
77 ));
78 }
79
80 let num_to_publish = new_msgs.len();
81 info!(
82 "adding {num_to_publish} msgs to the \
83 lockable work vec: KafkaPublishMessage.publish_msgs"
84 );
85 match add_messages_to_locked_work_vec(
86 &kafka_publisher.publish_msgs,
87 new_msgs,
88 ) {
89 Ok(num_msgs_in_vec) => {
90 info!(
91 "added {num_to_publish} msgs with \
92 total in vec={num_msgs_in_vec}"
93 );
94 }
95 Err(e) => {
96 error!(
97 "failed to add {} msgs to \
98 locked vec with err={e}",
99 num_to_publish
100 );
101 }
102 }
103
104 // waits until threads exit or are shutdown
105 info!("waiting 3s to send shutdown");
106 std::thread::sleep(std::time::Duration::from_millis(3000));
107 // send shutdown message to all worker threads in the pool
108 match kafka_publisher.shutdown().await {
109 Ok(msg) => trace!("{msg}"),
110 Err(err_msg) => {
111 error!("publisher shutdown failed with err='{err_msg}'")
112 }
113 }
114}