kube_client/api/
entry.rs

1//! API helpers for get-or-create and get-and-modify patterns
2//!
3//! [`Api::entry`] is the primary entry point for this API.
4
5// Import used in docs
6#[allow(unused_imports)] use std::collections::HashMap;
7use std::fmt::Debug;
8
9use crate::{Api, Error, Result};
10use kube_core::{Resource, params::PostParams};
11use serde::{Serialize, de::DeserializeOwned};
12
13impl<K: Resource + Clone + DeserializeOwned + Debug> Api<K> {
14    /// Gets a given object's "slot" on the Kubernetes API, designed for "get-or-create" and "get-and-modify" patterns
15    ///
16    /// This is similar to [`HashMap::entry`], but the [`Entry`] must be [`OccupiedEntry::commit`]ed for changes to be persisted.
17    ///
18    /// # Usage
19    ///
20    /// ```rust,no_run
21    /// # use std::collections::BTreeMap;
22    /// # use k8s_openapi::api::core::v1::ConfigMap;
23    /// # async fn wrapper() -> Result <(), Box<dyn std::error::Error>> {
24    /// let kube = kube::Client::try_default().await?;
25    /// let cms = kube::Api::<ConfigMap>::namespaced(kube, "default");
26    /// cms
27    ///     // Try to get `entry-example` if it exists
28    ///     .entry("entry-example").await?
29    ///     // Modify object if it already exists
30    ///     .and_modify(|cm| {
31    ///         cm.data
32    ///             .get_or_insert_with(BTreeMap::default)
33    ///             .insert("already-exists".to_string(), "true".to_string());
34    ///     })
35    ///     // Provide a default object if it does not exist
36    ///     .or_insert(|| ConfigMap::default())
37    ///     // Modify the object unconditionally now that we have provided a default value
38    ///     .and_modify(|cm| {
39    ///         cm.data
40    ///             .get_or_insert_with(BTreeMap::default)
41    ///             .insert("modified".to_string(), "true".to_string());
42    ///     })
43    ///     // Save changes
44    ///     .commit(&kube::api::PostParams::default()).await?;
45    /// # Ok(())
46    /// # }
47    /// ```
48    pub async fn entry<'a>(&'a self, name: &'a str) -> Result<Entry<'a, K>> {
49        Ok(match self.get_opt(name).await? {
50            Some(object) => Entry::Occupied(OccupiedEntry {
51                api: self,
52                dirtiness: Dirtiness::Clean,
53                name,
54                object,
55            }),
56            None => Entry::Vacant(VacantEntry { api: self, name }),
57        })
58    }
59}
60
61#[derive(Debug)]
62/// A view into a single object, with enough context to create or update it
63///
64/// See [`Api::entry`] for more information.
65pub enum Entry<'a, K> {
66    /// An object that either exists on the server, or has been created locally (and is awaiting synchronization)
67    Occupied(OccupiedEntry<'a, K>),
68    /// An object that does not exist
69    Vacant(VacantEntry<'a, K>),
70}
71
72impl<'a, K> Entry<'a, K> {
73    /// Borrow the object, if it exists (on the API, or queued for creation using [`Entry::or_insert`])
74    pub fn get(&self) -> Option<&K> {
75        match self {
76            Entry::Occupied(entry) => Some(entry.get()),
77            Entry::Vacant(_) => None,
78        }
79    }
80
81    /// Borrow the object mutably, if it exists (on the API, or queued for creation using [`Entry::or_insert`])
82    ///
83    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
84    pub fn get_mut(&mut self) -> Option<&mut K> {
85        match self {
86            Entry::Occupied(entry) => Some(entry.get_mut()),
87            Entry::Vacant(_) => None,
88        }
89    }
90
91    /// Let `f` modify the object, if it exists (on the API, or queued for creation using [`Entry::or_insert`])
92    ///
93    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
94    pub fn and_modify(self, f: impl FnOnce(&mut K)) -> Self {
95        match self {
96            Entry::Occupied(entry) => Entry::Occupied(entry.and_modify(f)),
97            entry @ Entry::Vacant(_) => entry,
98        }
99    }
100
101    /// Create a new object if it does not already exist
102    ///
103    /// [`OccupiedEntry::commit`] must be called afterwards for the change to be persisted.
104    pub fn or_insert(self, default: impl FnOnce() -> K) -> OccupiedEntry<'a, K>
105    where
106        K: Resource,
107    {
108        match self {
109            Entry::Occupied(entry) => entry,
110            Entry::Vacant(entry) => entry.insert(default()),
111        }
112    }
113}
114
115/// A view into a single object that exists
116///
117/// The object may exist because it existed at the time of call to [`Api::entry`],
118/// or because it was created by [`Entry::or_insert`].
119#[derive(Debug)]
120pub struct OccupiedEntry<'a, K> {
121    api: &'a Api<K>,
122    dirtiness: Dirtiness,
123    name: &'a str,
124    object: K,
125}
126
127#[derive(Debug)]
128enum Dirtiness {
129    /// The object has not been modified (locally) since the last API operation
130    Clean,
131    /// The object exists in the API, but has been modified locally
132    Dirty,
133    /// The object does not yet exist in the API, but was created locally
134    New,
135}
136
137impl<K> OccupiedEntry<'_, K> {
138    /// Borrow the object
139    pub fn get(&self) -> &K {
140        &self.object
141    }
142
143    /// Borrow the object mutably
144    ///
145    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
146    pub fn get_mut(&mut self) -> &mut K {
147        self.dirtiness = match self.dirtiness {
148            Dirtiness::Clean => Dirtiness::Dirty,
149            Dirtiness::Dirty => Dirtiness::Dirty,
150            Dirtiness::New => Dirtiness::New,
151        };
152        &mut self.object
153    }
154
155    /// Let `f` modify the object
156    ///
157    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
158    pub fn and_modify(mut self, f: impl FnOnce(&mut K)) -> Self {
159        f(self.get_mut());
160        self
161    }
162
163    /// Take ownership over the object
164    pub fn into_object(self) -> K {
165        self.object
166    }
167
168    /// Save the object to the Kubernetes API, if any changes have been made
169    ///
170    /// The [`OccupiedEntry`] is updated with the new object (including changes made by the API server, such as
171    /// `.metadata.resource_version`).
172    ///
173    /// # Errors
174    ///
175    /// This function can fail due to transient errors, or due to write conflicts (for example: if another client
176    /// created the object between the calls to [`Api::entry`] and `OccupiedEntry::commit`, or because another
177    /// client modified the object in the meantime).
178    ///
179    /// Any retries should be coarse-grained enough to also include the call to [`Api::entry`], so that the latest
180    /// state can be fetched.
181    #[tracing::instrument(skip(self))]
182    pub async fn commit(&mut self, pp: &PostParams) -> Result<(), CommitError>
183    where
184        K: Resource + DeserializeOwned + Serialize + Clone + Debug,
185    {
186        self.prepare_for_commit()?;
187        match self.dirtiness {
188            Dirtiness::New => {
189                self.object = self
190                    .api
191                    .create(pp, &self.object)
192                    .await
193                    .map_err(CommitError::Save)?
194            }
195            Dirtiness::Dirty => {
196                self.object = self
197                    .api
198                    .replace(self.name, pp, &self.object)
199                    .await
200                    .map_err(CommitError::Save)?;
201            }
202            Dirtiness::Clean => (),
203        };
204        if !pp.dry_run {
205            self.dirtiness = Dirtiness::Clean;
206        }
207        Ok(())
208    }
209
210    /// Validate that [`Self::object`] is valid, and refers to the same object as the original [`Api::entry`] call
211    ///
212    /// Defaults `ObjectMeta::name` and `ObjectMeta::namespace` if unset.
213    fn prepare_for_commit(&mut self) -> Result<(), CommitValidationError>
214    where
215        K: Resource,
216    {
217        // Access `Self::object` directly rather than using `Self::get_mut` to avoid flagging the object as dirty
218        let meta = self.object.meta_mut();
219        match &mut meta.name {
220            name @ None => *name = Some(self.name.to_string()),
221            Some(name) if name != self.name => {
222                return Err(CommitValidationError::NameMismatch {
223                    object_name: name.clone(),
224                    expected: self.name.to_string(),
225                });
226            }
227            Some(_) => (),
228        }
229        match &mut meta.namespace {
230            ns @ None => ns.clone_from(&self.api.namespace),
231            Some(ns) if Some(ns.as_str()) != self.api.namespace.as_deref() => {
232                return Err(CommitValidationError::NamespaceMismatch {
233                    object_namespace: Some(ns.clone()),
234                    expected: self.api.namespace.clone(),
235                });
236            }
237            Some(_) => (),
238        }
239        if let Some(generate_name) = &meta.generate_name {
240            return Err(CommitValidationError::GenerateName {
241                object_generate_name: generate_name.clone(),
242            });
243        }
244        Ok(())
245    }
246}
247
248#[derive(Debug, thiserror::Error)]
249/// Commit errors
250pub enum CommitError {
251    /// Pre-commit validation failed
252    #[error("failed to validate object for saving")]
253    Validate(#[from] CommitValidationError),
254    /// Failed to submit the new object to the Kubernetes API
255    #[error("failed to save object")]
256    Save(#[source] Error),
257}
258
259#[derive(Debug, thiserror::Error)]
260/// Pre-commit validation errors
261pub enum CommitValidationError {
262    /// `ObjectMeta::name` does not match the name passed to [`Api::entry`]
263    #[error(
264        ".metadata.name does not match the name passed to Api::entry (got: {object_name:?}, expected: {expected:?})"
265    )]
266    NameMismatch {
267        /// The name of the object (`ObjectMeta::name`)
268        object_name: String,
269        /// The name passed to [`Api::entry`]
270        expected: String,
271    },
272    /// `ObjectMeta::namespace` does not match the namespace of the [`Api`]
273    #[error(
274        ".metadata.namespace does not match the namespace of the Api (got: {object_namespace:?}, expected: {expected:?})"
275    )]
276    NamespaceMismatch {
277        /// The name of the object (`ObjectMeta::namespace`)
278        object_namespace: Option<String>,
279        /// The namespace of the [`Api`]
280        expected: Option<String>,
281    },
282    /// `ObjectMeta::generate_name` must not be set
283    #[error(".metadata.generate_name must not be set (got: {object_generate_name:?})")]
284    GenerateName {
285        /// The set name generation template of the object (`ObjectMeta::generate_name`)
286        object_generate_name: String,
287    },
288}
289
290/// A view of an object that does not yet exist
291///
292/// Created by [`Api::entry`], as a variant of [`Entry`]
293#[derive(Debug)]
294pub struct VacantEntry<'a, K> {
295    api: &'a Api<K>,
296    name: &'a str,
297}
298
299impl<'a, K> VacantEntry<'a, K> {
300    /// Create a new object
301    ///
302    /// [`OccupiedEntry::commit`] must be called afterwards for the change to be persisted.
303    #[tracing::instrument(skip(self, object))]
304    pub fn insert(self, object: K) -> OccupiedEntry<'a, K>
305    where
306        K: Resource,
307    {
308        OccupiedEntry {
309            api: self.api,
310            dirtiness: Dirtiness::New,
311            name: self.name,
312            object,
313        }
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use std::collections::BTreeMap;
320
321    use k8s_openapi::api::core::v1::ConfigMap;
322    use kube_core::{
323        ObjectMeta,
324        params::{DeleteParams, PostParams},
325    };
326
327    use crate::{
328        Api, Client, Error,
329        api::entry::{CommitError, Entry},
330    };
331
332    #[tokio::test]
333    #[ignore = "needs cluster (gets and writes cms)"]
334    async fn entry_create_missing_object() -> Result<(), Box<dyn std::error::Error>> {
335        let client = Client::try_default().await?;
336        let api = Api::<ConfigMap>::default_namespaced(client);
337
338        let object_name = "entry-missing-cm";
339        if api.get_opt(object_name).await?.is_some() {
340            api.delete(object_name, &DeleteParams::default()).await?;
341        }
342
343        let entry = api.entry(object_name).await?;
344        let entry2 = api.entry(object_name).await?;
345        assert_eq!(entry.get(), None);
346        assert_eq!(entry2.get(), None);
347
348        // Create object cleanly
349        let mut entry = entry.or_insert(|| ConfigMap {
350            data: Some([("key".to_string(), "value".to_string())].into()),
351            ..ConfigMap::default()
352        });
353        entry.commit(&PostParams::default()).await?;
354        assert_eq!(
355            entry
356                .get()
357                .data
358                .as_ref()
359                .and_then(|data| data.get("key"))
360                .map(String::as_str),
361            Some("value")
362        );
363        let fetched_obj = api.get(object_name).await?;
364        assert_eq!(
365            fetched_obj
366                .data
367                .as_ref()
368                .and_then(|data| data.get("key"))
369                .map(String::as_str),
370            Some("value")
371        );
372
373        // Update object
374        entry
375            .get_mut()
376            .data
377            .get_or_insert_with(BTreeMap::default)
378            .insert("key".to_string(), "value2".to_string());
379        entry.commit(&PostParams::default()).await?;
380        assert_eq!(
381            entry
382                .get()
383                .data
384                .as_ref()
385                .and_then(|data| data.get("key"))
386                .map(String::as_str),
387            Some("value2")
388        );
389        let fetched_obj = api.get(object_name).await?;
390        assert_eq!(
391            fetched_obj
392                .data
393                .as_ref()
394                .and_then(|data| data.get("key"))
395                .map(String::as_str),
396            Some("value2")
397        );
398
399        // Object was already created in parallel, fail with a conflict error
400        let mut entry2 = entry2.or_insert(|| ConfigMap {
401            data: Some([("key".to_string(), "value3".to_string())].into()),
402            ..ConfigMap::default()
403        });
404        assert!(
405            matches!(dbg!(entry2.commit(&PostParams::default()).await), Err(CommitError::Save(Error::Api(status))) if status.is_already_exists())
406        );
407
408        // Cleanup
409        api.delete(object_name, &DeleteParams::default()).await?;
410        Ok(())
411    }
412
413    #[tokio::test]
414    #[ignore = "needs cluster (gets and writes cms)"]
415    async fn entry_update_existing_object() -> Result<(), Box<dyn std::error::Error>> {
416        let client = Client::try_default().await?;
417        let api = Api::<ConfigMap>::default_namespaced(client);
418
419        let object_name = "entry-existing-cm";
420        if api.get_opt(object_name).await?.is_some() {
421            api.delete(object_name, &DeleteParams::default()).await?;
422        }
423        api.create(&PostParams::default(), &ConfigMap {
424            metadata: ObjectMeta {
425                namespace: api.namespace.clone(),
426                name: Some(object_name.to_string()),
427                ..ObjectMeta::default()
428            },
429            data: Some([("key".to_string(), "value".to_string())].into()),
430            ..ConfigMap::default()
431        })
432        .await?;
433
434        let mut entry = match api.entry(object_name).await? {
435            Entry::Occupied(entry) => entry,
436            entry => panic!("entry for existing object must be occupied: {entry:?}"),
437        };
438        let mut entry2 = match api.entry(object_name).await? {
439            Entry::Occupied(entry) => entry,
440            entry => panic!("entry for existing object must be occupied: {entry:?}"),
441        };
442
443        // Entry is up-to-date, modify cleanly
444        entry
445            .get_mut()
446            .data
447            .get_or_insert_with(BTreeMap::default)
448            .insert("key".to_string(), "value2".to_string());
449        entry.commit(&PostParams::default()).await?;
450        assert_eq!(
451            entry
452                .get()
453                .data
454                .as_ref()
455                .and_then(|data| data.get("key"))
456                .map(String::as_str),
457            Some("value2")
458        );
459        let fetched_obj = api.get(object_name).await?;
460        assert_eq!(
461            fetched_obj
462                .data
463                .as_ref()
464                .and_then(|data| data.get("key"))
465                .map(String::as_str),
466            Some("value2")
467        );
468
469        // Object was already updated in parallel, fail with a conflict error
470        entry2
471            .get_mut()
472            .data
473            .get_or_insert_with(BTreeMap::default)
474            .insert("key".to_string(), "value3".to_string());
475        assert!(
476            matches!(entry2.commit(&PostParams::default()).await, Err(CommitError::Save(Error::Api(status))) if status.is_conflict())
477        );
478
479        // Cleanup
480        api.delete(object_name, &DeleteParams::default()).await?;
481        Ok(())
482    }
483
484    #[tokio::test]
485    #[ignore = "needs cluster (gets and writes cms)"]
486    async fn entry_create_dry_run() -> Result<(), Box<dyn std::error::Error>> {
487        let client = Client::try_default().await?;
488        let api = Api::<ConfigMap>::default_namespaced(client);
489
490        let object_name = "entry-cm-dry";
491        if api.get_opt(object_name).await?.is_some() {
492            api.delete(object_name, &DeleteParams::default()).await?;
493        }
494
495        let pp_dry = PostParams {
496            dry_run: true,
497            ..Default::default()
498        };
499
500        let entry = api.entry(object_name).await?;
501        assert_eq!(entry.get(), None);
502
503        // Create object dry-run
504        let mut entry = entry.or_insert(|| ConfigMap {
505            data: Some([("key".to_string(), "value".to_string())].into()),
506            ..ConfigMap::default()
507        });
508        entry.commit(&pp_dry).await?;
509        assert_eq!(
510            entry
511                .get()
512                .data
513                .as_ref()
514                .and_then(|data| data.get("key"))
515                .map(String::as_str),
516            Some("value")
517        );
518        let fetched_obj = api.get_opt(object_name).await?;
519        assert_eq!(fetched_obj, None);
520
521        // Commit object creation properly
522        entry.commit(&PostParams::default()).await?;
523        assert_eq!(
524            entry
525                .get()
526                .data
527                .as_ref()
528                .and_then(|data| data.get("key"))
529                .map(String::as_str),
530            Some("value")
531        );
532        let fetched_obj = api.get(object_name).await?;
533        assert_eq!(
534            fetched_obj
535                .data
536                .as_ref()
537                .and_then(|data| data.get("key"))
538                .map(String::as_str),
539            Some("value")
540        );
541
542        // Update object dry-run
543        entry
544            .get_mut()
545            .data
546            .get_or_insert_with(BTreeMap::default)
547            .insert("key".to_string(), "value2".to_string());
548        entry.commit(&pp_dry).await?;
549        assert_eq!(
550            entry
551                .get()
552                .data
553                .as_ref()
554                .and_then(|data| data.get("key"))
555                .map(String::as_str),
556            Some("value2")
557        );
558        let fetched_obj = api.get(object_name).await?;
559        assert_eq!(
560            fetched_obj
561                .data
562                .as_ref()
563                .and_then(|data| data.get("key"))
564                .map(String::as_str),
565            Some("value")
566        );
567
568        // Commit object update properly
569        entry.commit(&PostParams::default()).await?;
570        assert_eq!(
571            entry
572                .get()
573                .data
574                .as_ref()
575                .and_then(|data| data.get("key"))
576                .map(String::as_str),
577            Some("value2")
578        );
579        let fetched_obj = api.get(object_name).await?;
580        assert_eq!(
581            fetched_obj
582                .data
583                .as_ref()
584                .and_then(|data| data.get("key"))
585                .map(String::as_str),
586            Some("value2")
587        );
588
589        // Cleanup
590        api.delete(object_name, &DeleteParams::default()).await?;
591        Ok(())
592    }
593}