spacegate-config 0.2.0-alpha.4

A library-first, lightweight, high-performance, cloud-native supported API gateway
Documentation
use futures_util::future::join_all;
use gateway::{SgListener, SgParameters, SgProtocolConfig, SgTlsConfig};
use http_route::SgHttpRouteRule;
use k8s_gateway_api::{Gateway, HttpRoute, Listener};
use k8s_openapi::api::core::v1::Secret;
use kube::{api::ListParams, Api, ResourceExt};
use spacegate_model::{
    ext::k8s::{
        crd::{
            http_spaceroute::HttpSpaceroute,
            sg_filter::{K8sSgFilterSpecTargetRef, SgFilter},
        },
        helper_struct::SgTargetKind,
    },
    PluginInstanceId,
};

use crate::{
    constants::{self, GATEWAY_CLASS_NAME},
    model::{gateway, http_route, PluginConfig, SgGateway, SgHttpRoute},
    service::Retrieve,
    BoxError, BoxResult,
};

use super::{
    convert::{filter_k8s_conv::PluginConfigConv, gateway_k8s_conv::SgParametersConv as _, route_k8s_conv::SgHttpRouteRuleConv as _},
    K8s,
};

impl Retrieve for K8s {
    async fn retrieve_config_item_gateway(&self, gateway_name: &str) -> BoxResult<Option<SgGateway>> {
        let gateway_api: Api<Gateway> = self.get_namespace_api();

        let result = if let Some(gateway_obj) = gateway_api.get_opt(gateway_name).await?.and_then(|gateway_obj| {
            if gateway_obj.spec.gateway_class_name == GATEWAY_CLASS_NAME {
                Some(gateway_obj)
            } else {
                None
            }
        }) {
            Some(self.kube_gateway_2_sg_gateway(gateway_obj).await?)
        } else {
            None
        };

        Ok(result)
    }

    async fn retrieve_config_item_route(&self, gateway_name: &str, route_name: &str) -> BoxResult<Option<SgHttpRoute>> {
        let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
        let httproute_api: Api<HttpRoute> = self.get_namespace_api();

        let result = if let Some(httpspaceroute) = http_spaceroute_api.get_opt(route_name).await?.and_then(|http_route_obj| {
            if http_route_obj
                .spec
                .inner
                .parent_refs
                .as_ref()
                .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == http_route_obj.namespace() && parent_ref.name == gateway_name))
                .unwrap_or(false)
            {
                Some(http_route_obj)
            } else {
                None
            }
        }) {
            Some(self.kube_httpspaceroute_2_sg_route(httpspaceroute).await?)
        } else if let Some(http_route) = httproute_api.get_opt(route_name).await?.and_then(|http_route| {
            if http_route
                .spec
                .inner
                .parent_refs
                .as_ref()
                .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == http_route.namespace() && parent_ref.name == gateway_name))
                .unwrap_or(false)
            {
                Some(http_route)
            } else {
                None
            }
        }) {
            Some(self.kube_httproute_2_sg_route(http_route).await?)
        } else {
            None
        };

        Ok(result)
    }

    async fn retrieve_config_item_route_names(&self, name: &str) -> BoxResult<Vec<String>> {
        let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
        let httproute_api: Api<HttpRoute> = self.get_namespace_api();

        let mut result: Vec<String> = http_spaceroute_api
            .list(&ListParams::default())
            .await?
            .iter()
            .filter(|route| {
                route
                    .spec
                    .inner
                    .parent_refs
                    .as_ref()
                    .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == route.namespace() && parent_ref.name == name))
                    .unwrap_or(false)
            })
            .map(|route| route.name_any())
            .collect();

        result.extend(
            httproute_api
                .list(&ListParams::default())
                .await?
                .iter()
                .filter(|route| {
                    route
                        .spec
                        .inner
                        .parent_refs
                        .as_ref()
                        .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == route.namespace() && parent_ref.name == name))
                        .unwrap_or(false)
                })
                .map(|route| route.name_any()),
        );

        Ok(result)
    }

    async fn retrieve_config_names(&self) -> BoxResult<Vec<String>> {
        let gateway_api: Api<Gateway> = self.get_namespace_api();

        let result = gateway_api.list(&ListParams::default()).await?.iter().map(|gateway| gateway.name_any()).collect();

        Ok(result)
    }

    async fn retrieve_all_plugins(&self) -> Result<Vec<PluginConfig>, BoxError> {
        let filter_api: Api<SgFilter> = self.get_namespace_api();

        let result = filter_api.list(&ListParams::default()).await?.into_iter().filter_map(PluginConfig::from_first_filter_obj).collect();
        Ok(result)
    }

    async fn retrieve_plugin(&self, id: &spacegate_model::PluginInstanceId) -> Result<Option<PluginConfig>, BoxError> {
        let filter_api: Api<SgFilter> = self.get_namespace_api();

        match &id.name {
            spacegate_model::PluginInstanceName::Anon { uid: _ } => Ok(None),
            spacegate_model::PluginInstanceName::Named { name } => {
                let result = filter_api.get_opt(name).await?.and_then(PluginConfig::from_first_filter_obj);
                Ok(result)
            }
            spacegate_model::PluginInstanceName::Mono => Ok(None),
        }
    }

    async fn retrieve_plugins_by_code(&self, code: &str) -> Result<Vec<PluginConfig>, BoxError> {
        Ok(self.retrieve_all_plugins().await?.into_iter().filter(|p| p.code() == code).collect())
    }
}

