1use crate::watcher::Error;
2use core::{
3 pin::Pin,
4 task::{Context, Poll, ready},
5};
6use futures::Stream;
7use kube_client::{Resource, api::ObjectMeta};
8use pin_project::pin_project;
9use std::{
10 collections::{HashMap, hash_map::DefaultHasher},
11 hash::{Hash, Hasher},
12 marker::PhantomData,
13 time::{Duration, Instant},
14};
15
16fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
17 let mut hasher = DefaultHasher::new();
18 t.hash(&mut hasher);
19 hasher.finish()
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24struct PredicateCacheKey {
25 name: String,
26 namespace: Option<String>,
27 uid: Option<String>,
28}
29
30impl From<&ObjectMeta> for PredicateCacheKey {
31 fn from(meta: &ObjectMeta) -> Self {
32 Self {
33 name: meta.name.clone().unwrap_or_default(),
34 namespace: meta.namespace.clone(),
35 uid: meta.uid.clone(),
36 }
37 }
38}
39
40pub trait Predicate<K> {
42 fn hash_property(&self, obj: &K) -> Option<u64>;
44
45 fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
57 where
58 Self: Sized,
59 {
60 Fallback(self, f)
61 }
62
63 fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
75 where
76 Self: Sized,
77 {
78 Combine(self, f)
79 }
80}
81
82impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
83 fn hash_property(&self, obj: &K) -> Option<u64> {
84 (self)(obj)
85 }
86}
87
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
90pub struct Fallback<A, B>(pub(super) A, pub(super) B);
91impl<A, B, K> Predicate<K> for Fallback<A, B>
92where
93 A: Predicate<K>,
94 B: Predicate<K>,
95{
96 fn hash_property(&self, obj: &K) -> Option<u64> {
97 self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
98 }
99}
100#[derive(Copy, Clone, Debug, PartialEq, Eq)]
102pub struct Combine<A, B>(pub(super) A, pub(super) B);
103impl<A, B, K> Predicate<K> for Combine<A, B>
104where
105 A: Predicate<K>,
106 B: Predicate<K>,
107{
108 fn hash_property(&self, obj: &K) -> Option<u64> {
109 match (self.0.hash_property(obj), self.1.hash_property(obj)) {
110 (None, None) => None,
112 (a, b) => Some(hash(&(a, b))),
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct Config {
121 ttl: Duration,
126}
127
128impl Config {
129 #[must_use]
133 pub fn ttl(mut self, ttl: Duration) -> Self {
134 self.ttl = ttl;
135 self
136 }
137}
138
139impl Default for Config {
140 fn default() -> Self {
141 Self {
142 ttl: Duration::from_secs(3600),
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
151struct CacheEntry {
152 hash: u64,
153 last_seen: Instant,
154}
155
156#[allow(clippy::pedantic)]
157#[pin_project]
158#[must_use = "streams do nothing unless polled"]
160pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
161 #[pin]
162 stream: St,
163 predicate: P,
164 cache: HashMap<PredicateCacheKey, CacheEntry>,
165 config: Config,
166 _phantom: PhantomData<K>,
168}
169impl<St, K, P> PredicateFilter<St, K, P>
170where
171 St: Stream<Item = Result<K, Error>>,
172 K: Resource,
173 P: Predicate<K>,
174{
175 pub(super) fn new(stream: St, predicate: P, config: Config) -> Self {
176 Self {
177 stream,
178 predicate,
179 cache: HashMap::new(),
180 config,
181 _phantom: PhantomData,
182 }
183 }
184}
185impl<St, K, P> Stream for PredicateFilter<St, K, P>
186where
187 St: Stream<Item = Result<K, Error>>,
188 K: Resource,
189 K::DynamicType: Default + Eq + Hash,
190 P: Predicate<K>,
191{
192 type Item = Result<K, Error>;
193
194 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195 let mut me = self.project();
196
197 let now = Instant::now();
199 let ttl = me.config.ttl;
200 me.cache
201 .retain(|_, entry| now.duration_since(entry.last_seen) < ttl);
202
203 Poll::Ready(loop {
204 break match ready!(me.stream.as_mut().poll_next(cx)) {
205 Some(Ok(obj)) => {
206 if let Some(val) = me.predicate.hash_property(&obj) {
207 let key = PredicateCacheKey::from(obj.meta());
208 let now = Instant::now();
209
210 let changed = me.cache.get(&key).map(|entry| entry.hash) != Some(val);
212
213 me.cache.insert(key, CacheEntry {
215 hash: val,
216 last_seen: now,
217 });
218
219 if changed {
220 Some(Ok(obj))
221 } else {
222 continue;
223 }
224 } else {
225 Some(Ok(obj))
227 }
228 }
229 Some(Err(err)) => Some(Err(err)),
230 None => return Poll::Ready(None),
231 };
232 })
233 }
234}
235
236pub mod predicates {
243 use super::hash;
244 use kube_client::{Resource, ResourceExt};
245
246 pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
248 obj.meta().generation.map(|g| hash(&g))
249 }
250
251 pub fn resource_version<K: Resource>(obj: &K) -> Option<u64> {
253 obj.meta().resource_version.as_ref().map(hash)
254 }
255
256 pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
258 Some(hash(obj.labels()))
259 }
260
261 pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
263 Some(hash(obj.annotations()))
264 }
265
266 pub fn finalizers<K: Resource>(obj: &K) -> Option<u64> {
268 Some(hash(obj.finalizers()))
269 }
270}
271
272#[cfg(test)]
273pub(crate) mod tests {
274 use std::{pin::pin, task::Poll};
275
276 use super::{Config, Error, PredicateFilter, predicates};
277 use futures::{FutureExt, StreamExt, poll, stream};
278 use kube_client::Resource;
279 use serde_json::json;
280
281 #[tokio::test]
282 async fn predicate_filtering_hides_equal_predicate_values() {
283 use k8s_openapi::api::core::v1::Pod;
284 let mkobj = |g: i32| {
285 let p: Pod = serde_json::from_value(json!({
286 "apiVersion": "v1",
287 "kind": "Pod",
288 "metadata": {
289 "name": "blog",
290 "generation": Some(g),
291 },
292 "spec": {
293 "containers": [{
294 "name": "blog",
295 "image": "clux/blog:0.1.0"
296 }],
297 }
298 }))
299 .unwrap();
300 p
301 };
302 let data = stream::iter([
303 Ok(mkobj(1)),
304 Err(Error::NoResourceVersion),
305 Ok(mkobj(1)),
306 Ok(mkobj(2)),
307 ]);
308 let mut rx = pin!(PredicateFilter::new(
309 data,
310 predicates::generation,
311 Config::default()
312 ));
313
314 let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
316 assert_eq!(first.meta().generation, Some(1));
317
318 assert!(matches!(
320 poll!(rx.next()),
321 Poll::Ready(Some(Err(Error::NoResourceVersion)))
322 ));
323 let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
326 assert_eq!(second.meta().generation, Some(2));
327 assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
328 }
329
330 #[tokio::test]
331 async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
332 use k8s_openapi::api::core::v1::Pod;
333
334 let mkobj = |g: i32, uid: &str| {
335 let p: Pod = serde_json::from_value(json!({
336 "apiVersion": "v1",
337 "kind": "Pod",
338 "metadata": {
339 "name": "blog",
340 "namespace": "default",
341 "generation": Some(g),
342 "uid": uid,
343 },
344 "spec": {
345 "containers": [{
346 "name": "blog",
347 "image": "clux/blog:0.1.0"
348 }],
349 }
350 }))
351 .unwrap();
352 p
353 };
354
355 let data = stream::iter([
358 Ok(mkobj(1, "uid-1")),
359 Ok(mkobj(1, "uid-1")),
360 Ok(mkobj(1, "uid-2")),
361 Ok(mkobj(2, "uid-3")),
362 ]);
363 let mut rx = pin!(PredicateFilter::new(
364 data,
365 predicates::generation,
366 Config::default()
367 ));
368
369 let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
371 assert_eq!(first.meta().generation, Some(1));
372 assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));
373
374 let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
377 assert_eq!(second.meta().generation, Some(1));
378 assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));
379
380 let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
382 assert_eq!(third.meta().generation, Some(2));
383 assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));
384
385 assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
386 }
387
388 #[tokio::test]
389 async fn predicate_cache_ttl_evicts_expired_entries() {
390 use futures::{SinkExt, channel::mpsc};
391 use k8s_openapi::api::core::v1::Pod;
392 use std::time::Duration;
393
394 let mkobj = |g: i32, uid: &str| {
395 let p: Pod = serde_json::from_value(json!({
396 "apiVersion": "v1",
397 "kind": "Pod",
398 "metadata": {
399 "name": "blog",
400 "namespace": "default",
401 "generation": Some(g),
402 "uid": uid,
403 },
404 "spec": {
405 "containers": [{
406 "name": "blog",
407 "image": "clux/blog:0.1.0"
408 }],
409 }
410 }))
411 .unwrap();
412 p
413 };
414
415 let config = Config::default().ttl(Duration::from_millis(50));
417
418 let (mut tx, rx) = mpsc::unbounded();
420 let mut filtered = pin!(PredicateFilter::new(
421 rx.map(Ok::<_, Error>),
422 predicates::generation,
423 config
424 ));
425
426 tx.send(mkobj(1, "uid-1")).await.unwrap();
428 let first = filtered.next().now_or_never().unwrap().unwrap().unwrap();
429 assert_eq!(first.meta().generation, Some(1));
430
431 tx.send(mkobj(1, "uid-1")).await.unwrap();
433 assert!(matches!(poll!(filtered.next()), Poll::Pending));
434
435 tokio::time::sleep(Duration::from_millis(100)).await;
437
438 tx.send(mkobj(1, "uid-1")).await.unwrap();
440 let second = filtered.next().now_or_never().unwrap().unwrap().unwrap();
441 assert_eq!(second.meta().generation, Some(1));
442 }
443}