Function setup_logger

Source
pub fn setup_logger(log_thread: bool, rust_log: Option<&str>)
Expand description

setup_logger

Setup a logger for message processing by consumers or producers

§Arguments

  • log_thread - flag for logging the processing thread
  • rust_log - string containing the logging level for the function caller

§Examples

use rust_with_kafka_tls::log_utils::setup_logger;
setup_logger(true, Some("rdkafka=trace"));
Examples found in repository?
examples/run-producer.rs (line 45)
15async fn main() {
16    let comma_delimited_brokers = std::env::var("KAFKA_BROKERS")
17        .unwrap_or_else(|_| "localhost:9092".to_string());
18    let matches = App::new("producer example")
19        .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
20        .about("Simple command line producer")
21        .arg(
22            Arg::with_name("brokers")
23                .short("b")
24                .long("brokers")
25                .help("Broker list in kafka format")
26                .takes_value(true)
27                .default_value(&comma_delimited_brokers),
28        )
29        .arg(
30            Arg::with_name("log-conf")
31                .long("log-conf")
32                .help("Configure the logging format (example: 'rdkafka=trace')")
33                .takes_value(true),
34        )
35        .arg(
36            Arg::with_name("topic")
37                .short("t")
38                .long("topic")
39                .help("Destination topic")
40                .takes_value(true)
41                .required(true),
42        )
43        .get_matches();
44
45    setup_logger(true, matches.value_of("log-conf"));
46
47    let (version_n, version_s) = get_rdkafka_version();
48    info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
49
50    let topic = matches.value_of("topic").unwrap();
51    let brokers = matches.value_of("brokers").unwrap();
52
53    let producer: &FutureProducer = &ClientConfig::new()
54        .set("bootstrap.servers", brokers)
55        .set("message.timeout.ms", "5000")
56        .set("security.protocol", "SSL")
57        .set(
58            "ssl.ca.location",
59            std::env::var("KAFKA_TLS_CLIENT_CA")
60                .unwrap_or_else(|_| "./kubernetes/tls/ca.pem".to_string()),
61        )
62        .set(
63            "ssl.key.location",
64            std::env::var("KAFKA_TLS_CLIENT_KEY").unwrap_or_else(|_| {
65                "./kubernetes/tls/client-key.pem".to_string()
66            }),
67        )
68        .set(
69            "ssl.certificate.location",
70            std::env::var("KAFKA_TLS_CLIENT_CERT")
71                .unwrap_or_else(|_| "./kubernetes/tls/client.pem".to_string()),
72        )
73        .set("enable.ssl.certificate.verification", "true")
74        .create()
75        .expect("Producer creation error");
76
77    info!("publishing messag to broker={brokers} topic={topic}");
78    publish_messages(&producer, &topic).await;
79}
More examples
Hide additional examples
examples/run-consumer.rs (line 57)
18async fn main() {
19    let comma_delimited_brokers = std::env::var("KAFKA_BROKERS")
20        .unwrap_or_else(|_| "localhost:9092".to_string());
21    let matches = App::new("consumer example")
22        .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
23        .about("Simple command line consumer")
24        .arg(
25            Arg::with_name("brokers")
26                .short("b")
27                .long("brokers")
28                .help("Broker list in kafka format")
29                .takes_value(true)
30                .default_value(&comma_delimited_brokers),
31        )
32        .arg(
33            Arg::with_name("group-id")
34                .short("g")
35                .long("group-id")
36                .help("Consumer group id")
37                .takes_value(true)
38                .default_value("example_consumer_group_id"),
39        )
40        .arg(
41            Arg::with_name("log-conf")
42                .long("log-conf")
43                .help("Configure the logging format (example: 'rdkafka=trace')")
44                .takes_value(true),
45        )
46        .arg(
47            Arg::with_name("topics")
48                .short("t")
49                .long("topics")
50                .help("Topic list")
51                .takes_value(true)
52                .multiple(true)
53                .required(true),
54        )
55        .get_matches();
56
57    setup_logger(true, matches.value_of("log-conf"));
58
59    let (version_n, version_s) = get_rdkafka_version();
60    info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
61
62    let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();
63    let brokers = matches.value_of("brokers").unwrap();
64    let group_id = matches.value_of("group-id").unwrap();
65
66    let context = CustomContext;
67    let consumer: LoggingConsumer = ClientConfig::new()
68        .set("group.id", group_id)
69        .set("bootstrap.servers", brokers)
70        .set("enable.partition.eof", "false")
71        .set("session.timeout.ms", "6000")
72        .set("enable.auto.commit", "true")
73        .set("security.protocol", "SSL")
74        .set(
75            "ssl.ca.location",
76            std::env::var("KAFKA_TLS_CLIENT_CA")
77                .unwrap_or_else(|_| "./kubernetes/tls/ca.pem".to_string()),
78        )
79        .set(
80            "ssl.key.location",
81            std::env::var("KAFKA_TLS_CLIENT_KEY").unwrap_or_else(|_| {
82                "./kubernetes/tls/client-key.pem".to_string()
83            }),
84        )
85        .set(
86            "ssl.certificate.location",
87            std::env::var("KAFKA_TLS_CLIENT_CERT")
88                .unwrap_or_else(|_| "./kubernetes/tls/client.pem".to_string()),
89        )
90        .set("enable.ssl.certificate.verification", "true")
91        .set_log_level(RDKafkaLogLevel::Debug)
92        .create_with_context(context)
93        .expect("Consumer creation failed");
94
95    info!(
96        "building consumer brokers={brokers} group_id={group_id} topics={:?}",
97        topics
98    );
99
100    consumer
101        .subscribe(&topics)
102        .expect("Can't subscribe to specified topics");
103
104    consume_and_print(&consumer).await
105}