use super::{Lookup, ObjectRef, dispatcher::Dispatcher};
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::reflector::ReflectHandle;
use crate::{
utils::delayed_init::{self, DelayedInit},
watcher,
};
use ahash::AHashMap;
use educe::Educe;
use parking_lot::RwLock;
use std::{fmt::Debug, hash::Hash, sync::Arc};
use thiserror::Error;
type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
#[derive(Debug)]
pub struct Writer<K: 'static + Lookup + Clone>
where
K::DynamicType: Eq + Hash + Clone,
{
store: Cache<K>,
buffer: AHashMap<ObjectRef<K>, Arc<K>>,
dyntype: K::DynamicType,
ready_tx: Option<delayed_init::Initializer<()>>,
ready_rx: Arc<DelayedInit<()>>,
dispatcher: Option<Dispatcher<K>>,
}
impl<K: 'static + Lookup + Clone> Writer<K>
where
K::DynamicType: Eq + Hash + Clone,
{
pub fn new(dyntype: K::DynamicType) -> Self {
let (ready_tx, ready_rx) = DelayedInit::new();
Writer {
store: Default::default(),
buffer: Default::default(),
dyntype,
ready_tx: Some(ready_tx),
ready_rx: Arc::new(ready_rx),
dispatcher: None,
}
}
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn new_shared(buf_size: usize, dyntype: K::DynamicType) -> Self {
let (ready_tx, ready_rx) = DelayedInit::new();
Writer {
store: Default::default(),
buffer: Default::default(),
dyntype,
ready_tx: Some(ready_tx),
ready_rx: Arc::new(ready_rx),
dispatcher: Some(Dispatcher::new(buf_size)),
}
}
#[must_use]
pub fn as_reader(&self) -> Store<K> {
Store {
store: self.store.clone(),
ready_rx: self.ready_rx.clone(),
}
}
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn subscribe(&self) -> Option<ReflectHandle<K>> {
self.dispatcher
.as_ref()
.map(|dispatcher| dispatcher.subscribe(self.as_reader()))
}
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
match event {
watcher::Event::Apply(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.store.write().insert(key, obj);
}
watcher::Event::Delete(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
self.store.write().remove(&key);
}
watcher::Event::Init => {
self.buffer = AHashMap::new();
}
watcher::Event::InitApply(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.buffer.insert(key, obj);
}
watcher::Event::InitDone => {
let mut store = self.store.write();
std::mem::swap(&mut *store, &mut self.buffer);
self.buffer = AHashMap::new();
if let Some(ready_tx) = self.ready_tx.take() {
ready_tx.init(())
}
}
}
}
pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) {
if let Some(ref mut dispatcher) = self.dispatcher {
match event {
watcher::Event::Apply(obj) => {
let obj_ref = obj.to_object_ref(self.dyntype.clone());
dispatcher.broadcast(obj_ref).await;
}
watcher::Event::InitDone => {
let obj_refs: Vec<_> = {
let store = self.store.read();
store.keys().cloned().collect()
};
for obj_ref in obj_refs {
dispatcher.broadcast(obj_ref).await;
}
}
_ => {}
}
}
}
}
impl<K> Default for Writer<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Default + Eq + Hash + Clone,
{
fn default() -> Self {
Self::new(K::DynamicType::default())
}
}
#[derive(Educe)]
#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
pub struct Store<K: 'static + Lookup>
where
K::DynamicType: Hash + Eq,
{
store: Cache<K>,
ready_rx: Arc<DelayedInit<()>>,
}
#[derive(Debug, Error)]
#[error("writer was dropped before store became ready")]
pub struct WriterDropped(delayed_init::InitDropped);
impl<K: 'static + Clone + Lookup> Store<K>
where
K::DynamicType: Eq + Hash + Clone,
{
pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> {
self.ready_rx.get().await.map_err(WriterDropped)
}
#[must_use]
pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
let store = self.store.read();
store
.get(key)
.or_else(|| {
store.get(&{
let mut cluster_key = key.clone();
cluster_key.namespace = None;
cluster_key
})
})
.cloned()
}
#[must_use]
pub fn state(&self) -> Vec<Arc<K>> {
let s = self.store.read();
s.values().cloned().collect()
}
#[must_use]
pub fn find<P>(&self, predicate: P) -> Option<Arc<K>>
where
P: Fn(&K) -> bool,
{
self.store
.read()
.values()
.find(|k| predicate(k.as_ref()))
.cloned()
}
#[must_use]
pub fn len(&self) -> usize {
self.store.read().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.store.read().is_empty()
}
}
#[must_use]
pub fn store<K>() -> (Store<K>, Writer<K>)
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
{
let w = Writer::<K>::default();
let r = w.as_reader();
(r, w)
}
#[must_use]
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>)
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
{
let w = Writer::<K>::new_shared(buf_size, Default::default());
let r = w.as_reader();
(r, w)
}
#[cfg(test)]
mod tests {
use super::{Writer, store};
use crate::{reflector::ObjectRef, watcher};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::api::ObjectMeta;
#[test]
fn should_allow_getting_namespaced_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: Some("ns".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}
#[test]
fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: Some("ns".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut cluster_cm = cm.clone();
cluster_cm.metadata.namespace = None;
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Apply(cm));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
}
#[test]
fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let (store, mut writer) = store();
writer.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}
#[test]
fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut nsed_cm = cm.clone();
nsed_cm.metadata.namespace = Some("ns".to_string());
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
}
#[test]
fn find_element_in_store() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut target_cm = cm.clone();
let (reader, mut writer) = store::<ConfigMap>();
assert!(reader.is_empty());
writer.apply_watcher_event(&watcher::Event::Apply(cm));
assert_eq!(reader.len(), 1);
assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none());
target_cm.metadata.name = Some("obj1".to_string());
target_cm.metadata.generation = Some(1234);
writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone()));
assert!(!reader.is_empty());
assert_eq!(reader.len(), 2);
let found = reader.find(|k| k.metadata.generation == Some(1234));
assert_eq!(found.as_deref(), Some(&target_cm));
}
}