use super::ObjectRef;
use crate::watcher;
use dashmap::DashMap;
use derivative::Derivative;
use k8s_openapi::Resource;
use kube::api::Meta;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
#[derive(Debug, Derivative)]
#[derivative(Default(bound = ""))]
pub struct Writer<K: 'static + Resource> {
store: Arc<DashMap<ObjectRef<K>, K>>,
}
impl<K: 'static + Meta + Clone> Writer<K> {
#[must_use]
pub fn as_reader(&self) -> Store<K> {
Store {
store: self.store.clone(),
}
}
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
match event {
watcher::Event::Applied(obj) => {
self.store.insert(ObjectRef::from_obj(&obj), obj.clone());
}
watcher::Event::Deleted(obj) => {
self.store.remove(&ObjectRef::from_obj(&obj));
}
watcher::Event::Restarted(new_objs) => {
let new_objs = new_objs
.iter()
.map(|obj| (ObjectRef::from_obj(obj), obj))
.collect::<HashMap<_, _>>();
self.store.retain(|key, _old_value| new_objs.contains_key(key));
for (key, obj) in new_objs {
self.store.insert(key, obj.clone());
}
}
}
}
}
#[derive(Debug, Derivative)]
#[derivative(Clone)]
pub struct Store<K: 'static + Resource> {
store: Arc<DashMap<ObjectRef<K>, K>>,
}
impl<K: 'static + Clone + Resource> Store<K> {
#[must_use]
pub fn get(&self, key: &ObjectRef<K>) -> Option<K> {
self.store
.get(key)
.or_else(|| {
self.store.get(&{
let mut cluster_key = key.clone();
cluster_key.namespace = None;
cluster_key
})
})
.map(|entry| entry.value().clone())
}
#[must_use]
pub fn state(&self) -> Vec<K> {
self.store.iter().map(|eg| eg.value().clone()).collect()
}
}
#[cfg(test)]
mod tests {
use super::Writer;
use crate::{reflector::ObjectRef, watcher};
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;
#[test]
fn should_allow_getting_namespaced_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: Some("ns".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cm)), Some(cm));
}
#[test]
fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: Some("ns".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut cluster_cm = cm.clone();
cluster_cm.metadata.namespace = None;
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
}
#[test]
fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cm)), Some(cm));
}
#[test]
fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut nsed_cm = cm.clone();
nsed_cm.metadata.namespace = Some("ns".to_string());
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)), Some(cm));
}
}