rust_with_kafka_tls/
consume_and_print.rs

1use log::info;
2use log::warn;
3use rdkafka::consumer::CommitMode;
4use rdkafka::consumer::Consumer;
5use rdkafka::message::Headers;
6use rdkafka::message::Message;
7
8use crate::custom_context::LoggingConsumer;
9
10/// consume_and_print
11///
12/// Consume messages from kafka with an initalized
13/// [`rdkafka::consumer::Consumer`](rdkafka::consumer::Consumer)
14/// using client tls assets based off environment variables.
15///
16/// # Optional - Set TLS Asset Paths
17///
18/// You can either use the included tls assets within the github repo
19/// [rust-with-strimzi-kafka-and-tls/kubernetes/tls](https://github.com/jay-johnson/rust-with-strimzi-kafka-and-tls/tree/main/kubernetes/tls)
20/// directory or export the environment variables:
21///
22/// - ``KAFKA_TLS_CLIENT_CA`` - path to the Certificate Authority file
23/// - ``KAFKA_TLS_CLIENT_KEY`` - path to the server key file
24/// - ``KAFKA_TLS_CLIENT_CERT`` - path to the server certificate file
25///
26/// # Set Broker Addresses
27///
28/// Export this environment variable to the correct broker fqdns and ports:
29///
30/// - ``KAFKA_BROKERS`` - comma delimited list of kafka brokers
31///   (format: ``fqdn1:port,fqdn2:port,fqdn3:port``)
32///
33/// # Arguments
34///
35/// * `consumer` - initialized
36/// [`rdkafka::consumer::Consumer`](rdkafka::consumer::Consumer)
37/// that is already subscribed to a list of ``topics`` with a ``group_id``
38///
39/// # Examples
40///
41/// ```bash
42/// # export KAFKA_TLS_CLIENT_CA=./kubernetes/tls/ca.pem
43/// # export KAFKA_TLS_CLIENT_KEY=./kubernetes/tls/client.pem
44/// # export KAFKA_TLS_CLIENT_CERT=./kubernetes/tls/client-key.pem
45/// # export KAFKA_BROKERS=fqdn1:port,fqdn2:port,fqdn3:port
46/// cargo build --example run-consumer
47/// export RUST_BACKTRACE=1
48/// export RUST_LOG=info
49/// ./target/debug/examples/run-consumer -b $KAFKA_BROKERS -g rust-consumer-testing -t testing
50/// ```
51///
52pub async fn consume_and_print(consumer: &LoggingConsumer) {
53    loop {
54        match consumer.recv().await {
55            Err(e) => warn!("Kafka error: {}", e),
56            Ok(m) => {
57                let payload = match m.payload_view::<str>() {
58                    None => "",
59                    Some(Ok(s)) => s,
60                    Some(Err(e)) => {
61                        warn!(
62                            "Error while deserializing message payload: {:?}",
63                            e
64                        );
65                        ""
66                    }
67                };
68                let mut header_str = String::from("");
69                if let Some(headers) = m.headers() {
70                    for i in 0..headers.count() {
71                        let header = headers.get(i).unwrap();
72                        let new_str = format!(
73                            "{i}:{:#?}=>{:?}",
74                            header.0,
75                            // https://doc.rust-lang.org/stable/std/str/fn.from_utf8.html
76                            std::str::from_utf8(header.1).unwrap()
77                        );
78                        if header_str.is_empty() {
79                            header_str = new_str;
80                        } else {
81                            header_str = header_str + ", " + &new_str;
82                        }
83                    }
84                }
85                let mut found_key = "";
86                if m.key().is_some() {
87                    found_key = std::str::from_utf8(m.key().unwrap()).unwrap();
88                }
89                info!(
90                    "key='{}' payload='{}', \
91                    topic={} partition={}, \
92                    offset={} timestamp={:?} \
93                    headers=[{header_str}]",
94                    // https://doc.rust-lang.org/stable/std/str/fn.from_utf8.html
95                    found_key,
96                    payload,
97                    m.topic(),
98                    m.partition(),
99                    m.offset(),
100                    m.timestamp()
101                );
102                consumer.commit_message(&m, CommitMode::Async).unwrap();
103            }
104        };
105    }
106}