kafka_threadpool/msg/
publish_message.rs

1//! Publish a [`KafkaPublishMessage`](crate::api::kafka_publish_message)
2//! to a Kafka topic
3
4use rdkafka::message::OwnedHeaders;
5use rdkafka::producer::FutureProducer;
6use rdkafka::producer::FutureRecord;
7use std::collections::HashMap;
8
9use crate::api::kafka_publish_message::KafkaPublishMessage;
10
11/// now()
12///
13/// helper for setting a message timestamp
14///
15fn now() -> i64 {
16    std::time::SystemTime::now()
17        .duration_since(std::time::UNIX_EPOCH)
18        .unwrap()
19        .as_millis()
20        .try_into()
21        .unwrap()
22}
23
24/// convert_hashmap_headers_to_ownedheaders
25///
26/// Internal method to serialize the public
27/// [`KafkaPublishMessage.headers`](crate::api::kafka_publish_message::KafkaPublishMessage)
28/// into an [`rdkafka::message::OwnedHeaders`](rdkafka::message::OwnedHeaders) before
29/// publishing
30///
31/// Created after finding this stack overflow:
32/// https://stackoverflow.com/questions/63015654/borrow-and-use-of-moved-value
33///
34/// # Arguments
35///
36/// * `hmap` - HashMap containing key/value pair of Strings to convert
37/// * `owned_headers` - initialized and mutable
38/// [`rdkafka::message::OwnedHeaders`](rdkafka::message::OwnedHeaders)
39/// for storing headers that are compliant with ``rdkafka``
40///
41/// # Returns
42///
43/// [`rdkafka::message::OwnedHeaders`](rdkafka::message::OwnedHeaders) containing
44/// all key/value pairs from ``hmap``
45///
46pub fn convert_hashmap_headers_to_ownedheaders(
47    hmap: HashMap<String, String>,
48    mut owned_headers: OwnedHeaders,
49) -> OwnedHeaders {
50    for (k, v) in hmap.iter() {
51        owned_headers = owned_headers.add(k, &v);
52    }
53    owned_headers
54}
55
56/// publish_message
57///
58/// Worker threads publish messages to kafka using this method
59///
60/// This function publishes
61/// [`KafkaPublishMessage`](crate::api::kafka_publish_message) where the ``msg_type``
62/// (of type: [`KafkaPublishMessageType`](crate::api::kafka_publish_message_type)) is set to
63/// ``Data`` or ``Sensitive``
64///
65/// This uses the
66/// [`FutureProducer.send_result() function`][rdkafka::producer::future_producer::FutureProducer::send_result]
67/// method to publish the message and immediately return without
68/// waiting on the kafka queue.
69///
70/// # Arguments
71///
72/// * `label` - calling thread's logging label
73/// * `producer` - initialized and connected
74/// [`rdkafka::producer::FutureProducer`](rdkafka::producer::FutureProducer)
75/// for publishing messages
76/// * `msg` - initialized
77/// [`KafkaPublishMessage`](crate::api::kafka_publish_message) containing
78/// all routing, metadata and payload information for the message
79///
80pub async fn publish_message(
81    producer: &FutureProducer,
82    msg: &KafkaPublishMessage,
83    owned_headers: &OwnedHeaders,
84) -> i32 {
85    // https://docs.rs/rdkafka/latest/rdkafka/producer/future_producer/struct.FutureProducer.html#method.send_result
86    let (delivery_status, _id) = producer
87        .send_result(
88            FutureRecord::to(&msg.topic)
89                .payload(&msg.payload)
90                .key(&msg.key)
91                .headers(owned_headers.to_owned())
92                .timestamp(now()),
93        )
94        .unwrap()
95        .await
96        .unwrap()
97        .unwrap();
98    delivery_status
99}