yozefu_app/configuration/
cluster_config.rs

1//! module defining the configuration structure of the application
2
3use indexmap::IndexMap;
4use resolve_path::PathResolveExt;
5use serde::{Deserialize, Serialize};
6use std::{collections::HashMap, path::PathBuf};
7use url::Url;
8
9use crate::configuration::{ConsumerConfig, YozefuConfig};
10
11use super::Configuration;
12
13/// List of kafka properties that are a file location.
14pub const KAFKA_PROPERTIES_WITH_LOCATIONS: [&str; 6] = [
15    "ssl.ca.location",
16    "ssl.certificate.location",
17    "ssl.key.location",
18    "ssl.keystore.location",
19    "ssl.crl.location",
20    "ssl.engine.location",
21];
22
23/// List of kafka properties that are a file location.
24pub const SENSITIVE_KAFKA_PROPERTIES: [&str; 3] =
25    ["sasl.password", "ssl.key.password", "ssl.keystore.password"];
26
27impl Default for ClusterConfig {
28    fn default() -> Self {
29        Self {
30            url_template: None,
31            schema_registry: None,
32            kafka: IndexMap::new(),
33            consumer: None,
34        }
35    }
36}
37
38/// Specific configuration for a cluster
39#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)]
40#[cfg_attr(test, derive(schemars::JsonSchema))]
41pub struct ClusterConfig {
42    /// A placeholder url that will be used when you want to open a kafka record in the browser
43    pub url_template: Option<String>,
44    /// Schema registry configuration
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub schema_registry: Option<SchemaRegistryConfig>,
47    /// Kafka consumer properties for this cluster, see <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md> for more details
48    pub kafka: IndexMap<String, String>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub consumer: Option<ConsumerConfig>,
51}
52
53impl ClusterConfig {
54    /// Normalize all properties that are file locations.
55    /// For instance, `~/certificates/ca.pem` will be resolved to `/home/user/certificates/ca.pem`.
56    pub fn normalize_paths(self) -> Self {
57        let mut cloned = self.clone();
58        for key in KAFKA_PROPERTIES_WITH_LOCATIONS {
59            if let Some(path) = cloned.kafka.get(key) {
60                let normalized_path = PathBuf::from(path)
61                    .resolve()
62                    .canonicalize()
63                    .map(|d| d.display().to_string())
64                    .unwrap_or(path.to_string());
65                cloned.kafka.insert(key.to_string(), normalized_path);
66            }
67        }
68        cloned
69    }
70
71    //    // cluster is something that can be converted to &str, must be a generic though
72    //    pub fn create<T>(self, cluster: T) -> ClusterConfig
73    //    where
74    //        T: ToString,
75    //    {
76    //        ClusterConfig {
77    //            cluster: cluster.to_string(),
78    //            config: self.normalize_paths(),
79    //        }
80    //    }
81}
82
83/// Schema registry configuration of a given cluster
84#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
85#[cfg_attr(test, derive(schemars::JsonSchema))]
86pub struct SchemaRegistryConfig {
87    /// Url of the schema registry
88    pub url: Url,
89    /// HTTP headers to be used when communicating with the schema registry
90    #[serde(default)]
91    pub headers: HashMap<String, String>,
92}
93
94impl Configuration for ClusterConfig {
95    fn kafka_config_map(&self) -> HashMap<String, String> {
96        let mut properties = HashMap::new();
97        properties.extend(self.kafka.clone());
98        properties
99    }
100}
101
102impl ClusterConfig {
103    pub fn with_kafka_properties(self, kafka_properties: HashMap<String, String>) -> Self {
104        Self {
105            url_template: None,
106            schema_registry: None,
107            kafka: indexmap::IndexMap::from_iter(kafka_properties),
108            consumer: self.consumer,
109        }
110    }
111
112    pub fn set_kafka_property(&mut self, key: &str, value: &str) {
113        self.kafka.insert(key.to_string(), value.to_string());
114    }
115
116    pub fn create(self, cluster: &str) -> YozefuConfig {
117        YozefuConfig::new(cluster, self)
118    }
119}
120//