Skip to main content

fluvio/config/
config.rs

1//!
2//! # Main Configuration file
3//!
4//! Contains contexts, profiles
5//!
6use std::env;
7use std::io::Error as IoError;
8use std::io::ErrorKind;
9use std::path::{Path, PathBuf};
10use std::collections::HashMap;
11use std::fs::create_dir_all;
12
13use fluvio_types::config_file::LoadConfigError;
14use thiserror::Error;
15
16use tracing::debug;
17#[cfg(not(target_arch = "wasm32"))]
18use dirs::home_dir;
19
20#[cfg(target_arch = "wasm32")]
21fn home_dir() -> Option<PathBuf> {
22    None
23}
24
25use serde::Deserialize;
26use serde::Serialize;
27
28use fluvio_types::defaults::CLI_CONFIG_PATH;
29use fluvio_types::config_file::SaveLoadConfig;
30use crate::{FluvioClusterConfig, FluvioError};
31
32use super::TlsPolicy;
33
34fn config_file_error(msg: &str, source: IoError) -> ConfigError {
35    ConfigError::ConfigFileError {
36        msg: msg.to_owned(),
37        source,
38    }
39}
40
41#[derive(Error, Debug)]
42pub enum ConfigError {
43    #[error("config file {msg}")]
44    ConfigFileError { msg: String, source: IoError },
45    #[error("Failed to deserialize Fluvio config {msg}")]
46    TomlError {
47        msg: String,
48        source: toml::de::Error,
49    },
50    #[error("Config has no active profile")]
51    NoActiveProfile,
52    #[error("No cluster config for profile {profile}")]
53    NoClusterForProfile { profile: String },
54}
55
56pub struct ConfigFile {
57    path: PathBuf,
58    config: Config,
59}
60
61impl ConfigFile {
62    fn new(path: PathBuf, config: Config) -> Self {
63        Self { path, config }
64    }
65
66    /// create default profile
67    pub fn default_config() -> Result<Self, IoError> {
68        let path = Self::default_file_path()?;
69        Ok(Self {
70            path,
71            config: Config::new(),
72        })
73    }
74
75    /// load from default location if not found, create new one
76    pub fn load_default_or_new() -> Result<Self, IoError> {
77        match Self::load(None) {
78            Ok(config_file) => Ok(config_file),
79            Err(err) => {
80                // if doesn't exist, we create new profile
81                debug!("profile can't be loaded, creating new one: {}", err);
82                ConfigFile::default_config()
83            }
84        }
85    }
86
87    /// try to load from default locations
88    pub fn load(optional_path: Option<String>) -> Result<Self, FluvioError> {
89        Self::from_file(match optional_path {
90            Some(p) => PathBuf::from(p),
91            None => Self::default_file_path().map_err(|e| config_file_error("default path", e))?,
92        })
93    }
94
95    /// read from file
96    fn from_file<T: AsRef<Path>>(path: T) -> Result<Self, FluvioError> {
97        let path_ref = path.as_ref();
98        let config = Config::load_from(path_ref).map_err(|e| match e {
99            LoadConfigError::IoError(e) => {
100                config_file_error(&format!("{:?}", path_ref.as_os_str()), e)
101            }
102            LoadConfigError::TomlError(e) => ConfigError::TomlError {
103                msg: path_ref.display().to_string(),
104                source: e,
105            },
106        })?;
107
108        Ok(Self::new(path_ref.to_owned(), config))
109    }
110
111    /// find default path where config is stored.  precedent is:
112    /// 1) supplied path
113    /// 2) environment variable in FLV_PROFILE_PATH
114    /// 3) home directory ~/.fluvio/config
115    fn default_file_path() -> Result<PathBuf, IoError> {
116        env::var("FLV_PROFILE_PATH")
117            .map(|p| Ok(PathBuf::from(p)))
118            .unwrap_or_else(|_| {
119                if let Some(mut profile_path) = home_dir() {
120                    profile_path.push(CLI_CONFIG_PATH);
121                    profile_path.push("config");
122                    Ok(profile_path)
123                } else {
124                    Err(IoError::new(
125                        ErrorKind::InvalidInput,
126                        "can't get profile directory",
127                    ))
128                }
129            })
130    }
131
132    /// Return a reference to the internal Config
133    pub fn config(&self) -> &Config {
134        &self.config
135    }
136
137    /// Return a mutable reference to the internal Config
138    pub fn mut_config(&mut self) -> &mut Config {
139        &mut self.config
140    }
141
142    /// Save to file
143    pub fn save(&self) -> Result<(), FluvioError> {
144        create_dir_all(self.path.parent().unwrap())
145            .map_err(|e| config_file_error(&format!("parent {:?}", self.path), e))?;
146        self.config
147            .save_to(&self.path)
148            .map_err(|e| config_file_error(&format!("{:?}", &self.path), e))?;
149        Ok(())
150    }
151
152    /// add or update profile with a simple cluster address
153    /// this will create or replace cluster config if it doesn't exists
154    pub fn add_or_replace_profile(
155        &mut self,
156        profile_name: &str,
157        cluster_addr: &str,
158        tls_policy: &TlsPolicy,
159    ) -> Result<(), FluvioError> {
160        let config = self.mut_config();
161
162        // if cluster exists, just update extern addr
163        // if not create new config
164        match config.cluster_mut(profile_name) {
165            Some(cluster) => {
166                cluster.endpoint = cluster_addr.to_string();
167                cluster.tls = tls_policy.clone();
168            }
169            None => {
170                let mut new_cluster = FluvioClusterConfig::new(cluster_addr);
171                new_cluster.tls = tls_policy.clone();
172                config.add_cluster(new_cluster, profile_name.to_string());
173            }
174        }
175
176        // add profile or exist
177        match config.profile_mut(profile_name) {
178            Some(profile) => {
179                profile.set_cluster(profile_name.to_string());
180            }
181            None => {
182                let profile = Profile::new(profile_name.to_string());
183                config.add_profile(profile, profile_name.to_string());
184            }
185        };
186
187        config.set_current_profile(profile_name);
188        self.save()?;
189        Ok(())
190    }
191}
192
193pub const LOCAL_PROFILE: &str = "local";
194const CONFIG_VERSION: &str = "2.0";
195
196#[derive(Debug, Default, Serialize, Deserialize)]
197pub struct Config {
198    version: String,
199    current_profile: Option<String>,
200    pub profile: HashMap<String, Profile>,
201    pub cluster: HashMap<String, FluvioClusterConfig>,
202    client_id: Option<String>,
203}
204
205impl Config {
206    pub fn new() -> Self {
207        Self {
208            version: CONFIG_VERSION.to_owned(),
209            ..Default::default()
210        }
211    }
212
213    /// create new config with a single local cluster
214    pub fn new_with_local_cluster(domain: String) -> Self {
215        let cluster = FluvioClusterConfig::new(domain);
216        let mut config = Self::new();
217
218        config.cluster.insert(LOCAL_PROFILE.to_owned(), cluster);
219
220        let profile_name = LOCAL_PROFILE.to_owned();
221        let local_profile = Profile::new(profile_name.clone());
222        config.profile.insert(profile_name.clone(), local_profile);
223        config.set_current_profile(&profile_name);
224        config
225    }
226
227    /// add new cluster
228    pub fn add_cluster(&mut self, cluster: FluvioClusterConfig, name: String) {
229        self.cluster.insert(name, cluster);
230    }
231
232    pub fn add_profile(&mut self, profile: Profile, name: String) {
233        self.profile.insert(name, profile);
234    }
235
236    pub fn version(&self) -> &str {
237        &self.version
238    }
239
240    /// current profile
241    pub fn current_profile_name(&self) -> Option<&str> {
242        self.current_profile.as_deref()
243    }
244
245    /// set current profile, if profile doesn't exists return false
246    pub fn set_current_profile(&mut self, profile_name: &str) -> bool {
247        if self.profile.contains_key(profile_name) {
248            self.current_profile = Some(profile_name.to_owned());
249            true
250        } else {
251            false
252        }
253    }
254
255    pub fn rename_profile(&mut self, from: &str, to: String) -> bool {
256        // Remove the profile from its old name, or return if it didn't exist
257        let profile = match self.profile.remove(from) {
258            Some(profile) => profile,
259            None => return false,
260        };
261
262        // Re-add the profile under its new name
263        self.add_profile(profile, to.clone());
264
265        // If the renamed profile was current, we need to update the current name
266        let update_current = self
267            .current_profile_name()
268            .map(|it| it == from)
269            .unwrap_or(false);
270        if update_current {
271            self.current_profile = Some(to);
272        }
273
274        true
275    }
276
277    /// delete profile
278    pub fn delete_profile(&mut self, profile_name: &str) -> bool {
279        if self.profile.remove(profile_name).is_some() {
280            if let Some(old_profile) = &self.current_profile {
281                // check if it same as current profile, then remove it
282                if profile_name == old_profile {
283                    self.current_profile = None;
284                }
285            }
286
287            true
288        } else {
289            false
290        }
291    }
292
293    /// Deletes the named cluster, whether it is being used or not.
294    ///
295    /// You may want to check if the named cluster is active or not using
296    /// `delete_cluster_check`. Otherwise, you may remove a cluster that
297    /// is being used by the active profile.
298    ///
299    /// # Example
300    ///
301    /// ```
302    /// # use fluvio::FluvioClusterConfig;
303    /// # use fluvio::config::{Config, Profile};
304    /// let mut config = Config::new();
305    /// let cluster = FluvioClusterConfig::new("https://cloud.fluvio.io".to_string());
306    /// config.add_cluster(cluster, "fluvio-cloud".to_string());
307    /// let profile = Profile::new("fluvio-cloud".to_string());
308    /// config.add_profile(profile, "fluvio-cloud".to_string());
309    ///
310    /// config.delete_cluster("fluvio-cloud").unwrap();
311    /// assert!(config.cluster("fluvio-cloud").is_none());
312    /// ```
313    pub fn delete_cluster(&mut self, cluster_name: &str) -> Option<FluvioClusterConfig> {
314        self.cluster.remove(cluster_name)
315    }
316
317    /// Checks whether it's safe to delete the named cluster
318    ///
319    /// If there are any profiles that reference the named cluster,
320    /// they are considered conflicts and the cluster is unsafe to delete.
321    /// When conflicts exist, the conflicting profile names are returned
322    /// in the `Err()` return value.
323    ///
324    /// If there are no profile conflicts, this returns with `Ok(())`.
325    ///
326    /// # Example
327    ///
328    /// ```
329    /// # use fluvio::FluvioClusterConfig;
330    /// # use fluvio::config::{Config, Profile};
331    /// let mut config = Config::new();
332    /// let cluster = FluvioClusterConfig::new("https://cloud.fluvio.io".to_string());
333    /// config.add_cluster(cluster, "fluvio-cloud".to_string());
334    /// let profile = Profile::new("fluvio-cloud".to_string());
335    /// config.add_profile(profile, "fluvio-cloud".to_string());
336    ///
337    /// let conflicts = config.delete_cluster_check("fluvio-cloud").unwrap_err();
338    /// assert_eq!(conflicts, vec!["fluvio-cloud"]);
339    /// ```
340    pub fn delete_cluster_check(&mut self, cluster_name: &str) -> Result<(), Vec<&str>> {
341        // Find all profiles that reference the named cluster
342        let conflicts: Vec<_> = self
343            .profile
344            .iter()
345            .filter(|(_, profile)| &*profile.cluster == cluster_name)
346            .map(|(name, _)| &**name)
347            .collect();
348
349        if !conflicts.is_empty() {
350            return Err(conflicts);
351        }
352
353        Ok(())
354    }
355
356    /// Returns a reference to the current Profile if there is one.
357    pub fn current_profile(&self) -> Result<&Profile, FluvioError> {
358        let profile = self
359            .current_profile
360            .as_ref()
361            .and_then(|p| self.profile.get(p))
362            .ok_or(ConfigError::NoActiveProfile)?;
363        Ok(profile)
364    }
365
366    /// Returns a reference to the current Profile if there is one.
367    pub fn profile(&self, profile_name: &str) -> Option<&Profile> {
368        self.profile.get(profile_name)
369    }
370
371    /// Returns a mutable reference to the current Profile if there is one.
372    pub fn profile_mut(&mut self, profile_name: &str) -> Option<&mut Profile> {
373        self.profile.get_mut(profile_name)
374    }
375
376    /// Returns the FluvioClusterConfig belonging to the current profile.
377    pub fn current_cluster(&self) -> Result<&FluvioClusterConfig, FluvioError> {
378        let profile = self.current_profile()?;
379        let maybe_cluster = self.cluster.get(&profile.cluster);
380        let cluster = maybe_cluster.ok_or_else(|| {
381            let profile = profile.cluster.clone();
382            ConfigError::NoClusterForProfile { profile }
383        })?;
384        Ok(cluster)
385    }
386
387    /// Returns the mutable reference to FluvioClusterConfig belonging to the current profile.
388    pub fn current_cluster_mut(&mut self) -> Result<&mut FluvioClusterConfig, FluvioError> {
389        let profile = self.current_profile()?.clone();
390        let maybe_cluster = self.cluster.get_mut(&profile.cluster);
391        let cluster = maybe_cluster.ok_or_else(|| {
392            let profile = profile.cluster;
393            ConfigError::NoClusterForProfile { profile }
394        })?;
395        Ok(cluster)
396    }
397
398    /// Returns the FluvioClusterConfig belonging to the named profile.
399    pub fn cluster_with_profile(&self, profile_name: &str) -> Option<&FluvioClusterConfig> {
400        self.profile
401            .get(profile_name)
402            .and_then(|profile| self.cluster.get(&profile.cluster))
403    }
404
405    /// Returns a reference to the named FluvioClusterConfig.
406    pub fn cluster(&self, cluster_name: &str) -> Option<&FluvioClusterConfig> {
407        self.cluster.get(cluster_name)
408    }
409
410    /// Returns a mutable reference to the named FluvioClusterConfig.
411    pub fn cluster_mut(&mut self, cluster_name: &str) -> Option<&mut FluvioClusterConfig> {
412        self.cluster.get_mut(cluster_name)
413    }
414
415    /// look up replica config
416    /// this will iterate and find all configuration that can resolve config
417    /// 1) match all config that matches criteria including asterisk
418    /// 2) apply in terms of precedent
419    pub fn resolve_replica_config(&self, _topic_name: &str, _partition: i32) -> Replica {
420        /*
421        for (key, val) in self.topic.iter() {
422            println!("key: {:#?}, value: {:#?}",key,val);
423        }
424        */
425
426        Replica::default()
427    }
428}
429
430#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
431pub struct Topic {
432    replica: HashMap<String, String>,
433}
434
435#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
436pub struct Profile {
437    pub cluster: String,
438    pub topic: Option<String>,
439    pub partition: Option<i32>,
440}
441
442impl Profile {
443    pub fn new(cluster: String) -> Self {
444        Self {
445            cluster,
446            ..Default::default()
447        }
448    }
449
450    pub fn set_cluster(&mut self, cluster: String) {
451        self.cluster = cluster;
452    }
453}
454
455#[derive(Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
456pub struct Replica {
457    pub max_bytes: Option<i32>,
458    pub isolation: Option<String>,
459}
460
461#[cfg(test)]
462pub mod test {
463    use super::*;
464    use std::path::PathBuf;
465    use std::env::temp_dir;
466    use crate::config::{TlsPolicy, TlsConfig, TlsCerts};
467
468    //#[test]
469    #[allow(unused)]
470    fn test_default_path_env() {
471        unsafe {
472            env::set_var("FLV_PROFILE_PATH", "/user2/config");
473        }
474        assert_eq!(
475            ConfigFile::default_file_path().expect("file"),
476            PathBuf::from("/user2/config")
477        );
478        unsafe {
479            env::remove_var("FLV_PROFILE_PATH");
480        }
481    }
482
483    #[test]
484    fn test_default_path_home() {
485        let mut path = home_dir().expect("home dir must exist");
486        path.push(CLI_CONFIG_PATH);
487        path.push("config");
488        assert_eq!(ConfigFile::default_file_path().expect("file"), path);
489    }
490
491    /// test basic reading
492    #[test]
493    fn test_config() {
494        // test read & parse
495        let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
496            .expect("failed to parse file");
497        let config = conf_file.mut_config();
498
499        assert_eq!(config.version(), "1.0");
500        assert_eq!(config.current_profile_name().unwrap(), "local");
501        let profile = config.current_profile().expect("profile should exists");
502        assert_eq!(profile.cluster, "local");
503
504        assert!(!config.set_current_profile(""));
505        assert!(config.set_current_profile("local2"));
506        assert_eq!(config.current_profile_name().unwrap(), "local2");
507
508        let cluster = config.current_cluster().expect("cluster should exist");
509        assert_eq!(cluster.endpoint, "127.0.0.1:9003");
510        // access from profile
511        config
512            .cluster_with_profile("local")
513            .expect("cluster should exists");
514        // access from cluster
515        config.cluster("local").expect("cluster should exists");
516    }
517
518    #[test]
519    fn test_rename_profile() {
520        let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
521            .expect("parse failed");
522
523        let config = conf_file.mut_config();
524        assert_eq!(config.current_profile_name(), Some("local"));
525        config.rename_profile("local", "remote".to_string());
526        assert_eq!(config.current_profile_name(), Some("remote"));
527        assert!(!config.profile.contains_key("local"));
528        assert!(config.profile.contains_key("remote"));
529    }
530
531    /// test TOML save generation
532    #[test]
533    fn test_tls_save() {
534        let mut config = Config::new_with_local_cluster("localhost:9003".to_owned());
535        let inline_tls_config = TlsConfig::Inline(TlsCerts {
536            key: "ABCDEFF".to_owned(),
537            cert: "JJJJ".to_owned(),
538            ca_cert: "XXXXX".to_owned(),
539            domain: "my_domain".to_owned(),
540        });
541
542        println!("temp: {:#?}", temp_dir());
543        config.cluster_mut(LOCAL_PROFILE).unwrap().tls = inline_tls_config.into();
544        config
545            .save_to(temp_dir().join("inline.toml"))
546            .expect("save should succeed");
547
548        config.cluster_mut(LOCAL_PROFILE).unwrap().tls = TlsPolicy::Disabled;
549        config
550            .save_to(temp_dir().join("noverf.toml"))
551            .expect("save should succeed");
552    }
553
554    #[test]
555    fn test_set_tls() {
556        let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
557            .expect("parse failed");
558        let config = conf_file.mut_config();
559        let cfg_path = temp_dir().join("test_config.toml");
560        config.set_current_profile("local3");
561        config
562            .save_to(cfg_path.clone())
563            .expect("save should succeed");
564        let update_conf_file =
565            ConfigFile::load(Some(cfg_path.to_string_lossy().to_string())).expect("parse failed");
566        assert_eq!(
567            update_conf_file.config().current_profile_name().unwrap(),
568            "local3"
569        );
570    }
571
572    #[test]
573    fn test_local_cluster() {
574        let config = Config::new_with_local_cluster("localhost:9003".to_owned());
575
576        assert_eq!(config.current_profile_name().unwrap(), "local");
577        let cluster = config.current_cluster().expect("cluster should exists");
578        assert_eq!(cluster.endpoint, "localhost:9003");
579    }
580}