1use std::fmt::Display;
2use std::sync::Arc;
3
4use anyhow::{anyhow, Result};
5use async_trait::async_trait;
6use futures_util::future::ready;
7use futures_util::future::FutureExt;
8use futures_util::stream::once;
9use futures_util::stream::BoxStream;
10use futures_util::stream::StreamExt;
11use serde::de::DeserializeOwned;
12use serde::Serialize;
13use serde_json::Value;
14use tracing::debug;
15use tracing::trace;
16
17use k8_diff::{Changes, Diff};
18use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
19use k8_types::options::DeleteOptions;
20use crate::diff::PatchMergeType;
21use crate::{ApplyResult, DiffableK8Obj};
22
23#[derive(Clone)]
24pub enum NameSpace {
25 All,
26 Named(String),
27}
28
29#[derive(Debug)]
30pub struct ObjectKeyNotFound {
31 key: String,
32}
33
34impl ObjectKeyNotFound {
35 pub fn new(key: String) -> Self {
36 Self { key }
37 }
38}
39
40impl std::error::Error for ObjectKeyNotFound {}
41
42impl Display for ObjectKeyNotFound {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "'{}' not found", self.key)
45 }
46}
47
48impl NameSpace {
49 pub fn is_all(&self) -> bool {
50 matches!(self, Self::All)
51 }
52
53 pub fn named(&self) -> &str {
54 match self {
55 Self::All => "all",
56 Self::Named(name) => name,
57 }
58 }
59}
60
61impl From<String> for NameSpace {
62 fn from(namespace: String) -> Self {
63 NameSpace::Named(namespace)
64 }
65}
66
67impl From<&str> for NameSpace {
68 fn from(namespace: &str) -> Self {
69 NameSpace::Named(namespace.to_owned())
70 }
71}
72
73#[derive(Default, Clone)]
74pub struct ListArg {
75 pub field_selector: Option<String>,
76 pub include_uninitialized: Option<bool>,
77 pub label_selector: Option<String>,
78}
79
80pub type TokenStreamResult<S> = Result<Vec<Result<K8Watch<S>>>>;
83
84#[allow(clippy::redundant_closure)]
85pub fn as_token_stream_result<S, E>(events: Vec<K8Watch<S>>) -> TokenStreamResult<S>
86where
87 S: Spec,
88 S::Status: Serialize + DeserializeOwned,
89 S::Header: Serialize + DeserializeOwned,
90{
91 Ok(events.into_iter().map(|event| Ok(event)).collect())
92}
93
94#[async_trait]
95pub trait MetadataClient: Send + Sync {
96 async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<Option<K8Obj<S>>>
98 where
99 S: Spec,
100 M: K8Meta + Send + Sync;
101
102 async fn retrieve_items<S, N>(&self, namespace: N) -> Result<K8List<S>>
105 where
106 S: Spec,
107 N: Into<NameSpace> + Send + Sync,
108 {
109 self.retrieve_items_with_option(namespace, None).await
110 }
111
112 async fn retrieve_items_with_option<S, N>(
113 &self,
114 namespace: N,
115 option: Option<ListArg>,
116 ) -> Result<K8List<S>>
117 where
118 S: Spec,
119 N: Into<NameSpace> + Send + Sync;
120
121 fn retrieve_items_in_chunks<'a, S, N>(
123 self: Arc<Self>,
124 namespace: N,
125 limit: u32,
126 option: Option<ListArg>,
127 ) -> BoxStream<'a, K8List<S>>
128 where
129 S: Spec + 'static,
130 N: Into<NameSpace> + Send + Sync + 'static;
131
132 async fn delete_item_with_option<S, M>(
133 &self,
134 metadata: &M,
135 option: Option<DeleteOptions>,
136 ) -> Result<DeleteStatus<S>>
137 where
138 S: Spec,
139 M: K8Meta + Send + Sync;
140
141 async fn delete_item<S, M>(&self, metadata: &M) -> Result<DeleteStatus<S>>
142 where
143 S: Spec,
144 M: K8Meta + Send + Sync,
145 {
146 self.delete_item_with_option::<S, M>(metadata, None).await
147 }
148
149 async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
151 where
152 S: Spec;
153
154 async fn apply<S>(&self, value: InputK8Obj<S>) -> Result<ApplyResult<S>>
159 where
160 S: Spec,
161 {
162 debug!("{}: applying '{}' changes", S::label(), value.metadata.name);
163 trace!("{}: applying {:#?}", S::label(), value);
164 match self.retrieve_item(&value.metadata).await {
165 Ok(Some(old_item)) => {
166 let mut old_spec: S = old_item.spec;
167 old_spec.make_same(&value.spec);
168 let new_obj = serde_json::to_value(DiffableK8Obj::new(
170 value.metadata.clone(),
171 value.spec.clone(),
172 value.header.clone(),
173 ))?;
174 let old_obj = serde_json::to_value(DiffableK8Obj::new(
175 old_item.metadata,
176 old_spec,
177 old_item.header,
178 ))?;
179 let diff = old_obj.diff(&new_obj)?;
180 match diff {
181 Diff::None => {
182 debug!("{}: no diff detected, doing nothing", S::label());
183 Ok(ApplyResult::None)
184 }
185 Diff::Patch(p) => {
186 let json_diff = serde_json::to_value(p)?;
187 debug!("{}: detected diff: old vs. new obj", S::label());
188 trace!("{}: new obj: {:#?}", S::label(), &new_obj);
189 trace!("{}: old obj: {:#?}", S::label(), &old_obj);
190 trace!("{}: new/old diff: {:#?}", S::label(), json_diff);
191 let patch_result = self.patch_obj(&value.metadata, &json_diff).await?;
192 Ok(ApplyResult::Patched(patch_result))
193 }
194 _ => Err(anyhow!("unsupported diff type")),
195 }
196 }
197 Ok(None) => {
198 debug!(
199 "{}: item '{}' not found, creating ...",
200 S::label(),
201 value.metadata.name
202 );
203 let created_item = self.create_item(value).await?;
204 Ok(ApplyResult::Created(created_item))
205 }
206 Err(err) => Err(err),
207 }
208 }
209
210 async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
212 where
213 S: Spec;
214
215 async fn patch_obj<S, M>(&self, metadata: &M, patch: &Value) -> Result<K8Obj<S>>
217 where
218 S: Spec,
219 M: K8Meta + Display + Send + Sync,
220 {
221 self.patch(metadata, patch, PatchMergeType::for_spec(S::metadata()))
222 .await
223 }
224
225 async fn patch<S, M>(
227 &self,
228 metadata: &M,
229 patch: &Value,
230 merge_type: PatchMergeType,
231 ) -> Result<K8Obj<S>>
232 where
233 S: Spec,
234 M: K8Meta + Display + Send + Sync;
235
236 async fn patch_status<S, M>(
238 &self,
239 metadata: &M,
240 patch: &Value,
241 merge_type: PatchMergeType,
242 ) -> Result<K8Obj<S>>
243 where
244 S: Spec,
245 M: K8Meta + Display + Send + Sync;
246
247 async fn patch_subresource<S, M>(
248 &self,
249 metadata: &M,
250 subresource: String,
251 patch: &Value,
252 merge_type: PatchMergeType,
253 ) -> Result<K8Obj<S>>
254 where
255 S: Spec,
256 M: K8Meta + Display + Send + Sync;
257
258 fn watch_stream_since<S, N>(
260 &self,
261 namespace: N,
262 resource_version: Option<String>,
263 ) -> BoxStream<'_, TokenStreamResult<S>>
264 where
265 S: Spec + 'static,
266 N: Into<NameSpace>;
267
268 fn watch_stream_now<S>(&self, ns: String) -> BoxStream<'_, TokenStreamResult<S>>
269 where
270 S: Spec + 'static,
271 {
272 let ft_stream = async move {
273 let namespace = ns.as_ref();
274 match self.retrieve_items_with_option(namespace, None).await {
275 Ok(item_now_list) => {
276 let resource_version = item_now_list.metadata.resource_version;
277
278 let items_watch_stream =
279 self.watch_stream_since(namespace, Some(resource_version));
280
281 let items_list = item_now_list
282 .items
283 .into_iter()
284 .map(|item| Ok(K8Watch::ADDED(item)))
285 .collect();
286 let list_stream = once(ready(Ok(items_list)));
287
288 list_stream.chain(items_watch_stream).left_stream()
289 }
291 Err(err) => once(ready(Err(err))).right_stream(),
292 }
293 };
294
295 ft_stream.flatten_stream().boxed()
296 }
297
298 async fn exists<S, M>(&self, metadata: &M) -> Result<bool>
300 where
301 S: Spec,
302 M: K8Meta + Display + Send + Sync,
303 {
304 debug!("check if '{}' exists", metadata);
305 match self.retrieve_item::<S, M>(metadata).await {
306 Ok(Some(_)) => Ok(true),
307 Ok(None) => Ok(false),
308 Err(err) => Err(err),
309 }
310 }
311}