use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::borrow::Cow;
use std::process::Command;
use std::time::Duration;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use tracing::{info, warn, debug, trace, instrument};
use fluvio::{Fluvio, FluvioConfig};
use fluvio::metadata::spg::SpuGroupSpec;
use fluvio::metadata::spu::SpuSpec;
use fluvio::config::{TlsPolicy, TlsConfig, TlsPaths, ConfigFile, Profile};
use flv_util::cmd::CommandExt;
use fluvio_future::timer::sleep;
use fluvio_future::net::{TcpStream, resolve};
use k8_client::K8Client;
use k8_config::K8Config;
use k8_client::metadata::MetadataClient;
use k8_obj_core::service::ServiceSpec;
use k8_obj_metadata::InputObjectMeta;
use semver::Version;
use crate::ClusterError;
use crate::helm::{HelmClient, Chart, InstalledChart};
use crate::check::get_cluster_server_host;
const DEFAULT_NAMESPACE: &str = "default";
const DEFAULT_REGISTRY: &str = "infinyon";
const DEFAULT_CHART_SYS_REPO: &str = "fluvio-sys";
const DEFAULT_CHART_SYS_NAME: &str = "fluvio/fluvio-sys";
const DEFAULT_CHART_APP_REPO: &str = "fluvio";
const DEFAULT_CHART_APP_NAME: &str = "fluvio/fluvio-app";
const DEFAULT_CHART_REMOTE: &str = "https://charts.fluvio.io";
const DEFAULT_GROUP_NAME: &str = "main";
const DEFAULT_CLOUD_NAME: &str = "minikube";
const DEFAULT_HELM_VERSION: &str = "3.2.0";
#[derive(Debug)]
enum ChartLocation {
Local(PathBuf),
Remote(String),
}
#[derive(Debug)]
pub struct ClusterInstallerBuilder {
namespace: String,
image_tag: Option<String>,
image_registry: String,
chart_name: String,
chart_version: String,
chart_location: ChartLocation,
group_name: String,
cloud: String,
save_profile: bool,
spu_spec: SpuGroupSpec,
rust_log: Option<String>,
server_tls_policy: TlsPolicy,
client_tls_policy: TlsPolicy,
}
impl ClusterInstallerBuilder {
pub fn build(self) -> Result<ClusterInstaller, ClusterError> {
Ok(ClusterInstaller {
config: self,
kube_client: K8Client::default()?,
helm_client: HelmClient::new()?,
})
}
pub fn with_namespace<S: Into<String>>(mut self, namespace: S) -> Self {
self.namespace = namespace.into();
self
}
pub fn with_image_tag<S: Into<String>>(mut self, image_tag: S) -> Self {
self.image_tag = Some(image_tag.into());
self
}
pub fn with_image_registry<S: Into<String>>(mut self, image_registry: S) -> Self {
self.image_registry = image_registry.into();
self
}
pub fn with_chart_version<S: Into<String>>(mut self, chart_version: S) -> Self {
self.chart_version = chart_version.into();
self
}
pub fn with_local_chart<S: Into<PathBuf>>(mut self, local_chart_location: S) -> Self {
self.chart_location = ChartLocation::Local(local_chart_location.into());
self
}
pub fn with_remote_chart<S: Into<String>>(mut self, remote_chart_location: S) -> Self {
self.chart_location = ChartLocation::Remote(remote_chart_location.into());
self
}
pub fn with_group_name<S: Into<String>>(mut self, group_name: S) -> Self {
self.group_name = group_name.into();
self
}
pub fn with_save_profile(mut self, save_profile: bool) -> Self {
self.save_profile = save_profile;
self
}
pub fn with_spu_replicas(mut self, spu_replicas: u16) -> Self {
self.spu_spec.replicas = spu_replicas;
self
}
pub fn with_rust_log<S: Into<String>>(mut self, rust_log: S) -> Self {
self.rust_log = Some(rust_log.into());
self
}
pub fn with_tls<C: Into<TlsPolicy>, S: Into<TlsPolicy>>(
mut self,
client: C,
server: S,
) -> Self {
let client_policy = client.into();
let server_policy = server.into();
use TlsPolicy::*;
use std::mem::discriminant;
match (&client_policy, &server_policy) {
_ if discriminant(&client_policy) != discriminant(&server_policy) => {
warn!("Client TLS policy type is different than the Server TLS policy type!");
}
(Verified(client), Verified(server)) if client.domain() != server.domain() => {
warn!(
client_domain = client.domain(),
server_domain = server.domain(),
"Client TLS config has a different domain than the Server TLS config!"
);
}
_ => (),
}
self.client_tls_policy = client_policy;
self.server_tls_policy = server_policy;
self
}
pub fn with_cloud<S: Into<String>>(mut self, cloud: S) -> Self {
self.cloud = cloud.into();
self
}
}
#[derive(Debug)]
pub struct ClusterInstaller {
config: ClusterInstallerBuilder,
kube_client: K8Client,
helm_client: HelmClient,
}
impl ClusterInstaller {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> ClusterInstallerBuilder {
let spu_spec = SpuGroupSpec {
replicas: 1,
min_id: 0,
..SpuGroupSpec::default()
};
ClusterInstallerBuilder {
namespace: DEFAULT_NAMESPACE.to_string(),
image_tag: None,
image_registry: DEFAULT_REGISTRY.to_string(),
chart_version: crate::VERSION.to_string(),
chart_name: DEFAULT_CHART_APP_NAME.to_string(),
chart_location: ChartLocation::Remote(DEFAULT_CHART_REMOTE.to_string()),
group_name: DEFAULT_GROUP_NAME.to_string(),
cloud: DEFAULT_CLOUD_NAME.to_string(),
save_profile: false,
spu_spec,
rust_log: None,
server_tls_policy: TlsPolicy::Disabled,
client_tls_policy: TlsPolicy::Disabled,
}
}
pub fn versions() -> Result<Vec<Chart>, ClusterError> {
let helm_client = HelmClient::new()?;
let versions = helm_client.versions(DEFAULT_CHART_APP_NAME)?;
Ok(versions)
}
pub fn sys_charts() -> Result<Vec<InstalledChart>, ClusterError> {
let helm_client = HelmClient::new()?;
let sys_charts = helm_client.get_installed_chart_by_name(DEFAULT_CHART_SYS_REPO)?;
Ok(sys_charts)
}
fn pre_install_check(&self) -> Result<(), ClusterError> {
let version_text_trimmed = self.helm_client.get_helm_version()?;
if Version::parse(&version_text_trimmed) < Version::parse(DEFAULT_HELM_VERSION) {
return Err(ClusterError::Other(format!(
"Helm version {} is not compatible with fluvio platform, please install version >= {}",
version_text_trimmed, DEFAULT_HELM_VERSION
)));
}
let sys_charts = self
.helm_client
.get_installed_chart_by_name(DEFAULT_CHART_SYS_REPO)?;
if sys_charts.is_empty() {
return Err(ClusterError::Other(
"Fluvio system chart is not installed, please install fluvio-sys first".to_string(),
));
} else if sys_charts.len() > 1 {
return Err(ClusterError::Other(
"Multiple fluvio system charts found".to_string(),
));
}
let app_charts = self
.helm_client
.get_installed_chart_by_name(DEFAULT_CHART_APP_REPO)?;
if !app_charts.is_empty() {
return Err(ClusterError::Other(
"Fluvio cluster is already installed".to_string(),
));
}
let k8_config = K8Config::load()?;
match k8_config {
K8Config::Pod(_) => {
}
K8Config::KubeConfig(config) => {
let server_host = match get_cluster_server_host(config) {
Ok(server) => server,
Err(e) => {
return Err(ClusterError::Other(format!(
"error fetching server from kube context {}",
e.to_string()
)))
}
};
if !server_host.trim().is_empty() {
if IpAddr::from_str(&server_host).is_ok() {
return Err(ClusterError::Other(
format!("Cluster in kube context cannot use IP address, please use minikube context: {}", server_host),
));
};
} else {
return Err(ClusterError::Other(
"Cluster in kubectl context cannot have empty hostname".to_owned(),
));
}
}
};
Ok(())
}
#[instrument(
skip(self),
fields(namespace = &*self.config.namespace),
)]
pub async fn install_fluvio(&self) -> Result<String, ClusterError> {
self.pre_install_check()?;
self.install_app()?;
let namespace = &self.config.namespace;
let sc_address = match self.wait_for_sc_service(namespace).await {
Ok(addr) => {
info!(addr = &*addr, "Fluvio SC is up");
addr
}
Err(err) => {
warn!("Unable to detect Fluvio service. If you're running on Minikube, make sure you have the tunnel up!");
return Err(ClusterError::Other(format!(
"Unable to detect Fluvio service: {}",
err
)));
}
};
if self.config.save_profile {
self.update_profile(sc_address.clone())?;
}
if self.config.spu_spec.replicas > 0 {
sleep(Duration::from_millis(2000)).await;
let cluster = FluvioConfig::new(sc_address.clone())
.with_tls(self.config.client_tls_policy.clone());
self.create_managed_spu_group(&cluster).await?;
self.wait_for_spu(namespace, self.config.spu_spec.replicas)
.await?;
}
Ok(sc_address)
}
#[doc(hidden)]
#[instrument(skip(self))]
pub fn _install_sys(&self) -> Result<(), ClusterError> {
let install_settings = &[("cloud", &*self.config.cloud)];
match &self.config.chart_location {
ChartLocation::Remote(chart_location) => {
debug!(
chart_location = &**chart_location,
"Using remote helm chart:"
);
self.helm_client
.repo_add(DEFAULT_CHART_APP_REPO, chart_location)?;
self.helm_client.repo_update()?;
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_SYS_REPO,
DEFAULT_CHART_SYS_NAME,
None,
install_settings,
)?;
}
ChartLocation::Local(chart_location) => {
let chart_location = chart_location.to_string_lossy();
debug!(
chart_location = chart_location.as_ref(),
"Using local helm chart:"
);
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_SYS_REPO,
chart_location.as_ref(),
None,
install_settings,
)?;
}
}
info!("Fluvio sys chart has been installed");
Ok(())
}
#[instrument(skip(self))]
fn install_app(&self) -> Result<(), ClusterError> {
trace!(
"Installing fluvio with the following configuration: {:#?}",
&self.config
);
if let TlsPolicy::Verified(tls) = &self.config.server_tls_policy {
self.upload_tls_secrets(tls)?;
}
let fluvio_tag = self
.config
.image_tag
.as_ref()
.unwrap_or(&self.config.chart_version)
.to_owned();
let mut install_settings: Vec<(_, &str)> = vec![
("image.registry", &self.config.image_registry),
("image.tag", &fluvio_tag),
("cloud", &self.config.cloud),
];
if let TlsPolicy::Anonymous | TlsPolicy::Verified(_) = self.config.server_tls_policy {
install_settings.push(("tls", "true"));
}
if let Some(log) = &self.config.rust_log {
install_settings.push(("scLog", log));
}
match &self.config.chart_location {
ChartLocation::Remote(chart_location) => {
self.helm_client
.repo_add(DEFAULT_CHART_APP_REPO, chart_location)?;
self.helm_client.repo_update()?;
if !self
.helm_client
.chart_version_exists(&self.config.chart_name, &self.config.chart_version)?
{
return Err(ClusterError::Other(format!(
"{}:{} not found in helm repo",
&self.config.chart_name, &self.config.chart_version,
)));
}
debug!(
chart_location = &**chart_location,
"Using remote helm chart:"
);
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_APP_REPO,
&self.config.chart_name,
Some(&self.config.chart_version),
&install_settings,
)?;
}
ChartLocation::Local(chart_location) => {
let chart_location = chart_location.to_string_lossy();
debug!(
chart_location = chart_location.as_ref(),
"Using local helm chart:"
);
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_APP_REPO,
chart_location.as_ref(),
Some(&self.config.chart_version),
&install_settings,
)?;
}
}
info!("Fluvio app chart has been installed");
Ok(())
}
fn upload_tls_secrets(&self, tls: &TlsConfig) -> Result<(), IoError> {
let paths: Cow<TlsPaths> = match tls {
TlsConfig::Files(paths) => Cow::Borrowed(paths),
TlsConfig::Inline(certs) => Cow::Owned(certs.try_into_temp_files()?),
};
self.upload_tls_secrets_from_files(paths.as_ref())?;
Ok(())
}
#[instrument(skip(self, ns))]
async fn discover_sc_address(&self, ns: &str) -> Result<Option<String>, ClusterError> {
use k8_client::http::StatusCode;
let result = self
.kube_client
.retrieve_item::<ServiceSpec, _>(&InputObjectMeta::named("flv-sc-public", ns))
.await;
let svc = match result {
Ok(svc) => svc,
Err(k8_client::ClientError::Client(status)) if status == StatusCode::NOT_FOUND => {
info!("no SC service found");
return Ok(None);
}
Err(err) => return Err(ClusterError::from(err)),
};
let ingress_addr = svc
.status
.load_balancer
.ingress
.iter()
.find(|_| true)
.and_then(|ingress| ingress.host_or_ip().to_owned());
let sock_addr = ingress_addr.and_then(|addr| {
svc.spec
.ports
.iter()
.find(|_| true)
.and_then(|port| port.target_port)
.map(|target_port| format!("{}:{}", addr, target_port))
});
Ok(sock_addr)
}
#[instrument(skip(self, ns))]
async fn wait_for_sc_service(&self, ns: &str) -> Result<String, ClusterError> {
info!("waiting for SC service");
for i in 0..12 {
if let Some(sock_addr) = self.discover_sc_address(ns).await? {
info!(%sock_addr, "found SC service load balancer, discovered SC address");
self.wait_for_sc_port_check(&sock_addr).await?;
return Ok(sock_addr);
}
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(
attempt = i,
"no SC service found, sleeping for {} ms", sleep_ms
);
sleep(Duration::from_millis(sleep_ms)).await
}
Err(ClusterError::SCServiceTimeout)
}
async fn wait_for_sc_port_check(&self, sock_addr_str: &str) -> Result<(), ClusterError> {
info!(sock_addr = %sock_addr_str, "waiting for SC port check");
for i in 0..12 {
let sock_addr = self.wait_for_sc_dns(&sock_addr_str).await?;
if TcpStream::connect(&*sock_addr).await.is_ok() {
return Ok(());
}
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(attempt = i, "sc port closed, sleeping for {} ms", sleep_ms);
sleep(Duration::from_millis(sleep_ms)).await
}
Err(ClusterError::SCPortCheckTimeout)
}
async fn wait_for_sc_dns(
&self,
sock_addr_string: &str,
) -> Result<Vec<SocketAddr>, ClusterError> {
info!("waiting for SC dns resolution");
for i in 0..12 {
match resolve(sock_addr_string).await {
Ok(sock_addr) => return Ok(sock_addr),
Err(err) => {
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(
attempt = i,
"SC dns resoultion failed {}, sleeping for {} ms", err, sleep_ms
);
sleep(Duration::from_millis(sleep_ms)).await
}
}
}
Err(ClusterError::SCDNSTimeout)
}
#[instrument(skip(self, ns))]
async fn wait_for_spu(&self, ns: &str, spu: u16) -> Result<bool, ClusterError> {
info!("waiting for SPU");
for i in 0..12 {
debug!("retrieving spu specs");
let items = self.kube_client.retrieve_items::<SpuSpec, _>(ns).await?;
let spu_count = items.items.len();
let ready_spu = items
.items
.iter()
.filter(|spu_obj| {
!spu_obj.spec.public_endpoint.ingress.is_empty() && spu_obj.status.is_online()
})
.count();
if spu as usize == ready_spu {
info!(spu_count, "All SPUs are ready");
return Ok(true);
} else {
debug!(
total_expected_spu = spu_count,
ready_spu,
attempt = i,
"Not all SPUs are ready. Waiting",
);
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(
attempt = i,
"{} of {} spu ready, sleeping for {} ms", ready_spu, spu, sleep_ms
);
sleep(Duration::from_millis(sleep_ms)).await;
}
}
Err(ClusterError::SPUTimeout)
}
#[instrument(skip(self, paths))]
fn upload_tls_secrets_from_files(&self, paths: &TlsPaths) -> Result<(), IoError> {
let ca_cert = paths
.ca_cert
.to_str()
.ok_or_else(|| IoError::new(ErrorKind::InvalidInput, "ca_cert must be a valid path"))?;
let server_cert = paths.cert.to_str().ok_or_else(|| {
IoError::new(ErrorKind::InvalidInput, "server_cert must be a valid path")
})?;
let server_key = paths.key.to_str().ok_or_else(|| {
IoError::new(ErrorKind::InvalidInput, "server_key must be a valid path")
})?;
debug!("Using TLS from paths: {:?}", paths);
Command::new("kubectl")
.args(&["delete", "secret", "fluvio-ca", "--ignore-not-found=true"])
.args(&["--namespace", &self.config.namespace])
.inherit();
Command::new("kubectl")
.args(&["delete", "secret", "fluvio-tls", "--ignore-not-found=true"])
.args(&["--namespace", &self.config.namespace])
.inherit();
Command::new("kubectl")
.args(&["create", "secret", "generic", "fluvio-ca"])
.args(&["--from-file", ca_cert])
.args(&["--namespace", &self.config.namespace])
.inherit();
Command::new("kubectl")
.args(&["create", "secret", "tls", "fluvio-tls"])
.args(&["--cert", server_cert])
.args(&["--key", server_key])
.args(&["--namespace", &self.config.namespace])
.inherit();
Ok(())
}
fn update_profile(&self, external_addr: String) -> Result<(), ClusterError> {
let mut config_file = ConfigFile::load_default_or_new()?;
let config = config_file.mut_config();
let profile_name = self.compute_profile_name()?;
match config.cluster_mut(&profile_name) {
Some(cluster) => {
cluster.addr = external_addr;
cluster.tls = self.config.server_tls_policy.clone();
}
None => {
let mut local_cluster = FluvioConfig::new(external_addr);
local_cluster.tls = self.config.server_tls_policy.clone();
config.add_cluster(local_cluster, profile_name.clone());
}
}
match config.profile_mut(&profile_name) {
Some(profile) => {
profile.set_cluster(profile_name.clone());
}
None => {
let profile = Profile::new(profile_name.clone());
config.add_profile(profile, profile_name.clone());
}
};
config.set_current_profile(&profile_name);
config_file.save()?;
Ok(())
}
fn compute_profile_name(&self) -> Result<String, ClusterError> {
let k8_config = K8Config::load()?;
let kc_config = match k8_config {
K8Config::Pod(_) => {
return Err(ClusterError::Other(
"Pod config is not valid here".to_owned(),
))
}
K8Config::KubeConfig(config) => config,
};
kc_config
.config
.current_context()
.ok_or_else(|| ClusterError::Other("No context fount".to_owned()))
.map(|ctx| ctx.name.to_owned())
}
#[instrument(
skip(self, cluster),
fields(cluster_addr = &*cluster.addr)
)]
async fn create_managed_spu_group(&self, cluster: &FluvioConfig) -> Result<(), ClusterError> {
let name = self.config.group_name.clone();
let mut fluvio = Fluvio::connect_with_config(cluster).await?;
let mut admin = fluvio.admin().await;
admin
.create(name, false, self.config.spu_spec.clone())
.await?;
Ok(())
}
}