use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::borrow::Cow;
use std::process::Command;
use std::time::Duration;
use std::net::SocketAddr;
use std::process::Stdio;
use std::fs::File;
use tracing::{info, warn, debug, error, instrument};
use fluvio::{Fluvio, FluvioConfig};
use fluvio::metadata::spg::SpuGroupSpec;
use fluvio::metadata::spu::SpuSpec;
use fluvio::config::{TlsPolicy, TlsConfig, TlsPaths, ConfigFile, Profile};
use flv_util::cmd::CommandExt;
use fluvio_future::timer::sleep;
use fluvio_future::net::{TcpStream, resolve};
use k8_client::K8Client;
use k8_config::K8Config;
use k8_client::metadata::MetadataClient;
use k8_client::core::service::ServiceSpec;
use k8_client::core::metadata::InputObjectMeta;
use crate::helm::{HelmClient, Chart, InstalledChart};
use crate::check::{
UnrecoverableCheck, InstallCheck, HelmVersion, AlreadyInstalled, SysChart, LoadableConfig,
LoadBalancer, CheckFailed, RecoverableCheck, CheckResults,
};
use crate::error::K8InstallError;
use crate::{
ClusterError, StartStatus, DEFAULT_NAMESPACE, DEFAULT_CHART_SYS_REPO, DEFAULT_CHART_APP_REPO,
CheckStatus,
};
use crate::start::{check_and_fix, ChartLocation, DEFAULT_CHART_REMOTE};
const DEFAULT_REGISTRY: &str = "infinyon";
const DEFAULT_APP_NAME: &str = "fluvio-app";
const DEFAULT_SYS_NAME: &str = "fluvio-sys";
const DEFAULT_CHART_SYS_NAME: &str = "fluvio/fluvio-sys";
const DEFAULT_CHART_APP_NAME: &str = "fluvio/fluvio-app";
const DEFAULT_GROUP_NAME: &str = "main";
const DEFAULT_CLOUD_NAME: &str = "minikube";
const DELAY: u64 = 3000;
#[derive(Debug)]
pub struct ClusterInstallerBuilder {
namespace: String,
image_tag: Option<String>,
image_registry: String,
chart_name: String,
chart_version: String,
chart_location: ChartLocation,
group_name: String,
cloud: String,
save_profile: bool,
install_sys: bool,
update_context: bool,
spu_spec: SpuGroupSpec,
rust_log: Option<String>,
server_tls_policy: TlsPolicy,
client_tls_policy: TlsPolicy,
authorization_config_map: Option<String>,
skip_checks: bool,
}
impl ClusterInstallerBuilder {
pub fn build(self) -> Result<ClusterInstaller, ClusterError> {
Ok(ClusterInstaller {
config: self,
kube_client: K8Client::default().map_err(K8InstallError::K8ClientError)?,
helm_client: HelmClient::new().map_err(K8InstallError::HelmError)?,
})
}
pub fn with_namespace<S: Into<String>>(mut self, namespace: S) -> Self {
self.namespace = namespace.into();
self
}
pub fn with_image_tag<S: Into<String>>(mut self, image_tag: S) -> Self {
self.image_tag = Some(image_tag.into());
self
}
pub fn with_image_registry<S: Into<String>>(mut self, image_registry: S) -> Self {
self.image_registry = image_registry.into();
self
}
pub fn with_chart_version<S: Into<String>>(mut self, chart_version: S) -> Self {
self.chart_version = chart_version.into();
self
}
pub fn with_local_chart<S: Into<PathBuf>>(mut self, local_chart_location: S) -> Self {
self.chart_location = ChartLocation::Local(local_chart_location.into());
self
}
pub fn with_remote_chart<S: Into<String>>(mut self, remote_chart_location: S) -> Self {
self.chart_location = ChartLocation::Remote(remote_chart_location.into());
self
}
pub fn with_group_name<S: Into<String>>(mut self, group_name: S) -> Self {
self.group_name = group_name.into();
self
}
pub fn with_save_profile(mut self, save_profile: bool) -> Self {
self.save_profile = save_profile;
self
}
pub fn with_skip_checks(mut self, skip_checks: bool) -> Self {
self.skip_checks = skip_checks;
self
}
pub fn with_system_chart(mut self, install_sys: bool) -> Self {
self.install_sys = install_sys;
self
}
pub fn with_update_context(mut self, update_context: bool) -> Self {
self.update_context = update_context;
self
}
pub fn with_spu_replicas(mut self, spu_replicas: u16) -> Self {
self.spu_spec.replicas = spu_replicas;
self
}
pub fn with_rust_log<S: Into<String>>(mut self, rust_log: S) -> Self {
self.rust_log = Some(rust_log.into());
self
}
pub fn with_tls<C: Into<TlsPolicy>, S: Into<TlsPolicy>>(
mut self,
client: C,
server: S,
) -> Self {
let client_policy = client.into();
let server_policy = server.into();
use TlsPolicy::*;
use std::mem::discriminant;
match (&client_policy, &server_policy) {
_ if discriminant(&client_policy) != discriminant(&server_policy) => {
warn!("Client TLS policy type is different than the Server TLS policy type!");
}
(Verified(client), Verified(server)) if client.domain() != server.domain() => {
warn!(
client_domain = client.domain(),
server_domain = server.domain(),
"Client TLS config has a different domain than the Server TLS config!"
);
}
_ => (),
}
self.client_tls_policy = client_policy;
self.server_tls_policy = server_policy;
self
}
pub fn with_cloud<S: Into<String>>(mut self, cloud: S) -> Self {
self.cloud = cloud.into();
self
}
pub fn with_authorization_config_map<S: Into<String>>(
mut self,
authorization_config_map: S,
) -> Self {
self.authorization_config_map = Some(authorization_config_map.into());
self
}
}
#[derive(Debug)]
pub struct ClusterInstaller {
config: ClusterInstallerBuilder,
kube_client: K8Client,
helm_client: HelmClient,
}
impl ClusterInstaller {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> ClusterInstallerBuilder {
let spu_spec = SpuGroupSpec {
replicas: 1,
min_id: 0,
..SpuGroupSpec::default()
};
ClusterInstallerBuilder {
namespace: DEFAULT_NAMESPACE.to_string(),
image_tag: None,
image_registry: DEFAULT_REGISTRY.to_string(),
chart_version: crate::VERSION.trim().to_string(),
chart_name: DEFAULT_CHART_APP_NAME.to_string(),
chart_location: ChartLocation::Remote(DEFAULT_CHART_REMOTE.to_string()),
group_name: DEFAULT_GROUP_NAME.to_string(),
cloud: DEFAULT_CLOUD_NAME.to_string(),
save_profile: false,
install_sys: true,
update_context: false,
spu_spec,
rust_log: None,
server_tls_policy: TlsPolicy::Disabled,
client_tls_policy: TlsPolicy::Disabled,
authorization_config_map: None,
skip_checks: false,
}
}
pub fn versions() -> Result<Vec<Chart>, ClusterError> {
let helm_client = HelmClient::new().map_err(K8InstallError::HelmError)?;
let versions = helm_client
.versions(DEFAULT_CHART_APP_NAME)
.map_err(K8InstallError::HelmError)?;
Ok(versions)
}
pub fn sys_charts() -> Result<Vec<InstalledChart>, K8InstallError> {
let helm_client = HelmClient::new()?;
let sys_charts = helm_client.get_installed_chart_by_name(DEFAULT_CHART_SYS_REPO)?;
Ok(sys_charts)
}
#[allow(unused)]
pub async fn setup(&self) -> CheckResults {
let checks: Vec<Box<dyn InstallCheck>> = vec![
Box::new(LoadableConfig),
Box::new(HelmVersion),
Box::new(SysChart),
Box::new(AlreadyInstalled),
Box::new(LoadBalancer),
];
let fix = |err| self.pre_install_fix(err);
check_and_fix(&checks, fix).await
}
async fn _try_minikube_tunnel(&self) -> Result<(), K8InstallError> {
let log_file = File::create("/tmp/tunnel.out")?;
let error_file = log_file.try_clone()?;
Command::new("minikube")
.arg("tunnel")
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(error_file))
.spawn()?;
sleep(Duration::from_millis(DELAY)).await;
Ok(())
}
#[instrument(skip(self, error))]
pub(crate) async fn pre_install_fix(
&self,
error: RecoverableCheck,
) -> Result<(), UnrecoverableCheck> {
match error {
RecoverableCheck::MissingSystemChart if self.config.install_sys => {
println!("Fluvio system chart not installed. Attempting to install");
self._install_sys()
.map_err(|_| UnrecoverableCheck::FailedRecovery(error))?;
}
RecoverableCheck::MinikubeTunnelNotFoundRetry => {
println!(
"Load balancer service is not available, trying to bring up minikube tunnel"
);
self._try_minikube_tunnel()
.await
.map_err(|_| UnrecoverableCheck::FailedRecovery(error))?;
}
unhandled => {
warn!("Pre-install was unable to autofix an error");
return Err(UnrecoverableCheck::FailedRecovery(unhandled));
}
}
Ok(())
}
#[instrument(
skip(self),
fields(namespace = &*self.config.namespace),
)]
pub async fn install_fluvio(&self) -> Result<StartStatus, ClusterError> {
let checks = match self.config.skip_checks {
true => None,
false => {
let check_results = self.setup().await;
if check_results.0.iter().any(|it| it.is_err()) {
return Err(K8InstallError::PrecheckErrored(check_results).into());
}
let statuses = check_results.into_statuses();
let mut any_failed = false;
for status in &statuses.0 {
match status {
CheckStatus::Fail(CheckFailed::AlreadyInstalled) => {
debug!("Fluvio is already installed. Getting SC address");
let address = self.wait_for_sc_service(&self.config.namespace).await?;
return Ok(StartStatus {
address,
checks: Some(statuses),
});
}
CheckStatus::Fail(_) => any_failed = true,
_ => (),
}
}
if any_failed {
return Err(K8InstallError::FailedPrecheck(statuses).into());
}
Some(statuses)
}
};
self.install_app()?;
let namespace = &self.config.namespace;
let address = self
.wait_for_sc_service(namespace)
.await
.map_err(|_| K8InstallError::UnableToDetectService)?;
info!(addr = %address, "Fluvio SC is up:");
if self.config.save_profile {
self.update_profile(address.clone())?;
}
if self.config.spu_spec.replicas > 0 {
debug!("waiting for SC to spin up");
sleep(Duration::from_millis(2000)).await;
let cluster =
FluvioConfig::new(address.clone()).with_tls(self.config.client_tls_policy.clone());
self.create_managed_spu_group(&cluster).await?;
self.wait_for_spu(namespace, self.config.spu_spec.replicas)
.await?;
}
Ok(StartStatus { address, checks })
}
#[doc(hidden)]
#[instrument(skip(self))]
pub fn _install_sys(&self) -> Result<(), K8InstallError> {
let install_settings = &[("cloud", &*self.config.cloud)];
match &self.config.chart_location {
ChartLocation::Remote(chart_location) => {
debug!(
chart_location = &**chart_location,
"Using remote helm chart:"
);
self.helm_client
.repo_add(DEFAULT_CHART_APP_REPO, chart_location)?;
self.helm_client.repo_update()?;
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_SYS_REPO,
DEFAULT_CHART_SYS_NAME,
None,
install_settings,
)?;
}
ChartLocation::Local(chart_home) => {
let chart_location = chart_home.join(DEFAULT_SYS_NAME);
let chart_string = chart_location.to_string_lossy();
debug!(
chart_location = chart_string.as_ref(),
"Using local helm chart:"
);
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_SYS_REPO,
chart_string.as_ref(),
None,
install_settings,
)?;
}
}
info!("Fluvio sys chart has been installed");
Ok(())
}
#[instrument(skip(self))]
fn install_app(&self) -> Result<(), K8InstallError> {
debug!(
"Installing fluvio with the following configuration: {:#?}",
&self.config
);
if let TlsPolicy::Verified(tls) = &self.config.server_tls_policy {
self.upload_tls_secrets(tls)?;
}
let fluvio_tag = self
.config
.image_tag
.as_ref()
.unwrap_or(&self.config.chart_version)
.to_owned();
let mut install_settings: Vec<(_, &str)> = vec![
("image.registry", &self.config.image_registry),
("image.tag", &fluvio_tag),
("cloud", &self.config.cloud),
];
if let TlsPolicy::Anonymous | TlsPolicy::Verified(_) = self.config.server_tls_policy {
install_settings.push(("tls", "true"));
}
if let Some(log) = &self.config.rust_log {
install_settings.push(("scLog", log));
}
if let Some(authorization_config_map) = &self.config.authorization_config_map {
install_settings.push(("authorizationConfigMap", authorization_config_map));
}
match &self.config.chart_location {
ChartLocation::Remote(chart_location) => {
self.helm_client
.repo_add(DEFAULT_CHART_APP_REPO, chart_location)?;
self.helm_client.repo_update()?;
if !self
.helm_client
.chart_version_exists(&self.config.chart_name, &self.config.chart_version)?
{
return Err(K8InstallError::HelmChartNotFound(format!(
"{}:{}",
&self.config.chart_name, &self.config.chart_version
)));
}
debug!(
chart_location = &**chart_location,
"Using remote helm chart:"
);
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_APP_REPO,
&self.config.chart_name,
Some(&self.config.chart_version),
&install_settings,
)?;
}
ChartLocation::Local(chart_home) => {
let chart_location = chart_home.join(DEFAULT_APP_NAME);
let chart_string = chart_location.to_string_lossy();
debug!(
chart_location = chart_string.as_ref(),
"Using local helm chart:"
);
self.helm_client.install(
&self.config.namespace,
DEFAULT_CHART_APP_REPO,
chart_string.as_ref(),
Some(&self.config.chart_version),
&install_settings,
)?;
}
}
info!("Fluvio app chart has been installed");
Ok(())
}
fn upload_tls_secrets(&self, tls: &TlsConfig) -> Result<(), IoError> {
let paths: Cow<TlsPaths> = match tls {
TlsConfig::Files(paths) => Cow::Borrowed(paths),
TlsConfig::Inline(certs) => Cow::Owned(certs.try_into_temp_files()?),
};
self.upload_tls_secrets_from_files(paths.as_ref())?;
Ok(())
}
#[instrument(skip(self, ns))]
async fn discover_sc_address(&self, ns: &str) -> Result<Option<String>, K8InstallError> {
use k8_client::http::status::StatusCode;
let result = self
.kube_client
.retrieve_item::<ServiceSpec, _>(&InputObjectMeta::named("fluvio-sc-public", ns))
.await;
let svc = match result {
Ok(svc) => svc,
Err(k8_client::ClientError::Client(status)) if status == StatusCode::NOT_FOUND => {
info!("no SC service found");
return Ok(None);
}
Err(err) => return Err(K8InstallError::from(err)),
};
let ingress_addr = svc
.status
.load_balancer
.ingress
.iter()
.find(|_| true)
.and_then(|ingress| ingress.host_or_ip().to_owned());
let sock_addr = ingress_addr.and_then(|addr| {
svc.spec
.ports
.iter()
.find(|_| true)
.and_then(|port| port.target_port)
.map(|target_port| format!("{}:{}", addr, target_port))
});
Ok(sock_addr)
}
#[instrument(skip(self, ns))]
async fn wait_for_sc_service(&self, ns: &str) -> Result<String, K8InstallError> {
info!("waiting for SC service");
for i in 0..12 {
if let Some(sock_addr) = self.discover_sc_address(ns).await? {
info!(%sock_addr, "found SC service load balancer, discovered SC address");
self.wait_for_sc_port_check(&sock_addr).await?;
return Ok(sock_addr);
}
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(
attempt = i,
"no SC service found, sleeping for {} ms", sleep_ms
);
sleep(Duration::from_millis(sleep_ms)).await;
}
Err(K8InstallError::SCServiceTimeout)
}
async fn wait_for_sc_port_check(&self, sock_addr_str: &str) -> Result<(), K8InstallError> {
info!(sock_addr = %sock_addr_str, "waiting for SC port check");
for i in 0..12 {
let sock_addr = self.wait_for_sc_dns(&sock_addr_str).await?;
if TcpStream::connect(&*sock_addr).await.is_ok() {
info!(sock_addr = %sock_addr_str, "finished SC port check");
return Ok(());
}
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(attempt = i, "sc port closed, sleeping for {} ms", sleep_ms);
sleep(Duration::from_millis(sleep_ms)).await;
}
error!(sock_addr = %sock_addr_str, "timeout for SC port check");
Err(K8InstallError::SCPortCheckTimeout)
}
async fn wait_for_sc_dns(
&self,
sock_addr_string: &str,
) -> Result<Vec<SocketAddr>, K8InstallError> {
info!("waiting for SC dns resolution: {}", sock_addr_string);
for i in 0..12 {
match resolve(sock_addr_string).await {
Ok(sock_addr) => {
debug!("finished SC dns resolution: {}", sock_addr_string);
return Ok(sock_addr);
}
Err(err) => {
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(
attempt = i,
"SC dns resoultion failed {}, sleeping for {} ms", err, sleep_ms
);
sleep(Duration::from_millis(sleep_ms)).await;
}
}
}
error!("timedout sc dns: {}", sock_addr_string);
Err(K8InstallError::SCDNSTimeout)
}
#[instrument(skip(self, ns))]
async fn wait_for_spu(&self, ns: &str, spu: u16) -> Result<bool, K8InstallError> {
info!("waiting for SPU");
for i in 0..12 {
debug!("retrieving spu specs");
let items = self.kube_client.retrieve_items::<SpuSpec, _>(ns).await?;
let spu_count = items.items.len();
let ready_spu = items
.items
.iter()
.filter(|spu_obj| {
!spu_obj.spec.public_endpoint.ingress.is_empty() && spu_obj.status.is_online()
})
.count();
if spu as usize == ready_spu {
info!(spu_count, "All SPUs are ready");
return Ok(true);
} else {
debug!(
total_expected_spu = spu_count,
ready_spu,
attempt = i,
"Not all SPUs are ready. Waiting",
);
let sleep_ms = 1000 * 2u64.pow(i as u32);
info!(
attempt = i,
"{} of {} spu ready, sleeping for {} ms", ready_spu, spu, sleep_ms
);
sleep(Duration::from_millis(sleep_ms)).await;
}
}
Err(K8InstallError::SPUTimeout)
}
#[instrument(skip(self, paths))]
fn upload_tls_secrets_from_files(&self, paths: &TlsPaths) -> Result<(), IoError> {
let ca_cert = paths
.ca_cert
.to_str()
.ok_or_else(|| IoError::new(ErrorKind::InvalidInput, "ca_cert must be a valid path"))?;
let server_cert = paths.cert.to_str().ok_or_else(|| {
IoError::new(ErrorKind::InvalidInput, "server_cert must be a valid path")
})?;
let server_key = paths.key.to_str().ok_or_else(|| {
IoError::new(ErrorKind::InvalidInput, "server_key must be a valid path")
})?;
debug!("Using TLS from paths: {:?}", paths);
Command::new("kubectl")
.args(&["delete", "secret", "fluvio-ca", "--ignore-not-found=true"])
.args(&["--namespace", &self.config.namespace])
.inherit();
Command::new("kubectl")
.args(&["delete", "secret", "fluvio-tls", "--ignore-not-found=true"])
.args(&["--namespace", &self.config.namespace])
.inherit();
Command::new("kubectl")
.args(&["create", "secret", "generic", "fluvio-ca"])
.args(&["--from-file", ca_cert])
.args(&["--namespace", &self.config.namespace])
.inherit();
Command::new("kubectl")
.args(&["create", "secret", "tls", "fluvio-tls"])
.args(&["--cert", server_cert])
.args(&["--key", server_key])
.args(&["--namespace", &self.config.namespace])
.inherit();
Ok(())
}
fn update_profile(&self, external_addr: String) -> Result<(), K8InstallError> {
debug!("updating profile for: {}", external_addr);
let mut config_file = ConfigFile::load_default_or_new()?;
let config = config_file.mut_config();
let profile_name = self.compute_profile_name()?;
match config.cluster_mut(&profile_name) {
Some(cluster) => {
cluster.addr = external_addr;
cluster.tls = self.config.client_tls_policy.clone();
}
None => {
let mut local_cluster = FluvioConfig::new(external_addr);
local_cluster.tls = self.config.client_tls_policy.clone();
config.add_cluster(local_cluster, profile_name.clone());
}
}
match config.profile_mut(&profile_name) {
Some(profile) => {
profile.set_cluster(profile_name.clone());
}
None => {
let profile = Profile::new(profile_name.clone());
config.add_profile(profile, profile_name.clone());
}
};
config.set_current_profile(&profile_name);
config_file.save()?;
Ok(())
}
fn compute_profile_name(&self) -> Result<String, K8InstallError> {
let k8_config = K8Config::load()?;
let kc_config = match k8_config {
K8Config::Pod(_) => {
return Err(K8InstallError::Other(
"Pod config is not valid here".to_owned(),
));
}
K8Config::KubeConfig(config) => config,
};
kc_config
.config
.current_context()
.ok_or_else(|| K8InstallError::Other("No context fount".to_owned()))
.map(|ctx| ctx.name.to_owned())
}
#[instrument(
skip(self, cluster),
fields(cluster_addr = & * cluster.addr)
)]
async fn create_managed_spu_group(&self, cluster: &FluvioConfig) -> Result<(), K8InstallError> {
debug!("trying to create managed spu: {:#?}", cluster);
let name = self.config.group_name.clone();
let fluvio = Fluvio::connect_with_config(cluster).await?;
let mut admin = fluvio.admin().await;
admin
.create(name, false, self.config.spu_spec.clone())
.await?;
Ok(())
}
}