publish_messages

Function publish_messages 

Source
pub async fn publish_messages(producer: &FutureProducer, topic_name: &str)
Expand description

publish_messages

Publish messages to a kafka topic_name with an initialized rdkafka::producer::FutureProducer using client tls assets based off environment variables.

§Optional - Set TLS Asset Paths

You can either use the included tls assets within the github repo rust-with-strimzi-kafka-and-tls/kubernetes/tls directory or export the environment variables:

  • KAFKA_TLS_CLIENT_CA - path to the Certificate Authority file
  • KAFKA_TLS_CLIENT_KEY - path to the server key file
  • KAFKA_TLS_CLIENT_CERT - path to the server certificate file

§Set Broker Addresses

Export this environment variable to the correct broker fqdns and ports:

  • KAFKA_BROKERS - comma delimited list of kafka brokers (format: fqdn1:port,fqdn2:port,fqdn3:port)

§Arguments

§Examples

# export KAFKA_TLS_CLIENT_CA=./kubernetes/tls/ca.pem
# export KAFKA_TLS_CLIENT_KEY=./kubernetes/tls/client.pem
# export KAFKA_TLS_CLIENT_CERT=./kubernetes/tls/client-key.pem
# export KAFKA_BROKERS=fqdn1:port,fqdn2:port,fqdn3:port
cargo build --example run-producer
export RUST_BACKTRACE=1
export RUST_LOG=info
./target/debug/examples/run-producer -b $KAFKA_BROKERS -t testing
Examples found in repository?
examples/run-producer.rs (line 78)
15async fn main() {
16    let comma_delimited_brokers = std::env::var("KAFKA_BROKERS")
17        .unwrap_or_else(|_| "localhost:9092".to_string());
18    let matches = App::new("producer example")
19        .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
20        .about("Simple command line producer")
21        .arg(
22            Arg::with_name("brokers")
23                .short("b")
24                .long("brokers")
25                .help("Broker list in kafka format")
26                .takes_value(true)
27                .default_value(&comma_delimited_brokers),
28        )
29        .arg(
30            Arg::with_name("log-conf")
31                .long("log-conf")
32                .help("Configure the logging format (example: 'rdkafka=trace')")
33                .takes_value(true),
34        )
35        .arg(
36            Arg::with_name("topic")
37                .short("t")
38                .long("topic")
39                .help("Destination topic")
40                .takes_value(true)
41                .required(true),
42        )
43        .get_matches();
44
45    setup_logger(true, matches.value_of("log-conf"));
46
47    let (version_n, version_s) = get_rdkafka_version();
48    info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
49
50    let topic = matches.value_of("topic").unwrap();
51    let brokers = matches.value_of("brokers").unwrap();
52
53    let producer: &FutureProducer = &ClientConfig::new()
54        .set("bootstrap.servers", brokers)
55        .set("message.timeout.ms", "5000")
56        .set("security.protocol", "SSL")
57        .set(
58            "ssl.ca.location",
59            std::env::var("KAFKA_TLS_CLIENT_CA")
60                .unwrap_or_else(|_| "./kubernetes/tls/ca.pem".to_string()),
61        )
62        .set(
63            "ssl.key.location",
64            std::env::var("KAFKA_TLS_CLIENT_KEY").unwrap_or_else(|_| {
65                "./kubernetes/tls/client-key.pem".to_string()
66            }),
67        )
68        .set(
69            "ssl.certificate.location",
70            std::env::var("KAFKA_TLS_CLIENT_CERT")
71                .unwrap_or_else(|_| "./kubernetes/tls/client.pem".to_string()),
72        )
73        .set("enable.ssl.certificate.verification", "true")
74        .create()
75        .expect("Producer creation error");
76
77    info!("publishing messag to broker={brokers} topic={topic}");
78    publish_messages(&producer, &topic).await;
79}