rust_with_kafka_tls/publish_messages.rs
1use log::info;
2use std::time::Duration;
3
4use rdkafka::message::OwnedHeaders;
5use rdkafka::producer::FutureProducer;
6use rdkafka::producer::FutureRecord;
7
8/// publish_messages
9///
10/// Publish messages to a kafka ``topic_name`` with an initialized
11/// [`rdkafka::producer::FutureProducer`](rdkafka::producer::FutureProducer)
12/// using client tls assets based off environment variables.
13///
14/// ## Optional - Set TLS Asset Paths
15///
16/// You can either use the included tls assets within the github repo
17/// [rust-with-strimzi-kafka-and-tls/kubernetes/tls](https://github.com/jay-johnson/rust-with-strimzi-kafka-and-tls/tree/main/kubernetes/tls)
18/// directory or export the environment variables:
19///
20/// - ``KAFKA_TLS_CLIENT_CA`` - path to the Certificate Authority file
21/// - ``KAFKA_TLS_CLIENT_KEY`` - path to the server key file
22/// - ``KAFKA_TLS_CLIENT_CERT`` - path to the server certificate file
23///
24/// # Set Broker Addresses
25///
26/// Export this environment variable to the correct broker fqdns and ports:
27///
28/// - ``KAFKA_BROKERS`` - comma delimited list of kafka brokers
29/// (format: ``fqdn1:port,fqdn2:port,fqdn3:port``)
30///
31/// # Arguments
32///
33/// * `producer` - initialized
34/// [`rdkafka::producer::FutureProducer`](rdkafka::producer::FutureProducer)
35/// that will publish messages to the kafka ``topic_name``
36/// * `topic_name` - publish messages this kafka topic
37///
38/// # Examples
39///
40/// ```bash
41/// # export KAFKA_TLS_CLIENT_CA=./kubernetes/tls/ca.pem
42/// # export KAFKA_TLS_CLIENT_KEY=./kubernetes/tls/client.pem
43/// # export KAFKA_TLS_CLIENT_CERT=./kubernetes/tls/client-key.pem
44/// # export KAFKA_BROKERS=fqdn1:port,fqdn2:port,fqdn3:port
45/// cargo build --example run-producer
46/// export RUST_BACKTRACE=1
47/// export RUST_LOG=info
48/// ./target/debug/examples/run-producer -b $KAFKA_BROKERS -t testing
49/// ```
50///
51pub async fn publish_messages(producer: &FutureProducer, topic_name: &str) {
52 // This loop is non blocking: all messages will be sent one after the other, without waiting
53 // for the results.
54 let futures = (0..5)
55 .map(|i| async move {
56 // The send operation on the topic returns a future, which will be
57 // completed once the result or failure from Kafka is received.
58 let delivery_status = producer
59 .send(
60 FutureRecord::to(topic_name)
61 .payload(&format!("Message {}", i))
62 .key(&format!("Key {}", i))
63 .headers(
64 OwnedHeaders::new()
65 .add("header_key", "header_value"),
66 ),
67 Duration::from_secs(0),
68 )
69 .await;
70
71 // This will be executed when the result is received.
72 info!("Delivery status for message {} received", i);
73 delivery_status
74 })
75 .collect::<Vec<_>>();
76
77 // This loop will wait until all delivery statuses have been received.
78 for future in futures {
79 info!("Future completed. Result: {:?}", future.await);
80 }
81}