pub async fn publish_messages(producer: &FutureProducer, topic_name: &str)
Expand description
publish_messages
Publish messages to a kafka topic_name
with an initialized
rdkafka::producer::FutureProducer
using client tls assets based off environment variables.
§Optional - Set TLS Asset Paths
You can either use the included tls assets within the github repo rust-with-strimzi-kafka-and-tls/kubernetes/tls directory or export the environment variables:
KAFKA_TLS_CLIENT_CA
- path to the Certificate Authority fileKAFKA_TLS_CLIENT_KEY
- path to the server key fileKAFKA_TLS_CLIENT_CERT
- path to the server certificate file
§Set Broker Addresses
Export this environment variable to the correct broker fqdns and ports:
KAFKA_BROKERS
- comma delimited list of kafka brokers (format:fqdn1:port,fqdn2:port,fqdn3:port
)
§Arguments
producer
- initializedrdkafka::producer::FutureProducer
that will publish messages to the kafkatopic_name
topic_name
- publish messages this kafka topic
§Examples
# export KAFKA_TLS_CLIENT_CA=./kubernetes/tls/ca.pem
# export KAFKA_TLS_CLIENT_KEY=./kubernetes/tls/client.pem
# export KAFKA_TLS_CLIENT_CERT=./kubernetes/tls/client-key.pem
# export KAFKA_BROKERS=fqdn1:port,fqdn2:port,fqdn3:port
cargo build --example run-producer
export RUST_BACKTRACE=1
export RUST_LOG=info
./target/debug/examples/run-producer -b $KAFKA_BROKERS -t testing
Examples found in repository?
examples/run-producer.rs (line 78)
15async fn main() {
16 let comma_delimited_brokers = std::env::var("KAFKA_BROKERS")
17 .unwrap_or_else(|_| "localhost:9092".to_string());
18 let matches = App::new("producer example")
19 .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
20 .about("Simple command line producer")
21 .arg(
22 Arg::with_name("brokers")
23 .short("b")
24 .long("brokers")
25 .help("Broker list in kafka format")
26 .takes_value(true)
27 .default_value(&comma_delimited_brokers),
28 )
29 .arg(
30 Arg::with_name("log-conf")
31 .long("log-conf")
32 .help("Configure the logging format (example: 'rdkafka=trace')")
33 .takes_value(true),
34 )
35 .arg(
36 Arg::with_name("topic")
37 .short("t")
38 .long("topic")
39 .help("Destination topic")
40 .takes_value(true)
41 .required(true),
42 )
43 .get_matches();
44
45 setup_logger(true, matches.value_of("log-conf"));
46
47 let (version_n, version_s) = get_rdkafka_version();
48 info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
49
50 let topic = matches.value_of("topic").unwrap();
51 let brokers = matches.value_of("brokers").unwrap();
52
53 let producer: &FutureProducer = &ClientConfig::new()
54 .set("bootstrap.servers", brokers)
55 .set("message.timeout.ms", "5000")
56 .set("security.protocol", "SSL")
57 .set(
58 "ssl.ca.location",
59 std::env::var("KAFKA_TLS_CLIENT_CA")
60 .unwrap_or_else(|_| "./kubernetes/tls/ca.pem".to_string()),
61 )
62 .set(
63 "ssl.key.location",
64 std::env::var("KAFKA_TLS_CLIENT_KEY").unwrap_or_else(|_| {
65 "./kubernetes/tls/client-key.pem".to_string()
66 }),
67 )
68 .set(
69 "ssl.certificate.location",
70 std::env::var("KAFKA_TLS_CLIENT_CERT")
71 .unwrap_or_else(|_| "./kubernetes/tls/client.pem".to_string()),
72 )
73 .set("enable.ssl.certificate.verification", "true")
74 .create()
75 .expect("Producer creation error");
76
77 info!("publishing messag to broker={brokers} topic={topic}");
78 publish_messages(&producer, &topic).await;
79}