kube_runtime/utils/watch_ext.rs
1use crate::{
2 utils::{
3 event_decode::EventDecode,
4 event_modify::EventModify,
5 predicate::{Config as PredicateConfig, Predicate, PredicateFilter},
6 stream_backoff::StreamBackoff,
7 },
8 watcher,
9};
10use kube_client::Resource;
11
12use crate::{
13 reflector::store::Writer,
14 utils::{Backoff, Reflect},
15};
16
17use crate::watcher::DefaultBackoff;
18use futures::{Stream, TryStream};
19
20/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
21pub trait WatchStreamExt: Stream {
22 /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy
23 ///
24 /// This is recommended for controllers that want to play nicely with the apiserver.
25 fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
26 where
27 Self: TryStream + Sized,
28 {
29 StreamBackoff::new(self, DefaultBackoff::default())
30 }
31
32 /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`]
33 fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
34 where
35 B: Backoff,
36 Self: TryStream + Sized,
37 {
38 StreamBackoff::new(self, b)
39 }
40
41 /// Decode a [`watcher()`] stream into a stream of applied objects
42 ///
43 /// All Added/Modified events are passed through, and critical errors bubble up.
44 fn applied_objects<K>(self) -> EventDecode<Self>
45 where
46 Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
47 {
48 EventDecode::new(self, false)
49 }
50
51 /// Decode a [`watcher()`] stream into a stream of touched objects
52 ///
53 /// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
54 fn touched_objects<K>(self) -> EventDecode<Self>
55 where
56 Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
57 {
58 EventDecode::new(self, true)
59 }
60
61 /// Modify elements of a [`watcher()`] stream.
62 ///
63 /// Calls [`watcher::Event::modify()`] on every element.
64 /// Stream shorthand for `stream.map_ok(|event| { event.modify(f) })`.
65 ///
66 /// ```no_run
67 /// # use std::pin::pin;
68 /// # use futures::{Stream, StreamExt, TryStreamExt};
69 /// # use kube::{Api, Client, ResourceExt};
70 /// # use kube_runtime::{watcher, WatchStreamExt};
71 /// # use k8s_openapi::api::apps::v1::Deployment;
72 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
73 /// # let client: kube::Client = todo!();
74 /// let deploys: Api<Deployment> = Api::all(client);
75 /// let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default())
76 /// .modify(|deploy| {
77 /// deploy.managed_fields_mut().clear();
78 /// deploy.status = None;
79 /// })
80 /// .applied_objects());
81 ///
82 /// while let Some(d) = truncated_deploy_stream.try_next().await? {
83 /// println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
84 /// }
85 /// # Ok(())
86 /// # }
87 /// ```
88 fn modify<F, K>(self, f: F) -> EventModify<Self, F>
89 where
90 Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
91 F: FnMut(&mut K),
92 {
93 EventModify::new(self, f)
94 }
95
96 /// Filter a stream based on on [`predicates`](crate::predicates).
97 ///
98 /// This will filter out repeat calls where the predicate returns the same result.
99 /// Common use case for this is to avoid repeat events for status updates
100 /// by filtering on [`predicates::generation`](crate::predicates::generation).
101 ///
102 /// The cache entries have a configurable time-to-live (TTL) to prevent unbounded
103 /// memory growth. By default, entries expire after 1 hour.
104 ///
105 /// ## Usage
106 /// ```no_run
107 /// # use std::pin::pin;
108 /// # use futures::{Stream, StreamExt, TryStreamExt};
109 /// use kube::{Api, Client, ResourceExt};
110 /// use kube_runtime::{watcher, WatchStreamExt, predicates};
111 /// use k8s_openapi::api::apps::v1::Deployment;
112 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
113 /// # let client: kube::Client = todo!();
114 /// let deploys: Api<Deployment> = Api::default_namespaced(client);
115 /// let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default())
116 /// .applied_objects()
117 /// .predicate_filter(predicates::generation, Default::default()));
118 ///
119 /// while let Some(d) = changed_deploys.try_next().await? {
120 /// println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
121 /// }
122 /// # Ok(())
123 /// # }
124 /// ```
125 fn predicate_filter<K, P>(self, predicate: P, config: PredicateConfig) -> PredicateFilter<Self, K, P>
126 where
127 Self: Stream<Item = Result<K, watcher::Error>> + Sized,
128 K: Resource + 'static,
129 P: Predicate<K> + 'static,
130 {
131 PredicateFilter::new(self, predicate, config)
132 }
133
134 /// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
135 ///
136 /// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`].
137 /// This populates a [`Store`] as the stream is polled.
138 ///
139 /// ## Usage
140 /// ```no_run
141 /// # use futures::{Stream, StreamExt, TryStreamExt};
142 /// # use std::time::Duration;
143 /// # use tracing::{info, warn};
144 /// use kube::{Api, Client, ResourceExt};
145 /// use kube_runtime::{watcher, WatchStreamExt, reflector};
146 /// use k8s_openapi::api::apps::v1::Deployment;
147 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
148 /// # let client: kube::Client = todo!();
149 ///
150 /// let deploys: Api<Deployment> = Api::default_namespaced(client);
151 /// let (reader, writer) = reflector::store::<Deployment>();
152 ///
153 /// tokio::spawn(async move {
154 /// // start polling the store once the reader is ready
155 /// reader.wait_until_ready().await.unwrap();
156 /// loop {
157 /// let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
158 /// info!("Current {} deploys: {:?}", names.len(), names);
159 /// tokio::time::sleep(Duration::from_secs(10)).await;
160 /// }
161 /// });
162 ///
163 /// // configure the watcher stream and populate the store while polling
164 /// watcher(deploys, watcher::Config::default())
165 /// .reflect(writer)
166 /// .applied_objects()
167 /// .for_each(|res| async move {
168 /// match res {
169 /// Ok(o) => info!("saw {}", o.name_any()),
170 /// Err(e) => warn!("watcher error: {}", e),
171 /// }
172 /// })
173 /// .await;
174 ///
175 /// # Ok(())
176 /// # }
177 /// ```
178 ///
179 /// [`Store`]: crate::reflector::Store
180 fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
181 where
182 Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
183 K: Resource + Clone + 'static,
184 K::DynamicType: Eq + std::hash::Hash + Clone,
185 {
186 Reflect::new(self, writer)
187 }
188
189 /// Reflect a shared [`watcher()`] stream into a [`Store`] through a [`Writer`]
190 ///
191 /// Returns the stream unmodified, but passes every [`watcher::Event`]
192 /// through a [`Writer`]. This populates a [`Store`] as the stream is
193 /// polled. When the [`watcher::Event`] is not an error or a
194 /// [`watcher::Event::Deleted`] then its inner object will also be
195 /// propagated to subscribers.
196 ///
197 /// Subscribers can be created by calling [`subscribe()`] on a [`Writer`].
198 /// This will return a [`ReflectHandle`] stream that should be polled
199 /// independently. When the root stream is dropped, or it ends, all [`ReflectHandle`]s
200 /// subscribed to the stream will also terminate after all events yielded by
201 /// the root stream have been observed. This means [`ReflectHandle`] streams
202 /// can still be polled after the root stream has been dropped.
203 ///
204 /// **NB**: This adapter requires an
205 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
206 /// feature
207 ///
208 /// ## Warning
209 ///
210 /// If the root [`Stream`] is not polled, [`ReflectHandle`] streams will
211 /// never receive any events. This will cause the streams to deadlock since
212 /// the root stream will apply backpressure when downstream readers are not
213 /// consuming events.
214 ///
215 ///
216 /// [`Store`]: crate::reflector::Store
217 /// [`subscribe()`]: crate::reflector::store::Writer::subscribe()
218 /// [`Stream`]: futures::stream::Stream
219 /// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle
220 /// ## Usage
221 /// ```no_run
222 /// # use futures::StreamExt;
223 /// # use std::time::Duration;
224 /// # use tracing::{info, warn};
225 /// use kube::{Api, ResourceExt};
226 /// use kube_runtime::{watcher, WatchStreamExt, reflector};
227 /// use k8s_openapi::api::apps::v1::Deployment;
228 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
229 /// # let client: kube::Client = todo!();
230 ///
231 /// let deploys: Api<Deployment> = Api::default_namespaced(client);
232 /// let subscriber_buf_sz = 100;
233 /// let (reader, writer) = reflector::store_shared::<Deployment>(subscriber_buf_sz);
234 /// let subscriber = writer.subscribe().unwrap();
235 ///
236 /// tokio::spawn(async move {
237 /// // start polling the store once the reader is ready
238 /// reader.wait_until_ready().await.unwrap();
239 /// loop {
240 /// let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
241 /// info!("Current {} deploys: {:?}", names.len(), names);
242 /// tokio::time::sleep(Duration::from_secs(10)).await;
243 /// }
244 /// });
245 ///
246 /// tokio::spawn(async move {
247 /// // subscriber can be used to receive applied_objects
248 /// subscriber.for_each(|obj| async move {
249 /// info!("saw in subscriber {}", &obj.name_any())
250 /// }).await;
251 /// });
252 ///
253 /// // configure the watcher stream and populate the store while polling
254 /// watcher(deploys, watcher::Config::default())
255 /// .reflect_shared(writer)
256 /// .applied_objects()
257 /// .for_each(|res| async move {
258 /// match res {
259 /// Ok(o) => info!("saw in root stream {}", o.name_any()),
260 /// Err(e) => warn!("watcher error in root stream: {}", e),
261 /// }
262 /// })
263 /// .await;
264 ///
265 /// # Ok(())
266 /// # }
267 /// ```
268 #[cfg(feature = "unstable-runtime-subscribe")]
269 fn reflect_shared<K>(self, writer: Writer<K>) -> impl Stream<Item = Self::Item>
270 where
271 Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
272 K: Resource + Clone + 'static,
273 K::DynamicType: Eq + std::hash::Hash + Clone,
274 {
275 crate::reflector(writer, self)
276 }
277}
278
279impl<St: ?Sized> WatchStreamExt for St where St: Stream {}
280
281// Compile tests
282#[cfg(test)]
283pub(crate) mod tests {
284 use super::watcher;
285 use crate::{WatchStreamExt as _, predicates};
286 use futures::prelude::*;
287 use k8s_openapi::api::core::v1::Pod;
288 use kube_client::{Api, Resource};
289
290 fn compile_type<T>() -> T {
291 unimplemented!("not called - compile test only")
292 }
293
294 pub fn assert_stream<T, K>(x: T) -> T
295 where
296 T: Stream<Item = watcher::Result<K>> + Send,
297 K: Resource + Clone + Send + 'static,
298 {
299 x
300 }
301
302 // not #[test] because this is only a compile check verification
303 #[allow(dead_code, unused_must_use)]
304 fn test_watcher_stream_type_drift() {
305 let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
306 .touched_objects()
307 .predicate_filter(predicates::generation, Default::default())
308 .boxed();
309 assert_stream(pred_watch);
310 }
311}