spacegate_config/service/k8s/
retrieve.rs1use futures_util::future::join_all;
2use gateway::{SgListener, SgParameters, SgProtocolConfig, SgTlsConfig};
3use http_route::SgHttpRouteRule;
4use k8s_gateway_api::{Gateway, HttpRoute, Listener};
5use k8s_openapi::api::core::v1::Secret;
6use kube::{api::ListParams, Api, ResourceExt};
7use spacegate_model::{
8 ext::k8s::{
9 crd::{
10 http_spaceroute::HttpSpaceroute,
11 sg_filter::{K8sSgFilterSpecTargetRef, SgFilter},
12 },
13 helper_struct::SgTargetKind,
14 },
15 PluginInstanceId,
16};
17
18use crate::{
19 constants::{self, GATEWAY_CLASS_NAME},
20 model::{gateway, http_route, PluginConfig, SgGateway, SgHttpRoute},
21 service::Retrieve,
22 BoxError, BoxResult,
23};
24
25use super::{
26 convert::{filter_k8s_conv::PluginConfigConv, gateway_k8s_conv::SgParametersConv as _, route_k8s_conv::SgHttpRouteRuleConv as _},
27 K8s,
28};
29
30impl Retrieve for K8s {
31 async fn retrieve_config_item_gateway(&self, gateway_name: &str) -> BoxResult<Option<SgGateway>> {
32 let gateway_api: Api<Gateway> = self.get_namespace_api();
33
34 let result = if let Some(gateway_obj) = gateway_api.get_opt(gateway_name).await?.and_then(|gateway_obj| {
35 if gateway_obj.spec.gateway_class_name == GATEWAY_CLASS_NAME {
36 Some(gateway_obj)
37 } else {
38 None
39 }
40 }) {
41 Some(self.kube_gateway_2_sg_gateway(gateway_obj).await?)
42 } else {
43 None
44 };
45
46 Ok(result)
47 }
48
49 async fn retrieve_config_item_route(&self, gateway_name: &str, route_name: &str) -> BoxResult<Option<SgHttpRoute>> {
50 let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
51 let httproute_api: Api<HttpRoute> = self.get_namespace_api();
52
53 let result = if let Some(httpspaceroute) = http_spaceroute_api.get_opt(route_name).await?.and_then(|http_route_obj| {
54 if http_route_obj
55 .spec
56 .inner
57 .parent_refs
58 .as_ref()
59 .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == http_route_obj.namespace() && parent_ref.name == gateway_name))
60 .unwrap_or(false)
61 {
62 Some(http_route_obj)
63 } else {
64 None
65 }
66 }) {
67 Some(self.kube_httpspaceroute_2_sg_route(httpspaceroute).await?)
68 } else if let Some(http_route) = httproute_api.get_opt(route_name).await?.and_then(|http_route| {
69 if http_route
70 .spec
71 .inner
72 .parent_refs
73 .as_ref()
74 .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == http_route.namespace() && parent_ref.name == gateway_name))
75 .unwrap_or(false)
76 {
77 Some(http_route)
78 } else {
79 None
80 }
81 }) {
82 Some(self.kube_httproute_2_sg_route(http_route).await?)
83 } else {
84 None
85 };
86
87 Ok(result)
88 }
89
90 async fn retrieve_config_item_route_names(&self, name: &str) -> BoxResult<Vec<String>> {
91 let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
92 let httproute_api: Api<HttpRoute> = self.get_namespace_api();
93
94 let mut result: Vec<String> = http_spaceroute_api
95 .list(&ListParams::default())
96 .await?
97 .iter()
98 .filter(|route| {
99 route
100 .spec
101 .inner
102 .parent_refs
103 .as_ref()
104 .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == route.namespace() && parent_ref.name == name))
105 .unwrap_or(false)
106 })
107 .map(|route| route.name_any())
108 .collect();
109
110 result.extend(
111 httproute_api
112 .list(&ListParams::default())
113 .await?
114 .iter()
115 .filter(|route| {
116 route
117 .spec
118 .inner
119 .parent_refs
120 .as_ref()
121 .map(|parent_refs| parent_refs.iter().any(|parent_ref| parent_ref.namespace == route.namespace() && parent_ref.name == name))
122 .unwrap_or(false)
123 })
124 .map(|route| route.name_any()),
125 );
126
127 Ok(result)
128 }
129
130 async fn retrieve_config_names(&self) -> BoxResult<Vec<String>> {
131 let gateway_api: Api<Gateway> = self.get_namespace_api();
132
133 let result = gateway_api.list(&ListParams::default()).await?.iter().map(|gateway| gateway.name_any()).collect();
134
135 Ok(result)
136 }
137
138 async fn retrieve_all_plugins(&self) -> Result<Vec<PluginConfig>, BoxError> {
139 let filter_api: Api<SgFilter> = self.get_namespace_api();
140
141 let result = filter_api.list(&ListParams::default()).await?.into_iter().filter_map(PluginConfig::from_first_filter_obj).collect();
142 Ok(result)
143 }
144
145 async fn retrieve_plugin(&self, id: &spacegate_model::PluginInstanceId) -> Result<Option<PluginConfig>, BoxError> {
146 let filter_api: Api<SgFilter> = self.get_namespace_api();
147
148 match &id.name {
149 spacegate_model::PluginInstanceName::Anon { uid: _ } => Ok(None),
150 spacegate_model::PluginInstanceName::Named { name } => {
151 let result = filter_api.get_opt(name).await?.and_then(PluginConfig::from_first_filter_obj);
152 Ok(result)
153 }
154 spacegate_model::PluginInstanceName::Mono => Ok(None),
155 }
156 }
157
158 async fn retrieve_plugins_by_code(&self, code: &str) -> Result<Vec<PluginConfig>, BoxError> {
159 Ok(self.retrieve_all_plugins().await?.into_iter().filter(|p| p.code() == code).collect())
160 }
161}
162
163impl K8s {
164 pub(crate) const HTTP2_KEY: &'static str = "http2";
165 pub(crate) const HTTP2_ENABLE: &'static str = "true";
166 fn retrieve_http2_config(tls_config: &k8s_gateway_api::GatewayTlsConfig) -> bool {
168 if let Some(options) = &tls_config.options {
169 if let Some(Self::HTTP2_ENABLE) = options.get(Self::HTTP2_KEY).map(String::as_str) {
170 return true;
171 }
172 }
173 false
174 }
175 async fn kube_gateway_2_sg_gateway(&self, gateway_obj: Gateway) -> BoxResult<SgGateway> {
176 let gateway_name = gateway_obj.name_any();
177 let plugins = self
178 .retrieve_config_item_filters(K8sSgFilterSpecTargetRef {
179 kind: SgTargetKind::Gateway.into(),
180 name: gateway_name.clone(),
181 namespace: gateway_obj.namespace(),
182 })
183 .await?;
184 let result = SgGateway {
185 name: gateway_name,
186 parameters: SgParameters::from_kube_gateway(&gateway_obj),
187 listeners: self.retrieve_config_item_listeners(&gateway_obj.spec.listeners).await?,
188 plugins,
189 };
190 Ok(result)
191 }
192
193 async fn kube_httpspaceroute_2_sg_route(&self, httpspace_route: HttpSpaceroute) -> BoxResult<SgHttpRoute> {
194 let route_name = httpspace_route.name_any();
195 let kind = if let Some(kind) = httpspace_route.annotations().get(constants::RAW_HTTP_ROUTE_KIND) {
196 kind.clone()
197 } else {
198 SgTargetKind::Httpspaceroute.into()
199 };
200 let priority = httpspace_route.annotations().get(crate::constants::ANNOTATION_RESOURCE_PRIORITY).and_then(|a| a.parse::<i16>().ok()).unwrap_or(0);
201 let plugins = self
202 .retrieve_config_item_filters(K8sSgFilterSpecTargetRef {
203 kind,
204 name: route_name.clone(),
205 namespace: httpspace_route.namespace(),
206 })
207 .await?;
208 Ok(SgHttpRoute {
209 hostnames: httpspace_route.spec.hostnames.clone(),
210 plugins,
211 rules: httpspace_route
212 .spec
213 .rules
214 .map(|r_vec| r_vec.into_iter().map(SgHttpRouteRule::from_kube_httproute).collect::<Result<Vec<_>, BoxError>>())
215 .transpose()?
216 .unwrap_or_default(),
217 priority,
218 route_name,
219 })
220 }
221
222 async fn kube_httproute_2_sg_route(&self, http_route: HttpRoute) -> BoxResult<SgHttpRoute> {
223 self.kube_httpspaceroute_2_sg_route(http_route.into()).await
224 }
225
226 async fn retrieve_config_item_filters(&self, target: K8sSgFilterSpecTargetRef) -> BoxResult<Vec<PluginInstanceId>> {
227 let kind = target.kind;
228 let name = target.name;
229 let namespace = target.namespace.unwrap_or(self.namespace.to_string());
230
231 let filter_api: Api<SgFilter> = self.get_all_api();
232 let plugin_ids: Vec<PluginInstanceId> = filter_api
233 .list(&ListParams::default())
234 .await
235 .map_err(Box::new)?
236 .into_iter()
237 .filter(|filter_obj| {
238 filter_obj.spec.target_refs.iter().any(|target_ref| {
239 target_ref.kind.eq_ignore_ascii_case(&kind)
240 && target_ref.name.eq_ignore_ascii_case(&name)
241 && target_ref.namespace.as_deref().unwrap_or("default").eq_ignore_ascii_case(&namespace)
242 })
243 })
244 .flat_map(|filter_obj| PluginConfig::from_first_filter_obj(filter_obj).map(|f| f.into()))
245 .collect();
246
247 if !plugin_ids.is_empty() {
248 let mut filter_vec = String::new();
249 plugin_ids.clone().into_iter().for_each(|id| filter_vec.push_str(&format!("plugin:{{id:{}}},", id)));
250 tracing::trace!("[SG.Common] {namespace}.{kind}.{name} filter found: {}", filter_vec.trim_end_matches(','));
251 }
252
253 if plugin_ids.is_empty() {
254 Ok(vec![])
255 } else {
256 Ok(plugin_ids)
257 }
258 }
259
260 async fn retrieve_config_item_listeners(&self, listeners: &[Listener]) -> BoxResult<Vec<SgListener>> {
261 join_all(
262 listeners
263 .iter()
264 .map(|listener| async move {
265 let sg_listener = SgListener {
266 name: listener.name.clone(),
267 ip: None,
268 port: listener.port,
269 protocol: match listener.protocol.to_lowercase().as_str() {
270 "http" => SgProtocolConfig::Http,
271 "https" => {
272 if let Some(tls_config) = &listener.tls {
273 if let Some(certificate_ref) = tls_config.certificate_refs.as_ref().and_then(|vec| vec.first()) {
274 let secret_api: Api<Secret> = self.get_namespace_api();
275 if let Some(secret_obj) = secret_api.get_opt(&certificate_ref.name).await? {
276 let tls = if let Some(secret_data) = secret_obj.data {
277 if let Some(tls_crt) = secret_data.get("tls.crt") {
278 if let Some(tls_key) = secret_data.get("tls.key") {
279 Some(SgTlsConfig {
280 mode: tls_config.mode.clone().into(),
281 key: String::from_utf8(tls_key.0.clone()).expect("[SG.Config] Gateway tls secret [tls.key] is not valid utf8"),
282 cert: String::from_utf8(tls_crt.0.clone()).expect("[SG.Config] Gateway tls secret [tls.cert] is not valid utf8"),
283 http2: Some(Self::retrieve_http2_config(tls_config)),
284 })
285 } else {
286 tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.key is empty");
287 None
288 }
289 } else {
290 tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
291 None
292 }
293 } else {
294 tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.data is empty");
295 None
296 };
297 if let Some(tls) = tls {
298 SgProtocolConfig::Https { tls }
299 } else {
300 SgProtocolConfig::Http
301 }
302 } else {
303 tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
304 SgProtocolConfig::Http
305 }
306 } else {
307 tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls.certificate_refs is empty");
308 SgProtocolConfig::Http
309 }
310 } else {
311 tracing::warn!("[SG.Config] Gateway [spec.listener.protocol=https] tls is empty");
312 SgProtocolConfig::Http
313 }
314 }
315 _ => return Err("Unsupported protocol".into()),
316 },
317 hostname: listener.hostname.clone(),
318 };
319 Ok(sg_listener)
320 })
321 .collect::<Vec<_>>(),
322 )
323 .await
324 .into_iter()
325 .collect()
326 }
327}