rust_with_kafka_tls/consume_and_print.rs
1use log::info;
2use log::warn;
3use rdkafka::consumer::CommitMode;
4use rdkafka::consumer::Consumer;
5use rdkafka::message::Headers;
6use rdkafka::message::Message;
7
8use crate::custom_context::LoggingConsumer;
9
10/// consume_and_print
11///
12/// Consume messages from kafka with an initalized
13/// [`rdkafka::consumer::Consumer`](rdkafka::consumer::Consumer)
14/// using client tls assets based off environment variables.
15///
16/// # Optional - Set TLS Asset Paths
17///
18/// You can either use the included tls assets within the github repo
19/// [rust-with-strimzi-kafka-and-tls/kubernetes/tls](https://github.com/jay-johnson/rust-with-strimzi-kafka-and-tls/tree/main/kubernetes/tls)
20/// directory or export the environment variables:
21///
22/// - ``KAFKA_TLS_CLIENT_CA`` - path to the Certificate Authority file
23/// - ``KAFKA_TLS_CLIENT_KEY`` - path to the server key file
24/// - ``KAFKA_TLS_CLIENT_CERT`` - path to the server certificate file
25///
26/// # Set Broker Addresses
27///
28/// Export this environment variable to the correct broker fqdns and ports:
29///
30/// - ``KAFKA_BROKERS`` - comma delimited list of kafka brokers
31/// (format: ``fqdn1:port,fqdn2:port,fqdn3:port``)
32///
33/// # Arguments
34///
35/// * `consumer` - initialized
36/// [`rdkafka::consumer::Consumer`](rdkafka::consumer::Consumer)
37/// that is already subscribed to a list of ``topics`` with a ``group_id``
38///
39/// # Examples
40///
41/// ```bash
42/// # export KAFKA_TLS_CLIENT_CA=./kubernetes/tls/ca.pem
43/// # export KAFKA_TLS_CLIENT_KEY=./kubernetes/tls/client.pem
44/// # export KAFKA_TLS_CLIENT_CERT=./kubernetes/tls/client-key.pem
45/// # export KAFKA_BROKERS=fqdn1:port,fqdn2:port,fqdn3:port
46/// cargo build --example run-consumer
47/// export RUST_BACKTRACE=1
48/// export RUST_LOG=info
49/// ./target/debug/examples/run-consumer -b $KAFKA_BROKERS -g rust-consumer-testing -t testing
50/// ```
51///
52pub async fn consume_and_print(consumer: &LoggingConsumer) {
53 loop {
54 match consumer.recv().await {
55 Err(e) => warn!("Kafka error: {}", e),
56 Ok(m) => {
57 let payload = match m.payload_view::<str>() {
58 None => "",
59 Some(Ok(s)) => s,
60 Some(Err(e)) => {
61 warn!(
62 "Error while deserializing message payload: {:?}",
63 e
64 );
65 ""
66 }
67 };
68 let mut header_str = String::from("");
69 if let Some(headers) = m.headers() {
70 for i in 0..headers.count() {
71 let header = headers.get(i).unwrap();
72 let new_str = format!(
73 "{i}:{:#?}=>{:?}",
74 header.0,
75 // https://doc.rust-lang.org/stable/std/str/fn.from_utf8.html
76 std::str::from_utf8(header.1).unwrap()
77 );
78 if header_str.is_empty() {
79 header_str = new_str;
80 } else {
81 header_str = header_str + ", " + &new_str;
82 }
83 }
84 }
85 let mut found_key = "";
86 if m.key().is_some() {
87 found_key = std::str::from_utf8(m.key().unwrap()).unwrap();
88 }
89 info!(
90 "key='{}' payload='{}', \
91 topic={} partition={}, \
92 offset={} timestamp={:?} \
93 headers=[{header_str}]",
94 // https://doc.rust-lang.org/stable/std/str/fn.from_utf8.html
95 found_key,
96 payload,
97 m.topic(),
98 m.partition(),
99 m.offset(),
100 m.timestamp()
101 );
102 consumer.commit_message(&m, CommitMode::Async).unwrap();
103 }
104 };
105 }
106}