yozefu_app/configuration/
cluster_config.rs1use indexmap::IndexMap;
4use resolve_path::PathResolveExt;
5use serde::{Deserialize, Serialize};
6use std::{collections::HashMap, path::PathBuf};
7use url::Url;
8
9use crate::configuration::{ConsumerConfig, YozefuConfig};
10
11use super::Configuration;
12
13pub const KAFKA_PROPERTIES_WITH_LOCATIONS: [&str; 6] = [
15 "ssl.ca.location",
16 "ssl.certificate.location",
17 "ssl.key.location",
18 "ssl.keystore.location",
19 "ssl.crl.location",
20 "ssl.engine.location",
21];
22
23pub const SENSITIVE_KAFKA_PROPERTIES: [&str; 3] =
25 ["sasl.password", "ssl.key.password", "ssl.keystore.password"];
26
27impl Default for ClusterConfig {
28 fn default() -> Self {
29 Self {
30 url_template: None,
31 schema_registry: None,
32 kafka: IndexMap::new(),
33 consumer: None,
34 }
35 }
36}
37
38#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)]
40#[cfg_attr(test, derive(schemars::JsonSchema))]
41pub struct ClusterConfig {
42 pub url_template: Option<String>,
44 #[serde(skip_serializing_if = "Option::is_none")]
46 pub schema_registry: Option<SchemaRegistryConfig>,
47 pub kafka: IndexMap<String, String>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub consumer: Option<ConsumerConfig>,
51}
52
53impl ClusterConfig {
54 pub fn normalize_paths(self) -> Self {
57 let mut cloned = self.clone();
58 for key in KAFKA_PROPERTIES_WITH_LOCATIONS {
59 if let Some(path) = cloned.kafka.get(key) {
60 let normalized_path = PathBuf::from(path)
61 .resolve()
62 .canonicalize()
63 .map(|d| d.display().to_string())
64 .unwrap_or(path.to_string());
65 cloned.kafka.insert(key.to_string(), normalized_path);
66 }
67 }
68 cloned
69 }
70
71 }
82
83#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
85#[cfg_attr(test, derive(schemars::JsonSchema))]
86pub struct SchemaRegistryConfig {
87 pub url: Url,
89 #[serde(default)]
91 pub headers: HashMap<String, String>,
92}
93
94impl Configuration for ClusterConfig {
95 fn kafka_config_map(&self) -> HashMap<String, String> {
96 let mut properties = HashMap::new();
97 properties.extend(self.kafka.clone());
98 properties
99 }
100}
101
102impl ClusterConfig {
103 pub fn with_kafka_properties(self, kafka_properties: HashMap<String, String>) -> Self {
104 Self {
105 url_template: None,
106 schema_registry: None,
107 kafka: indexmap::IndexMap::from_iter(kafka_properties),
108 consumer: self.consumer,
109 }
110 }
111
112 pub fn set_kafka_property(&mut self, key: &str, value: &str) {
113 self.kafka.insert(key.to_string(), value.to_string());
114 }
115
116 pub fn create(self, cluster: &str) -> YozefuConfig {
117 YozefuConfig::new(cluster, self)
118 }
119}
120