use std::process;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::convert::TryFrom;
use tracing::info;
use tracing::debug;
use structopt::StructOpt;
use fluvio_types::print_cli_err;
use k8_client::K8Config;
use fluvio_future::openssl::TlsAcceptor;
use fluvio_future::openssl::SslVerifyMode;
use crate::services::auth::basic::BasicRbacPolicy;
use crate::error::ScError;
use crate::config::ScConfig;
type Config = (ScConfig, Option<BasicRbacPolicy>);
#[derive(Debug, StructOpt, Default)]
#[structopt(name = "sc-server", about = "Streaming Controller")]
pub struct ScOpt {
#[structopt(long)]
bind_public: Option<String>,
#[structopt(long)]
bind_private: Option<String>,
#[structopt(short = "n", long = "namespace", value_name = "namespace")]
namespace: Option<String>,
#[structopt(flatten)]
tls: TlsConfig,
#[structopt(
long = "authorization-scopes",
value_name = "authorization scopes path",
env
)]
x509_auth_scopes: Option<PathBuf>,
#[structopt(
long = "authorization-policy",
value_name = "authorization policy path",
env
)]
auth_policy: Option<PathBuf>,
}
impl ScOpt {
#[allow(clippy::type_complexity)]
fn get_sc_and_k8_config(
mut self,
) -> Result<(Config, K8Config, Option<(String, TlsConfig)>), ScError> {
let k8_config = K8Config::load().expect("no k8 config founded");
if self.namespace.is_none() {
let k8_namespace = k8_config.namespace().to_owned();
info!("using {} as namespace from kubernetes config", k8_namespace);
self.namespace = Some(k8_namespace);
}
let (sc_config, tls_option) = self.as_sc_config()?;
Ok((sc_config, k8_config, tls_option))
}
#[allow(clippy::wrong_self_convention)]
fn as_sc_config(self) -> Result<(Config, Option<(String, TlsConfig)>), IoError> {
let mut config = ScConfig::default();
if let Some(public_addr) = self.bind_public {
config.public_endpoint = public_addr;
}
if let Some(private_addr) = self.bind_private {
config.private_endpoint = private_addr;
}
config.namespace = self.namespace.unwrap();
config.x509_auth_scopes = self.x509_auth_scopes;
let policy = match self.auth_policy {
Some(p) => Some(BasicRbacPolicy::try_from(p)?),
None => None,
};
let tls = self.tls;
if tls.tls {
let proxy_addr = config.public_endpoint.clone();
debug!("using tls proxy addr: {}", proxy_addr);
config.public_endpoint = tls.bind_non_tls_public.clone().ok_or_else(|| {
IoError::new(
ErrorKind::NotFound,
"non tls addr for public must be specified",
)
})?;
Ok(((config, policy), Some((proxy_addr, tls))))
} else {
Ok(((config, policy), None))
}
}
pub fn parse_cli_or_exit(self) -> (Config, K8Config, Option<(String, TlsConfig)>) {
match self.get_sc_and_k8_config() {
Err(err) => {
print_cli_err!(err);
process::exit(-1);
}
Ok(config) => config,
}
}
}
#[derive(Debug, StructOpt, Clone, Default)]
pub struct TlsConfig {
#[structopt(long)]
tls: bool,
#[structopt(long)]
pub server_cert: Option<String>,
#[structopt(long)]
pub server_key: Option<String>,
#[structopt(long)]
pub enable_client_cert: bool,
#[structopt(long)]
pub ca_cert: Option<String>,
#[structopt(long)]
bind_non_tls_public: Option<String>,
}
impl TlsConfig {
pub fn try_build_tls_acceptor(&self) -> Result<TlsAcceptor, IoError> {
let server_crt_path = self
.server_cert
.as_ref()
.ok_or_else(|| IoError::new(ErrorKind::NotFound, "missing server cert"))?;
info!("using server crt: {}", server_crt_path);
let server_key_path = self
.server_key
.as_ref()
.ok_or_else(|| IoError::new(ErrorKind::NotFound, "missing server key"))?;
info!("using server key: {}", server_key_path);
let builder = (if self.enable_client_cert {
let ca_path = self
.ca_cert
.as_ref()
.ok_or_else(|| IoError::new(ErrorKind::NotFound, "missing ca cert"))?;
info!("using client cert CA path: {}", ca_path);
TlsAcceptor::builder()
.map_err(|err| err.into_io_error())?
.with_ssl_verify_mode(SslVerifyMode::PEER)
.with_ca_from_pem_file(ca_path)
.map_err(|err| err.into_io_error())?
} else {
info!("using tls anonymous access");
TlsAcceptor::builder().map_err(|err| err.into_io_error())?
})
.with_certifiate_and_key_from_pem_files(server_crt_path, server_key_path)
.map_err(|err| err.into_io_error())?;
Ok(builder.build())
}
}