kube_runtime/utils/
predicate.rs

1use crate::watcher::Error;
2use core::{
3    pin::Pin,
4    task::{Context, Poll, ready},
5};
6use futures::Stream;
7use kube_client::{Resource, api::ObjectMeta};
8use pin_project::pin_project;
9use std::{
10    collections::{HashMap, hash_map::DefaultHasher},
11    hash::{Hash, Hasher},
12    marker::PhantomData,
13    time::{Duration, Instant},
14};
15
16fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
17    let mut hasher = DefaultHasher::new();
18    t.hash(&mut hasher);
19    hasher.finish()
20}
21
22/// Private cache key that includes UID in equality for predicate filtering
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24struct PredicateCacheKey {
25    name: String,
26    namespace: Option<String>,
27    uid: Option<String>,
28}
29
30impl From<&ObjectMeta> for PredicateCacheKey {
31    fn from(meta: &ObjectMeta) -> Self {
32        Self {
33            name: meta.name.clone().unwrap_or_default(),
34            namespace: meta.namespace.clone(),
35            uid: meta.uid.clone(),
36        }
37    }
38}
39
40/// A predicate is a hasher of Kubernetes objects stream filtering
41pub trait Predicate<K> {
42    /// A predicate only needs to implement optional hashing when keys exist
43    fn hash_property(&self, obj: &K) -> Option<u64>;
44
45    /// Returns a `Predicate` that falls back to an alternate property if the first does not exist
46    ///
47    /// # Usage
48    ///
49    /// ```
50    /// # use k8s_openapi::api::core::v1::Pod;
51    /// use kube::runtime::{predicates, Predicate};
52    /// # fn blah<K>(a: impl Predicate<K>) {}
53    /// let pred = predicates::generation.fallback(predicates::resource_version);
54    /// blah::<Pod>(pred);
55    /// ```
56    fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
57    where
58        Self: Sized,
59    {
60        Fallback(self, f)
61    }
62
63    /// Returns a `Predicate` that combines all available hashes
64    ///
65    /// # Usage
66    ///
67    /// ```
68    /// # use k8s_openapi::api::core::v1::Pod;
69    /// use kube::runtime::{predicates, Predicate};
70    /// # fn blah<K>(a: impl Predicate<K>) {}
71    /// let pred = predicates::labels.combine(predicates::annotations);
72    /// blah::<Pod>(pred);
73    /// ```
74    fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
75    where
76        Self: Sized,
77    {
78        Combine(self, f)
79    }
80}
81
82impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
83    fn hash_property(&self, obj: &K) -> Option<u64> {
84        (self)(obj)
85    }
86}
87
88/// See [`Predicate::fallback`]
89#[derive(Copy, Clone, Debug, PartialEq, Eq)]
90pub struct Fallback<A, B>(pub(super) A, pub(super) B);
91impl<A, B, K> Predicate<K> for Fallback<A, B>
92where
93    A: Predicate<K>,
94    B: Predicate<K>,
95{
96    fn hash_property(&self, obj: &K) -> Option<u64> {
97        self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
98    }
99}
100/// See [`Predicate::combine`]
101#[derive(Copy, Clone, Debug, PartialEq, Eq)]
102pub struct Combine<A, B>(pub(super) A, pub(super) B);
103impl<A, B, K> Predicate<K> for Combine<A, B>
104where
105    A: Predicate<K>,
106    B: Predicate<K>,
107{
108    fn hash_property(&self, obj: &K) -> Option<u64> {
109        match (self.0.hash_property(obj), self.1.hash_property(obj)) {
110            // pass on both missing properties so people can chain .fallback
111            (None, None) => None,
112            // but any other combination of properties are hashed together
113            (a, b) => Some(hash(&(a, b))),
114        }
115    }
116}
117
118/// Configuration for predicate filtering with cache TTL
119#[derive(Debug, Clone)]
120pub struct Config {
121    /// Time-to-live for cache entries
122    ///
123    /// Entries not seen for at least this long will be evicted from the cache.
124    /// Default is 1 hour.
125    ttl: Duration,
126}
127
128impl Config {
129    /// Set the time-to-live for cache entries
130    ///
131    /// Entries not seen for at least this long will be evicted from the cache.
132    #[must_use]
133    pub fn ttl(mut self, ttl: Duration) -> Self {
134        self.ttl = ttl;
135        self
136    }
137}
138
139impl Default for Config {
140    fn default() -> Self {
141        Self {
142            // Default to 1 hour TTL - long enough to avoid unnecessary reconciles
143            // but short enough to prevent unbounded memory growth
144            ttl: Duration::from_secs(3600),
145        }
146    }
147}
148
149/// Cache entry storing predicate hash and last access time
150#[derive(Debug, Clone)]
151struct CacheEntry {
152    hash: u64,
153    last_seen: Instant,
154}
155
156#[allow(clippy::pedantic)]
157#[pin_project]
158/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
159#[must_use = "streams do nothing unless polled"]
160pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
161    #[pin]
162    stream: St,
163    predicate: P,
164    cache: HashMap<PredicateCacheKey, CacheEntry>,
165    config: Config,
166    // K: Resource necessary to get .meta() of an object during polling
167    _phantom: PhantomData<K>,
168}
169impl<St, K, P> PredicateFilter<St, K, P>
170where
171    St: Stream<Item = Result<K, Error>>,
172    K: Resource,
173    P: Predicate<K>,
174{
175    pub(super) fn new(stream: St, predicate: P, config: Config) -> Self {
176        Self {
177            stream,
178            predicate,
179            cache: HashMap::new(),
180            config,
181            _phantom: PhantomData,
182        }
183    }
184}
185impl<St, K, P> Stream for PredicateFilter<St, K, P>
186where
187    St: Stream<Item = Result<K, Error>>,
188    K: Resource,
189    K::DynamicType: Default + Eq + Hash,
190    P: Predicate<K>,
191{
192    type Item = Result<K, Error>;
193
194    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195        let mut me = self.project();
196
197        // Evict expired entries before processing new events
198        let now = Instant::now();
199        let ttl = me.config.ttl;
200        me.cache
201            .retain(|_, entry| now.duration_since(entry.last_seen) < ttl);
202
203        Poll::Ready(loop {
204            break match ready!(me.stream.as_mut().poll_next(cx)) {
205                Some(Ok(obj)) => {
206                    if let Some(val) = me.predicate.hash_property(&obj) {
207                        let key = PredicateCacheKey::from(obj.meta());
208                        let now = Instant::now();
209
210                        // Check if the predicate value changed or entry doesn't exist
211                        let changed = me.cache.get(&key).map(|entry| entry.hash) != Some(val);
212
213                        // Upsert the cache entry with new hash and timestamp
214                        me.cache.insert(key, CacheEntry {
215                            hash: val,
216                            last_seen: now,
217                        });
218
219                        if changed {
220                            Some(Ok(obj))
221                        } else {
222                            continue;
223                        }
224                    } else {
225                        // if we can't evaluate predicate, always emit K
226                        Some(Ok(obj))
227                    }
228                }
229                Some(Err(err)) => Some(Err(err)),
230                None => return Poll::Ready(None),
231            };
232        })
233    }
234}
235
236/// Predicate functions for [`WatchStreamExt::predicate_filter`](crate::WatchStreamExt::predicate_filter)
237///
238/// These functions just return a hash of commonly compared values,
239/// to help decide whether to pass a watch event along or not.
240///
241/// Functional rewrite of the [controller-runtime/predicate module](https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/predicate/predicate.go).
242pub mod predicates {
243    use super::hash;
244    use kube_client::{Resource, ResourceExt};
245
246    /// Hash the generation of a Resource K
247    pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
248        obj.meta().generation.map(|g| hash(&g))
249    }
250
251    /// Hash the resource version of a Resource K
252    pub fn resource_version<K: Resource>(obj: &K) -> Option<u64> {
253        obj.meta().resource_version.as_ref().map(hash)
254    }
255
256    /// Hash the labels of a Resource K
257    pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
258        Some(hash(obj.labels()))
259    }
260
261    /// Hash the annotations of a Resource K
262    pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
263        Some(hash(obj.annotations()))
264    }
265
266    /// Hash the finalizers of a Resource K
267    pub fn finalizers<K: Resource>(obj: &K) -> Option<u64> {
268        Some(hash(obj.finalizers()))
269    }
270}
271
272#[cfg(test)]
273pub(crate) mod tests {
274    use std::{pin::pin, task::Poll};
275
276    use super::{Config, Error, PredicateFilter, predicates};
277    use futures::{FutureExt, StreamExt, poll, stream};
278    use kube_client::Resource;
279    use serde_json::json;
280
281    #[tokio::test]
282    async fn predicate_filtering_hides_equal_predicate_values() {
283        use k8s_openapi::api::core::v1::Pod;
284        let mkobj = |g: i32| {
285            let p: Pod = serde_json::from_value(json!({
286                "apiVersion": "v1",
287                "kind": "Pod",
288                "metadata": {
289                    "name": "blog",
290                    "generation": Some(g),
291                },
292                "spec": {
293                    "containers": [{
294                      "name": "blog",
295                      "image": "clux/blog:0.1.0"
296                    }],
297                }
298            }))
299            .unwrap();
300            p
301        };
302        let data = stream::iter([
303            Ok(mkobj(1)),
304            Err(Error::NoResourceVersion),
305            Ok(mkobj(1)),
306            Ok(mkobj(2)),
307        ]);
308        let mut rx = pin!(PredicateFilter::new(
309            data,
310            predicates::generation,
311            Config::default()
312        ));
313
314        // mkobj(1) passed through
315        let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
316        assert_eq!(first.meta().generation, Some(1));
317
318        // Error passed through
319        assert!(matches!(
320            poll!(rx.next()),
321            Poll::Ready(Some(Err(Error::NoResourceVersion)))
322        ));
323        // (no repeat mkobj(1) - same generation)
324        // mkobj(2) next
325        let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
326        assert_eq!(second.meta().generation, Some(2));
327        assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
328    }
329
330    #[tokio::test]
331    async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
332        use k8s_openapi::api::core::v1::Pod;
333
334        let mkobj = |g: i32, uid: &str| {
335            let p: Pod = serde_json::from_value(json!({
336                "apiVersion": "v1",
337                "kind": "Pod",
338                "metadata": {
339                    "name": "blog",
340                    "namespace": "default",
341                    "generation": Some(g),
342                    "uid": uid,
343                },
344                "spec": {
345                    "containers": [{
346                      "name": "blog",
347                      "image": "clux/blog:0.1.0"
348                    }],
349                }
350            }))
351            .unwrap();
352            p
353        };
354
355        // Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete ->
356        // create (gen=1, uid=2) -> delete -> create (gen=2, uid=3)
357        let data = stream::iter([
358            Ok(mkobj(1, "uid-1")),
359            Ok(mkobj(1, "uid-1")),
360            Ok(mkobj(1, "uid-2")),
361            Ok(mkobj(2, "uid-3")),
362        ]);
363        let mut rx = pin!(PredicateFilter::new(
364            data,
365            predicates::generation,
366            Config::default()
367        ));
368
369        // mkobj(1, uid-1) passed through
370        let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
371        assert_eq!(first.meta().generation, Some(1));
372        assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));
373
374        // (no repeat mkobj(1, uid-1) - same UID and generation)
375        // mkobj(1, uid-2) next - different UID detected
376        let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
377        assert_eq!(second.meta().generation, Some(1));
378        assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));
379
380        // mkobj(2, uid-3) next
381        let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
382        assert_eq!(third.meta().generation, Some(2));
383        assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));
384
385        assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
386    }
387
388    #[tokio::test]
389    async fn predicate_cache_ttl_evicts_expired_entries() {
390        use futures::{SinkExt, channel::mpsc};
391        use k8s_openapi::api::core::v1::Pod;
392        use std::time::Duration;
393
394        let mkobj = |g: i32, uid: &str| {
395            let p: Pod = serde_json::from_value(json!({
396                "apiVersion": "v1",
397                "kind": "Pod",
398                "metadata": {
399                    "name": "blog",
400                    "namespace": "default",
401                    "generation": Some(g),
402                    "uid": uid,
403                },
404                "spec": {
405                    "containers": [{
406                      "name": "blog",
407                      "image": "clux/blog:0.1.0"
408                    }],
409                }
410            }))
411            .unwrap();
412            p
413        };
414
415        // Use a very short TTL for testing
416        let config = Config::default().ttl(Duration::from_millis(50));
417
418        // Use a channel so we can send items with delays
419        let (mut tx, rx) = mpsc::unbounded();
420        let mut filtered = pin!(PredicateFilter::new(
421            rx.map(Ok::<_, Error>),
422            predicates::generation,
423            config
424        ));
425
426        // Send first object
427        tx.send(mkobj(1, "uid-1")).await.unwrap();
428        let first = filtered.next().now_or_never().unwrap().unwrap().unwrap();
429        assert_eq!(first.meta().generation, Some(1));
430
431        // Send same object immediately - should be filtered
432        tx.send(mkobj(1, "uid-1")).await.unwrap();
433        assert!(matches!(poll!(filtered.next()), Poll::Pending));
434
435        // Wait for TTL to expire
436        tokio::time::sleep(Duration::from_millis(100)).await;
437
438        // Send same object after TTL - should pass through due to eviction
439        tx.send(mkobj(1, "uid-1")).await.unwrap();
440        let second = filtered.next().now_or_never().unwrap().unwrap().unwrap();
441        assert_eq!(second.meta().generation, Some(1));
442    }
443}