use std::env;
use std::fs::read_to_string;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::collections::HashMap;
use std::fs::File;
use std::fs::create_dir_all;
use thiserror::Error;
use tracing::debug;
use dirs::home_dir;
use serde::Deserialize;
use serde::Serialize;
use fluvio_types::defaults::{CLI_CONFIG_PATH};
use crate::{FluvioConfig, FluvioError};
#[derive(Error, Debug)]
pub enum ConfigError {
#[error(transparent)]
ConfigFileError {
#[from]
source: IoError,
},
#[error("Failed to deserialize Fluvio config")]
TomlError {
#[from]
source: toml::de::Error,
},
#[error("Config has no active profile")]
NoActiveProfile,
#[error("No cluster config for profile {profile}")]
NoClusterForProfile { profile: String },
}
pub struct ConfigFile {
path: PathBuf,
config: Config,
}
impl ConfigFile {
fn new(path: PathBuf, config: Config) -> Self {
Self { path, config }
}
pub fn default_config() -> Result<Self, IoError> {
let path = Self::default_file_path(None)?;
Ok(Self {
path,
config: Config::new(),
})
}
pub fn load_default_or_new() -> Result<Self, IoError> {
match Self::load(None) {
Ok(config_file) => Ok(config_file),
Err(err) => {
debug!("profile can't be loaded, creating new one: {}", err);
ConfigFile::default_config()
}
}
}
pub fn load(optional_path: Option<String>) -> Result<Self, FluvioError> {
let path = Self::default_file_path(optional_path)
.map_err(|source| ConfigError::ConfigFileError { source })?;
Self::from_file(path)
}
fn from_file<T: AsRef<Path>>(path: T) -> Result<Self, FluvioError> {
let path_ref = path.as_ref();
let file_str: String =
read_to_string(path_ref).map_err(|source| ConfigError::ConfigFileError { source })?;
let config =
toml::from_str(&file_str).map_err(|source| ConfigError::TomlError { source })?;
Ok(Self::new(path_ref.to_owned(), config))
}
fn default_file_path(path: Option<String>) -> Result<PathBuf, IoError> {
path.map(|p| Ok(PathBuf::from(p))).unwrap_or_else(|| {
env::var("FLV_PROFILE_PATH")
.map(|p| Ok(PathBuf::from(p)))
.unwrap_or_else(|_| {
if let Some(mut profile_path) = home_dir() {
profile_path.push(CLI_CONFIG_PATH);
profile_path.push("config");
Ok(profile_path)
} else {
Err(IoError::new(
ErrorKind::InvalidInput,
"can't get profile directory",
))
}
})
})
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn mut_config(&mut self) -> &mut Config {
&mut self.config
}
pub fn save(&self) -> Result<(), FluvioError> {
create_dir_all(self.path.parent().unwrap())
.map_err(|source| ConfigError::ConfigFileError { source })?;
self.config
.save_to(&self.path)
.map_err(|source| ConfigError::ConfigFileError { source })?;
Ok(())
}
}
pub const LOCAL_PROFILE: &str = "local";
const CONFIG_VERSION: &str = "2.0";
#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct Config {
version: String,
current_profile: Option<String>,
pub profile: HashMap<String, Profile>,
pub cluster: HashMap<String, FluvioConfig>,
client_id: Option<String>,
}
impl Config {
pub fn new() -> Self {
Self {
version: CONFIG_VERSION.to_owned(),
..Default::default()
}
}
pub fn new_with_local_cluster(domain: String) -> Self {
let cluster = FluvioConfig::new(domain);
let mut config = Self::new();
config.cluster.insert(LOCAL_PROFILE.to_owned(), cluster);
let profile_name = LOCAL_PROFILE.to_owned();
let local_profile = Profile::new(profile_name.clone());
config.profile.insert(profile_name.clone(), local_profile);
config.set_current_profile(&profile_name);
config
}
pub fn add_cluster(&mut self, cluster: FluvioConfig, name: String) {
self.cluster.insert(name, cluster);
}
pub fn add_profile(&mut self, profile: Profile, name: String) {
self.profile.insert(name, profile);
}
fn save_to<T: AsRef<Path>>(&self, path: T) -> Result<(), IoError> {
let path_ref = path.as_ref();
debug!("saving config: {:#?} to: {:#?}", self, path_ref);
let toml =
toml::to_vec(self).map_err(|err| IoError::new(ErrorKind::Other, format!("{}", err)))?;
let mut file = File::create(path_ref)?;
file.write_all(&toml)
}
pub fn version(&self) -> &str {
&self.version
}
pub fn current_profile_name(&self) -> Option<&str> {
self.current_profile.as_ref().map(|c| c.as_ref())
}
pub fn set_current_profile(&mut self, profile_name: &str) -> bool {
if self.profile.contains_key(profile_name) {
self.current_profile = Some(profile_name.to_owned());
true
} else {
false
}
}
pub fn rename_profile(&mut self, from: &str, to: String) -> bool {
let profile = match self.profile.remove(from) {
Some(profile) => profile,
None => return false,
};
self.add_profile(profile, to.clone());
let update_current = self
.current_profile_name()
.map(|it| it == from)
.unwrap_or(false);
if update_current {
self.current_profile = Some(to);
}
true
}
pub fn delete_profile(&mut self, profile_name: &str) -> bool {
if self.profile.remove(profile_name).is_some() {
if let Some(old_profile) = &self.current_profile {
if profile_name == old_profile {
self.current_profile = None;
}
}
true
} else {
false
}
}
pub fn delete_cluster(&mut self, cluster_name: &str) -> Option<FluvioConfig> {
self.cluster.remove(cluster_name)
}
pub fn delete_cluster_check(&mut self, cluster_name: &str) -> Result<(), Vec<&str>> {
let conflicts: Vec<_> = self
.profile
.iter()
.filter(|(_, profile)| &*profile.cluster == cluster_name)
.map(|(name, _)| &**name)
.collect();
if !conflicts.is_empty() {
return Err(conflicts);
}
Ok(())
}
pub fn current_profile(&self) -> Result<&Profile, FluvioError> {
let profile = self
.current_profile
.as_ref()
.and_then(|p| self.profile.get(p))
.ok_or(ConfigError::NoActiveProfile)?;
Ok(profile)
}
pub fn profile_mut(&mut self, profile_name: &str) -> Option<&mut Profile> {
self.profile.get_mut(profile_name)
}
pub fn current_cluster(&self) -> Result<&FluvioConfig, FluvioError> {
let profile = self.current_profile()?;
let maybe_cluster = self.cluster.get(&profile.cluster);
let cluster = maybe_cluster.ok_or_else(|| {
let profile = profile.cluster.clone();
ConfigError::NoClusterForProfile { profile }
})?;
Ok(cluster)
}
pub fn cluster_with_profile(&self, profile_name: &str) -> Option<&FluvioConfig> {
self.profile
.get(profile_name)
.and_then(|profile| self.cluster.get(&profile.cluster))
}
pub fn cluster(&self, cluster_name: &str) -> Option<&FluvioConfig> {
self.cluster.get(cluster_name)
}
pub fn cluster_mut(&mut self, cluster_name: &str) -> Option<&mut FluvioConfig> {
self.cluster.get_mut(cluster_name)
}
pub fn resolve_replica_config(&self, _topic_name: &str, _partition: i32) -> Replica {
Replica::default()
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Topic {
replica: HashMap<String, String>,
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct Profile {
pub cluster: String,
pub topic: Option<String>,
pub partition: Option<i32>,
}
impl Profile {
pub fn new(cluster: String) -> Self {
Self {
cluster,
..Default::default()
}
}
pub fn set_cluster(&mut self, cluster: String) {
self.cluster = cluster;
}
}
#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct Replica {
pub max_bytes: Option<i32>,
pub isolation: Option<String>,
}
#[cfg(test)]
pub mod test {
use super::*;
use std::path::PathBuf;
use std::env::temp_dir;
use crate::config::{TlsPolicy, TlsConfig, TlsCerts};
#[test]
fn test_default_path_arg() {
assert_eq!(
ConfigFile::default_file_path(Some("/user1/test".to_string())).expect("file"),
PathBuf::from("/user1/test")
);
}
#[allow(unused)]
fn test_default_path_env() {
env::set_var("FLV_PROFILE_PATH", "/user2/config");
assert_eq!(
ConfigFile::default_file_path(None).expect("file"),
PathBuf::from("/user2/config")
);
env::remove_var("FLV_PROFILE_PATH");
}
#[test]
fn test_default_path_home() {
let mut path = home_dir().expect("home dir must exist");
path.push(CLI_CONFIG_PATH);
path.push("config");
assert_eq!(ConfigFile::default_file_path(None).expect("file"), path);
}
#[test]
fn test_config() {
let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
.expect("parse failed");
let config = conf_file.mut_config();
assert_eq!(config.version(), "1.0");
assert_eq!(config.current_profile_name().unwrap(), "local");
let profile = config.current_profile().expect("profile should exists");
assert_eq!(profile.cluster, "local");
assert!(!config.set_current_profile("dummy"));
assert!(config.set_current_profile("local2"));
assert_eq!(config.current_profile_name().unwrap(), "local2");
let cluster = config.current_cluster().expect("cluster should exist");
assert_eq!(cluster.addr, "127.0.0.1:9003");
}
#[test]
fn test_rename_profile() {
let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
.expect("parse failed");
let config = conf_file.mut_config();
assert_eq!(config.current_profile_name(), Some("local"));
config.rename_profile("local", "remote".to_string());
assert_eq!(config.current_profile_name(), Some("remote"));
assert!(!config.profile.contains_key("local"));
assert!(config.profile.contains_key("remote"));
}
#[test]
fn test_tls_save() {
let mut config = Config::new_with_local_cluster("localhost:9003".to_owned());
let inline_tls_config = TlsConfig::Inline(TlsCerts {
key: "ABCDEFF".to_owned(),
cert: "JJJJ".to_owned(),
ca_cert: "XXXXX".to_owned(),
domain: "my_domain".to_owned(),
});
println!("temp: {:#?}", temp_dir());
config.cluster_mut(LOCAL_PROFILE).unwrap().tls = inline_tls_config.into();
config
.save_to(temp_dir().join("inline.toml"))
.expect("save should succeed");
config.cluster_mut(LOCAL_PROFILE).unwrap().tls = TlsPolicy::Disabled;
config
.save_to(temp_dir().join("noverf.toml"))
.expect("save should succeed");
}
#[test]
fn test_set_tls() {
let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
.expect("parse failed");
let config = conf_file.mut_config();
config.set_current_profile("local3");
config
.save_to("/tmp/test_config.toml")
.expect("save should succeed");
let update_conf_file =
ConfigFile::load(Some("/tmp/test_config.toml".to_owned())).expect("parse failed");
assert_eq!(
update_conf_file.config().current_profile_name().unwrap(),
"local3"
);
}
#[test]
fn test_local_cluster() {
let config = Config::new_with_local_cluster("localhost:9003".to_owned());
assert_eq!(config.current_profile_name().unwrap(), "local");
let cluster = config.current_cluster().expect("cluster should exists");
assert_eq!(cluster.addr, "localhost:9003");
}
}