1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//! Publish a [`KafkaPublishMessage`](crate::api::kafka_publish_message)
//! to a Kafka topic
use OwnedHeaders;
use FutureProducer;
use FutureRecord;
use HashMap;
use crateKafkaPublishMessage;
/// now()
///
/// helper for setting a message timestamp
///
/// convert_hashmap_headers_to_ownedheaders
///
/// Internal method to serialize the public
/// [`KafkaPublishMessage.headers`](crate::api::kafka_publish_message::KafkaPublishMessage)
/// into an [`rdkafka::message::OwnedHeaders`](rdkafka::message::OwnedHeaders) before
/// publishing
///
/// Created after finding this stack overflow:
/// https://stackoverflow.com/questions/63015654/borrow-and-use-of-moved-value
///
/// # Arguments
///
/// * `hmap` - HashMap containing key/value pair of Strings to convert
/// * `owned_headers` - initialized and mutable
/// [`rdkafka::message::OwnedHeaders`](rdkafka::message::OwnedHeaders)
/// for storing headers that are compliant with ``rdkafka``
///
/// # Returns
///
/// [`rdkafka::message::OwnedHeaders`](rdkafka::message::OwnedHeaders) containing
/// all key/value pairs from ``hmap``
///
/// publish_message
///
/// Worker threads publish messages to kafka using this method
///
/// This function publishes
/// [`KafkaPublishMessage`](crate::api::kafka_publish_message) where the ``msg_type``
/// (of type: [`KafkaPublishMessageType`](crate::api::kafka_publish_message_type)) is set to
/// ``Data`` or ``Sensitive``
///
/// This uses the
/// [`FutureProducer.send_result() function`][rdkafka::producer::future_producer::FutureProducer::send_result]
/// method to publish the message and immediately return without
/// waiting on the kafka queue.
///
/// # Arguments
///
/// * `label` - calling thread's logging label
/// * `producer` - initialized and connected
/// [`rdkafka::producer::FutureProducer`](rdkafka::producer::FutureProducer)
/// for publishing messages
/// * `msg` - initialized
/// [`KafkaPublishMessage`](crate::api::kafka_publish_message) containing
/// all routing, metadata and payload information for the message
///
pub async