spacegate_config/service/k8s/
update.rs1use std::collections::HashSet;
2
3use k8s_gateway_api::{Gateway, HttpRoute};
4use k8s_openapi::api::core::v1::Secret;
5use kube::{
6 api::{DeleteParams, PostParams},
7 Api, ResourceExt,
8};
9use spacegate_model::{
10 ext::k8s::crd::{
11 http_spaceroute::HttpSpaceroute,
12 sg_filter::{K8sSgFilterSpecTargetRef, SgFilter},
13 },
14 BoxError, BoxResult, PluginInstanceId,
15};
16
17use crate::service::{Retrieve as _, Update};
18
19use super::{
20 convert::{filter_k8s_conv::PluginIdConv as _, gateway_k8s_conv::SgGatewayConv as _, route_k8s_conv::SgHttpRouteConv, ToTarget},
21 K8s,
22};
23
24impl Update for K8s {
25 async fn update_config_item_gateway(&self, gateway_name: &str, gateway: crate::model::SgGateway) -> BoxResult<()> {
26 let (mut gateway, secret, update_plugin_ids) = gateway.to_kube_gateway(&self.namespace);
27
28 let gateway_api: Api<Gateway> = self.get_namespace_api();
29 let old_gateway = self
30 .retrieve_config_item_gateway(gateway_name)
31 .await?
32 .map(|g| g.to_kube_gateway(&self.namespace))
33 .ok_or_else(|| -> BoxError { format!("[Sg.Config] gateway [{gateway_name}] not found ,update failed").into() })?;
34
35 gateway.metadata.resource_version = gateway_api.get_metadata(gateway_name).await?.resource_version();
36 gateway_api.replace(gateway_name, &PostParams::default(), &gateway).await?;
37
38 let secret_api: Api<Secret> = self.get_namespace_api();
39
40 if let Some(old_secret) = old_gateway.1 {
41 if let Some(mut secret) = secret {
42 if old_secret.name_any() == secret.name_any() {
43 secret.metadata.resource_version = old_secret.resource_version();
44 secret_api.replace(&secret.name_any(), &PostParams::default(), &secret).await?;
45 } else {
46 secret_api.create(&PostParams::default(), &secret).await?;
47 }
48 } else {
49 secret_api.delete(&old_secret.name_any(), &DeleteParams::default()).await?;
50 }
51 } else if let Some(secret) = secret {
52 secret_api.create(&PostParams::default(), &secret).await?;
53 }
54
55 self.update_plugin_ids_changes(old_gateway.2, update_plugin_ids, gateway.to_target_ref()).await?;
56 Ok(())
57 }
58
59 async fn update_config_item_route(&self, gateway_name: &str, route_name: &str, route: crate::model::SgHttpRoute) -> BoxResult<()> {
60 let (mut http_spaceroute, update_plugin_ids) = route.to_kube_httproute(gateway_name, route_name, &self.namespace);
61
62 let http_spaceroute_api: Api<HttpSpaceroute> = self.get_namespace_api();
63 let http_route_api: Api<HttpRoute> = self.get_namespace_api();
64
65 let old_sg_httproute = self.retrieve_config_item_route(gateway_name, route_name).await?;
66
67 if let Some(old_route) = http_spaceroute_api.get_metadata_opt(&http_spaceroute.name_any()).await? {
68 http_spaceroute.metadata.resource_version = old_route.resource_version();
69 http_spaceroute_api.replace(&http_spaceroute.name_any(), &PostParams::default(), &http_spaceroute).await?;
70 } else if http_route_api.get_metadata_opt(&http_spaceroute.name_any()).await?.is_some() {
71 http_route_api.delete(&http_spaceroute.name_any(), &DeleteParams::default()).await?;
72 http_spaceroute_api.create(&PostParams::default(), &http_spaceroute).await?;
73 } else {
74 return Err(format!("raw http route {route_name} not found").into());
75 };
76
77 self.update_plugin_ids_changes(
78 old_sg_httproute.map(|r| r.to_kube_httproute(gateway_name, route_name, &self.namespace).1).unwrap_or_default(),
79 update_plugin_ids,
80 http_spaceroute.to_target_ref(),
81 )
82 .await?;
83
84 Ok(())
85 }
86
87 async fn update_plugin(&self, id: &spacegate_model::PluginInstanceId, value: serde_json::Value) -> BoxResult<()> {
88 let filter = id.to_singe_filter(value, None, &self.namespace);
89
90 if let Some(filter) = filter {
91 let filter_api: Api<SgFilter> = self.get_namespace_api();
92 if let Some(old_filter) = filter_api.get_opt(&filter.name).await? {
93 let name = &old_filter.name_any();
94 let mut update_filter: SgFilter = filter.into();
95 update_filter.metadata.resource_version = old_filter.resource_version();
96 update_filter.spec.target_refs = old_filter.spec.target_refs;
97 filter_api.replace(name, &PostParams::default(), &update_filter).await?;
98 } else {
99 return Err(format!("raw filter {id:?} not found").into());
100 };
101 }
102
103 Ok(())
104 }
105}
106
107impl K8s {
108 pub(crate) async fn update_plugin_ids_changes(&self, old: Vec<PluginInstanceId>, update: Vec<PluginInstanceId>, target: K8sSgFilterSpecTargetRef) -> BoxResult<()> {
109 if old.is_empty() && update.is_empty() {
110 return Ok(());
111 }
112
113 let old_set: HashSet<_> = old.into_iter().collect();
114 let update_set: HashSet<_> = update.into_iter().collect();
115
116 let add_vec: Vec<_> = update_set.difference(&old_set).collect();
117 for add_id in add_vec {
118 add_id.add_filter_target(target.clone(), self).await?;
119 }
120 let delete_vec: Vec<_> = old_set.difference(&update_set).collect();
121 for delete_id in delete_vec {
122 delete_id.remove_filter_target(target.clone(), self).await?;
123 }
124
125 Ok(())
126 }
127}