spacegate_config/service/k8s/
discovery.rs1use k8s_gateway_api::GatewayClass;
2use k8s_openapi::api::{
3 apps::v1::DaemonSet,
4 core::v1::{Pod, Service},
5};
6use kube::{api::ListParams, Api, ResourceExt};
7use spacegate_model::{constants::DEFAULT_API_PORT, BackendHost, BoxResult, K8sServiceData};
8
9use crate::service::{Discovery, Instance};
10
11use super::K8s;
12
13pub struct K8sGatewayInstance {
14 name: String,
15 uri: String,
16}
17impl Instance for K8sGatewayInstance {
18 fn api_url(&self) -> &str {
19 &self.uri
20 }
21 fn id(&self) -> &str {
22 &self.name
23 }
24}
25impl Discovery for K8s {
26 async fn instances(&self) -> BoxResult<Vec<impl Instance>> {
27 let gateway_class_api: Api<GatewayClass> = self.get_all_api();
28
29 let instance = if let Some(mut gateway_class) = gateway_class_api.get_opt(spacegate_model::constants::GATEWAY_CLASS_NAME).await? {
30 gateway_class.labels_mut().remove(spacegate_model::constants::KUBE_OBJECT_INSTANCE).unwrap_or(spacegate_model::constants::GATEWAY_DEFAULT_INSTANCE.to_string())
31 } else {
32 return Err("gateway class not found".into());
33 };
34
35 let instance_split: Vec<_> = instance.split('.').collect();
36 let (ds_api, pod_api, ds_name): (Api<DaemonSet>, Api<Pod>, String) = if instance_split.len() == 2 {
37 let ns = instance_split.get(1).expect("unexpected");
38 let ds_api: Api<DaemonSet> = self.get_specify_namespace_api(ns);
39 let pod_api: Api<Pod> = self.get_specify_namespace_api(ns);
40 let instance_name = instance_split.first().expect("unexpected");
41 (ds_api, pod_api, instance_name.to_string())
42 } else {
43 let ds_api: Api<DaemonSet> = self.get_namespace_api();
44 let pod_api: Api<Pod> = self.get_namespace_api();
45 let instance_name = instance;
46 (ds_api, pod_api, instance_name)
47 };
48
49 let ds_instance = if let Some(ds) = ds_api.get_opt(&ds_name).await? {
50 ds
51 } else {
52 return Err("spacegate instance not found".into());
53 };
54
55 let pods = pod_api.list(&ListParams::default()).await?;
56 let pods = pods.items;
57 let instance_list = pods
58 .into_iter()
59 .filter_map(|p| {
60 let ip = p.status.as_ref().and_then(|s| s.host_ip.as_ref())?;
61 let port = DEFAULT_API_PORT;
62 for owner_ref in p.owner_references() {
63 let instance_name = ds_instance.name_any();
64
65 if owner_ref.uid == ds_instance.uid().unwrap_or_default() && owner_ref.name == instance_name {
66 return Some(K8sGatewayInstance {
67 name: instance_name,
68 uri: format!("{ip}:{port}"),
69 });
70 }
71 }
72 None
73 })
74 .collect();
75
76 Ok(instance_list)
77 }
78
79 async fn backends(&self) -> BoxResult<Vec<BackendHost>> {
80 let service_api: Api<Service> = self.get_all_api();
81 let result = service_api
82 .list(&ListParams::default())
83 .await?
84 .into_iter()
85 .map(|s| {
86 BackendHost::K8sService(K8sServiceData {
87 name: s.name_any(),
88 namespace: s.namespace(),
89 })
90 })
91 .collect();
92 Ok(result)
93 }
94}