1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::convert::TryInto;
use tracing::*;
use fluvio::config::*;
use k8_client::K8Client;
use k8_obj_core::service::ServiceSpec;
use k8_obj_metadata::InputObjectMeta;
use k8_client::metadata::MetadataClient;
use k8_client::K8Config;
use crate::{CliError, profile::sync::K8Opt};
fn compute_profile_name() -> Result<String, CliError> {
let k8_config = K8Config::load()?;
let kc_config = match k8_config {
K8Config::Pod(_) => return Err(CliError::Other("Pod config is not valid here".to_owned())),
K8Config::KubeConfig(config) => config,
};
if let Some(ctx) = kc_config.config.current_context() {
Ok(ctx.name.to_owned())
} else {
Err(CliError::Other("no context found".to_owned()))
}
}
pub async fn set_k8_context(opt: K8Opt, external_addr: String) -> Result<Profile, CliError> {
let mut config_file = ConfigFile::load_default_or_new()?;
let config = config_file.mut_config();
let profile_name = if let Some(name) = &opt.name {
name.to_owned()
} else {
compute_profile_name()?
};
match config.cluster_mut(&profile_name) {
Some(cluster) => {
cluster.addr = external_addr;
cluster.tls = opt.tls.try_into()?;
}
None => {
let mut local_cluster = FluvioConfig::new(external_addr);
local_cluster.tls = opt.tls.try_into()?;
config.add_cluster(local_cluster, profile_name.clone());
}
};
let new_profile = match config.profile_mut(&profile_name) {
Some(profile) => {
profile.set_cluster(profile_name.clone());
profile.clone()
}
None => {
let profile = Profile::new(profile_name.clone());
config.add_profile(profile.clone(), profile_name.clone());
profile
}
};
assert!(config.set_current_profile(&profile_name));
config_file.save()?;
println!("k8 profile set");
Ok(new_profile)
}
pub async fn discover_fluvio_addr(namespace: Option<&str>) -> Result<Option<String>, CliError> {
use k8_client::http::StatusCode;
let ns = namespace.unwrap_or("default");
let svc = match K8Client::default()?
.retrieve_item::<ServiceSpec, _>(&InputObjectMeta::named("flv-sc-public", ns))
.await
{
Ok(svc) => svc,
Err(err) => match err {
k8_client::ClientError::Client(status) if status == StatusCode::NOT_FOUND => {
return Ok(None)
}
_ => {
return Err(CliError::Other(format!(
"unable to look up fluvio service in k8: {}",
err
)))
}
},
};
debug!("fluvio svc: {:#?}", svc);
let ingress_addr = match svc.status.load_balancer.ingress.iter().find(|_| true) {
Some(ingress) => ingress.host_or_ip().map(|addr| addr.to_owned()),
None => None,
};
Ok(if let Some(external_address) = ingress_addr {
if let Some(port) = svc.spec.ports.iter().find(|_| true) {
if let Some(target_port) = port.target_port {
Some(format!("{}:{}", external_address, target_port))
} else {
None
}
} else {
None
}
} else {
None
})
}