kube_runtime/utils/
watch_ext.rs

1use crate::{
2    utils::{
3        event_decode::EventDecode,
4        event_modify::EventModify,
5        predicate::{Config as PredicateConfig, Predicate, PredicateFilter},
6        stream_backoff::StreamBackoff,
7    },
8    watcher,
9};
10use kube_client::Resource;
11
12use crate::{
13    reflector::store::Writer,
14    utils::{Backoff, Reflect},
15};
16
17use crate::watcher::DefaultBackoff;
18use futures::{Stream, TryStream};
19
20/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
21pub trait WatchStreamExt: Stream {
22    /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy
23    ///
24    /// This is recommended for controllers that want to play nicely with the apiserver.
25    fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
26    where
27        Self: TryStream + Sized,
28    {
29        StreamBackoff::new(self, DefaultBackoff::default())
30    }
31
32    /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`]
33    fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
34    where
35        B: Backoff,
36        Self: TryStream + Sized,
37    {
38        StreamBackoff::new(self, b)
39    }
40
41    /// Decode a [`watcher()`] stream into a stream of applied objects
42    ///
43    /// All Added/Modified events are passed through, and critical errors bubble up.
44    fn applied_objects<K>(self) -> EventDecode<Self>
45    where
46        Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
47    {
48        EventDecode::new(self, false)
49    }
50
51    /// Decode a [`watcher()`] stream into a stream of touched objects
52    ///
53    /// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
54    fn touched_objects<K>(self) -> EventDecode<Self>
55    where
56        Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
57    {
58        EventDecode::new(self, true)
59    }
60
61    /// Modify elements of a [`watcher()`] stream.
62    ///
63    /// Calls [`watcher::Event::modify()`] on every element.
64    /// Stream shorthand for `stream.map_ok(|event| { event.modify(f) })`.
65    ///
66    /// ```no_run
67    /// # use std::pin::pin;
68    /// # use futures::{Stream, StreamExt, TryStreamExt};
69    /// # use kube::{Api, Client, ResourceExt};
70    /// # use kube_runtime::{watcher, WatchStreamExt};
71    /// # use k8s_openapi::api::apps::v1::Deployment;
72    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
73    /// # let client: kube::Client = todo!();
74    /// let deploys: Api<Deployment> = Api::all(client);
75    /// let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default())
76    ///     .modify(|deploy| {
77    ///         deploy.managed_fields_mut().clear();
78    ///         deploy.status = None;
79    ///     })
80    ///     .applied_objects());
81    ///
82    /// while let Some(d) = truncated_deploy_stream.try_next().await? {
83    ///    println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
84    /// }
85    /// # Ok(())
86    /// # }
87    /// ```
88    fn modify<F, K>(self, f: F) -> EventModify<Self, F>
89    where
90        Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
91        F: FnMut(&mut K),
92    {
93        EventModify::new(self, f)
94    }
95
96    /// Filter a stream based on on [`predicates`](crate::predicates).
97    ///
98    /// This will filter out repeat calls where the predicate returns the same result.
99    /// Common use case for this is to avoid repeat events for status updates
100    /// by filtering on [`predicates::generation`](crate::predicates::generation).
101    ///
102    /// The cache entries have a configurable time-to-live (TTL) to prevent unbounded
103    /// memory growth. By default, entries expire after 1 hour.
104    ///
105    /// ## Usage
106    /// ```no_run
107    /// # use std::pin::pin;
108    /// # use futures::{Stream, StreamExt, TryStreamExt};
109    /// use kube::{Api, Client, ResourceExt};
110    /// use kube_runtime::{watcher, WatchStreamExt, predicates};
111    /// use k8s_openapi::api::apps::v1::Deployment;
112    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
113    /// # let client: kube::Client = todo!();
114    /// let deploys: Api<Deployment> = Api::default_namespaced(client);
115    /// let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default())
116    ///     .applied_objects()
117    ///     .predicate_filter(predicates::generation, Default::default()));
118    ///
119    /// while let Some(d) = changed_deploys.try_next().await? {
120    ///    println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
121    /// }
122    /// # Ok(())
123    /// # }
124    /// ```
125    fn predicate_filter<K, P>(self, predicate: P, config: PredicateConfig) -> PredicateFilter<Self, K, P>
126    where
127        Self: Stream<Item = Result<K, watcher::Error>> + Sized,
128        K: Resource + 'static,
129        P: Predicate<K> + 'static,
130    {
131        PredicateFilter::new(self, predicate, config)
132    }
133
134    /// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
135    ///
136    /// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`].
137    /// This populates a [`Store`] as the stream is polled.
138    ///
139    /// ## Usage
140    /// ```no_run
141    /// # use futures::{Stream, StreamExt, TryStreamExt};
142    /// # use std::time::Duration;
143    /// # use tracing::{info, warn};
144    /// use kube::{Api, Client, ResourceExt};
145    /// use kube_runtime::{watcher, WatchStreamExt, reflector};
146    /// use k8s_openapi::api::apps::v1::Deployment;
147    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
148    /// # let client: kube::Client = todo!();
149    ///
150    /// let deploys: Api<Deployment> = Api::default_namespaced(client);
151    /// let (reader, writer) = reflector::store::<Deployment>();
152    ///
153    /// tokio::spawn(async move {
154    ///     // start polling the store once the reader is ready
155    ///     reader.wait_until_ready().await.unwrap();
156    ///     loop {
157    ///         let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
158    ///         info!("Current {} deploys: {:?}", names.len(), names);
159    ///         tokio::time::sleep(Duration::from_secs(10)).await;
160    ///     }
161    /// });
162    ///
163    /// // configure the watcher stream and populate the store while polling
164    /// watcher(deploys, watcher::Config::default())
165    ///     .reflect(writer)
166    ///     .applied_objects()
167    ///     .for_each(|res| async move {
168    ///         match res {
169    ///             Ok(o) => info!("saw {}", o.name_any()),
170    ///             Err(e) => warn!("watcher error: {}", e),
171    ///         }
172    ///     })
173    ///     .await;
174    ///
175    /// # Ok(())
176    /// # }
177    /// ```
178    ///
179    /// [`Store`]: crate::reflector::Store
180    fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
181    where
182        Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
183        K: Resource + Clone + 'static,
184        K::DynamicType: Eq + std::hash::Hash + Clone,
185    {
186        Reflect::new(self, writer)
187    }
188
189    /// Reflect a shared [`watcher()`] stream into a [`Store`] through a [`Writer`]
190    ///
191    /// Returns the stream unmodified, but passes every [`watcher::Event`]
192    /// through a [`Writer`]. This populates a [`Store`] as the stream is
193    /// polled. When the [`watcher::Event`] is not an error or a
194    /// [`watcher::Event::Deleted`] then its inner object will also be
195    /// propagated to subscribers.
196    ///
197    /// Subscribers can be created by calling [`subscribe()`] on a [`Writer`].
198    /// This will return a [`ReflectHandle`] stream that should be polled
199    /// independently. When the root stream is dropped, or it ends, all [`ReflectHandle`]s
200    /// subscribed to the stream will also terminate after all events yielded by
201    /// the root stream have been observed. This means [`ReflectHandle`] streams
202    /// can still be polled after the root stream has been dropped.
203    ///
204    /// **NB**: This adapter requires an
205    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
206    /// feature
207    ///
208    /// ## Warning
209    ///
210    /// If the root [`Stream`] is not polled, [`ReflectHandle`] streams will
211    /// never receive any events. This will cause the streams to deadlock since
212    /// the root stream will apply backpressure when downstream readers are not
213    /// consuming events.
214    ///
215    ///
216    /// [`Store`]: crate::reflector::Store
217    /// [`subscribe()`]: crate::reflector::store::Writer::subscribe()
218    /// [`Stream`]: futures::stream::Stream
219    /// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle
220    /// ## Usage
221    /// ```no_run
222    /// # use futures::StreamExt;
223    /// # use std::time::Duration;
224    /// # use tracing::{info, warn};
225    /// use kube::{Api, ResourceExt};
226    /// use kube_runtime::{watcher, WatchStreamExt, reflector};
227    /// use k8s_openapi::api::apps::v1::Deployment;
228    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
229    /// # let client: kube::Client = todo!();
230    ///
231    /// let deploys: Api<Deployment> = Api::default_namespaced(client);
232    /// let subscriber_buf_sz = 100;
233    /// let (reader, writer) = reflector::store_shared::<Deployment>(subscriber_buf_sz);
234    /// let subscriber = writer.subscribe().unwrap();
235    ///
236    /// tokio::spawn(async move {
237    ///     // start polling the store once the reader is ready
238    ///     reader.wait_until_ready().await.unwrap();
239    ///     loop {
240    ///         let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
241    ///         info!("Current {} deploys: {:?}", names.len(), names);
242    ///         tokio::time::sleep(Duration::from_secs(10)).await;
243    ///     }
244    /// });
245    ///
246    /// tokio::spawn(async move {
247    ///     // subscriber can be used to receive applied_objects
248    ///     subscriber.for_each(|obj| async move {
249    ///         info!("saw in subscriber {}", &obj.name_any())
250    ///     }).await;
251    /// });
252    ///
253    /// // configure the watcher stream and populate the store while polling
254    /// watcher(deploys, watcher::Config::default())
255    ///     .reflect_shared(writer)
256    ///     .applied_objects()
257    ///     .for_each(|res| async move {
258    ///         match res {
259    ///             Ok(o) => info!("saw in root stream {}", o.name_any()),
260    ///             Err(e) => warn!("watcher error in root stream: {}", e),
261    ///         }
262    ///     })
263    ///     .await;
264    ///
265    /// # Ok(())
266    /// # }
267    /// ```
268    #[cfg(feature = "unstable-runtime-subscribe")]
269    fn reflect_shared<K>(self, writer: Writer<K>) -> impl Stream<Item = Self::Item>
270    where
271        Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
272        K: Resource + Clone + 'static,
273        K::DynamicType: Eq + std::hash::Hash + Clone,
274    {
275        crate::reflector(writer, self)
276    }
277}
278
279impl<St: ?Sized> WatchStreamExt for St where St: Stream {}
280
281// Compile tests
282#[cfg(test)]
283pub(crate) mod tests {
284    use super::watcher;
285    use crate::{WatchStreamExt as _, predicates};
286    use futures::prelude::*;
287    use k8s_openapi::api::core::v1::Pod;
288    use kube_client::{Api, Resource};
289
290    fn compile_type<T>() -> T {
291        unimplemented!("not called - compile test only")
292    }
293
294    pub fn assert_stream<T, K>(x: T) -> T
295    where
296        T: Stream<Item = watcher::Result<K>> + Send,
297        K: Resource + Clone + Send + 'static,
298    {
299        x
300    }
301
302    // not #[test] because this is only a compile check verification
303    #[allow(dead_code, unused_must_use)]
304    fn test_watcher_stream_type_drift() {
305        let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
306            .touched_objects()
307            .predicate_filter(predicates::generation, Default::default())
308            .boxed();
309        assert_stream(pred_watch);
310    }
311}