use indexmap::IndexMap;
use resolve_path::PathResolveExt;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf};
use url::Url;
use crate::configuration::{ConsumerConfig, YozefuConfig};
use super::Configuration;
pub const KAFKA_PROPERTIES_WITH_LOCATIONS: [&str; 6] = [
"ssl.ca.location",
"ssl.certificate.location",
"ssl.key.location",
"ssl.keystore.location",
"ssl.crl.location",
"ssl.engine.location",
];
pub const SENSITIVE_KAFKA_PROPERTIES: [&str; 3] =
["sasl.password", "ssl.key.password", "ssl.keystore.password"];
impl Default for ClusterConfig {
fn default() -> Self {
Self {
url_template: None,
schema_registry: None,
kafka: IndexMap::new(),
consumer: None,
}
}
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone)]
#[cfg_attr(test, derive(schemars::JsonSchema))]
pub struct ClusterConfig {
pub url_template: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_registry: Option<SchemaRegistryConfig>,
pub kafka: IndexMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub consumer: Option<ConsumerConfig>,
}
impl ClusterConfig {
pub fn normalize_paths(self) -> Self {
let mut cloned = self.clone();
for key in KAFKA_PROPERTIES_WITH_LOCATIONS {
if let Some(path) = cloned.kafka.get(key) {
let normalized_path = PathBuf::from(path)
.resolve()
.canonicalize()
.map(|d| d.display().to_string())
.unwrap_or(path.to_string());
cloned.kafka.insert(key.to_string(), normalized_path);
}
}
cloned
}
}
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
#[cfg_attr(test, derive(schemars::JsonSchema))]
pub struct SchemaRegistryConfig {
pub url: Url,
#[serde(default)]
pub headers: HashMap<String, String>,
}
impl Configuration for ClusterConfig {
fn kafka_config_map(&self) -> HashMap<String, String> {
let mut properties = HashMap::new();
properties.extend(self.kafka.clone());
properties
}
}
impl ClusterConfig {
pub fn with_kafka_properties(self, kafka_properties: HashMap<String, String>) -> Self {
Self {
url_template: None,
schema_registry: None,
kafka: indexmap::IndexMap::from_iter(kafka_properties),
consumer: self.consumer,
}
}
pub fn set_kafka_property(&mut self, key: &str, value: &str) {
self.kafka.insert(key.to_string(), value.to_string());
}
pub fn create(self, cluster: &str) -> YozefuConfig {
YozefuConfig::new(cluster, self)
}
}