yozefu_app/configuration/
mod.rs

1use std::collections::HashMap;
2
3use lib::Error;
4use rdkafka::{ClientConfig, config::FromClientConfig};
5
6mod cluster_config;
7mod consumer_config;
8mod global_config;
9mod internal_config;
10mod workspace;
11mod yozefu_config;
12
13pub use cluster_config::ClusterConfig;
14pub use cluster_config::KAFKA_PROPERTIES_WITH_LOCATIONS;
15pub use cluster_config::SENSITIVE_KAFKA_PROPERTIES;
16pub use cluster_config::SchemaRegistryConfig;
17pub use consumer_config::ConsumerConfig;
18pub use global_config::GlobalConfig;
19pub use global_config::TimestampFormat;
20pub use internal_config::InternalConfig;
21use tracing::debug;
22use tracing::enabled;
23pub use workspace::Workspace;
24pub use yozefu_config::YozefuConfig;
25
26pub trait Configuration {
27    /// Returns the kafka properties
28    fn kafka_config_map(&self) -> HashMap<String, String>;
29
30    /// Properties you can set for the kafka consumer: <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
31    fn create_kafka_consumer<T>(&self) -> Result<T, Error>
32    where
33        T: FromClientConfig,
34    {
35        self.client_config()
36            .create()
37            .map_err(std::convert::Into::into)
38    }
39
40    /// Properties you can set for the kafka consumer: <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
41    fn client_config(&self) -> ClientConfig {
42        Self::kafka_client_config_from_properties(self.kafka_config_map().clone())
43    }
44
45    fn kafka_client_config_from_properties(
46        kafka_properties: HashMap<String, String>,
47    ) -> ClientConfig {
48        let mut config = ClientConfig::new();
49        config.set_log_level(rdkafka::config::RDKafkaLogLevel::Emerg);
50        for (key, value) in kafka_properties {
51            config.set(key, value);
52        }
53
54        if enabled!(tracing::Level::DEBUG) {
55            config.set("debug", "consumer,cgrp,topic");
56            for (k, v) in config.config_map().iter() {
57                debug!("'{}' set to '{}'", k, v);
58            }
59        }
60
61        config
62    }
63}