pub fn setup_logger(log_thread: bool, rust_log: Option<&str>)
Expand description
setup_logger
Setup a logger for message processing by consumers or producers
§Arguments
log_thread
- flag for logging the processing threadrust_log
- string containing the logging level for the function caller
§Examples
use rust_with_kafka_tls::log_utils::setup_logger;
setup_logger(true, Some("rdkafka=trace"));
Examples found in repository?
examples/run-producer.rs (line 45)
15async fn main() {
16 let comma_delimited_brokers = std::env::var("KAFKA_BROKERS")
17 .unwrap_or_else(|_| "localhost:9092".to_string());
18 let matches = App::new("producer example")
19 .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
20 .about("Simple command line producer")
21 .arg(
22 Arg::with_name("brokers")
23 .short("b")
24 .long("brokers")
25 .help("Broker list in kafka format")
26 .takes_value(true)
27 .default_value(&comma_delimited_brokers),
28 )
29 .arg(
30 Arg::with_name("log-conf")
31 .long("log-conf")
32 .help("Configure the logging format (example: 'rdkafka=trace')")
33 .takes_value(true),
34 )
35 .arg(
36 Arg::with_name("topic")
37 .short("t")
38 .long("topic")
39 .help("Destination topic")
40 .takes_value(true)
41 .required(true),
42 )
43 .get_matches();
44
45 setup_logger(true, matches.value_of("log-conf"));
46
47 let (version_n, version_s) = get_rdkafka_version();
48 info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
49
50 let topic = matches.value_of("topic").unwrap();
51 let brokers = matches.value_of("brokers").unwrap();
52
53 let producer: &FutureProducer = &ClientConfig::new()
54 .set("bootstrap.servers", brokers)
55 .set("message.timeout.ms", "5000")
56 .set("security.protocol", "SSL")
57 .set(
58 "ssl.ca.location",
59 std::env::var("KAFKA_TLS_CLIENT_CA")
60 .unwrap_or_else(|_| "./kubernetes/tls/ca.pem".to_string()),
61 )
62 .set(
63 "ssl.key.location",
64 std::env::var("KAFKA_TLS_CLIENT_KEY").unwrap_or_else(|_| {
65 "./kubernetes/tls/client-key.pem".to_string()
66 }),
67 )
68 .set(
69 "ssl.certificate.location",
70 std::env::var("KAFKA_TLS_CLIENT_CERT")
71 .unwrap_or_else(|_| "./kubernetes/tls/client.pem".to_string()),
72 )
73 .set("enable.ssl.certificate.verification", "true")
74 .create()
75 .expect("Producer creation error");
76
77 info!("publishing messag to broker={brokers} topic={topic}");
78 publish_messages(&producer, &topic).await;
79}
More examples
examples/run-consumer.rs (line 57)
18async fn main() {
19 let comma_delimited_brokers = std::env::var("KAFKA_BROKERS")
20 .unwrap_or_else(|_| "localhost:9092".to_string());
21 let matches = App::new("consumer example")
22 .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
23 .about("Simple command line consumer")
24 .arg(
25 Arg::with_name("brokers")
26 .short("b")
27 .long("brokers")
28 .help("Broker list in kafka format")
29 .takes_value(true)
30 .default_value(&comma_delimited_brokers),
31 )
32 .arg(
33 Arg::with_name("group-id")
34 .short("g")
35 .long("group-id")
36 .help("Consumer group id")
37 .takes_value(true)
38 .default_value("example_consumer_group_id"),
39 )
40 .arg(
41 Arg::with_name("log-conf")
42 .long("log-conf")
43 .help("Configure the logging format (example: 'rdkafka=trace')")
44 .takes_value(true),
45 )
46 .arg(
47 Arg::with_name("topics")
48 .short("t")
49 .long("topics")
50 .help("Topic list")
51 .takes_value(true)
52 .multiple(true)
53 .required(true),
54 )
55 .get_matches();
56
57 setup_logger(true, matches.value_of("log-conf"));
58
59 let (version_n, version_s) = get_rdkafka_version();
60 info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
61
62 let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();
63 let brokers = matches.value_of("brokers").unwrap();
64 let group_id = matches.value_of("group-id").unwrap();
65
66 let context = CustomContext;
67 let consumer: LoggingConsumer = ClientConfig::new()
68 .set("group.id", group_id)
69 .set("bootstrap.servers", brokers)
70 .set("enable.partition.eof", "false")
71 .set("session.timeout.ms", "6000")
72 .set("enable.auto.commit", "true")
73 .set("security.protocol", "SSL")
74 .set(
75 "ssl.ca.location",
76 std::env::var("KAFKA_TLS_CLIENT_CA")
77 .unwrap_or_else(|_| "./kubernetes/tls/ca.pem".to_string()),
78 )
79 .set(
80 "ssl.key.location",
81 std::env::var("KAFKA_TLS_CLIENT_KEY").unwrap_or_else(|_| {
82 "./kubernetes/tls/client-key.pem".to_string()
83 }),
84 )
85 .set(
86 "ssl.certificate.location",
87 std::env::var("KAFKA_TLS_CLIENT_CERT")
88 .unwrap_or_else(|_| "./kubernetes/tls/client.pem".to_string()),
89 )
90 .set("enable.ssl.certificate.verification", "true")
91 .set_log_level(RDKafkaLogLevel::Debug)
92 .create_with_context(context)
93 .expect("Consumer creation failed");
94
95 info!(
96 "building consumer brokers={brokers} group_id={group_id} topics={:?}",
97 topics
98 );
99
100 consumer
101 .subscribe(&topics)
102 .expect("Can't subscribe to specified topics");
103
104 consume_and_print(&consumer).await
105}