k8_metadata_client/
client.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use anyhow::{anyhow, Result};
5use async_trait::async_trait;
6use futures_util::future::ready;
7use futures_util::future::FutureExt;
8use futures_util::stream::once;
9use futures_util::stream::BoxStream;
10use futures_util::stream::StreamExt;
11use serde::de::DeserializeOwned;
12use serde::Serialize;
13use serde_json::Value;
14use tracing::debug;
15use tracing::trace;
16
17use k8_diff::{Changes, Diff};
18use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
19use k8_types::options::DeleteOptions;
20use crate::diff::PatchMergeType;
21use crate::{ApplyResult, DiffableK8Obj};
22
23#[derive(Clone)]
24pub enum NameSpace {
25    All,
26    Named(String),
27}
28
29#[derive(Debug)]
30pub struct ObjectKeyNotFound {
31    key: String,
32}
33
34impl ObjectKeyNotFound {
35    pub fn new(key: String) -> Self {
36        Self { key }
37    }
38}
39
40impl std::error::Error for ObjectKeyNotFound {}
41
42impl Display for ObjectKeyNotFound {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "'{}' not found", self.key)
45    }
46}
47
48impl NameSpace {
49    pub fn is_all(&self) -> bool {
50        matches!(self, Self::All)
51    }
52
53    pub fn named(&self) -> &str {
54        match self {
55            Self::All => "all",
56            Self::Named(name) => name,
57        }
58    }
59}
60
61impl From<String> for NameSpace {
62    fn from(namespace: String) -> Self {
63        NameSpace::Named(namespace)
64    }
65}
66
67impl From<&str> for NameSpace {
68    fn from(namespace: &str) -> Self {
69        NameSpace::Named(namespace.to_owned())
70    }
71}
72
73#[derive(Default, Clone)]
74pub struct ListArg {
75    pub field_selector: Option<String>,
76    pub include_uninitialized: Option<bool>,
77    pub label_selector: Option<String>,
78}
79
80// For error mapping: see: https://doc.rust-lang.org/nightly/core/convert/trait.From.html
81
82pub type TokenStreamResult<S> = Result<Vec<Result<K8Watch<S>>>>;
83
84#[allow(clippy::redundant_closure)]
85pub fn as_token_stream_result<S, E>(events: Vec<K8Watch<S>>) -> TokenStreamResult<S>
86where
87    S: Spec,
88    S::Status: Serialize + DeserializeOwned,
89    S::Header: Serialize + DeserializeOwned,
90{
91    Ok(events.into_iter().map(|event| Ok(event)).collect())
92}
93
94#[async_trait]
95pub trait MetadataClient: Send + Sync {
96    /// retrieval a single item
97    async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<Option<K8Obj<S>>>
98    where
99        S: Spec,
100        M: K8Meta + Send + Sync;
101
102    /// retrieve all items a single chunk
103    /// this may cause client to hang if there are too many items
104    async fn retrieve_items<S, N>(&self, namespace: N) -> Result<K8List<S>>
105    where
106        S: Spec,
107        N: Into<NameSpace> + Send + Sync,
108    {
109        self.retrieve_items_with_option(namespace, None).await
110    }
111
112    async fn retrieve_items_with_option<S, N>(
113        &self,
114        namespace: N,
115        option: Option<ListArg>,
116    ) -> Result<K8List<S>>
117    where
118        S: Spec,
119        N: Into<NameSpace> + Send + Sync;
120
121    /// returns stream of items in chunks
122    fn retrieve_items_in_chunks<'a, S, N>(
123        self: Arc<Self>,
124        namespace: N,
125        limit: u32,
126        option: Option<ListArg>,
127    ) -> BoxStream<'a, K8List<S>>
128    where
129        S: Spec + 'static,
130        N: Into<NameSpace> + Send + Sync + 'static;
131
132    async fn delete_item_with_option<S, M>(
133        &self,
134        metadata: &M,
135        option: Option<DeleteOptions>,
136    ) -> Result<DeleteStatus<S>>
137    where
138        S: Spec,
139        M: K8Meta + Send + Sync;
140
141    async fn delete_item<S, M>(&self, metadata: &M) -> Result<DeleteStatus<S>>
142    where
143        S: Spec,
144        M: K8Meta + Send + Sync,
145    {
146        self.delete_item_with_option::<S, M>(metadata, None).await
147    }
148
149    /// create new object
150    async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
151    where
152        S: Spec;
153
154    /// apply object, this is similar to ```kubectl apply```
155    /// for now, this doesn't do any optimization
156    /// if object doesn't exist, it will be created
157    /// if object exist, it will be patched by using strategic merge diff
158    async fn apply<S>(&self, value: InputK8Obj<S>) -> Result<ApplyResult<S>>
159    where
160        S: Spec,
161    {
162        debug!("{}: applying '{}' changes", S::label(), value.metadata.name);
163        trace!("{}: applying {:#?}", S::label(), value);
164        match self.retrieve_item(&value.metadata).await {
165            Ok(Some(old_item)) => {
166                let mut old_spec: S = old_item.spec;
167                old_spec.make_same(&value.spec);
168                // we don't care about status
169                let new_obj = serde_json::to_value(DiffableK8Obj::new(
170                    value.metadata.clone(),
171                    value.spec.clone(),
172                    value.header.clone(),
173                ))?;
174                let old_obj = serde_json::to_value(DiffableK8Obj::new(
175                    old_item.metadata,
176                    old_spec,
177                    old_item.header,
178                ))?;
179                let diff = old_obj.diff(&new_obj)?;
180                match diff {
181                    Diff::None => {
182                        debug!("{}: no diff detected, doing nothing", S::label());
183                        Ok(ApplyResult::None)
184                    }
185                    Diff::Patch(p) => {
186                        let json_diff = serde_json::to_value(p)?;
187                        debug!("{}: detected diff: old vs. new obj", S::label());
188                        trace!("{}: new obj: {:#?}", S::label(), &new_obj);
189                        trace!("{}: old obj: {:#?}", S::label(), &old_obj);
190                        trace!("{}: new/old diff: {:#?}", S::label(), json_diff);
191                        let patch_result = self.patch_obj(&value.metadata, &json_diff).await?;
192                        Ok(ApplyResult::Patched(patch_result))
193                    }
194                    _ => Err(anyhow!("unsupported diff type")),
195                }
196            }
197            Ok(None) => {
198                debug!(
199                    "{}: item '{}' not found, creating ...",
200                    S::label(),
201                    value.metadata.name
202                );
203                let created_item = self.create_item(value).await?;
204                Ok(ApplyResult::Created(created_item))
205            }
206            Err(err) => Err(err),
207        }
208    }
209
210    /// update status
211    async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
212    where
213        S: Spec;
214
215    /// patch existing obj
216    async fn patch_obj<S, M>(&self, metadata: &M, patch: &Value) -> Result<K8Obj<S>>
217    where
218        S: Spec,
219        M: K8Meta + Display + Send + Sync,
220    {
221        self.patch(metadata, patch, PatchMergeType::for_spec(S::metadata()))
222            .await
223    }
224
225    /// patch object with arbitrary patch
226    async fn patch<S, M>(
227        &self,
228        metadata: &M,
229        patch: &Value,
230        merge_type: PatchMergeType,
231    ) -> Result<K8Obj<S>>
232    where
233        S: Spec,
234        M: K8Meta + Display + Send + Sync;
235
236    /// patch status
237    async fn patch_status<S, M>(
238        &self,
239        metadata: &M,
240        patch: &Value,
241        merge_type: PatchMergeType,
242    ) -> Result<K8Obj<S>>
243    where
244        S: Spec,
245        M: K8Meta + Display + Send + Sync;
246
247    async fn patch_subresource<S, M>(
248        &self,
249        metadata: &M,
250        subresource: String,
251        patch: &Value,
252        merge_type: PatchMergeType,
253    ) -> Result<K8Obj<S>>
254    where
255        S: Spec,
256        M: K8Meta + Display + Send + Sync;
257
258    /// stream items since resource versions
259    fn watch_stream_since<S, N>(
260        &self,
261        namespace: N,
262        resource_version: Option<String>,
263    ) -> BoxStream<'_, TokenStreamResult<S>>
264    where
265        S: Spec + 'static,
266        N: Into<NameSpace>;
267
268    fn watch_stream_now<S>(&self, ns: String) -> BoxStream<'_, TokenStreamResult<S>>
269    where
270        S: Spec + 'static,
271    {
272        let ft_stream = async move {
273            let namespace = ns.as_ref();
274            match self.retrieve_items_with_option(namespace, None).await {
275                Ok(item_now_list) => {
276                    let resource_version = item_now_list.metadata.resource_version;
277
278                    let items_watch_stream =
279                        self.watch_stream_since(namespace, Some(resource_version));
280
281                    let items_list = item_now_list
282                        .items
283                        .into_iter()
284                        .map(|item| Ok(K8Watch::ADDED(item)))
285                        .collect();
286                    let list_stream = once(ready(Ok(items_list)));
287
288                    list_stream.chain(items_watch_stream).left_stream()
289                    // list_stream
290                }
291                Err(err) => once(ready(Err(err))).right_stream(),
292            }
293        };
294
295        ft_stream.flatten_stream().boxed()
296    }
297
298    /// Check if the object exists, return true or false.
299    async fn exists<S, M>(&self, metadata: &M) -> Result<bool>
300    where
301        S: Spec,
302        M: K8Meta + Display + Send + Sync,
303    {
304        debug!("check if '{}' exists", metadata);
305        match self.retrieve_item::<S, M>(metadata).await {
306            Ok(Some(_)) => Ok(true),
307            Ok(None) => Ok(false),
308            Err(err) => Err(err),
309        }
310    }
311}