kafka_threadpool/
thread_process_messages_handler.rs

1//! Handler that each tokio-spawned thread uses to process all messages. This
2//! function is the thread context state machine.
3//!
4use std::sync::Arc;
5use std::sync::Mutex;
6
7use log::error;
8use log::info;
9use log::trace;
10
11use rdkafka::message::OwnedHeaders;
12
13use crate::api::add_messages_to_locked_work_vec::add_messages_to_locked_work_vec;
14use crate::api::drain_messages_from_locked_work_vec::drain_messages_from_locked_work_vec;
15use crate::api::get_kafka_consumer::get_kafka_consumer;
16use crate::api::get_kafka_producer::get_kafka_producer;
17use crate::api::kafka_publish_message::KafkaPublishMessage;
18use crate::api::kafka_publish_message_type::KafkaPublishMessageType;
19use crate::config::kafka_client_config::KafkaClientConfig;
20use crate::metadata::get_kafka_metadata::get_kafka_metadata;
21use crate::msg::publish_message::convert_hashmap_headers_to_ownedheaders;
22use crate::msg::publish_message::publish_message;
23
24/// thread_process_messages_handler
25///
26/// Each tokio-spawned thread calls this method
27///
28/// # Arguments
29///
30/// * `cur_thread_num` - thread counter assigned by
31/// [`start_threads_from_config`]
32/// * `config` - initialized [`KafkaClientConfig`] for this thread
33/// * `lockable_work_vec` - shared work vec of
34/// [`KafkaPublishMessage`] messages to process within a lockable
35/// [`Arc<Mutex<lockable_work_vec>>`] thread-safe object
36///
37pub async fn thread_process_messages_handler(
38    cur_thread_num: u8,
39    config: KafkaClientConfig,
40    lockable_work_vec: Arc<Mutex<Vec<KafkaPublishMessage>>>,
41) {
42    // THREAD CONTEXT - start
43    let mut work_vec: Vec<KafkaPublishMessage> = Vec::with_capacity(20);
44    let log_label = format!("{}-tid-{}", config.label, cur_thread_num + 1);
45    // connect to the kafka cluster before starting
46    if config.broker_list.is_empty() {
47        error!(
48            "{log_label} - \
49            no brokers to connect to KAFKA_BROKERS={:?} - stopping thread",
50            config.broker_list
51        );
52        return;
53    }
54    if config.broker_list[0].is_empty() {
55        error!(
56            "{log_label} - \
57            no brokers to connect to KAFKA_BROKERS={:?} - stopping thread",
58            config.broker_list
59        );
60        return;
61    }
62    if cur_thread_num == 0 {
63        info!(
64            "threadpool connecting to brokers={:?} topics={:?} \
65            tls ca={} key={} cert={} \
66            work_vec_cap={}",
67            config.broker_list,
68            config.publish_topics,
69            config.tls_ca,
70            config.tls_key,
71            config.tls_ca,
72            work_vec.capacity()
73        );
74    }
75    let producer = get_kafka_producer(&config);
76    trace!("{log_label} - start");
77    // In a loop, read data from the socket and write the data back.
78    loop {
79        let mut should_shutdown = false;
80        work_vec = drain_messages_from_locked_work_vec(&lockable_work_vec);
81        if work_vec.is_empty() {
82            trace!("{log_label} - idle");
83            std::thread::sleep(std::time::Duration::from_millis(
84                config.idle_sleep_sec,
85            ));
86            continue;
87        } else {
88            trace!("{log_label} - processing {} msgs", work_vec.len());
89            // publish the messages with a retry timer
90            while !work_vec.is_empty() {
91                let msg = work_vec.remove(0);
92                if msg.msg_type == KafkaPublishMessageType::Shutdown {
93                    should_shutdown = true;
94                    // requeue shutdown message for other threads
95                    let requeue_vec: Vec<KafkaPublishMessage> =
96                        vec![msg.clone()];
97                    match add_messages_to_locked_work_vec(
98                        &lockable_work_vec,
99                        requeue_vec,
100                    ) {
101                        Ok(num_msgs_in_vec) => {
102                            trace!(
103                                "{log_label} - requeue shutdown message \
104                                success with total in vec={num_msgs_in_vec}"
105                            );
106                        }
107                        Err(e) => {
108                            error!(
109                                "{log_label} - failed to requeue shutdown \
110                                message into vec with err={e}"
111                            );
112                        }
113                    }
114                    // success ends the retry loop
115                    break;
116                } else if msg.msg_type == KafkaPublishMessageType::Data {
117                    let payload_sub = msg.payload[..10].to_string();
118                    trace!(
119                        "{log_label} pub \
120                        topic={} data='{}'",
121                        msg.topic,
122                        payload_sub
123                    );
124                    let topic = msg.topic.clone();
125                    let mut owned_headers: OwnedHeaders = OwnedHeaders::new();
126                    if msg.headers.is_some() {
127                        owned_headers = convert_hashmap_headers_to_ownedheaders(
128                            msg.headers.clone().unwrap(),
129                            owned_headers,
130                        );
131                    }
132                    // success ends the retry loop
133                    loop {
134                        let delivery_status =
135                            publish_message(&producer, &msg, &owned_headers)
136                                .await;
137                        if delivery_status != -1 {
138                            trace!("published message topic={topic}");
139                            break;
140                        } else {
141                            error!(
142                                "failed to publish delivery status={} \
143                                retrying msg={:?}",
144                                delivery_status, msg
145                            );
146                            std::thread::sleep(
147                                std::time::Duration::from_millis(
148                                    config.retry_sleep_sec,
149                                ),
150                            );
151                        }
152                    }
153                } else if msg.msg_type
154                    == KafkaPublishMessageType::LogBrokerDetails
155                {
156                    info!(
157                        "{log_label} - get all broker config={} information",
158                        config
159                    );
160                    {
161                        let consumer = get_kafka_consumer(&config);
162                        let count_msgs =
163                            std::env::var("KAFKA_METADATA_COUNT_MSG_OFFSETS")
164                                .unwrap_or_else(|_| "true".to_string())
165                                == *"true";
166                        get_kafka_metadata(&config, consumer, count_msgs, None);
167                    }
168                    break;
169                } else if msg.msg_type
170                    == KafkaPublishMessageType::LogBrokerTopicDetails
171                {
172                    if msg.payload.is_empty() {
173                        error!(
174                            "{log_label} - \
175                            unable to get broker config={} \
176                            missing topic={} in msg.payload",
177                            config, msg.payload
178                        );
179                    } else {
180                        info!(
181                            "{log_label} - \
182                            get broker config={} topic={} information",
183                            config, msg.payload
184                        );
185                        let consumer = get_kafka_consumer(&config);
186                        let count_msgs =
187                            std::env::var("KAFKA_METADATA_COUNT_MSG_OFFSETS")
188                                .unwrap_or_else(|_| "true".to_string())
189                                == *"true";
190                        get_kafka_metadata(
191                            &config,
192                            consumer,
193                            count_msgs,
194                            Some(&msg.payload),
195                        );
196                    }
197                    break;
198                } else {
199                    error!(
200                        "{log_label} - \
201                        unsupported KafkaPublishMessageType={:?}",
202                        msg.msg_type
203                    );
204                    break;
205                }
206            }
207            // after processing everything in the vec - break the main thread loop if shutting down
208            if should_shutdown {
209                let num_left = work_vec.len();
210                if num_left == 0 {
211                    trace!("{log_label} - work vec empty={num_left}");
212                } else {
213                    error!("{log_label} - work vec NOT empty={num_left}");
214                }
215                break;
216            }
217            // if everything published, clear the temp drained vec
218            work_vec.clear();
219        }
220    }
221    info!("{log_label} - done exiting thread");
222    // THREAD CONTEXT - end
223}