mod dispatcher;
mod object_ref;
pub mod store;
pub use self::{
dispatcher::ReflectHandle,
object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef},
};
use crate::watcher;
use async_stream::stream;
use futures::{Stream, StreamExt};
use std::hash::Hash;
#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
pub use store::{Store, store};
pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Lookup + Clone,
K::DynamicType: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
let mut stream = Box::pin(stream);
stream! {
while let Some(event) = stream.next().await {
match event {
Ok(ev) => {
writer.apply_watcher_event(&ev);
writer.dispatch_event(&ev).await;
yield Ok(ev);
},
Err(ev) => yield Err(ev)
}
}
}
}
#[cfg(test)]
mod tests {
use super::{ObjectRef, reflector, store};
use crate::watcher;
use futures::{StreamExt, TryStreamExt, stream};
use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
use rand::{
RngExt,
distr::{Bernoulli, Uniform},
};
use std::collections::{BTreeMap, HashMap};
#[tokio::test]
async fn reflector_applied_should_add_object() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))]))
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}
#[tokio::test]
async fn reflector_applied_should_update_object() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let updated_cm = ConfigMap {
data: Some({
let mut data = BTreeMap::new();
data.insert("data".to_string(), "present!".to_string());
data
}),
..cm.clone()
};
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Apply(cm.clone())),
Ok(watcher::Event::Apply(updated_cm.clone())),
]),
)
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&updated_cm));
}
#[tokio::test]
async fn reflector_deleted_should_remove_object() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Apply(cm.clone())),
Ok(watcher::Event::Delete(cm.clone())),
]),
)
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
}
#[tokio::test]
async fn reflector_restarted_should_clear_objects() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm_a = ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let cm_b = ConfigMap {
metadata: ObjectMeta {
name: Some("b".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Apply(cm_a.clone())),
Ok(watcher::Event::Init),
Ok(watcher::Event::InitApply(cm_b.clone())),
Ok(watcher::Event::InitDone),
]),
)
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None);
assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), Some(&cm_b));
}
#[tokio::test]
async fn reflector_store_should_not_contain_duplicates() {
let mut rng = rand::rng();
let item_dist = Uniform::new(0_u8, 100).unwrap();
let deleted_dist = Bernoulli::new(0.40).unwrap();
let store_w = store::Writer::default();
let store = store_w.as_reader();
reflector(
store_w,
stream::iter((0_u32..100_000).map(|num| {
let item = rng.sample(item_dist);
let deleted = rng.sample(deleted_dist);
let obj = ConfigMap {
metadata: ObjectMeta {
name: Some(item.to_string()),
resource_version: Some(num.to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
Ok(if deleted {
watcher::Event::Delete(obj)
} else {
watcher::Event::Apply(obj)
})
})),
)
.map_ok(|_| ())
.try_collect::<()>()
.await
.unwrap();
let mut seen_objects = HashMap::new();
for obj in store.state() {
assert_eq!(seen_objects.get(obj.metadata.name.as_ref().unwrap()), None);
seen_objects.insert(obj.metadata.name.clone().unwrap(), obj);
}
}
}