impl K8s {
    pub(crate) const HTTP2_KEY: &'static str = "http2";
    pub(crate) const HTTP2_ENABLE: &'static str = "true";
    // query is http2 enabled?
    fn retrieve_http2_config(tls_config: &k8s_gateway_api::GatewayTlsConfig) -> bool {
        if let Some(options) = &tls_config.options {
            if let Some(Self::HTTP2_ENABLE) = options.get(Self::HTTP2_KEY).map(String::as_str) {
                return true;
            }
        }
        false
    }
    async fn kube_gateway_2_sg_gateway(&self, gateway_obj: Gateway) -> BoxResult<SgGateway> {
        let gateway_name = gateway_obj.name_any();
        let plugins = self
            .retrieve_config_item_filters(K8sSgFilterSpecTargetRef {
                kind: SgTargetKind::Gateway.into(),
                name: gateway_name.clone(),
                namespace: gateway_obj.namespace(),
            })
            .await?;
        let result = SgGateway {
            name: gateway_name,
            parameters: SgParameters::from_kube_gateway(&gateway_obj),
            listeners: self.retrieve_config_item_listeners(&gateway_obj.spec.listeners).await?,
            plugins,
        };
        Ok(result)
    }

    async fn kube_httpspaceroute_2_sg_route(&self, httpspace_route: HttpSpaceroute) -> BoxResult<SgHttpRoute> {
        let route_name = httpspace_route.name_any();
        let kind = if let Some(kind) = httpspace_route.annotations().get(constants::RAW_HTTP_ROUTE_KIND) {
            kind.clone()
        } else {
            SgTargetKind::Httpspaceroute.into()
        };
        let priority = httpspace_route.annotations().get(crate::constants::ANNOTATION_RESOURCE_PRIORITY).and_then(|a| a.parse::<i16>().ok()).unwrap_or(0);
        let plugins = self
            .retrieve_config_item_filters(K8sSgFilterSpecTargetRef {
                kind,
                name: route_name.clone(),
                namespace: httpspace_route.namespace(),
            })
            .await?;
        Ok(SgHttpRoute {
            hostnames: httpspace_route.spec.hostnames.clone(),
            plugins,
            rules: httpspace_route
                .spec
                .rules
                .map(|r_vec| r_vec.into_iter().map(SgHttpRouteRule::from_kube_httproute).collect::<Result<Vec<_>, BoxError>>())
                .transpose()?
                .unwrap_or_default(),
            priority,
            route_name,
        })
    }

    async fn kube_httproute_2_sg_route(&self, http_route: HttpRoute) -> BoxResult<SgHttpRoute> {
        self.kube_httpspaceroute_2_sg_route(http_route.into()).await
    }

