spacegate_config/service/k8s/
update.rs

1use std::collections::HashSet;
2
3use k8s_gateway_api::{Gateway, HttpRoute};
4use k8s_openapi::api::core::v1::Secret;
5use kube::{
6    api::{DeleteParams, PostParams},
7    Api, ResourceExt,
8};
9use spacegate_model::{
10    ext::k8s::crd::{
11        http_spaceroute::HttpSpaceroute,
12        sg_filter::{K8sSgFilterSpecTargetRef, SgFilter},
13    },
14    BoxError, BoxResult, PluginInstanceId,
15};
16
17use crate::service::{Retrieve as _, Update};
18
19use super::{
20    convert::{filter_k8s_conv::PluginIdConv as _, gateway_k8s_conv::SgGatewayConv as _, route_k8s_conv::SgHttpRouteConv, ToTarget},
21    K8s,
22};
23
24impl Update for K8s {
25    async fn update_config_item_gateway(&self, gateway_name: &str, gateway: crate::model::SgGateway) -> BoxResult<()> {
26        let (mut gateway, secret, update_plugin_ids) = gateway.to_kube_gateway(&self.namespace);
27
28        let gateway_api: Api<Gateway> = self.get_namespace_api();
29        let old_gateway = self
30            .retrieve_config_item_gateway(gateway_name)
31            .await?
32            .map(|g| g.to_kube_gateway(&self.namespace))
33            .ok_or_else(|| -> BoxError { format!("[Sg.Config] gateway [{gateway_name}] not found ,update failed").into() })?;
34
35        gateway.metadata.resource_version = gateway_api.get_metadata(gateway_name).await?.resource_version();
36        gateway_api.replace(gateway_name, &PostParams::default(), &gateway).await?;
37
38        let secret_api: Api<Secret> = self.get_namespace_api();
39
40        if let Some(old_secret) = old_gateway.1 {
41            if let Some(mut secret) = secret {
42                if old_secret.name_any() == secret.name_any() {
43                    secret.metadata.resource_version = old_secret.resource_version();
44                    secret_api.replace(&secret.name_any(), &PostParams::default(), &secret).await?;
45                } else {
46                    secret_api.create(&PostParams::default(), &secret).await?;
47                }
48            } else {
49                secret_api.delete(&old_secret.name_any(), &DeleteParams::default()).await?;
50            }
51        } else if let Some(secret) = secret {
52            secret_api.create(&PostParams::default(), &secret).await?;
53        }
54
55        self.update_plugin_ids_changes(old_gateway.2, update_plugin_ids, gateway.to_target_ref()).await?;
56        Ok(())
57    }
58
59    async fn update_config_item_route(&self, gateway_name: &str, route_name: &str, route: crate::model::SgHttpRoute) -> BoxResult<()> {
60        let (mut http_spaceroute, update_plugin_ids) = route.to_kube_httproute(gateway_name, route_name, &self.namespace);
61
62        let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
63        let http_route_api: Api<HttpRoute> = self.get_namespace_api();
64
65        let old_sg_httproute = self.retrieve_config_item_route(gateway_name, route_name).await?;
66
67        if let Some(old_route) = http_spaceroute_api.get_metadata_opt(&http_spaceroute.name_any()).await? {
68            http_spaceroute.metadata.resource_version = old_route.resource_version();
69            http_spaceroute_api.replace(&http_spaceroute.name_any(), &PostParams::default(), &http_spaceroute).await?;
70        } else if http_route_api.get_metadata_opt(&http_spaceroute.name_any()).await?.is_some() {
71            http_route_api.delete(&http_spaceroute.name_any(), &DeleteParams::default()).await?;
72            http_spaceroute_api.create(&PostParams::default(), &http_spaceroute).await?;
73        } else {
74            return Err(format!("raw http route {route_name} not found").into());
75        };
76
77        self.update_plugin_ids_changes(
78            old_sg_httproute.map(|r| r.to_kube_httproute(gateway_name, route_name, &self.namespace).1).unwrap_or_default(),
79            update_plugin_ids,
80            http_spaceroute.to_target_ref(),
81        )
82        .await?;
83
84        Ok(())
85    }
86
87    async fn update_plugin(&self, id: &spacegate_model::PluginInstanceId, value: serde_json::Value) -> BoxResult<()> {
88        let filter = id.to_singe_filter(value, None, &self.namespace);
89
90        if let Some(filter) = filter {
91            let filter_api: Api<SgFilter> = self.get_namespace_api();
92            if let Some(old_filter) = filter_api.get_opt(&filter.name).await? {
93                let name = &old_filter.name_any();
94                let mut update_filter: SgFilter = filter.into();
95                update_filter.metadata.resource_version = old_filter.resource_version();
96                update_filter.spec.target_refs = old_filter.spec.target_refs;
97                filter_api.replace(name, &PostParams::default(), &update_filter).await?;
98            } else {
99                return Err(format!("raw filter {id:?} not found").into());
100            };
101        }
102
103        Ok(())
104    }
105}
106
107impl K8s {
108    pub(crate) async fn update_plugin_ids_changes(&self, old: Vec<PluginInstanceId>, update: Vec<PluginInstanceId>, target: K8sSgFilterSpecTargetRef) -> BoxResult<()> {
109        if old.is_empty() && update.is_empty() {
110            return Ok(());
111        }
112
113        let old_set: HashSet<_> = old.into_iter().collect();
114        let update_set: HashSet<_> = update.into_iter().collect();
115
116        let add_vec: Vec<_> = update_set.difference(&old_set).collect();
117        for add_id in add_vec {
118            add_id.add_filter_target(target.clone(), self).await?;
119        }
120        let delete_vec: Vec<_> = old_set.difference(&update_set).collect();
121        for delete_id in delete_vec {
122            delete_id.remove_filter_target(target.clone(), self).await?;
123        }
124
125        Ok(())
126    }
127}