1use crate::watcher::Error;
2use core::{
3 pin::Pin,
4 task::{Context, Poll, ready},
5};
6use futures::Stream;
7use kube_client::{Resource, api::ObjectMeta};
8use pin_project::pin_project;
9use std::{
10 collections::{HashMap, hash_map::DefaultHasher},
11 hash::{Hash, Hasher},
12 marker::PhantomData,
13 time::{Duration, Instant},
14};
15
16fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
17 let mut hasher = DefaultHasher::new();
18 t.hash(&mut hasher);
19 hasher.finish()
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24struct PredicateCacheKey {
25 name: String,
26 namespace: Option<String>,
27 uid: Option<String>,
28}
29
30impl From<&ObjectMeta> for PredicateCacheKey {
31 fn from(meta: &ObjectMeta) -> Self {
32 Self {
33 name: meta.name.clone().unwrap_or_default(),
34 namespace: meta.namespace.clone(),
35 uid: meta.uid.clone(),
36 }
37 }
38}
39
40pub trait Predicate<K> {
42 fn hash_property(&self, obj: &K) -> Option<u64>;
44
45 fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
57 where
58 Self: Sized,
59 {
60 Fallback(self, f)
61 }
62
63 fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
75 where
76 Self: Sized,
77 {
78 Combine(self, f)
79 }
80}
81
82impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
83 fn hash_property(&self, obj: &K) -> Option<u64> {
84 (self)(obj)
85 }
86}
87
88#[derive(Copy, Clone, Debug, PartialEq, Eq)]
90pub struct Fallback<A, B>(pub(super) A, pub(super) B);
91impl<A, B, K> Predicate<K> for Fallback<A, B>
92where
93 A: Predicate<K>,
94 B: Predicate<K>,
95{
96 fn hash_property(&self, obj: &K) -> Option<u64> {
97 self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
98 }
99}
100#[derive(Copy, Clone, Debug, PartialEq, Eq)]
102pub struct Combine<A, B>(pub(super) A, pub(super) B);
103impl<A, B, K> Predicate<K> for Combine<A, B>
104where
105 A: Predicate<K>,
106 B: Predicate<K>,
107{
108 fn hash_property(&self, obj: &K) -> Option<u64> {
109 match (self.0.hash_property(obj), self.1.hash_property(obj)) {
110 (None, None) => None,
112 (a, b) => Some(hash(&(a, b))),
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct Config {
121 ttl: Duration,
126}
127
128impl Config {
129 #[must_use]
133 pub fn ttl(mut self, ttl: Duration) -> Self {
134 self.ttl = ttl;
135 self
136 }
137}
138
139impl Default for Config {
140 fn default() -> Self {
141 Self {
142 ttl: Duration::from_secs(3600),
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
151struct CacheEntry {
152 hash: u64,
153 last_seen: Instant,
154}
155
156#[pin_project]
157#[must_use = "streams do nothing unless polled"]
159pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
160 #[pin]
161 stream: St,
162 predicate: P,
163 cache: HashMap<PredicateCacheKey, CacheEntry>,
164 config: Config,
165 _phantom: PhantomData<K>,
167}
168impl<St, K, P> PredicateFilter<St, K, P>
169where
170 St: Stream<Item = Result<K, Error>>,
171 K: Resource,
172 P: Predicate<K>,
173{
174 pub(super) fn new(stream: St, predicate: P, config: Config) -> Self {
175 Self {
176 stream,
177 predicate,
178 cache: HashMap::new(),
179 config,
180 _phantom: PhantomData,
181 }
182 }
183}
184impl<St, K, P> Stream for PredicateFilter<St, K, P>
185where
186 St: Stream<Item = Result<K, Error>>,
187 K: Resource,
188 K::DynamicType: Default + Eq + Hash,
189 P: Predicate<K>,
190{
191 type Item = Result<K, Error>;
192
193 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
194 let mut me = self.project();
195
196 let now = Instant::now();
198 let ttl = me.config.ttl;
199 me.cache
200 .retain(|_, entry| now.duration_since(entry.last_seen) < ttl);
201
202 Poll::Ready(loop {
203 break match ready!(me.stream.as_mut().poll_next(cx)) {
204 Some(Ok(obj)) => {
205 if let Some(val) = me.predicate.hash_property(&obj) {
206 let key = PredicateCacheKey::from(obj.meta());
207 let now = Instant::now();
208
209 let changed = me.cache.get(&key).map(|entry| entry.hash) != Some(val);
211
212 me.cache.insert(key, CacheEntry {
214 hash: val,
215 last_seen: now,
216 });
217
218 if changed {
219 Some(Ok(obj))
220 } else {
221 continue;
222 }
223 } else {
224 Some(Ok(obj))
226 }
227 }
228 Some(Err(err)) => Some(Err(err)),
229 None => return Poll::Ready(None),
230 };
231 })
232 }
233}
234
235pub mod predicates {
242 use super::hash;
243 use kube_client::{Resource, ResourceExt};
244
245 pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
247 obj.meta().generation.map(|g| hash(&g))
248 }
249
250 pub fn resource_version<K: Resource>(obj: &K) -> Option<u64> {
252 obj.meta().resource_version.as_ref().map(hash)
253 }
254
255 pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
257 Some(hash(obj.labels()))
258 }
259
260 pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
262 Some(hash(obj.annotations()))
263 }
264
265 pub fn finalizers<K: Resource>(obj: &K) -> Option<u64> {
267 Some(hash(obj.finalizers()))
268 }
269}
270
271#[cfg(test)]
272pub(crate) mod tests {
273 use std::{pin::pin, task::Poll};
274
275 use super::{Config, Error, PredicateFilter, predicates};
276 use futures::{FutureExt, StreamExt, poll, stream};
277 use kube_client::Resource;
278 use serde_json::json;
279
280 #[tokio::test]
281 async fn predicate_filtering_hides_equal_predicate_values() {
282 use k8s_openapi::api::core::v1::Pod;
283 let mkobj = |g: i32| {
284 let p: Pod = serde_json::from_value(json!({
285 "apiVersion": "v1",
286 "kind": "Pod",
287 "metadata": {
288 "name": "blog",
289 "generation": Some(g),
290 },
291 "spec": {
292 "containers": [{
293 "name": "blog",
294 "image": "clux/blog:0.1.0"
295 }],
296 }
297 }))
298 .unwrap();
299 p
300 };
301 let data = stream::iter([
302 Ok(mkobj(1)),
303 Err(Error::NoResourceVersion),
304 Ok(mkobj(1)),
305 Ok(mkobj(2)),
306 ]);
307 let mut rx = pin!(PredicateFilter::new(
308 data,
309 predicates::generation,
310 Config::default()
311 ));
312
313 let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
315 assert_eq!(first.meta().generation, Some(1));
316
317 assert!(matches!(
319 poll!(rx.next()),
320 Poll::Ready(Some(Err(Error::NoResourceVersion)))
321 ));
322 let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
325 assert_eq!(second.meta().generation, Some(2));
326 assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
327 }
328
329 #[tokio::test]
330 async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
331 use k8s_openapi::api::core::v1::Pod;
332
333 let mkobj = |g: i32, uid: &str| {
334 let p: Pod = serde_json::from_value(json!({
335 "apiVersion": "v1",
336 "kind": "Pod",
337 "metadata": {
338 "name": "blog",
339 "namespace": "default",
340 "generation": Some(g),
341 "uid": uid,
342 },
343 "spec": {
344 "containers": [{
345 "name": "blog",
346 "image": "clux/blog:0.1.0"
347 }],
348 }
349 }))
350 .unwrap();
351 p
352 };
353
354 let data = stream::iter([
357 Ok(mkobj(1, "uid-1")),
358 Ok(mkobj(1, "uid-1")),
359 Ok(mkobj(1, "uid-2")),
360 Ok(mkobj(2, "uid-3")),
361 ]);
362 let mut rx = pin!(PredicateFilter::new(
363 data,
364 predicates::generation,
365 Config::default()
366 ));
367
368 let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
370 assert_eq!(first.meta().generation, Some(1));
371 assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));
372
373 let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
376 assert_eq!(second.meta().generation, Some(1));
377 assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));
378
379 let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
381 assert_eq!(third.meta().generation, Some(2));
382 assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));
383
384 assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
385 }
386
387 #[tokio::test]
388 async fn predicate_cache_ttl_evicts_expired_entries() {
389 use futures::{SinkExt, channel::mpsc};
390 use k8s_openapi::api::core::v1::Pod;
391 use std::time::Duration;
392
393 let mkobj = |g: i32, uid: &str| {
394 let p: Pod = serde_json::from_value(json!({
395 "apiVersion": "v1",
396 "kind": "Pod",
397 "metadata": {
398 "name": "blog",
399 "namespace": "default",
400 "generation": Some(g),
401 "uid": uid,
402 },
403 "spec": {
404 "containers": [{
405 "name": "blog",
406 "image": "clux/blog:0.1.0"
407 }],
408 }
409 }))
410 .unwrap();
411 p
412 };
413
414 let config = Config::default().ttl(Duration::from_millis(50));
416
417 let (mut tx, rx) = mpsc::unbounded();
419 let mut filtered = pin!(PredicateFilter::new(
420 rx.map(Ok::<_, Error>),
421 predicates::generation,
422 config
423 ));
424
425 tx.send(mkobj(1, "uid-1")).await.unwrap();
427 let first = filtered.next().now_or_never().unwrap().unwrap().unwrap();
428 assert_eq!(first.meta().generation, Some(1));
429
430 tx.send(mkobj(1, "uid-1")).await.unwrap();
432 assert!(matches!(poll!(filtered.next()), Poll::Pending));
433
434 tokio::time::sleep(Duration::from_millis(100)).await;
436
437 tx.send(mkobj(1, "uid-1")).await.unwrap();
439 let second = filtered.next().now_or_never().unwrap().unwrap().unwrap();
440 assert_eq!(second.meta().generation, Some(1));
441 }
442}