use crate::watcher::Error;
use core::{
pin::Pin,
task::{Context, Poll, ready},
};
use futures::Stream;
use kube_client::{Resource, api::ObjectMeta};
use pin_project::pin_project;
use std::{
collections::{HashMap, hash_map::DefaultHasher},
hash::{Hash, Hasher},
marker::PhantomData,
time::{Duration, Instant},
};
fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
let mut hasher = DefaultHasher::new();
t.hash(&mut hasher);
hasher.finish()
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct PredicateCacheKey {
name: String,
namespace: Option<String>,
uid: Option<String>,
}
impl From<&ObjectMeta> for PredicateCacheKey {
fn from(meta: &ObjectMeta) -> Self {
Self {
name: meta.name.clone().unwrap_or_default(),
namespace: meta.namespace.clone(),
uid: meta.uid.clone(),
}
}
}
pub trait Predicate<K> {
fn hash_property(&self, obj: &K) -> Option<u64>;
fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
where
Self: Sized,
{
Fallback(self, f)
}
fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
where
Self: Sized,
{
Combine(self, f)
}
}
impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
fn hash_property(&self, obj: &K) -> Option<u64> {
(self)(obj)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Fallback<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Predicate<K> for Fallback<A, B>
where
A: Predicate<K>,
B: Predicate<K>,
{
fn hash_property(&self, obj: &K) -> Option<u64> {
self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Combine<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Predicate<K> for Combine<A, B>
where
A: Predicate<K>,
B: Predicate<K>,
{
fn hash_property(&self, obj: &K) -> Option<u64> {
match (self.0.hash_property(obj), self.1.hash_property(obj)) {
(None, None) => None,
(a, b) => Some(hash(&(a, b))),
}
}
}
#[derive(Debug, Clone)]
pub struct Config {
ttl: Duration,
}
impl Config {
#[must_use]
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
}
impl Default for Config {
fn default() -> Self {
Self {
ttl: Duration::from_secs(3600),
}
}
}
#[derive(Debug, Clone)]
struct CacheEntry {
hash: u64,
last_seen: Instant,
}
#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
#[pin]
stream: St,
predicate: P,
cache: HashMap<PredicateCacheKey, CacheEntry>,
config: Config,
_phantom: PhantomData<K>,
}
impl<St, K, P> PredicateFilter<St, K, P>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
P: Predicate<K>,
{
pub(super) fn new(stream: St, predicate: P, config: Config) -> Self {
Self {
stream,
predicate,
cache: HashMap::new(),
config,
_phantom: PhantomData,
}
}
}
impl<St, K, P> Stream for PredicateFilter<St, K, P>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
K::DynamicType: Default + Eq + Hash,
P: Predicate<K>,
{
type Item = Result<K, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
let now = Instant::now();
let ttl = me.config.ttl;
me.cache
.retain(|_, entry| now.duration_since(entry.last_seen) < ttl);
Poll::Ready(loop {
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = me.predicate.hash_property(&obj) {
let key = PredicateCacheKey::from(obj.meta());
let now = Instant::now();
let changed = me.cache.get(&key).map(|entry| entry.hash) != Some(val);
me.cache.insert(key, CacheEntry {
hash: val,
last_seen: now,
});
if changed {
Some(Ok(obj))
} else {
continue;
}
} else {
Some(Ok(obj))
}
}
Some(Err(err)) => Some(Err(err)),
None => return Poll::Ready(None),
};
})
}
}
pub mod predicates {
use super::hash;
use kube_client::{Resource, ResourceExt};
pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
obj.meta().generation.map(|g| hash(&g))
}
pub fn resource_version<K: Resource>(obj: &K) -> Option<u64> {
obj.meta().resource_version.as_ref().map(hash)
}
pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.labels()))
}
pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.annotations()))
}
pub fn finalizers<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.finalizers()))
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::{pin::pin, task::Poll};
use super::{Config, Error, PredicateFilter, predicates};
use futures::{FutureExt, StreamExt, poll, stream};
use kube_client::Resource;
use serde_json::json;
#[tokio::test]
async fn predicate_filtering_hides_equal_predicate_values() {
use k8s_openapi::api::core::v1::Pod;
let mkobj = |g: i32| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"generation": Some(g),
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};
let data = stream::iter([
Ok(mkobj(1)),
Err(Error::NoResourceVersion),
Ok(mkobj(1)),
Ok(mkobj(2)),
]);
let mut rx = pin!(PredicateFilter::new(
data,
predicates::generation,
Config::default()
));
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));
assert!(matches!(
poll!(rx.next()),
Poll::Ready(Some(Err(Error::NoResourceVersion)))
));
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(2));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
#[tokio::test]
async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
use k8s_openapi::api::core::v1::Pod;
let mkobj = |g: i32, uid: &str| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"namespace": "default",
"generation": Some(g),
"uid": uid,
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};
let data = stream::iter([
Ok(mkobj(1, "uid-1")),
Ok(mkobj(1, "uid-1")),
Ok(mkobj(1, "uid-2")),
Ok(mkobj(2, "uid-3")),
]);
let mut rx = pin!(PredicateFilter::new(
data,
predicates::generation,
Config::default()
));
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));
assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(1));
assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));
let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(third.meta().generation, Some(2));
assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
#[tokio::test]
async fn predicate_cache_ttl_evicts_expired_entries() {
use futures::{SinkExt, channel::mpsc};
use k8s_openapi::api::core::v1::Pod;
use std::time::Duration;
let mkobj = |g: i32, uid: &str| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"namespace": "default",
"generation": Some(g),
"uid": uid,
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};
let config = Config::default().ttl(Duration::from_millis(50));
let (mut tx, rx) = mpsc::unbounded();
let mut filtered = pin!(PredicateFilter::new(
rx.map(Ok::<_, Error>),
predicates::generation,
config
));
tx.send(mkobj(1, "uid-1")).await.unwrap();
let first = filtered.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));
tx.send(mkobj(1, "uid-1")).await.unwrap();
assert!(matches!(poll!(filtered.next()), Poll::Pending));
tokio::time::sleep(Duration::from_millis(100)).await;
tx.send(mkobj(1, "uid-1")).await.unwrap();
let second = filtered.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(1));
}
}