1use super::{Lookup, ObjectRef, dispatcher::Dispatcher};
3#[cfg(feature = "unstable-runtime-subscribe")]
4use crate::reflector::ReflectHandle;
5use crate::{
6 utils::delayed_init::{self, DelayedInit},
7 watcher,
8};
9use ahash::AHashMap;
10use educe::Educe;
11use parking_lot::RwLock;
12use std::{fmt::Debug, hash::Hash, sync::Arc};
13use thiserror::Error;
14
15type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
16
17#[derive(Debug)]
22pub struct Writer<K: 'static + Lookup + Clone>
23where
24 K::DynamicType: Eq + Hash + Clone,
25{
26 store: Cache<K>,
27 buffer: AHashMap<ObjectRef<K>, Arc<K>>,
28 dyntype: K::DynamicType,
29 ready_tx: Option<delayed_init::Initializer<()>>,
30 ready_rx: Arc<DelayedInit<()>>,
31 dispatcher: Option<Dispatcher<K>>,
32}
33
34impl<K: 'static + Lookup + Clone> Writer<K>
35where
36 K::DynamicType: Eq + Hash + Clone,
37{
38 pub fn new(dyntype: K::DynamicType) -> Self {
43 let (ready_tx, ready_rx) = DelayedInit::new();
44 Writer {
45 store: Default::default(),
46 buffer: Default::default(),
47 dyntype,
48 ready_tx: Some(ready_tx),
49 ready_rx: Arc::new(ready_rx),
50 dispatcher: None,
51 }
52 }
53
54 #[cfg(feature = "unstable-runtime-subscribe")]
64 pub fn new_shared(buf_size: usize, dyntype: K::DynamicType) -> Self {
65 let (ready_tx, ready_rx) = DelayedInit::new();
66 Writer {
67 store: Default::default(),
68 buffer: Default::default(),
69 dyntype,
70 ready_tx: Some(ready_tx),
71 ready_rx: Arc::new(ready_rx),
72 dispatcher: Some(Dispatcher::new(buf_size)),
73 }
74 }
75
76 #[must_use]
81 pub fn as_reader(&self) -> Store<K> {
82 Store {
83 store: self.store.clone(),
84 ready_rx: self.ready_rx.clone(),
85 }
86 }
87
88 #[cfg(feature = "unstable-runtime-subscribe")]
96 pub fn subscribe(&self) -> Option<ReflectHandle<K>> {
97 self.dispatcher
98 .as_ref()
99 .map(|dispatcher| dispatcher.subscribe(self.as_reader()))
100 }
101
102 pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
104 match event {
105 watcher::Event::Apply(obj) => {
106 let key = obj.to_object_ref(self.dyntype.clone());
107 let obj = Arc::new(obj.clone());
108 self.store.write().insert(key, obj);
109 }
110 watcher::Event::Delete(obj) => {
111 let key = obj.to_object_ref(self.dyntype.clone());
112 self.store.write().remove(&key);
113 }
114 watcher::Event::Init => {
115 self.buffer = AHashMap::new();
116 }
117 watcher::Event::InitApply(obj) => {
118 let key = obj.to_object_ref(self.dyntype.clone());
119 let obj = Arc::new(obj.clone());
120 self.buffer.insert(key, obj);
121 }
122 watcher::Event::InitDone => {
123 let mut store = self.store.write();
124
125 std::mem::swap(&mut *store, &mut self.buffer);
127
128 self.buffer = AHashMap::new();
132
133 if let Some(ready_tx) = self.ready_tx.take() {
135 ready_tx.init(())
136 }
137 }
138 }
139 }
140
141 pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) {
143 if let Some(ref mut dispatcher) = self.dispatcher {
144 match event {
145 watcher::Event::Apply(obj) => {
146 let obj_ref = obj.to_object_ref(self.dyntype.clone());
147 dispatcher.broadcast(obj_ref).await;
150 }
151
152 watcher::Event::InitDone => {
153 let obj_refs: Vec<_> = {
154 let store = self.store.read();
155 store.keys().cloned().collect()
156 };
157
158 for obj_ref in obj_refs {
159 dispatcher.broadcast(obj_ref).await;
160 }
161 }
162
163 _ => {}
164 }
165 }
166 }
167}
168
169impl<K> Default for Writer<K>
170where
171 K: Lookup + Clone + 'static,
172 K::DynamicType: Default + Eq + Hash + Clone,
173{
174 fn default() -> Self {
175 Self::new(K::DynamicType::default())
176 }
177}
178
179#[derive(Educe)]
186#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
187pub struct Store<K: 'static + Lookup>
188where
189 K::DynamicType: Hash + Eq,
190{
191 store: Cache<K>,
192 ready_rx: Arc<DelayedInit<()>>,
193}
194
195#[derive(Debug, Error)]
197#[error("writer was dropped before store became ready")]
198pub struct WriterDropped(delayed_init::InitDropped);
199
200impl<K: 'static + Clone + Lookup> Store<K>
201where
202 K::DynamicType: Eq + Hash + Clone,
203{
204 pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> {
212 self.ready_rx.get().await.map_err(WriterDropped)
213 }
214
215 #[must_use]
225 pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
226 let store = self.store.read();
227 store
228 .get(key)
229 .or_else(|| {
231 store.get(&{
232 let mut cluster_key = key.clone();
233 cluster_key.namespace = None;
234 cluster_key
235 })
236 })
237 .cloned()
239 }
240
241 #[must_use]
243 pub fn state(&self) -> Vec<Arc<K>> {
244 let s = self.store.read();
245 s.values().cloned().collect()
246 }
247
248 #[must_use]
250 pub fn find<P>(&self, predicate: P) -> Option<Arc<K>>
251 where
252 P: Fn(&K) -> bool,
253 {
254 self.store
255 .read()
256 .values()
257 .find(|k| predicate(k.as_ref()))
258 .cloned()
259 }
260
261 #[must_use]
263 pub fn len(&self) -> usize {
264 self.store.read().len()
265 }
266
267 #[must_use]
269 pub fn is_empty(&self) -> bool {
270 self.store.read().is_empty()
271 }
272}
273
274#[must_use]
279pub fn store<K>() -> (Store<K>, Writer<K>)
280where
281 K: Lookup + Clone + 'static,
282 K::DynamicType: Eq + Hash + Clone + Default,
283{
284 let w = Writer::<K>::default();
285 let r = w.as_reader();
286 (r, w)
287}
288
289#[must_use]
298#[cfg(feature = "unstable-runtime-subscribe")]
299pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>)
300where
301 K: Lookup + Clone + 'static,
302 K::DynamicType: Eq + Hash + Clone + Default,
303{
304 let w = Writer::<K>::new_shared(buf_size, Default::default());
305 let r = w.as_reader();
306 (r, w)
307}
308
309#[cfg(test)]
310mod tests {
311 use super::{Writer, store};
312 use crate::{reflector::ObjectRef, watcher};
313 use k8s_openapi::api::core::v1::ConfigMap;
314 use kube_client::api::ObjectMeta;
315
316 #[test]
317 fn should_allow_getting_namespaced_object_by_namespaced_ref() {
318 let cm = ConfigMap {
319 metadata: ObjectMeta {
320 name: Some("obj".to_string()),
321 namespace: Some("ns".to_string()),
322 ..ObjectMeta::default()
323 },
324 ..ConfigMap::default()
325 };
326 let mut store_w = Writer::default();
327 store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
328 let store = store_w.as_reader();
329 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
330 }
331
332 #[test]
333 fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
334 let cm = ConfigMap {
335 metadata: ObjectMeta {
336 name: Some("obj".to_string()),
337 namespace: Some("ns".to_string()),
338 ..ObjectMeta::default()
339 },
340 ..ConfigMap::default()
341 };
342 let mut cluster_cm = cm.clone();
343 cluster_cm.metadata.namespace = None;
344 let mut store_w = Writer::default();
345 store_w.apply_watcher_event(&watcher::Event::Apply(cm));
346 let store = store_w.as_reader();
347 assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
348 }
349
350 #[test]
351 fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
352 let cm = ConfigMap {
353 metadata: ObjectMeta {
354 name: Some("obj".to_string()),
355 namespace: None,
356 ..ObjectMeta::default()
357 },
358 ..ConfigMap::default()
359 };
360 let (store, mut writer) = store();
361 writer.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
362 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
363 }
364
365 #[test]
366 fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
367 let cm = ConfigMap {
368 metadata: ObjectMeta {
369 name: Some("obj".to_string()),
370 namespace: None,
371 ..ObjectMeta::default()
372 },
373 ..ConfigMap::default()
374 };
375 let mut nsed_cm = cm.clone();
376 nsed_cm.metadata.namespace = Some("ns".to_string());
377 let mut store_w = Writer::default();
378 store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
379 let store = store_w.as_reader();
380 assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
381 }
382
383 #[test]
384 fn find_element_in_store() {
385 let cm = ConfigMap {
386 metadata: ObjectMeta {
387 name: Some("obj".to_string()),
388 namespace: None,
389 ..ObjectMeta::default()
390 },
391 ..ConfigMap::default()
392 };
393 let mut target_cm = cm.clone();
394
395 let (reader, mut writer) = store::<ConfigMap>();
396 assert!(reader.is_empty());
397 writer.apply_watcher_event(&watcher::Event::Apply(cm));
398
399 assert_eq!(reader.len(), 1);
400 assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none());
401
402 target_cm.metadata.name = Some("obj1".to_string());
403 target_cm.metadata.generation = Some(1234);
404 writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone()));
405 assert!(!reader.is_empty());
406 assert_eq!(reader.len(), 2);
407 let found = reader.find(|k| k.metadata.generation == Some(1234));
408 assert_eq!(found.as_deref(), Some(&target_cm));
409 }
410}