use crate::{reflector::ObjectRef, watcher::Error};
use core::{
pin::Pin,
task::{ready, Context, Poll},
};
use futures::Stream;
use kube_client::Resource;
use pin_project::pin_project;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
};
fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
let mut hasher = DefaultHasher::new();
t.hash(&mut hasher);
hasher.finish()
}
pub trait Predicate<K> {
fn hash_property(&self, obj: &K) -> Option<u64>;
fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
where
Self: Sized,
{
Fallback(self, f)
}
fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
where
Self: Sized,
{
Combine(self, f)
}
}
impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
fn hash_property(&self, obj: &K) -> Option<u64> {
(self)(obj)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Fallback<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Predicate<K> for Fallback<A, B>
where
A: Predicate<K>,
B: Predicate<K>,
{
fn hash_property(&self, obj: &K) -> Option<u64> {
self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Combine<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Predicate<K> for Combine<A, B>
where
A: Predicate<K>,
B: Predicate<K>,
{
fn hash_property(&self, obj: &K) -> Option<u64> {
match (self.0.hash_property(obj), self.1.hash_property(obj)) {
(None, None) => None,
(a, b) => Some(hash(&(a, b))),
}
}
}
#[allow(clippy::pedantic)]
#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
#[pin]
stream: St,
predicate: P,
cache: HashMap<ObjectRef<K>, u64>,
}
impl<St, K, P> PredicateFilter<St, K, P>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
P: Predicate<K>,
{
pub(super) fn new(stream: St, predicate: P) -> Self {
Self {
stream,
predicate,
cache: HashMap::new(),
}
}
}
impl<St, K, P> Stream for PredicateFilter<St, K, P>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
K::DynamicType: Default + Eq + Hash,
P: Predicate<K>,
{
type Item = Result<K, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
Poll::Ready(loop {
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = me.predicate.hash_property(&obj) {
let key = ObjectRef::from_obj(&obj);
let changed = if let Some(old) = me.cache.get(&key) {
*old != val
} else {
true
};
if let Some(old) = me.cache.get_mut(&key) {
*old = val;
} else {
me.cache.insert(key, val);
}
if changed {
Some(Ok(obj))
} else {
continue;
}
} else {
Some(Ok(obj))
}
}
Some(Err(err)) => Some(Err(err)),
None => return Poll::Ready(None),
};
})
}
}
pub mod predicates {
use super::hash;
use kube_client::{Resource, ResourceExt};
pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
obj.meta().generation.map(|g| hash(&g))
}
pub fn resource_version<K: Resource>(obj: &K) -> Option<u64> {
obj.meta().resource_version.as_ref().map(hash)
}
pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.labels()))
}
pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.annotations()))
}
pub fn finalizers<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.finalizers()))
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::{pin::pin, task::Poll};
use super::{predicates, Error, PredicateFilter};
use futures::{poll, stream, FutureExt, StreamExt};
use kube_client::Resource;
use serde_json::json;
#[tokio::test]
async fn predicate_filtering_hides_equal_predicate_values() {
use k8s_openapi::api::core::v1::Pod;
let mkobj = |g: i32| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"generation": Some(g),
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};
let data = stream::iter([
Ok(mkobj(1)),
Err(Error::NoResourceVersion),
Ok(mkobj(1)),
Ok(mkobj(2)),
]);
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));
assert!(matches!(
poll!(rx.next()),
Poll::Ready(Some(Err(Error::NoResourceVersion)))
));
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(2));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
}