    async fn retrieve_config_item_filters(&self, target: K8sSgFilterSpecTargetRef) -> BoxResult<Vec<PluginInstanceId>> {
        let kind = target.kind;
        let name = target.name;
        let namespace = target.namespace.unwrap_or(self.namespace.to_string());

        let filter_api: Api<SgFilter> = self.get_all_api();
        let plugin_ids: Vec<PluginInstanceId> = filter_api
            .list(&ListParams::default())
            .await
            .map_err(Box::new)?
            .into_iter()
            .filter(|filter_obj| {
                filter_obj.spec.target_refs.iter().any(|target_ref| {
                    target_ref.kind.eq_ignore_ascii_case(&kind)
                        && target_ref.name.eq_ignore_ascii_case(&name)
                        && target_ref.namespace.as_deref().unwrap_or("default").eq_ignore_ascii_case(&namespace)
                })
            })
            .flat_map(|filter_obj| PluginConfig::from_first_filter_obj(filter_obj).map(|f| f.into()))
            .collect();

        if !plugin_ids.is_empty() {
            let mut filter_vec = String::new();
            plugin_ids.clone().into_iter().for_each(|id| filter_vec.push_str(&format!("plugin:{{id:{}}},", id)));
            tracing::trace!("[SG.Common] {namespace}.{kind}.{name} filter found: {}", filter_vec.trim_end_matches(','));
        }

        if plugin_ids.is_empty() {
            Ok(vec![])
        } else {
            Ok(plugin_ids)
        }
    }

    async fn retrieve_config_item_listeners(&self, listeners: &[Listener]) -> BoxResult<Vec<SgListener>> {
        join_all(
            listeners
                .iter()
                .map(|listener| async move {
                    let sg_listener = SgListener {
                        name: listener.name.clone(),
                        ip: None,
                        port: listener.port,
                        protocol: match listener.protocol.to_lowercase().as_str() {
                            "http" => SgProtocolConfig::Http,
                            "https" => {
                                if let Some(tls_config) = &listener.tls {
                                    if let Some(certificate_ref) = tls_config.certificate_refs.as_ref().and_then(|vec| vec.first()) {
                                        let secret_api: Api<Secret> = self.get_namespace_api();
                                        if let Some(secret_obj) = secret_api.get_opt(&certificate_ref.name).await? {
                                            let tls = if let Some(secret_data) = secret_obj.data {
                                                if let Some(tls_crt) = secret_data.get("tls.crt") {
                                                    if let Some(tls_key) = secret_data.get("tls.key") {
                                                        Some(SgTlsConfig {
                                                            mode: tls_config.mode.clone().into(),
                                                            key: String::from_utf8(tls_key.0.clone()).expect("[SG.Config] Gateway tls secret [tls.key] is not valid utf8"),
                                                            cert: String::from_utf8(tls_crt.0.clone()).expect("[SG.Config] Gateway tls secret [tls.cert] is not valid utf8"),
                                                            http2: Some(Self::retrieve_http2_config(tls_config)),
                                                        })
                                                    } else {
                                                        tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.key is empty");
                                                        None
                                                    }
                                                } else {
                                                    tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
                                                    None
                                                }
                                            } else {
                                                tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.data is empty");
                                                None
                                            };
                                            if let Some(tls) = tls {
                                                SgProtocolConfig::Https { tls }
                                            } else {
                                                SgProtocolConfig::Http
                                            }
                                        } else {
                                            tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
                                            SgProtocolConfig::Http
                                        }
                                    } else {
                                        tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
                                        SgProtocolConfig::Http
                                    }
                                } else {
                                    tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls is empty");
                                    SgProtocolConfig::Http
                                }
                            }
                            _ => return Err("Unsupported protocol".into()),
                        },
                        hostname: listener.hostname.clone(),
                    };
                    Ok(sg_listener)
                })
                .collect::<Vec<_>>(),
        )
        .await
        .into_iter()
        .collect()
    }
}