yozefu_app/configuration/
mod.rs1use std::collections::HashMap;
2
3use lib::Error;
4use rdkafka::{ClientConfig, config::FromClientConfig};
5
6mod cluster_config;
7mod consumer_config;
8mod global_config;
9mod internal_config;
10mod workspace;
11mod yozefu_config;
12
13pub use cluster_config::ClusterConfig;
14pub use cluster_config::KAFKA_PROPERTIES_WITH_LOCATIONS;
15pub use cluster_config::SENSITIVE_KAFKA_PROPERTIES;
16pub use cluster_config::SchemaRegistryConfig;
17pub use consumer_config::ConsumerConfig;
18pub use global_config::GlobalConfig;
19pub use internal_config::InternalConfig;
20use tracing::debug;
21use tracing::enabled;
22pub use workspace::Workspace;
23pub use yozefu_config::YozefuConfig;
24
25pub trait Configuration {
26 fn kafka_config_map(&self) -> HashMap<String, String>;
28
29 fn create_kafka_consumer<T>(&self) -> Result<T, Error>
31 where
32 T: FromClientConfig,
33 {
34 self.client_config()
35 .create()
36 .map_err(std::convert::Into::into)
37 }
38
39 fn client_config(&self) -> ClientConfig {
41 Self::kafka_client_config_from_properties(self.kafka_config_map().clone())
42 }
43
44 fn kafka_client_config_from_properties(
45 kafka_properties: HashMap<String, String>,
46 ) -> ClientConfig {
47 let mut config = ClientConfig::new();
48 config.set_log_level(rdkafka::config::RDKafkaLogLevel::Emerg);
49 for (key, value) in kafka_properties {
50 config.set(key, value);
51 }
52
53 if enabled!(tracing::Level::DEBUG) {
54 config.set("debug", "consumer,cgrp,topic");
55 for (k, v) in config.config_map().iter() {
56 debug!("'{}' set to '{}'", k, v);
57 }
58 }
59
60 config
61 }
62}