use serde::{Serialize, Deserialize};
use toml::Table as Metadata;
use crate::{config::TlsPolicy, FluvioError};
use super::ConfigFile;
pub type FluvioConfig = FluvioClusterConfig;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct FluvioClusterConfig {
#[serde(alias = "addr")]
pub endpoint: String,
#[serde(default)]
pub use_spu_local_address: bool,
#[serde(default)]
pub tls: TlsPolicy,
#[serde(default = "Metadata::new", skip_serializing_if = "Metadata::is_empty")]
metadata: Metadata,
#[serde(skip)]
pub client_id: Option<String>,
}
impl FluvioClusterConfig {
pub fn load() -> Result<Self, FluvioError> {
let config_file = ConfigFile::load_default_or_new()?;
let cluster_config = config_file.config().current_cluster()?;
Ok(cluster_config.to_owned())
}
pub fn load_with_profile(profile_name: &str) -> Result<Option<Self>, FluvioError> {
let config_file = ConfigFile::load_default_or_new()?;
let cluster_config = config_file.config().cluster_with_profile(profile_name);
Ok(cluster_config.cloned())
}
pub fn new(addr: impl Into<String>) -> Self {
Self {
endpoint: addr.into(),
use_spu_local_address: false,
tls: TlsPolicy::Disabled,
metadata: Metadata::new(),
client_id: None,
}
}
pub fn with_tls(mut self, tls: impl Into<TlsPolicy>) -> Self {
self.tls = tls.into();
self
}
pub fn query_metadata_by_name<'de, T>(&self, name: &str) -> Option<T>
where
T: Deserialize<'de>,
{
let metadata = self.metadata.get(name)?;
T::deserialize(metadata.clone()).ok()
}
pub fn update_metadata_by_name<S>(&mut self, name: &str, data: S) -> anyhow::Result<()>
where
S: Serialize,
{
use toml::{Value, map::Entry};
match self.metadata.entry(name) {
Entry::Vacant(entry) => {
entry.insert(Value::try_from(data)?);
}
Entry::Occupied(mut entry) => {
*entry.get_mut() = Value::try_from(data)?;
}
}
Ok(())
}
pub fn has_metadata(&self, name: &str) -> bool {
self.metadata.get(name).is_some()
}
}
impl TryFrom<FluvioClusterConfig> for fluvio_socket::ClientConfig {
type Error = anyhow::Error;
fn try_from(config: FluvioClusterConfig) -> Result<Self, Self::Error> {
let connector = fluvio_future::net::DomainConnector::try_from(config.tls.clone())?;
Ok(Self::new(
&config.endpoint,
connector,
config.use_spu_local_address,
))
}
}
#[cfg(test)]
mod test_metadata {
use fluvio_types::config_file::SaveLoadConfig;
use serde::{Deserialize, Serialize};
use crate::config::{Config, ConfigFile};
#[test]
fn test_get_metadata_path() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
[cluster.local.metadata.custom]
name = "foo"
"#;
let profile = Config::load_str(toml).unwrap();
let config = profile.cluster("local").unwrap();
#[derive(Deserialize, Debug, PartialEq)]
struct Custom {
name: String,
}
let custom: Option<Custom> = config.query_metadata_by_name("custom");
assert_eq!(
custom,
Some(Custom {
name: "foo".to_owned()
})
);
}
#[test]
fn test_create_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
"#;
let mut profile = Config::load_str(toml).unwrap();
let config = profile.cluster_mut("local").unwrap();
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Preference {
connection: String,
}
let preference = Preference {
connection: "wired".to_owned(),
};
config
.update_metadata_by_name("preference", preference.clone())
.expect("failed to add metadata");
let metadata = config.query_metadata_by_name("preference").expect("");
assert_eq!(preference, metadata);
}
#[test]
fn test_update_old_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
[cluster.local.metadata.installation]
type = "local"
"#;
let mut profile = Config::load_str(toml).unwrap();
let config = profile.cluster_mut("local").unwrap();
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Installation {
#[serde(rename = "type")]
typ: String,
}
let mut install = config
.query_metadata_by_name::<Installation>("installation")
.expect("message");
assert_eq!(
install,
Installation {
typ: "local".to_owned()
}
);
"cloud".clone_into(&mut install.typ);
config
.update_metadata_by_name("installation", install)
.expect("failed to add metadata");
let metadata = config
.query_metadata_by_name::<Installation>("installation")
.expect("could not get Installation metadata");
assert_eq!("cloud", metadata.typ);
}
#[test]
fn test_update_with_new_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
[cluster.local.metadata.installation]
type = "local"
"#;
let mut profile = Config::load_str(toml).unwrap();
let config = profile.cluster_mut("local").unwrap();
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Preference {
connection: String,
}
let preference = Preference {
connection: "wired".to_owned(),
};
config
.update_metadata_by_name("preference", preference.clone())
.expect("failed to add metadata");
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Installation {
#[serde(rename = "type")]
typ: String,
}
let installation: Installation = config
.query_metadata_by_name("installation")
.expect("could not get installation metadata");
assert_eq!(installation.typ, "local");
let preference: Preference = config
.query_metadata_by_name("preference")
.expect("could not get preference metadata");
assert_eq!(preference.connection, "wired");
}
#[test]
fn test_profile_with_metadata() {
let config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
.expect("could not parse config file");
let config = config_file.config();
let cluster = config
.cluster("extra")
.expect("could not find `extra` cluster in test file");
let table = toml::toml! {
[deep.nesting.example]
key = "custom field"
[installation]
type = "local"
};
assert_eq!(cluster.metadata, table);
}
#[test]
fn test_save_updated_metadata() {
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Installation {
#[serde(rename = "type")]
typ: String,
}
let mut config_file =
ConfigFile::load(Some("test-data/profiles/updatable_config.toml".to_owned()))
.expect("could not parse config file");
let config = config_file.mut_config();
let cluster = config
.cluster_mut("updated")
.expect("could not find `updated` cluster in test file");
let table: toml::Table = toml::toml! {
[installation]
type = "local"
};
assert_eq!(cluster.metadata, table);
cluster
.update_metadata_by_name(
"installation",
Installation {
typ: "cloud".to_owned(),
},
)
.expect("should have updated key");
let updated_table: toml::Table = toml::toml! {
[installation]
type = "cloud"
};
assert_eq!(cluster.metadata, updated_table.clone());
config_file.save().expect("failed to save config file");
let mut config_file =
ConfigFile::load(Some("test-data/profiles/updatable_config.toml".to_owned()))
.expect("could not parse config file");
let config = config_file.mut_config();
let cluster = config
.cluster_mut("updated")
.expect("could not find `updated` cluster in test file");
assert_eq!(cluster.metadata, updated_table);
cluster
.update_metadata_by_name(
"installation",
Installation {
typ: "local".to_owned(),
},
)
.expect("teardown: failed to set installation type back to local");
config_file
.save()
.expect("teardown: failed to set installation type back to local");
}
}