kafka_threadpool/
thread_process_messages_handler.rs1use std::sync::Arc;
5use std::sync::Mutex;
6
7use log::error;
8use log::info;
9use log::trace;
10
11use rdkafka::message::OwnedHeaders;
12
13use crate::api::add_messages_to_locked_work_vec::add_messages_to_locked_work_vec;
14use crate::api::drain_messages_from_locked_work_vec::drain_messages_from_locked_work_vec;
15use crate::api::get_kafka_consumer::get_kafka_consumer;
16use crate::api::get_kafka_producer::get_kafka_producer;
17use crate::api::kafka_publish_message::KafkaPublishMessage;
18use crate::api::kafka_publish_message_type::KafkaPublishMessageType;
19use crate::config::kafka_client_config::KafkaClientConfig;
20use crate::metadata::get_kafka_metadata::get_kafka_metadata;
21use crate::msg::publish_message::convert_hashmap_headers_to_ownedheaders;
22use crate::msg::publish_message::publish_message;
23
24pub async fn thread_process_messages_handler(
38 cur_thread_num: u8,
39 config: KafkaClientConfig,
40 lockable_work_vec: Arc<Mutex<Vec<KafkaPublishMessage>>>,
41) {
42 let mut work_vec: Vec<KafkaPublishMessage> = Vec::with_capacity(20);
44 let log_label = format!("{}-tid-{}", config.label, cur_thread_num + 1);
45 if config.broker_list.is_empty() {
47 error!(
48 "{log_label} - \
49 no brokers to connect to KAFKA_BROKERS={:?} - stopping thread",
50 config.broker_list
51 );
52 return;
53 }
54 if config.broker_list[0].is_empty() {
55 error!(
56 "{log_label} - \
57 no brokers to connect to KAFKA_BROKERS={:?} - stopping thread",
58 config.broker_list
59 );
60 return;
61 }
62 if cur_thread_num == 0 {
63 info!(
64 "threadpool connecting to brokers={:?} topics={:?} \
65 tls ca={} key={} cert={} \
66 work_vec_cap={}",
67 config.broker_list,
68 config.publish_topics,
69 config.tls_ca,
70 config.tls_key,
71 config.tls_ca,
72 work_vec.capacity()
73 );
74 }
75 let producer = get_kafka_producer(&config);
76 trace!("{log_label} - start");
77 loop {
79 let mut should_shutdown = false;
80 work_vec = drain_messages_from_locked_work_vec(&lockable_work_vec);
81 if work_vec.is_empty() {
82 trace!("{log_label} - idle");
83 std::thread::sleep(std::time::Duration::from_millis(
84 config.idle_sleep_sec,
85 ));
86 continue;
87 } else {
88 trace!("{log_label} - processing {} msgs", work_vec.len());
89 while !work_vec.is_empty() {
91 let msg = work_vec.remove(0);
92 if msg.msg_type == KafkaPublishMessageType::Shutdown {
93 should_shutdown = true;
94 let requeue_vec: Vec<KafkaPublishMessage> =
96 vec![msg.clone()];
97 match add_messages_to_locked_work_vec(
98 &lockable_work_vec,
99 requeue_vec,
100 ) {
101 Ok(num_msgs_in_vec) => {
102 trace!(
103 "{log_label} - requeue shutdown message \
104 success with total in vec={num_msgs_in_vec}"
105 );
106 }
107 Err(e) => {
108 error!(
109 "{log_label} - failed to requeue shutdown \
110 message into vec with err={e}"
111 );
112 }
113 }
114 break;
116 } else if msg.msg_type == KafkaPublishMessageType::Data {
117 let payload_sub = msg.payload[..10].to_string();
118 trace!(
119 "{log_label} pub \
120 topic={} data='{}'",
121 msg.topic,
122 payload_sub
123 );
124 let topic = msg.topic.clone();
125 let mut owned_headers: OwnedHeaders = OwnedHeaders::new();
126 if msg.headers.is_some() {
127 owned_headers = convert_hashmap_headers_to_ownedheaders(
128 msg.headers.clone().unwrap(),
129 owned_headers,
130 );
131 }
132 loop {
134 let delivery_status =
135 publish_message(&producer, &msg, &owned_headers)
136 .await;
137 if delivery_status != -1 {
138 trace!("published message topic={topic}");
139 break;
140 } else {
141 error!(
142 "failed to publish delivery status={} \
143 retrying msg={:?}",
144 delivery_status, msg
145 );
146 std::thread::sleep(
147 std::time::Duration::from_millis(
148 config.retry_sleep_sec,
149 ),
150 );
151 }
152 }
153 } else if msg.msg_type
154 == KafkaPublishMessageType::LogBrokerDetails
155 {
156 info!(
157 "{log_label} - get all broker config={} information",
158 config
159 );
160 {
161 let consumer = get_kafka_consumer(&config);
162 let count_msgs =
163 std::env::var("KAFKA_METADATA_COUNT_MSG_OFFSETS")
164 .unwrap_or_else(|_| "true".to_string())
165 == *"true";
166 get_kafka_metadata(&config, consumer, count_msgs, None);
167 }
168 break;
169 } else if msg.msg_type
170 == KafkaPublishMessageType::LogBrokerTopicDetails
171 {
172 if msg.payload.is_empty() {
173 error!(
174 "{log_label} - \
175 unable to get broker config={} \
176 missing topic={} in msg.payload",
177 config, msg.payload
178 );
179 } else {
180 info!(
181 "{log_label} - \
182 get broker config={} topic={} information",
183 config, msg.payload
184 );
185 let consumer = get_kafka_consumer(&config);
186 let count_msgs =
187 std::env::var("KAFKA_METADATA_COUNT_MSG_OFFSETS")
188 .unwrap_or_else(|_| "true".to_string())
189 == *"true";
190 get_kafka_metadata(
191 &config,
192 consumer,
193 count_msgs,
194 Some(&msg.payload),
195 );
196 }
197 break;
198 } else {
199 error!(
200 "{log_label} - \
201 unsupported KafkaPublishMessageType={:?}",
202 msg.msg_type
203 );
204 break;
205 }
206 }
207 if should_shutdown {
209 let num_left = work_vec.len();
210 if num_left == 0 {
211 trace!("{log_label} - work vec empty={num_left}");
212 } else {
213 error!("{log_label} - work vec NOT empty={num_left}");
214 }
215 break;
216 }
217 work_vec.clear();
219 }
220 }
221 info!("{log_label} - done exiting thread");
222 }