yozefu_app/configuration/
mod.rs1use std::collections::HashMap;
2
3use lib::Error;
4use rdkafka::{ClientConfig, config::FromClientConfig};
5
6mod cluster_config;
7mod consumer_config;
8mod global_config;
9mod internal_config;
10mod workspace;
11mod yozefu_config;
12
13pub use cluster_config::ClusterConfig;
14pub use cluster_config::KAFKA_PROPERTIES_WITH_LOCATIONS;
15pub use cluster_config::SENSITIVE_KAFKA_PROPERTIES;
16pub use cluster_config::SchemaRegistryConfig;
17pub use consumer_config::ConsumerConfig;
18pub use global_config::GlobalConfig;
19pub use global_config::TimestampFormat;
20pub use internal_config::InternalConfig;
21use tracing::debug;
22use tracing::enabled;
23pub use workspace::Workspace;
24pub use yozefu_config::YozefuConfig;
25
26pub trait Configuration {
27 fn kafka_config_map(&self) -> HashMap<String, String>;
29
30 fn create_kafka_consumer<T>(&self) -> Result<T, Error>
32 where
33 T: FromClientConfig,
34 {
35 self.client_config()
36 .create()
37 .map_err(std::convert::Into::into)
38 }
39
40 fn client_config(&self) -> ClientConfig {
42 Self::kafka_client_config_from_properties(self.kafka_config_map().clone())
43 }
44
45 fn kafka_client_config_from_properties(
46 kafka_properties: HashMap<String, String>,
47 ) -> ClientConfig {
48 let mut config = ClientConfig::new();
49 config.set_log_level(rdkafka::config::RDKafkaLogLevel::Emerg);
50 for (key, value) in kafka_properties {
51 config.set(key, value);
52 }
53
54 if enabled!(tracing::Level::DEBUG) {
55 config.set("debug", "consumer,cgrp,topic");
56 for (k, v) in config.config_map().iter() {
57 debug!("'{}' set to '{}'", k, v);
58 }
59 }
60
61 config
62 }
63}