Skip to main content

kube_runtime/reflector/
store.rs

1//! A reader/writer split store for reflectors
2use super::{Lookup, ObjectRef, dispatcher::Dispatcher};
3#[cfg(feature = "unstable-runtime-subscribe")]
4use crate::reflector::ReflectHandle;
5use crate::{
6    utils::delayed_init::{self, DelayedInit},
7    watcher,
8};
9use ahash::AHashMap;
10use educe::Educe;
11use parking_lot::RwLock;
12use std::{fmt::Debug, hash::Hash, sync::Arc};
13use thiserror::Error;
14
15type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
16
17/// A writable Store handle
18///
19/// This is exclusive since it's not safe to share a single `Store` between multiple reflectors.
20/// In particular, `Restarted` events will clobber the state of other connected reflectors.
21#[derive(Debug)]
22pub struct Writer<K: 'static + Lookup + Clone>
23where
24    K::DynamicType: Eq + Hash + Clone,
25{
26    store: Cache<K>,
27    buffer: AHashMap<ObjectRef<K>, Arc<K>>,
28    dyntype: K::DynamicType,
29    ready_tx: Option<delayed_init::Initializer<()>>,
30    ready_rx: Arc<DelayedInit<()>>,
31    dispatcher: Option<Dispatcher<K>>,
32}
33
34impl<K: 'static + Lookup + Clone> Writer<K>
35where
36    K::DynamicType: Eq + Hash + Clone,
37{
38    /// Creates a new Writer with the specified dynamic type.
39    ///
40    /// If the dynamic type is default-able (for example when writer is used with
41    /// `k8s_openapi` types) you can use `Default` instead.
42    pub fn new(dyntype: K::DynamicType) -> Self {
43        let (ready_tx, ready_rx) = DelayedInit::new();
44        Writer {
45            store: Default::default(),
46            buffer: Default::default(),
47            dyntype,
48            ready_tx: Some(ready_tx),
49            ready_rx: Arc::new(ready_rx),
50            dispatcher: None,
51        }
52    }
53
54    /// Creates a new Writer with the specified dynamic type and buffer size.
55    ///
56    /// When the Writer is created through `new_shared`, it will be able to
57    /// be subscribed. Stored objects will be propagated to all subscribers. The
58    /// buffer size is used for the underlying channel. An object is cleared
59    /// from the buffer only when all subscribers have seen it.
60    ///
61    /// If the dynamic type is default-able (for example when writer is used with
62    /// `k8s_openapi` types) you can use `Default` instead.
63    #[cfg(feature = "unstable-runtime-subscribe")]
64    pub fn new_shared(buf_size: usize, dyntype: K::DynamicType) -> Self {
65        let (ready_tx, ready_rx) = DelayedInit::new();
66        Writer {
67            store: Default::default(),
68            buffer: Default::default(),
69            dyntype,
70            ready_tx: Some(ready_tx),
71            ready_rx: Arc::new(ready_rx),
72            dispatcher: Some(Dispatcher::new(buf_size)),
73        }
74    }
75
76    /// Return a read handle to the store
77    ///
78    /// Multiple read handles may be obtained, by either calling `as_reader` multiple times,
79    /// or by calling `Store::clone()` afterwards.
80    #[must_use]
81    pub fn as_reader(&self) -> Store<K> {
82        Store {
83            store: self.store.clone(),
84            ready_rx: self.ready_rx.clone(),
85        }
86    }
87
88    /// Return a handle to a subscriber
89    ///
90    /// Multiple subscribe handles may be obtained, by either calling
91    /// `subscribe` multiple times, or by calling `clone()`
92    ///
93    /// This function returns a `Some` when the [`Writer`] is constructed through
94    /// [`Writer::new_shared`] or [`store_shared`], and a `None` otherwise.
95    #[cfg(feature = "unstable-runtime-subscribe")]
96    pub fn subscribe(&self) -> Option<ReflectHandle<K>> {
97        self.dispatcher
98            .as_ref()
99            .map(|dispatcher| dispatcher.subscribe(self.as_reader()))
100    }
101
102    /// Applies a single watcher event to the store
103    pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
104        match event {
105            watcher::Event::Apply(obj) => {
106                let key = obj.to_object_ref(self.dyntype.clone());
107                let obj = Arc::new(obj.clone());
108                self.store.write().insert(key, obj);
109            }
110            watcher::Event::Delete(obj) => {
111                let key = obj.to_object_ref(self.dyntype.clone());
112                self.store.write().remove(&key);
113            }
114            watcher::Event::Init => {
115                self.buffer = AHashMap::new();
116            }
117            watcher::Event::InitApply(obj) => {
118                let key = obj.to_object_ref(self.dyntype.clone());
119                let obj = Arc::new(obj.clone());
120                self.buffer.insert(key, obj);
121            }
122            watcher::Event::InitDone => {
123                let mut store = self.store.write();
124
125                // Swap the buffer into the store
126                std::mem::swap(&mut *store, &mut self.buffer);
127
128                // Clear the buffer
129                // This is preferred over self.buffer.clear(), as clear() will keep the allocated memory for reuse.
130                // This way, the old buffer is dropped.
131                self.buffer = AHashMap::new();
132
133                // Mark as ready after the Restart, "releasing" any calls to Store::wait_until_ready()
134                if let Some(ready_tx) = self.ready_tx.take() {
135                    ready_tx.init(())
136                }
137            }
138        }
139    }
140
141    /// Broadcast an event to any downstream listeners subscribed on the store
142    pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) {
143        if let Some(ref mut dispatcher) = self.dispatcher {
144            match event {
145                watcher::Event::Apply(obj) => {
146                    let obj_ref = obj.to_object_ref(self.dyntype.clone());
147                    // TODO (matei): should this take a timeout to log when backpressure has
148                    // been applied for too long, e.g. 10s
149                    dispatcher.broadcast(obj_ref).await;
150                }
151
152                watcher::Event::InitDone => {
153                    let obj_refs: Vec<_> = {
154                        let store = self.store.read();
155                        store.keys().cloned().collect()
156                    };
157
158                    for obj_ref in obj_refs {
159                        dispatcher.broadcast(obj_ref).await;
160                    }
161                }
162
163                _ => {}
164            }
165        }
166    }
167}
168
169impl<K> Default for Writer<K>
170where
171    K: Lookup + Clone + 'static,
172    K::DynamicType: Default + Eq + Hash + Clone,
173{
174    fn default() -> Self {
175        Self::new(K::DynamicType::default())
176    }
177}
178
179/// A readable cache of Kubernetes objects of kind `K`
180///
181/// Cloning will produce a new reference to the same backing store.
182///
183/// Cannot be constructed directly since one writer handle is required,
184/// use `Writer::as_reader()` instead.
185#[derive(Educe)]
186#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
187pub struct Store<K: 'static + Lookup>
188where
189    K::DynamicType: Hash + Eq,
190{
191    store: Cache<K>,
192    ready_rx: Arc<DelayedInit<()>>,
193}
194
195/// The error returned by `Store::wait_until_ready`
196#[derive(Debug, Error)]
197#[error("writer was dropped before store became ready")]
198pub struct WriterDropped(delayed_init::InitDropped);
199
200impl<K: 'static + Clone + Lookup> Store<K>
201where
202    K::DynamicType: Eq + Hash + Clone,
203{
204    /// Wait for the store to be populated by Kubernetes.
205    ///
206    /// Note that polling this will _not_ await the source of the stream that populates the [`Writer`].
207    /// The [`reflector`](crate::reflector()) stream must be awaited separately.
208    ///
209    /// # Errors
210    /// Returns an error if the [`Writer`] was dropped before any value was written.
211    pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> {
212        self.ready_rx.get().await.map_err(WriterDropped)
213    }
214
215    /// Retrieve a `clone()` of the entry referred to by `key`, if it is in the cache.
216    ///
217    /// `key.namespace` is ignored for cluster-scoped resources.
218    ///
219    /// Note that this is a cache and may be stale. Deleted objects may still exist in the cache
220    /// despite having been deleted in the cluster, and new objects may not yet exist in the cache.
221    /// If any of these are a problem for you then you should abort your reconciler and retry later.
222    /// If you use `kube_rt::controller` then you can do this by returning an error and specifying a
223    /// reasonable `error_policy`.
224    #[must_use]
225    pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
226        let store = self.store.read();
227        store
228            .get(key)
229            // Try to erase the namespace and try again, in case the object is cluster-scoped
230            .or_else(|| {
231                store.get(&{
232                    let mut cluster_key = key.clone();
233                    cluster_key.namespace = None;
234                    cluster_key
235                })
236            })
237            // Clone to let go of the entry lock ASAP
238            .cloned()
239    }
240
241    /// Return a full snapshot of the current values
242    #[must_use]
243    pub fn state(&self) -> Vec<Arc<K>> {
244        let s = self.store.read();
245        s.values().cloned().collect()
246    }
247
248    /// Retrieve a `clone()` of the entry found by the given predicate
249    #[must_use]
250    pub fn find<P>(&self, predicate: P) -> Option<Arc<K>>
251    where
252        P: Fn(&K) -> bool,
253    {
254        self.store
255            .read()
256            .values()
257            .find(|k| predicate(k.as_ref()))
258            .cloned()
259    }
260
261    /// Return the number of elements in the store
262    #[must_use]
263    pub fn len(&self) -> usize {
264        self.store.read().len()
265    }
266
267    /// Return whether the store is empty
268    #[must_use]
269    pub fn is_empty(&self) -> bool {
270        self.store.read().is_empty()
271    }
272}
273
274/// Create a (Reader, Writer) for a `Store<K>` for a typed resource `K`
275///
276/// The `Writer` should be passed to a [`reflector`](crate::reflector()),
277/// and the [`Store`] is a read-only handle.
278#[must_use]
279pub fn store<K>() -> (Store<K>, Writer<K>)
280where
281    K: Lookup + Clone + 'static,
282    K::DynamicType: Eq + Hash + Clone + Default,
283{
284    let w = Writer::<K>::default();
285    let r = w.as_reader();
286    (r, w)
287}
288
289/// Create a (Reader, Writer) for a `Store<K>` for a typed resource `K`
290///
291/// The resulting `Writer` can be subscribed on in order to fan out events from
292/// a watcher. The `Writer` should be passed to a [`reflector`](crate::reflector()),
293/// and the [`Store`] is a read-only handle.
294///
295/// A buffer size is used for the underlying message channel. When the buffer is
296/// full, backpressure will be applied by waiting for capacity.
297#[must_use]
298#[cfg(feature = "unstable-runtime-subscribe")]
299pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>)
300where
301    K: Lookup + Clone + 'static,
302    K::DynamicType: Eq + Hash + Clone + Default,
303{
304    let w = Writer::<K>::new_shared(buf_size, Default::default());
305    let r = w.as_reader();
306    (r, w)
307}
308
309#[cfg(test)]
310mod tests {
311    use super::{Writer, store};
312    use crate::{reflector::ObjectRef, watcher};
313    use k8s_openapi::api::core::v1::ConfigMap;
314    use kube_client::api::ObjectMeta;
315
316    #[test]
317    fn should_allow_getting_namespaced_object_by_namespaced_ref() {
318        let cm = ConfigMap {
319            metadata: ObjectMeta {
320                name: Some("obj".to_string()),
321                namespace: Some("ns".to_string()),
322                ..ObjectMeta::default()
323            },
324            ..ConfigMap::default()
325        };
326        let mut store_w = Writer::default();
327        store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
328        let store = store_w.as_reader();
329        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
330    }
331
332    #[test]
333    fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
334        let cm = ConfigMap {
335            metadata: ObjectMeta {
336                name: Some("obj".to_string()),
337                namespace: Some("ns".to_string()),
338                ..ObjectMeta::default()
339            },
340            ..ConfigMap::default()
341        };
342        let mut cluster_cm = cm.clone();
343        cluster_cm.metadata.namespace = None;
344        let mut store_w = Writer::default();
345        store_w.apply_watcher_event(&watcher::Event::Apply(cm));
346        let store = store_w.as_reader();
347        assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
348    }
349
350    #[test]
351    fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
352        let cm = ConfigMap {
353            metadata: ObjectMeta {
354                name: Some("obj".to_string()),
355                namespace: None,
356                ..ObjectMeta::default()
357            },
358            ..ConfigMap::default()
359        };
360        let (store, mut writer) = store();
361        writer.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
362        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
363    }
364
365    #[test]
366    fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
367        let cm = ConfigMap {
368            metadata: ObjectMeta {
369                name: Some("obj".to_string()),
370                namespace: None,
371                ..ObjectMeta::default()
372            },
373            ..ConfigMap::default()
374        };
375        let mut nsed_cm = cm.clone();
376        nsed_cm.metadata.namespace = Some("ns".to_string());
377        let mut store_w = Writer::default();
378        store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
379        let store = store_w.as_reader();
380        assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
381    }
382
383    #[test]
384    fn find_element_in_store() {
385        let cm = ConfigMap {
386            metadata: ObjectMeta {
387                name: Some("obj".to_string()),
388                namespace: None,
389                ..ObjectMeta::default()
390            },
391            ..ConfigMap::default()
392        };
393        let mut target_cm = cm.clone();
394
395        let (reader, mut writer) = store::<ConfigMap>();
396        assert!(reader.is_empty());
397        writer.apply_watcher_event(&watcher::Event::Apply(cm));
398
399        assert_eq!(reader.len(), 1);
400        assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none());
401
402        target_cm.metadata.name = Some("obj1".to_string());
403        target_cm.metadata.generation = Some(1234);
404        writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone()));
405        assert!(!reader.is_empty());
406        assert_eq!(reader.len(), 2);
407        let found = reader.find(|k| k.metadata.generation == Some(1234));
408        assert_eq!(found.as_deref(), Some(&target_cm));
409    }
410}