spacegate_config/service/k8s/
discovery.rs

1use k8s_gateway_api::GatewayClass;
2use k8s_openapi::api::{
3    apps::v1::DaemonSet,
4    core::v1::{Pod, Service},
5};
6use kube::{api::ListParams, Api, ResourceExt};
7use spacegate_model::{constants::DEFAULT_API_PORT, BackendHost, BoxResult, K8sServiceData};
8
9use crate::service::{Discovery, Instance};
10
11use super::K8s;
12
13pub struct K8sGatewayInstance {
14    name: String,
15    uri: String,
16}
17impl Instance for K8sGatewayInstance {
18    fn api_url(&self) -> &str {
19        &self.uri
20    }
21    fn id(&self) -> &str {
22        &self.name
23    }
24}
25impl Discovery for K8s {
26    async fn instances(&self) -> BoxResult<Vec<impl Instance>> {
27        let gateway_class_api: Api<GatewayClass> = self.get_all_api();
28
29        let instance = if let Some(mut gateway_class) = gateway_class_api.get_opt(spacegate_model::constants::GATEWAY_CLASS_NAME).await? {
30            gateway_class.labels_mut().remove(spacegate_model::constants::KUBE_OBJECT_INSTANCE).unwrap_or(spacegate_model::constants::GATEWAY_DEFAULT_INSTANCE.to_string())
31        } else {
32            return Err("gateway class not found".into());
33        };
34
35        let instance_split: Vec<_> = instance.split('.').collect();
36        let (ds_api, pod_api, ds_name): (Api<DaemonSet>, Api<Pod>, String) = if instance_split.len() == 2 {
37            let ns = instance_split.get(1).expect("unexpected");
38            let ds_api: Api<DaemonSet> = self.get_specify_namespace_api(ns);
39            let pod_api: Api<Pod> = self.get_specify_namespace_api(ns);
40            let instance_name = instance_split.first().expect("unexpected");
41            (ds_api, pod_api, instance_name.to_string())
42        } else {
43            let ds_api: Api<DaemonSet> = self.get_namespace_api();
44            let pod_api: Api<Pod> = self.get_namespace_api();
45            let instance_name = instance;
46            (ds_api, pod_api, instance_name)
47        };
48
49        let ds_instance = if let Some(ds) = ds_api.get_opt(&ds_name).await? {
50            ds
51        } else {
52            return Err("spacegate instance not found".into());
53        };
54
55        let pods = pod_api.list(&ListParams::default()).await?;
56        let pods = pods.items;
57        let instance_list = pods
58            .into_iter()
59            .filter_map(|p| {
60                let ip = p.status.as_ref().and_then(|s| s.host_ip.as_ref())?;
61                let port = DEFAULT_API_PORT;
62                for owner_ref in p.owner_references() {
63                    let instance_name = ds_instance.name_any();
64
65                    if owner_ref.uid == ds_instance.uid().unwrap_or_default() && owner_ref.name == instance_name {
66                        return Some(K8sGatewayInstance {
67                            name: instance_name,
68                            uri: format!("{ip}:{port}"),
69                        });
70                    }
71                }
72                None
73            })
74            .collect();
75
76        Ok(instance_list)
77    }
78
79    async fn backends(&self) -> BoxResult<Vec<BackendHost>> {
80        let service_api: Api<Service> = self.get_all_api();
81        let result = service_api
82            .list(&ListParams::default())
83            .await?
84            .into_iter()
85            .map(|s| {
86                BackendHost::K8sService(K8sServiceData {
87                    name: s.name_any(),
88                    namespace: s.namespace(),
89                })
90            })
91            .collect();
92        Ok(result)
93    }
94}