spacegate_config/service/k8s/
retrieve.rs

1use futures_util::future::join_all;
2use gateway::{SgListener, SgParameters, SgProtocolConfig, SgTlsConfig};
3use http_route::SgHttpRouteRule;
4use k8s_gateway_api::{Gateway, HttpRoute, Listener};
5use k8s_openapi::api::core::v1::Secret;
6use kube::{api::ListParams, Api, ResourceExt};
7use spacegate_model::{
8    ext::k8s::{
9        crd::{
10            http_spaceroute::HttpSpaceroute,
11            sg_filter::{K8sSgFilterSpecTargetRef, SgFilter},
12        },
13        helper_struct::SgTargetKind,
14    },
15    PluginInstanceId,
16};
17
18use crate::{
19    constants::{self, GATEWAY_CLASS_NAME},
20    model::{gateway, http_route, PluginConfig, SgGateway, SgHttpRoute},
21    service::Retrieve,
22    BoxError, BoxResult,
23};
24
25use super::{
26    convert::{filter_k8s_conv::PluginConfigConv, gateway_k8s_conv::SgParametersConv as _, route_k8s_conv::SgHttpRouteRuleConv as _},
27    K8s,
28};
29
30impl Retrieve for K8s {
31    async fn retrieve_config_item_gateway(&self, gateway_name: &str) -> BoxResult<Option<SgGateway>> {
32        let gateway_api: Api<Gateway> = self.get_namespace_api();
33
34        let result = if let Some(gateway_obj) = gateway_api.get_opt(gateway_name).await?.and_then(|gateway_obj| {
35            if gateway_obj.spec.gateway_class_name == GATEWAY_CLASS_NAME {
36                Some(gateway_obj)
37            } else {
38                None
39            }
40        }) {
41            Some(self.kube_gateway_2_sg_gateway(gateway_obj).await?)
42        } else {
43            None
44        };
45
46        Ok(result)
47    }
48
49    async fn retrieve_config_item_route(&self, gateway_name: &str, route_name: &str) -> BoxResult<Option<SgHttpRoute>> {
50        let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
51        let httproute_api: Api<HttpRoute> = self.get_namespace_api();
52
53        let result = if let Some(httpspaceroute) = http_spaceroute_api.get_opt(route_name).await?.and_then(|http_route_obj| {
54            if http_route_obj
55                .spec
56                .inner
57                .parent_refs
58                .as_ref()
59                .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == http_route_obj.namespace() && parent_ref.name == gateway_name))
60                .unwrap_or(false)
61            {
62                Some(http_route_obj)
63            } else {
64                None
65            }
66        }) {
67            Some(self.kube_httpspaceroute_2_sg_route(httpspaceroute).await?)
68        } else if let Some(http_route) = httproute_api.get_opt(route_name).await?.and_then(|http_route| {
69            if http_route
70                .spec
71                .inner
72                .parent_refs
73                .as_ref()
74                .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == http_route.namespace() && parent_ref.name == gateway_name))
75                .unwrap_or(false)
76            {
77                Some(http_route)
78            } else {
79                None
80            }
81        }) {
82            Some(self.kube_httproute_2_sg_route(http_route).await?)
83        } else {
84            None
85        };
86
87        Ok(result)
88    }
89
90    async fn retrieve_config_item_route_names(&self, name: &str) -> BoxResult<Vec<String>> {
91        let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
92        let httproute_api: Api<HttpRoute> = self.get_namespace_api();
93
94        let mut result: Vec<String> = http_spaceroute_api
95            .list(&ListParams::default())
96            .await?
97            .iter()
98            .filter(|route| {
99                route
100                    .spec
101                    .inner
102                    .parent_refs
103                    .as_ref()
104                    .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == route.namespace() && parent_ref.name == name))
105                    .unwrap_or(false)
106            })
107            .map(|route| route.name_any())
108            .collect();
109
110        result.extend(
111            httproute_api
112                .list(&ListParams::default())
113                .await?
114                .iter()
115                .filter(|route| {
116                    route
117                        .spec
118                        .inner
119                        .parent_refs
120                        .as_ref()
121                        .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == route.namespace() && parent_ref.name == name))
122                        .unwrap_or(false)
123                })
124                .map(|route| route.name_any()),
125        );
126
127        Ok(result)
128    }
129
130    async fn retrieve_config_names(&self) -> BoxResult<Vec<String>> {
131        let gateway_api: Api<Gateway> = self.get_namespace_api();
132
133        let result = gateway_api.list(&ListParams::default()).await?.iter().map(|gateway| gateway.name_any()).collect();
134
135        Ok(result)
136    }
137
138    async fn retrieve_all_plugins(&self) -> Result<Vec<PluginConfig>, BoxError> {
139        let filter_api: Api<SgFilter> = self.get_namespace_api();
140
141        let result = filter_api.list(&ListParams::default()).await?.into_iter().filter_map(PluginConfig::from_first_filter_obj).collect();
142        Ok(result)
143    }
144
145    async fn retrieve_plugin(&self, id: &spacegate_model::PluginInstanceId) -> Result<Option<PluginConfig>, BoxError> {
146        let filter_api: Api<SgFilter> = self.get_namespace_api();
147
148        match &id.name {
149            spacegate_model::PluginInstanceName::Anon { uid: _ } => Ok(None),
150            spacegate_model::PluginInstanceName::Named { name } => {
151                let result = filter_api.get_opt(name).await?.and_then(PluginConfig::from_first_filter_obj);
152                Ok(result)
153            }
154            spacegate_model::PluginInstanceName::Mono => Ok(None),
155        }
156    }
157
158    async fn retrieve_plugins_by_code(&self, code: &str) -> Result<Vec<PluginConfig>, BoxError> {
159        Ok(self.retrieve_all_plugins().await?.into_iter().filter(|p| p.code() == code).collect())
160    }
161}
162
163impl K8s {
164    pub(crate) const HTTP2_KEY: &'static str = "http2";
165    pub(crate) const HTTP2_ENABLE: &'static str = "true";
166    // query is http2 enabled?
167    fn retrieve_http2_config(tls_config: &k8s_gateway_api::GatewayTlsConfig) -> bool {
168        if let Some(options) = &tls_config.options {
169            if let Some(Self::HTTP2_ENABLE) = options.get(Self::HTTP2_KEY).map(String::as_str) {
170                return true;
171            }
172        }
173        false
174    }
175    async fn kube_gateway_2_sg_gateway(&self, gateway_obj: Gateway) -> BoxResult<SgGateway> {
176        let gateway_name = gateway_obj.name_any();
177        let plugins = self
178            .retrieve_config_item_filters(K8sSgFilterSpecTargetRef {
179                kind: SgTargetKind::Gateway.into(),
180                name: gateway_name.clone(),
181                namespace: gateway_obj.namespace(),
182            })
183            .await?;
184        let result = SgGateway {
185            name: gateway_name,
186            parameters: SgParameters::from_kube_gateway(&gateway_obj),
187            listeners: self.retrieve_config_item_listeners(&gateway_obj.spec.listeners).await?,
188            plugins,
189        };
190        Ok(result)
191    }
192
193    async fn kube_httpspaceroute_2_sg_route(&self, httpspace_route: HttpSpaceroute) -> BoxResult<SgHttpRoute> {
194        let route_name = httpspace_route.name_any();
195        let kind = if let Some(kind) = httpspace_route.annotations().get(constants::RAW_HTTP_ROUTE_KIND) {
196            kind.clone()
197        } else {
198            SgTargetKind::Httpspaceroute.into()
199        };
200        let priority = httpspace_route.annotations().get(crate::constants::ANNOTATION_RESOURCE_PRIORITY).and_then(|a| a.parse::<i16>().ok()).unwrap_or(0);
201        let plugins = self
202            .retrieve_config_item_filters(K8sSgFilterSpecTargetRef {
203                kind,
204                name: route_name.clone(),
205                namespace: httpspace_route.namespace(),
206            })
207            .await?;
208        Ok(SgHttpRoute {
209            hostnames: httpspace_route.spec.hostnames.clone(),
210            plugins,
211            rules: httpspace_route
212                .spec
213                .rules
214                .map(|r_vec| r_vec.into_iter().map(SgHttpRouteRule::from_kube_httproute).collect::<Result<Vec<_>, BoxError>>())
215                .transpose()?
216                .unwrap_or_default(),
217            priority,
218            route_name,
219        })
220    }
221
222    async fn kube_httproute_2_sg_route(&self, http_route: HttpRoute) -> BoxResult<SgHttpRoute> {
223        self.kube_httpspaceroute_2_sg_route(http_route.into()).await
224    }
225
226    async fn retrieve_config_item_filters(&self, target: K8sSgFilterSpecTargetRef) -> BoxResult<Vec<PluginInstanceId>> {
227        let kind = target.kind;
228        let name = target.name;
229        let namespace = target.namespace.unwrap_or(self.namespace.to_string());
230
231        let filter_api: Api<SgFilter> = self.get_all_api();
232        let plugin_ids: Vec<PluginInstanceId> = filter_api
233            .list(&ListParams::default())
234            .await
235            .map_err(Box::new)?
236            .into_iter()
237            .filter(|filter_obj| {
238                filter_obj.spec.target_refs.iter().any(|target_ref| {
239                    target_ref.kind.eq_ignore_ascii_case(&kind)
240                        && target_ref.name.eq_ignore_ascii_case(&name)
241                        && target_ref.namespace.as_deref().unwrap_or("default").eq_ignore_ascii_case(&namespace)
242                })
243            })
244            .flat_map(|filter_obj| PluginConfig::from_first_filter_obj(filter_obj).map(|f| f.into()))
245            .collect();
246
247        if !plugin_ids.is_empty() {
248            let mut filter_vec = String::new();
249            plugin_ids.clone().into_iter().for_each(|id| filter_vec.push_str(&format!("plugin:{{id:{}}},", id)));
250            tracing::trace!("[SG.Common] {namespace}.{kind}.{name} filter found: {}", filter_vec.trim_end_matches(','));
251        }
252
253        if plugin_ids.is_empty() {
254            Ok(vec![])
255        } else {
256            Ok(plugin_ids)
257        }
258    }
259
260    async fn retrieve_config_item_listeners(&self, listeners: &[Listener]) -> BoxResult<Vec<SgListener>> {
261        join_all(
262            listeners
263                .iter()
264                .map(|listener| async move {
265                    let sg_listener = SgListener {
266                        name: listener.name.clone(),
267                        ip: None,
268                        port: listener.port,
269                        protocol: match listener.protocol.to_lowercase().as_str() {
270                            "http" => SgProtocolConfig::Http,
271                            "https" => {
272                                if let Some(tls_config) = &listener.tls {
273                                    if let Some(certificate_ref) = tls_config.certificate_refs.as_ref().and_then(|vec| vec.first()) {
274                                        let secret_api: Api<Secret> = self.get_namespace_api();
275                                        if let Some(secret_obj) = secret_api.get_opt(&certificate_ref.name).await? {
276                                            let tls = if let Some(secret_data) = secret_obj.data {
277                                                if let Some(tls_crt) = secret_data.get("tls.crt") {
278                                                    if let Some(tls_key) = secret_data.get("tls.key") {
279                                                        Some(SgTlsConfig {
280                                                            mode: tls_config.mode.clone().into(),
281                                                            key: String::from_utf8(tls_key.0.clone()).expect("[SG.Config] Gateway tls secret [tls.key] is not valid utf8"),
282                                                            cert: String::from_utf8(tls_crt.0.clone()).expect("[SG.Config] Gateway tls secret [tls.cert] is not valid utf8"),
283                                                            http2: Some(Self::retrieve_http2_config(tls_config)),
284                                                        })
285                                                    } else {
286                                                        tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.key is empty");
287                                                        None
288                                                    }
289                                                } else {
290                                                    tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
291                                                    None
292                                                }
293                                            } else {
294                                                tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.data is empty");
295                                                None
296                                            };
297                                            if let Some(tls) = tls {
298                                                SgProtocolConfig::Https { tls }
299                                            } else {
300                                                SgProtocolConfig::Http
301                                            }
302                                        } else {
303                                            tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
304                                            SgProtocolConfig::Http
305                                        }
306                                    } else {
307                                        tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
308                                        SgProtocolConfig::Http
309                                    }
310                                } else {
311                                    tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls is empty");
312                                    SgProtocolConfig::Http
313                                }
314                            }
315                            _ => return Err("Unsupported protocol".into()),
316                        },
317                        hostname: listener.hostname.clone(),
318                    };
319                    Ok(sg_listener)
320                })
321                .collect::<Vec<_>>(),
322        )
323        .await
324        .into_iter()
325        .collect()
326    }
327}