use crate::error::{DshApiError, DshApiResult};
use crate::{DEFAULT_PLATFORMS, ENV_VAR_PLATFORM, ENV_VAR_PLATFORMS_FILE_NAME};
use itertools::Itertools;
use log::{debug, error, info};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::LazyLock;
use std::{env, fs};
#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub struct DshPlatform {
name: String,
description: String,
alias: String,
#[serde(rename = "is-production")]
is_production: bool,
#[serde(rename = "cloud-provider")]
cloud_provider: CloudProvider,
region: Option<String>,
#[serde(alias = "issuer-endpoint")]
issuer_endpoint: String,
realm: String,
#[serde(rename = "public-domain")]
public_domain: String,
#[serde(rename = "private-domain")]
private_domain: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub enum CloudProvider {
#[serde(rename = "aws")]
AWS,
#[serde(rename = "azure")]
Azure,
}
#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub enum VhostZone {
#[serde(rename = "private")]
Private,
#[serde(rename = "public")]
Public,
}
#[rustfmt::skip]
pub fn serialize_platform<S>(platform: &DshPlatform, serialize: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serialize.serialize_str(platform.name())
}
#[rustfmt::skip]
pub fn serialize_platform_alias<S>(platform: &DshPlatform, serialize: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serialize.serialize_str(platform.alias())
}
#[rustfmt::skip]
pub fn deserialize_platform<'de, D>(deserializer: D) -> Result<DshPlatform, D::Error>
where
D: Deserializer<'de>,
{
DshPlatform::from_str(&String::deserialize(deserializer)?).map_err(serde::de::Error::custom)
}
const CLIENT_ID_SEPARATOR: &str = ":";
impl DshPlatform {
pub fn access_token_endpoint(&self) -> String {
format!("{}/protocol/openid-connect/token", self.issuer_endpoint())
}
pub fn alias(&self) -> &str {
self.alias.as_str()
}
pub fn all() -> &'static [DshPlatform] {
&DSH_PLATFORMS
}
pub fn bucket_name(&self, tenant_name: impl Display, bucket_id: impl Display, access_key: Option<impl Display>) -> Result<String, String> {
match self.cloud_provider {
CloudProvider::AWS => Ok(format!("{}-{}-{}", self.realm, tenant_name, bucket_id)),
CloudProvider::Azure => match access_key {
Some(access_key) => Ok(format!("{}-{}-{}@{}", self.realm, tenant_name, bucket_id, access_key)),
None => Err("bucket name for azure requires the bucket access secret system/objectstore/access_key_id".to_string()),
},
}
}
#[deprecated]
pub fn client_id(&self) -> String {
self.robot_client_id()
}
pub fn cloud_provider(&self) -> &CloudProvider {
&self.cloud_provider
}
pub fn console_domain(&self) -> String {
format!("console.{}", self.public_domain())
}
#[rustfmt::skip]
pub fn console_url(&self) -> String {
format!("https://{}", self.console_domain())
}
pub fn consumer_group(&self, tenant_name: impl Display, service_name: impl Display, index: usize) -> String {
format!("{}_{}_{}", tenant_name, service_name, index)
}
#[rustfmt::skip]
pub fn description(&self) -> &str {
&self.description
}
#[rustfmt::skip]
pub fn domain(&self, vhost_zone: VhostZone) -> DshApiResult<&str> {
match vhost_zone {
VhostZone::Private => match self.private_domain() {
Some(private_domain) => Ok(private_domain),
None => Err(DshApiError::parameter(format!("platform '{}' does not support private vhosts", self))),
},
VhostZone::Public => Ok(self.public_domain()),
}
}
#[rustfmt::skip]
pub fn from_domain(domain_name: &str) -> DshApiResult<Option<(Self, VhostZone)>> {
let matching_platforms: Vec<(DshPlatform, VhostZone)> = DSH_PLATFORMS
.iter()
.filter_map(|platform| {
match (
platform.public_domain.ends_with(domain_name),
platform.private_domain.as_ref().is_some_and(|private_domain| private_domain.ends_with(domain_name)),
) {
(false, false) => None,
(false, true) => Some((platform.clone(), VhostZone::Private)),
(true, false) => Some((platform.clone(), VhostZone::Public)),
(true, true) => None,
}
})
.collect_vec();
match matching_platforms.len() {
0 => Ok(None),
1 => Ok(matching_platforms.first().cloned()),
_ => Err(DshApiError::parameter("")),
}
}
pub fn http_messaging_api_url_multi(&self, mqtt_topic: impl Display) -> String {
format!("https://{}/data/v0/multi/{}", self.rest_api_domain(), mqtt_topic)
}
pub fn http_messaging_api_url_single(&self, mqtt_topic: impl Display) -> String {
format!("https://{}/data/v0/single/{}", self.rest_api_domain(), mqtt_topic)
}
#[rustfmt::skip]
pub fn internal_domain(&self, tenant_name: impl Display) -> String {
format!("{}.marathon.mesos", tenant_name)
}
pub fn internal_service_domain(&self, tenant_name: impl Display, service_name: impl Display) -> String {
format!("{}.{}", service_name, self.internal_domain(tenant_name))
}
pub fn issuer_endpoint(&self) -> &str {
self.issuer_endpoint.as_str()
}
pub fn is_production(&self) -> bool {
self.is_production
}
pub fn mqtt_messaging_api_endpoint(&self) -> String {
format!("mqtt.{}", self.public_domain())
}
pub fn mqtt_messaging_api_port(&self) -> usize {
8883
}
pub fn mqtt_token_endpoint(&self) -> String {
format!("https://{}/datastreams/v0/mqtt/token", self.rest_api_domain())
}
pub fn name(&self) -> &str {
&self.name
}
pub fn new(platform_name: &str) -> Self {
match DshPlatform::try_from(platform_name) {
Ok(dsh_platform) => dsh_platform,
Err(error) => panic!("{}", error),
}
}
pub fn private_domain(&self) -> Option<&str> {
self.private_domain.as_deref()
}
#[rustfmt::skip]
pub fn proxy_common_name(&self, proxy_name: impl Display, tenant_name: impl Display, vhost_zone: VhostZone) -> DshApiResult<String> {
Ok(format!("{}.{}", proxy_name, self.proxy_vhost_domain(tenant_name, vhost_zone)?))
}
pub fn proxy_consumer_group(&self, tenant_name: impl Display, proxy_name: impl Display, index: usize) -> String {
format!("{}_{}_{}", tenant_name, proxy_name, index)
}
#[rustfmt::skip]
pub fn proxy_consumer_group_acl(&self, tenant_name: impl Display, proxy_name: impl Display, acl_group_id: impl Display, index: usize) -> String {
format!("{}.{}_{}_{}", tenant_name, acl_group_id, proxy_name, index)
}
pub fn proxy_schema_store_vhost(&self, tenant_name: impl Display, proxy_name: impl Display, vhost_zone: VhostZone) -> DshApiResult<String> {
Ok(format!("{}-schema-store.{}", proxy_name, self.proxy_vhost_domain(tenant_name, vhost_zone)?))
}
#[rustfmt::skip]
pub fn proxy_vhost(&self, tenant_name: impl Display, proxy_name: impl Display, vhost_zone: VhostZone, index: usize) -> DshApiResult<String> {
Ok(format!("{}-{}.{}", proxy_name, index, self.proxy_vhost_domain(tenant_name, vhost_zone)?))
}
pub fn proxy_vhost_domain(&self, tenant_name: impl Display, vhost_zone: VhostZone) -> DshApiResult<String> {
Ok(format!("kafka.{}.{}", tenant_name, self.domain(vhost_zone)?))
}
pub fn public_domain(&self) -> &str {
&self.public_domain
}
pub fn public_vhost_domain(&self, vhost_name: impl Display) -> String {
format!("{}.{}", vhost_name, self.public_domain())
}
pub fn realm(&self) -> &str {
&self.realm
}
pub fn region(&self) -> Option<&str> {
self.region.as_deref()
}
pub fn rest_api_domain(&self) -> String {
format!("api.{}", self.public_domain())
}
pub fn rest_api_endpoint(&self) -> String {
format!("https://{}/resources/v0", self.rest_api_domain())
}
pub fn rest_token_endpoint(&self) -> String {
format!("https://{}/auth/v0/token", self.rest_api_domain())
}
pub fn robot_client_id(&self) -> String {
format!("robot{}{}", CLIENT_ID_SEPARATOR, self.realm())
}
#[rustfmt::skip]
pub fn robot_tenant_client_id(&self, tenant_name: impl Display) -> String {
format!("{}{}{}", self.robot_client_id(), CLIENT_ID_SEPARATOR, tenant_name)
}
pub fn swagger_url(&self) -> String {
format!("https://{}/tenant-api/spec?url=/tenant-api/assets/openapi.json", self.console_domain())
}
#[rustfmt::skip]
pub fn tenant_app_catalog_app_url(&self, tenant_name: impl Display, vendor_name: impl Display, app_name: impl Display) -> String {
format!(
"https://{}/#/profiles/{}/app-catalog/app/{}%2F{}",
self.console_domain(),
tenant_name,
vendor_name,
app_name
)
}
pub fn tenant_app_catalog_url(&self, tenant_name: impl Display) -> String {
format!("https://{}/#/profiles/{}/app-catalog", self.console_domain(), tenant_name)
}
pub fn tenant_app_console_url(&self, tenant_name: impl Display, app_name: impl Display) -> String {
format!("{}/services/{}/app", self.tenant_console_url(tenant_name), app_name)
}
#[rustfmt::skip]
#[deprecated]
pub fn tenant_client_id(&self, tenant_name: impl Display) -> String {
format!("{}{}{}", self.robot_client_id(), CLIENT_ID_SEPARATOR, tenant_name)
}
pub fn tenant_console_url(&self, tenant_name: impl Display) -> String {
format!("{}/#/profiles/{}", self.console_url(), tenant_name)
}
pub fn tenant_data_catalog_url(&self, tenant_name: impl Display) -> String {
format!("https://{}/#/profiles/{}/data-catalog", self.console_domain(), tenant_name)
}
pub fn tenant_domain(&self, tenant_name: impl Display, vhost_zone: VhostZone) -> Result<String, String> {
Ok(format!("{}.{}", tenant_name, self.domain(vhost_zone)?))
}
pub fn tenant_monitoring_url(&self, tenant_name: impl Display) -> String {
format!("https://monitoring-{}.{}", tenant_name, self.public_domain)
}
#[deprecated]
pub fn tenant_private_domain(&self, tenant_name: impl Display) -> Result<String, String> {
match self.private_domain() {
Some(private_domain) => Ok(format!("{}.{}", tenant_name, private_domain)),
None => Err(format!("private domain is not set for platform {}", self.name())),
}
}
pub fn tenant_private_vhost_domain(&self, tenant_name: impl Display, vhost_name: impl Display) -> Result<String, String> {
self
.tenant_domain(tenant_name, VhostZone::Private)
.map(|tenant_private_domain| format!("{}.{}", vhost_name, tenant_private_domain))
}
#[rustfmt::skip]
pub fn tenant_proxy_bootstrap_server(
&self,
tenant_name: impl Display,
proxy_name: impl Display,
vhost_zone: VhostZone,
port: Option<usize>,
index: usize,
) -> Result<String, String> {
match port {
Some(port) => Ok(format!("{}-{}.kafka.{}:{}", proxy_name, index, self.tenant_domain(tenant_name, vhost_zone)?, port)),
None => Ok(format!("{}-{}.kafka.{}:9091", proxy_name, index, self.tenant_domain(tenant_name, vhost_zone)?)),
}
}
#[rustfmt::skip]
pub fn tenant_proxy_bootstrap_servers(
&self,
tenant_name: impl Display,
proxy_name: impl Display,
vhost_zone: VhostZone,
number_of_servers: usize,
) -> Result<Vec<String>, String> {
(0..number_of_servers)
.map(|index| self.tenant_proxy_bootstrap_server(&tenant_name, &proxy_name, vhost_zone.clone(), None, index))
.collect::<Result<Vec<_>, _>>()
}
#[deprecated]
pub fn tenant_proxy_private_bootstrap_servers(&self, tenant_name: impl Display, proxy_name: impl Display, number_of_servers: usize) -> Result<Vec<String>, String> {
self.tenant_domain(tenant_name, VhostZone::Private).map(|tenant_private_domain| {
(0..number_of_servers)
.map(|index| format!("{}-{}.kafka.{}:9091", proxy_name, index, tenant_private_domain))
.collect_vec()
})
}
#[rustfmt::skip]
#[deprecated]
pub fn tenant_proxy_private_schema_store_host(&self, tenant_name: impl Display, proxy_name: impl Display) -> Result<String, String> {
self
.tenant_domain(tenant_name, VhostZone::Private)
.map(|tenant_private_domain| format!("{}-schema-store.kafka.{}", proxy_name, tenant_private_domain))
}
#[deprecated]
pub fn tenant_proxy_public_bootstrap_servers(&self, tenant_name: impl Display, proxy_name: impl Display, number_of_servers: usize) -> Vec<String> {
(0..number_of_servers)
.map(|index| format!("{}-{}.kafka.{}.{}:9091", proxy_name, index, tenant_name, self.public_domain))
.collect_vec()
}
#[deprecated]
pub fn tenant_proxy_public_schema_store_host(&self, tenant_name: impl Display, proxy_name: impl Display) -> String {
format!("{}-schema-store.kafka.{}.{}", proxy_name, tenant_name, self.public_domain)
}
pub fn tenant_proxy_schema_store_host(&self, tenant_name: impl Display, proxy_name: impl Display, vhost_zone: VhostZone) -> Result<String, String> {
self
.tenant_domain(tenant_name, vhost_zone)
.map(|tenant_private_domain| format!("{}-schema-store.kafka.{}", proxy_name, tenant_private_domain))
}
pub fn tenant_public_app_domain(&self, tenant_name: impl Display, app_name: impl Display) -> String {
format!("{}.{}.{}", app_name, tenant_name, self.public_domain)
}
#[deprecated]
pub fn tenant_public_domain(&self, tenant_name: impl Display) -> String {
format!("{}.{}", tenant_name, self.public_domain)
}
pub fn tenant_service_console_url(&self, tenant_name: impl Display, service_name: impl Display) -> String {
format!("{}/services/{}/service", self.tenant_console_url(tenant_name), service_name)
}
#[rustfmt::skip]
pub fn tracing_url(&self) -> String {
format!("https://tracing.{}", self.public_domain())
}
pub fn try_default() -> Result<Self, String> {
match env::var(ENV_VAR_PLATFORM) {
Ok(platform_name_from_env_var) => match DshPlatform::try_from(platform_name_from_env_var.as_str()) {
Ok(platform) => {
debug!("platform '{}' (environment variable '{}')", platform, ENV_VAR_PLATFORM);
Ok(platform)
}
Err(_) => Err(format!(
"environment variable {} contains invalid platform name '{}' (possible values: {})",
ENV_VAR_PLATFORM,
platform_name_from_env_var,
DSH_PLATFORMS
.iter()
.map(|dsh_platform| format!("{}/{}", dsh_platform.name(), dsh_platform.alias()))
.collect_vec()
.join(", ")
)),
},
Err(_) => Err(format!("environment variable '{}' not set", ENV_VAR_PLATFORM)),
}
}
}
impl Default for DshPlatform {
fn default() -> Self {
match DshPlatform::try_default() {
Ok(dsh_platform) => {
info!("default dsh platform {} created", dsh_platform);
dsh_platform
}
Err(error) => panic!("{}", error),
}
}
}
impl Display for DshPlatform {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
impl Display for CloudProvider {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
CloudProvider::AWS => write!(f, "aws"),
CloudProvider::Azure => write!(f, "azure"),
}
}
}
impl Display for VhostZone {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Private => f.write_str("private"),
Self::Public => f.write_str("public"),
}
}
}
impl TryFrom<&str> for DshPlatform {
type Error = String;
fn try_from(platform_name: &str) -> Result<Self, Self::Error> {
Self::from_str(platform_name)
}
}
impl FromStr for DshPlatform {
type Err = String;
fn from_str(platform_name: &str) -> Result<Self, Self::Err> {
match DSH_PLATFORMS
.iter()
.find(|dsh_platform| dsh_platform.name() == platform_name || dsh_platform.alias() == platform_name)
{
Some(platform) => Ok(platform.clone()),
None => Err(format!(
"invalid platform name '{}' (possible values: {})",
platform_name,
DSH_PLATFORMS
.iter()
.map(|dsh_platform| format!("{}/{}", dsh_platform.name(), dsh_platform.alias()))
.collect_vec()
.join(", ")
)),
}
}
}
impl FromStr for VhostZone {
type Err = DshApiError;
fn from_str(representation: &str) -> DshApiResult<Self> {
match representation {
"private" => Ok(Self::Private),
"public" => Ok(Self::Public),
_ => Err(DshApiError::parameter(format!("invalid vhost zone '{}'", representation))),
}
}
}
impl FromStr for CloudProvider {
type Err = DshApiError;
fn from_str(representation: &str) -> DshApiResult<Self> {
match representation {
"aws" => Ok(Self::AWS),
"azure" => Ok(Self::Azure),
_ => Err(DshApiError::parameter(format!("invalid cloud provider '{}'", representation))),
}
}
}
static DSH_PLATFORMS: LazyLock<Vec<DshPlatform>> = LazyLock::new(|| match env::var(ENV_VAR_PLATFORMS_FILE_NAME) {
Ok(platform_file_name_from_env_var) => match fs::read_to_string(&platform_file_name_from_env_var) {
Ok(platform_list_from_file) => match serde_json::from_str(platform_list_from_file.as_str()) {
Ok(mut dsh_platforms_from_file) => {
if let Err(validation_error) = check_for_duplicate_names_or_aliases(&dsh_platforms_from_file) {
error!("{}", validation_error);
panic!("{}", validation_error)
}
dsh_platforms_from_file.sort_by(|platform_a, platform_b| platform_a.name.cmp(&platform_b.name));
info!("dsh platform list read from '{}'", platform_file_name_from_env_var);
dsh_platforms_from_file
}
Err(parse_error) => {
let message = format!("invalid platforms file '{}' ({})", platform_file_name_from_env_var, parse_error);
error!("{}", message);
panic!("{}", message)
}
},
Err(file_error) => {
let message = format!("unable to read platforms file '{}' ({})", platform_file_name_from_env_var, file_error);
error!("{}", message);
panic!("{}", message)
}
},
Err(_) => match serde_json::from_str::<Vec<DshPlatform>>(DEFAULT_PLATFORMS) {
Ok(mut default_dsh_platforms) => {
default_dsh_platforms.sort_by(|platform_a, platform_b| platform_a.name.cmp(&platform_b.name));
debug!("default platform list");
default_dsh_platforms
}
Err(_) => panic!(),
},
});
#[allow(suspicious_double_ref_op)]
fn check_for_duplicate_names_or_aliases(platforms: &Vec<DshPlatform>) -> Result<(), String> {
let mut names_and_aliases: Vec<&str> = vec![];
for platform in platforms {
names_and_aliases.push(platform.name.as_str());
names_and_aliases.push(platform.alias.as_str());
}
names_and_aliases.sort();
let mut duplicates = Vec::new();
for (name_or_alias, chunk) in &names_and_aliases.into_iter().chunk_by(|b| b.clone()) {
if chunk.collect_vec().len() > 1 {
duplicates.push(name_or_alias);
}
}
if !duplicates.is_empty() {
Err(format!(
"platforms file contains duplicate names and/or aliases ({})",
duplicates.into_iter().join(", ")
))
} else {
Ok(())
}
}