1use std::env;
7use std::io::Error as IoError;
8use std::io::ErrorKind;
9use std::path::{Path, PathBuf};
10use std::collections::HashMap;
11use std::fs::create_dir_all;
12
13use fluvio_types::config_file::LoadConfigError;
14use thiserror::Error;
15
16use tracing::debug;
17#[cfg(not(target_arch = "wasm32"))]
18use dirs::home_dir;
19
20#[cfg(target_arch = "wasm32")]
21fn home_dir() -> Option<PathBuf> {
22 None
23}
24
25use serde::Deserialize;
26use serde::Serialize;
27
28use fluvio_types::defaults::CLI_CONFIG_PATH;
29use fluvio_types::config_file::SaveLoadConfig;
30use crate::{FluvioClusterConfig, FluvioError};
31
32use super::TlsPolicy;
33
34fn config_file_error(msg: &str, source: IoError) -> ConfigError {
35 ConfigError::ConfigFileError {
36 msg: msg.to_owned(),
37 source,
38 }
39}
40
41#[derive(Error, Debug)]
42pub enum ConfigError {
43 #[error("config file {msg}")]
44 ConfigFileError { msg: String, source: IoError },
45 #[error("Failed to deserialize Fluvio config {msg}")]
46 TomlError {
47 msg: String,
48 source: toml::de::Error,
49 },
50 #[error("Config has no active profile")]
51 NoActiveProfile,
52 #[error("No cluster config for profile {profile}")]
53 NoClusterForProfile { profile: String },
54}
55
56pub struct ConfigFile {
57 path: PathBuf,
58 config: Config,
59}
60
61impl ConfigFile {
62 fn new(path: PathBuf, config: Config) -> Self {
63 Self { path, config }
64 }
65
66 pub fn default_config() -> Result<Self, IoError> {
68 let path = Self::default_file_path()?;
69 Ok(Self {
70 path,
71 config: Config::new(),
72 })
73 }
74
75 pub fn load_default_or_new() -> Result<Self, IoError> {
77 match Self::load(None) {
78 Ok(config_file) => Ok(config_file),
79 Err(err) => {
80 debug!("profile can't be loaded, creating new one: {}", err);
82 ConfigFile::default_config()
83 }
84 }
85 }
86
87 pub fn load(optional_path: Option<String>) -> Result<Self, FluvioError> {
89 Self::from_file(match optional_path {
90 Some(p) => PathBuf::from(p),
91 None => Self::default_file_path().map_err(|e| config_file_error("default path", e))?,
92 })
93 }
94
95 fn from_file<T: AsRef<Path>>(path: T) -> Result<Self, FluvioError> {
97 let path_ref = path.as_ref();
98 let config = Config::load_from(path_ref).map_err(|e| match e {
99 LoadConfigError::IoError(e) => {
100 config_file_error(&format!("{:?}", path_ref.as_os_str()), e)
101 }
102 LoadConfigError::TomlError(e) => ConfigError::TomlError {
103 msg: path_ref.display().to_string(),
104 source: e,
105 },
106 })?;
107
108 Ok(Self::new(path_ref.to_owned(), config))
109 }
110
111 fn default_file_path() -> Result<PathBuf, IoError> {
116 env::var("FLV_PROFILE_PATH")
117 .map(|p| Ok(PathBuf::from(p)))
118 .unwrap_or_else(|_| {
119 if let Some(mut profile_path) = home_dir() {
120 profile_path.push(CLI_CONFIG_PATH);
121 profile_path.push("config");
122 Ok(profile_path)
123 } else {
124 Err(IoError::new(
125 ErrorKind::InvalidInput,
126 "can't get profile directory",
127 ))
128 }
129 })
130 }
131
132 pub fn config(&self) -> &Config {
134 &self.config
135 }
136
137 pub fn mut_config(&mut self) -> &mut Config {
139 &mut self.config
140 }
141
142 pub fn save(&self) -> Result<(), FluvioError> {
144 create_dir_all(self.path.parent().unwrap())
145 .map_err(|e| config_file_error(&format!("parent {:?}", self.path), e))?;
146 self.config
147 .save_to(&self.path)
148 .map_err(|e| config_file_error(&format!("{:?}", &self.path), e))?;
149 Ok(())
150 }
151
152 pub fn add_or_replace_profile(
155 &mut self,
156 profile_name: &str,
157 cluster_addr: &str,
158 tls_policy: &TlsPolicy,
159 ) -> Result<(), FluvioError> {
160 let config = self.mut_config();
161
162 match config.cluster_mut(profile_name) {
165 Some(cluster) => {
166 cluster.endpoint = cluster_addr.to_string();
167 cluster.tls = tls_policy.clone();
168 }
169 None => {
170 let mut new_cluster = FluvioClusterConfig::new(cluster_addr);
171 new_cluster.tls = tls_policy.clone();
172 config.add_cluster(new_cluster, profile_name.to_string());
173 }
174 }
175
176 match config.profile_mut(profile_name) {
178 Some(profile) => {
179 profile.set_cluster(profile_name.to_string());
180 }
181 None => {
182 let profile = Profile::new(profile_name.to_string());
183 config.add_profile(profile, profile_name.to_string());
184 }
185 };
186
187 config.set_current_profile(profile_name);
188 self.save()?;
189 Ok(())
190 }
191}
192
193pub const LOCAL_PROFILE: &str = "local";
194const CONFIG_VERSION: &str = "2.0";
195
196#[derive(Debug, Default, Serialize, Deserialize)]
197pub struct Config {
198 version: String,
199 current_profile: Option<String>,
200 pub profile: HashMap<String, Profile>,
201 pub cluster: HashMap<String, FluvioClusterConfig>,
202 client_id: Option<String>,
203}
204
205impl Config {
206 pub fn new() -> Self {
207 Self {
208 version: CONFIG_VERSION.to_owned(),
209 ..Default::default()
210 }
211 }
212
213 pub fn new_with_local_cluster(domain: String) -> Self {
215 let cluster = FluvioClusterConfig::new(domain);
216 let mut config = Self::new();
217
218 config.cluster.insert(LOCAL_PROFILE.to_owned(), cluster);
219
220 let profile_name = LOCAL_PROFILE.to_owned();
221 let local_profile = Profile::new(profile_name.clone());
222 config.profile.insert(profile_name.clone(), local_profile);
223 config.set_current_profile(&profile_name);
224 config
225 }
226
227 pub fn add_cluster(&mut self, cluster: FluvioClusterConfig, name: String) {
229 self.cluster.insert(name, cluster);
230 }
231
232 pub fn add_profile(&mut self, profile: Profile, name: String) {
233 self.profile.insert(name, profile);
234 }
235
236 pub fn version(&self) -> &str {
237 &self.version
238 }
239
240 pub fn current_profile_name(&self) -> Option<&str> {
242 self.current_profile.as_deref()
243 }
244
245 pub fn set_current_profile(&mut self, profile_name: &str) -> bool {
247 if self.profile.contains_key(profile_name) {
248 self.current_profile = Some(profile_name.to_owned());
249 true
250 } else {
251 false
252 }
253 }
254
255 pub fn rename_profile(&mut self, from: &str, to: String) -> bool {
256 let profile = match self.profile.remove(from) {
258 Some(profile) => profile,
259 None => return false,
260 };
261
262 self.add_profile(profile, to.clone());
264
265 let update_current = self
267 .current_profile_name()
268 .map(|it| it == from)
269 .unwrap_or(false);
270 if update_current {
271 self.current_profile = Some(to);
272 }
273
274 true
275 }
276
277 pub fn delete_profile(&mut self, profile_name: &str) -> bool {
279 if self.profile.remove(profile_name).is_some() {
280 if let Some(old_profile) = &self.current_profile {
281 if profile_name == old_profile {
283 self.current_profile = None;
284 }
285 }
286
287 true
288 } else {
289 false
290 }
291 }
292
293 pub fn delete_cluster(&mut self, cluster_name: &str) -> Option<FluvioClusterConfig> {
314 self.cluster.remove(cluster_name)
315 }
316
317 pub fn delete_cluster_check(&mut self, cluster_name: &str) -> Result<(), Vec<&str>> {
341 let conflicts: Vec<_> = self
343 .profile
344 .iter()
345 .filter(|(_, profile)| &*profile.cluster == cluster_name)
346 .map(|(name, _)| &**name)
347 .collect();
348
349 if !conflicts.is_empty() {
350 return Err(conflicts);
351 }
352
353 Ok(())
354 }
355
356 pub fn current_profile(&self) -> Result<&Profile, FluvioError> {
358 let profile = self
359 .current_profile
360 .as_ref()
361 .and_then(|p| self.profile.get(p))
362 .ok_or(ConfigError::NoActiveProfile)?;
363 Ok(profile)
364 }
365
366 pub fn profile(&self, profile_name: &str) -> Option<&Profile> {
368 self.profile.get(profile_name)
369 }
370
371 pub fn profile_mut(&mut self, profile_name: &str) -> Option<&mut Profile> {
373 self.profile.get_mut(profile_name)
374 }
375
376 pub fn current_cluster(&self) -> Result<&FluvioClusterConfig, FluvioError> {
378 let profile = self.current_profile()?;
379 let maybe_cluster = self.cluster.get(&profile.cluster);
380 let cluster = maybe_cluster.ok_or_else(|| {
381 let profile = profile.cluster.clone();
382 ConfigError::NoClusterForProfile { profile }
383 })?;
384 Ok(cluster)
385 }
386
387 pub fn current_cluster_mut(&mut self) -> Result<&mut FluvioClusterConfig, FluvioError> {
389 let profile = self.current_profile()?.clone();
390 let maybe_cluster = self.cluster.get_mut(&profile.cluster);
391 let cluster = maybe_cluster.ok_or_else(|| {
392 let profile = profile.cluster;
393 ConfigError::NoClusterForProfile { profile }
394 })?;
395 Ok(cluster)
396 }
397
398 pub fn cluster_with_profile(&self, profile_name: &str) -> Option<&FluvioClusterConfig> {
400 self.profile
401 .get(profile_name)
402 .and_then(|profile| self.cluster.get(&profile.cluster))
403 }
404
405 pub fn cluster(&self, cluster_name: &str) -> Option<&FluvioClusterConfig> {
407 self.cluster.get(cluster_name)
408 }
409
410 pub fn cluster_mut(&mut self, cluster_name: &str) -> Option<&mut FluvioClusterConfig> {
412 self.cluster.get_mut(cluster_name)
413 }
414
415 pub fn resolve_replica_config(&self, _topic_name: &str, _partition: i32) -> Replica {
420 Replica::default()
427 }
428}
429
430#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
431pub struct Topic {
432 replica: HashMap<String, String>,
433}
434
435#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
436pub struct Profile {
437 pub cluster: String,
438 pub topic: Option<String>,
439 pub partition: Option<i32>,
440}
441
442impl Profile {
443 pub fn new(cluster: String) -> Self {
444 Self {
445 cluster,
446 ..Default::default()
447 }
448 }
449
450 pub fn set_cluster(&mut self, cluster: String) {
451 self.cluster = cluster;
452 }
453}
454
455#[derive(Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
456pub struct Replica {
457 pub max_bytes: Option<i32>,
458 pub isolation: Option<String>,
459}
460
461#[cfg(test)]
462pub mod test {
463 use super::*;
464 use std::path::PathBuf;
465 use std::env::temp_dir;
466 use crate::config::{TlsPolicy, TlsConfig, TlsCerts};
467
468 #[allow(unused)]
470 fn test_default_path_env() {
471 unsafe {
472 env::set_var("FLV_PROFILE_PATH", "/user2/config");
473 }
474 assert_eq!(
475 ConfigFile::default_file_path().expect("file"),
476 PathBuf::from("/user2/config")
477 );
478 unsafe {
479 env::remove_var("FLV_PROFILE_PATH");
480 }
481 }
482
483 #[test]
484 fn test_default_path_home() {
485 let mut path = home_dir().expect("home dir must exist");
486 path.push(CLI_CONFIG_PATH);
487 path.push("config");
488 assert_eq!(ConfigFile::default_file_path().expect("file"), path);
489 }
490
491 #[test]
493 fn test_config() {
494 let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
496 .expect("failed to parse file");
497 let config = conf_file.mut_config();
498
499 assert_eq!(config.version(), "1.0");
500 assert_eq!(config.current_profile_name().unwrap(), "local");
501 let profile = config.current_profile().expect("profile should exists");
502 assert_eq!(profile.cluster, "local");
503
504 assert!(!config.set_current_profile(""));
505 assert!(config.set_current_profile("local2"));
506 assert_eq!(config.current_profile_name().unwrap(), "local2");
507
508 let cluster = config.current_cluster().expect("cluster should exist");
509 assert_eq!(cluster.endpoint, "127.0.0.1:9003");
510 config
512 .cluster_with_profile("local")
513 .expect("cluster should exists");
514 config.cluster("local").expect("cluster should exists");
516 }
517
518 #[test]
519 fn test_rename_profile() {
520 let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
521 .expect("parse failed");
522
523 let config = conf_file.mut_config();
524 assert_eq!(config.current_profile_name(), Some("local"));
525 config.rename_profile("local", "remote".to_string());
526 assert_eq!(config.current_profile_name(), Some("remote"));
527 assert!(!config.profile.contains_key("local"));
528 assert!(config.profile.contains_key("remote"));
529 }
530
531 #[test]
533 fn test_tls_save() {
534 let mut config = Config::new_with_local_cluster("localhost:9003".to_owned());
535 let inline_tls_config = TlsConfig::Inline(TlsCerts {
536 key: "ABCDEFF".to_owned(),
537 cert: "JJJJ".to_owned(),
538 ca_cert: "XXXXX".to_owned(),
539 domain: "my_domain".to_owned(),
540 });
541
542 println!("temp: {:#?}", temp_dir());
543 config.cluster_mut(LOCAL_PROFILE).unwrap().tls = inline_tls_config.into();
544 config
545 .save_to(temp_dir().join("inline.toml"))
546 .expect("save should succeed");
547
548 config.cluster_mut(LOCAL_PROFILE).unwrap().tls = TlsPolicy::Disabled;
549 config
550 .save_to(temp_dir().join("noverf.toml"))
551 .expect("save should succeed");
552 }
553
554 #[test]
555 fn test_set_tls() {
556 let mut conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
557 .expect("parse failed");
558 let config = conf_file.mut_config();
559 let cfg_path = temp_dir().join("test_config.toml");
560 config.set_current_profile("local3");
561 config
562 .save_to(cfg_path.clone())
563 .expect("save should succeed");
564 let update_conf_file =
565 ConfigFile::load(Some(cfg_path.to_string_lossy().to_string())).expect("parse failed");
566 assert_eq!(
567 update_conf_file.config().current_profile_name().unwrap(),
568 "local3"
569 );
570 }
571
572 #[test]
573 fn test_local_cluster() {
574 let config = Config::new_with_local_cluster("localhost:9003".to_owned());
575
576 assert_eq!(config.current_profile_name().unwrap(), "local");
577 let cluster = config.current_cluster().expect("cluster should exists");
578 assert_eq!(cluster.endpoint, "localhost:9003");
579 }
580}