kube_runtime/watcher.rs
1//! Watches a Kubernetes Resource for changes, with error recovery
2//!
3//! See [`watcher`] for the primary entry point.
4
5use crate::utils::{Backoff, ResetTimerBackoff};
6
7use backon::BackoffBuilder;
8use educe::Educe;
9use futures::{Stream, StreamExt, stream::BoxStream};
10use kube_client::{
11 Api, Error as ClientErr,
12 api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
13 core::{ObjectList, Selector, metadata::PartialObjectMeta},
14 error::Status,
15};
16use serde::de::DeserializeOwned;
17use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
18use thiserror::Error;
19use tracing::{debug, error, warn};
20
21/// Errors that a watcher can emit
22///
23/// These are all considered retryable from a watcher's point of view,
24/// even though they may require patching of rbac/netpols in the background to fix.
25///
26/// To avoid constantly looping errors, make sure backoff is applied.
27#[derive(Debug, Error)]
28pub enum Error {
29 /// Received a raw error while performing an api.list
30 #[error("failed to perform initial object list: {0}")]
31 InitialListFailed(#[source] kube_client::Error),
32
33 /// Received a raw error while starting an api.watch
34 #[error("failed to start watching object: {0}")]
35 WatchStartFailed(#[source] kube_client::Error),
36
37 /// Received a `WatchEvent::Error` from the apiserver
38 #[error("error returned by apiserver during watch: {0}")]
39 WatchError(#[source] Box<Status>),
40
41 /// Received a raw error while watching
42 #[error("watch stream failed: {0}")]
43 WatchFailed(#[source] kube_client::Error),
44
45 /// Missing resource version field from api server
46 #[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
47 NoResourceVersion,
48}
49
50/// Type alias for Result with a `watcher::Error` as default.
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53#[derive(Debug, Clone)]
54/// Watch events returned from the [`watcher`]
55pub enum Event<K> {
56 /// An object was added or modified
57 Apply(K),
58 /// An object was deleted
59 ///
60 /// NOTE: This should not be used for managing persistent state elsewhere, since
61 /// events may be lost if the watcher is unavailable. Use Finalizers instead.
62 Delete(K),
63 /// The watch stream was restarted.
64 ///
65 /// A series of `InitApply` events are expected to follow until all matching objects
66 /// have been listed. This event can be used to prepare a buffer for `InitApply` events.
67 Init,
68 /// Received an object during `Init`.
69 ///
70 /// Objects returned here are either from the initial stream using the `StreamingList` strategy,
71 /// or from pages using the `ListWatch` strategy.
72 ///
73 /// These events can be passed up if having a complete set of objects is not a concern.
74 /// If you need to wait for a complete set, please buffer these events until an `InitDone`.
75 InitApply(K),
76 /// The initialisation is complete.
77 ///
78 /// This can be used as a signal to replace buffered store contents atomically.
79 /// No more `InitApply` events will happen until the next `Init` event.
80 ///
81 /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of
82 /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted).
83 InitDone,
84}
85
86impl<K> Event<K> {
87 /// Map each object in an event through a mutator fn
88 ///
89 /// This allows for memory optimizations in watch streams.
90 /// If you are chaining a watch stream into a reflector as an in memory state store,
91 /// you can control the space used by each object by dropping fields.
92 ///
93 /// ```no_run
94 /// use k8s_openapi::api::core::v1::Pod;
95 /// use kube::ResourceExt;
96 /// # use kube::runtime::watcher::Event;
97 /// # let event: Event<Pod> = todo!();
98 /// event.modify(|pod| {
99 /// pod.managed_fields_mut().clear();
100 /// pod.annotations_mut().clear();
101 /// pod.status = None;
102 /// });
103 /// ```
104 #[must_use]
105 pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
106 match &mut self {
107 Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj),
108 Self::Init | Self::InitDone => {} // markers, nothing to modify
109 }
110 self
111 }
112}
113
114#[derive(Educe, Default)]
115#[educe(Debug)]
116/// The internal finite state machine driving the [`watcher`]
117enum State<K> {
118 /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
119 #[default]
120 Empty,
121 /// The Watcher is in the process of paginating through the initial LIST
122 InitPage {
123 continue_token: Option<String>,
124 objects: VecDeque<K>,
125 last_bookmark: Option<String>,
126 },
127 /// Kubernetes 1.27 Streaming Lists
128 /// The initial watch is in progress
129 InitialWatch {
130 #[educe(Debug(ignore))]
131 stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
132 },
133 /// The initial LIST was successful, so we should move on to starting the actual watch.
134 InitListed { resource_version: String },
135 /// The watch is in progress, from this point we just return events from the server.
136 ///
137 /// If the connection is disrupted then we propagate the error but try to restart the watch stream by
138 /// returning to the `InitListed` state.
139 /// If we fall out of the K8s watch window then we propagate the error and fall back doing a re-list
140 /// with `Empty`.
141 Watching {
142 resource_version: String,
143 #[educe(Debug(ignore))]
144 stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
145 },
146}
147
148/// Used to control whether the watcher receives the full object, or only the
149/// metadata
150trait ApiMode {
151 type Value: Clone;
152
153 async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
154 async fn watch(
155 &self,
156 wp: &WatchParams,
157 version: &str,
158 ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
159}
160
161/// A wrapper around the `Api` of a `Resource` type that when used by the
162/// watcher will return the entire (full) object
163struct FullObject<'a, K> {
164 api: &'a Api<K>,
165}
166
167/// Configurable list semantics for `watcher` relists
168#[derive(Clone, Default, Debug, PartialEq)]
169pub enum ListSemantic {
170 /// List calls perform a full quorum read for most recent results
171 ///
172 /// Prefer this if you have strong consistency requirements. Note that this
173 /// is more taxing for the apiserver and can be less scalable for the cluster.
174 ///
175 /// If you are observing large resource sets (such as congested `Controller` cases),
176 /// you typically have a delay between the list call completing, and all the events
177 /// getting processed. In such cases, it is probably worth picking `Any` over `MostRecent`,
178 /// as your events are not guaranteed to be up-to-date by the time you get to them anyway.
179 #[default]
180 MostRecent,
181
182 /// List calls returns cached results from apiserver
183 ///
184 /// This is faster and much less taxing on the apiserver, but can result
185 /// in much older results than has previously observed for `Restarted` events,
186 /// particularly in HA configurations, due to partitions or stale caches.
187 ///
188 /// This option makes the most sense for controller usage where events have
189 /// some delay between being seen by the runtime, and it being sent to the reconciler.
190 Any,
191}
192
193/// Configurable watcher listwatch semantics
194
195#[derive(Clone, Default, Debug, PartialEq)]
196pub enum InitialListStrategy {
197 /// List first, then watch from given resource version
198 ///
199 /// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
200 /// When using this mode, you can configure the `page_size` on the watcher.
201 #[default]
202 ListWatch,
203 /// Kubernetes 1.27 Streaming Lists
204 ///
205 /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
206 /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
207 StreamingList,
208}
209
210/// Accumulates all options that can be used on the watcher invocation.
211#[derive(Clone, Debug, PartialEq)]
212pub struct Config {
213 /// A selector to restrict the list of returned objects by their labels.
214 ///
215 /// Defaults to everything if `None`.
216 pub label_selector: Option<String>,
217
218 /// A selector to restrict the list of returned objects by their fields.
219 ///
220 /// Defaults to everything if `None`.
221 pub field_selector: Option<String>,
222
223 /// Timeout for the list/watch call.
224 ///
225 /// This limits the duration of the call, regardless of any activity or inactivity.
226 /// If unset for a watch call, we will use 290s.
227 /// We limit this to 295s due to [inherent watch limitations](https://github.com/kubernetes/kubernetes/issues/6513).
228 pub timeout: Option<u32>,
229
230 /// Semantics for list calls.
231 ///
232 /// Configures re-list for performance vs. consistency.
233 ///
234 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
235 pub list_semantic: ListSemantic,
236
237 /// Control how the watcher fetches the initial list of objects.
238 ///
239 /// - `ListWatch`: The watcher will fetch the initial list of objects using a list call.
240 /// - `StreamingList`: The watcher will fetch the initial list of objects using a watch call.
241 ///
242 /// `StreamingList` is more efficient than `ListWatch`, but it requires the server to support
243 /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
244 ///
245 /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
246 /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
247 pub initial_list_strategy: InitialListStrategy,
248
249 /// Maximum number of objects retrieved per list operation resyncs.
250 ///
251 /// This can reduce the memory consumption during resyncs, at the cost of requiring more
252 /// API roundtrips to complete.
253 ///
254 /// Defaults to 500. Note that `None` represents unbounded.
255 ///
256 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
257 pub page_size: Option<u32>,
258
259 /// Enables watch events with type "BOOKMARK".
260 ///
261 /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
262 /// This is default enabled and should generally not be turned off.
263 pub bookmarks: bool,
264}
265
266impl Default for Config {
267 fn default() -> Self {
268 Self {
269 bookmarks: true,
270 label_selector: None,
271 field_selector: None,
272 timeout: None,
273 list_semantic: ListSemantic::default(),
274 // same default page size limit as client-go
275 // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
276 page_size: Some(500),
277 initial_list_strategy: InitialListStrategy::ListWatch,
278 }
279 }
280}
281
282/// Builder interface to Config
283///
284/// Usage:
285/// ```
286/// use kube::runtime::watcher::Config;
287/// let wc = Config::default()
288/// .timeout(60)
289/// .labels("kubernetes.io/lifecycle=spot");
290/// ```
291impl Config {
292 /// Configure the timeout for list/watch calls
293 ///
294 /// This limits the duration of the call, regardless of any activity or inactivity.
295 /// Defaults to 290s
296 #[must_use]
297 pub fn timeout(mut self, timeout_secs: u32) -> Self {
298 self.timeout = Some(timeout_secs);
299 self
300 }
301
302 /// Configure the selector to restrict the list of returned objects by their fields.
303 ///
304 /// Defaults to everything.
305 /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
306 /// The server only supports a limited number of field queries per type.
307 #[must_use]
308 pub fn fields(mut self, field_selector: &str) -> Self {
309 self.field_selector = Some(field_selector.to_string());
310 self
311 }
312
313 /// Configure the selector to restrict the list of returned objects by their labels.
314 ///
315 /// Defaults to everything.
316 /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
317 #[must_use]
318 pub fn labels(mut self, label_selector: &str) -> Self {
319 self.label_selector = Some(label_selector.to_string());
320 self
321 }
322
323 /// Configure typed label selectors
324 ///
325 /// Configure typed selectors from [`Selector`](kube_client::core::Selector) and [`Expression`](kube_client::core::Expression) lists.
326 ///
327 /// ```
328 /// use kube_runtime::watcher::Config;
329 /// use kube_client::core::{Expression, Selector, ParseExpressionError};
330 /// use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
331 /// let selector: Selector = Expression::In("env".into(), ["development".into(), "sandbox".into()].into()).into();
332 /// let cfg = Config::default().labels_from(&selector);
333 /// let cfg = Config::default().labels_from(&Expression::Exists("foo".into()).into());
334 /// let selector: Selector = LabelSelector::default().try_into()?;
335 /// let cfg = Config::default().labels_from(&selector);
336 /// # Ok::<(), ParseExpressionError>(())
337 ///```
338 #[must_use]
339 pub fn labels_from(mut self, selector: &Selector) -> Self {
340 self.label_selector = Some(selector.to_string());
341 self
342 }
343
344 /// Sets list semantic to configure re-list performance and consistency
345 ///
346 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
347 #[must_use]
348 pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
349 self.list_semantic = semantic;
350 self
351 }
352
353 /// Sets list semantic to `Any` to improve list performance
354 ///
355 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
356 #[must_use]
357 pub fn any_semantic(self) -> Self {
358 self.list_semantic(ListSemantic::Any)
359 }
360
361 /// Disables watch bookmarks to simplify watch handling
362 ///
363 /// This is not recommended to use with production watchers as it can cause desyncs.
364 /// See [#219](https://github.com/kube-rs/kube/issues/219) for details.
365 #[must_use]
366 pub fn disable_bookmarks(mut self) -> Self {
367 self.bookmarks = false;
368 self
369 }
370
371 /// Limits the number of objects retrieved in each list operation during resync.
372 ///
373 /// This can reduce the memory consumption during resyncs, at the cost of requiring more
374 /// API roundtrips to complete.
375 ///
376 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
377 #[must_use]
378 pub fn page_size(mut self, page_size: u32) -> Self {
379 self.page_size = Some(page_size);
380 self
381 }
382
383 /// Kubernetes 1.27 Streaming Lists
384 /// Sets list semantic to `Stream` to make use of watch bookmarks
385 #[must_use]
386 pub fn streaming_lists(mut self) -> Self {
387 self.initial_list_strategy = InitialListStrategy::StreamingList;
388 self
389 }
390
391 /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
392 fn to_list_params(&self) -> ListParams {
393 let (resource_version, version_match) = match self.list_semantic {
394 ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
395 ListSemantic::MostRecent => (None, None),
396 };
397 ListParams {
398 label_selector: self.label_selector.clone(),
399 field_selector: self.field_selector.clone(),
400 timeout: self.timeout,
401 version_match,
402 resource_version,
403 // The watcher handles pagination internally.
404 limit: self.page_size,
405 continue_token: None,
406 }
407 }
408
409 /// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests.
410 fn to_watch_params(&self, phase: WatchPhase) -> WatchParams {
411 WatchParams {
412 label_selector: self.label_selector.clone(),
413 field_selector: self.field_selector.clone(),
414 timeout: self.timeout,
415 bookmarks: self.bookmarks,
416 send_initial_events: phase == WatchPhase::Initial
417 && self.initial_list_strategy == InitialListStrategy::StreamingList,
418 }
419 }
420}
421
422/// Distinguishes between initial watch and resumed watch for streaming lists.
423///
424/// This is used to determine whether to set `sendInitialEvents=true` in watch requests.
425/// Only initial watches should request initial events; reconnections should not.
426#[derive(Clone, Copy, Debug, PartialEq, Eq)]
427enum WatchPhase {
428 /// Initial watch from `State::Empty` - requests initial events for streaming lists
429 Initial,
430 /// Resumed watch from `State::InitListed` - does not request initial events
431 Resumed,
432}
433
434impl<K> ApiMode for FullObject<'_, K>
435where
436 K: Clone + Debug + DeserializeOwned + Send + 'static,
437{
438 type Value = K;
439
440 async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
441 self.api.list(lp).await
442 }
443
444 async fn watch(
445 &self,
446 wp: &WatchParams,
447 version: &str,
448 ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
449 self.api.watch(wp, version).await.map(StreamExt::boxed)
450 }
451}
452
453/// A wrapper around the `Api` of a `Resource` type that when used by the
454/// watcher will return only the metadata associated with an object
455struct MetaOnly<'a, K> {
456 api: &'a Api<K>,
457}
458
459impl<K> ApiMode for MetaOnly<'_, K>
460where
461 K: Clone + Debug + DeserializeOwned + Send + 'static,
462{
463 type Value = PartialObjectMeta<K>;
464
465 async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
466 self.api.list_metadata(lp).await
467 }
468
469 async fn watch(
470 &self,
471 wp: &WatchParams,
472 version: &str,
473 ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
474 self.api.watch_metadata(wp, version).await.map(StreamExt::boxed)
475 }
476}
477
478/// Progresses the watcher a single step, returning (event, state)
479///
480/// This function should be trampolined: if event == `None`
481/// then the function should be called again until it returns a Some.
482#[allow(clippy::too_many_lines)] // for now
483async fn step_trampolined<A>(
484 api: &A,
485 wc: &Config,
486 state: State<A::Value>,
487) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
488where
489 A: ApiMode,
490 A::Value: Resource + 'static,
491{
492 match state {
493 State::Empty => match wc.initial_list_strategy {
494 InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
495 continue_token: None,
496 objects: VecDeque::default(),
497 last_bookmark: None,
498 }),
499 InitialListStrategy::StreamingList => {
500 match api.watch(&wc.to_watch_params(WatchPhase::Initial), "0").await {
501 Ok(stream) => (None, State::InitialWatch { stream }),
502 Err(err) => {
503 if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
504 warn!("watch initlist error with 403: {err:?}");
505 } else {
506 debug!("watch initlist error: {err:?}");
507 }
508 (Some(Err(Error::WatchStartFailed(err))), State::default())
509 }
510 }
511 }
512 },
513 State::InitPage {
514 continue_token,
515 mut objects,
516 last_bookmark,
517 } => {
518 if let Some(next) = objects.pop_front() {
519 return (Some(Ok(Event::InitApply(next))), State::InitPage {
520 continue_token,
521 objects,
522 last_bookmark,
523 });
524 }
525 // check if we need to perform more pages
526 if continue_token.is_none()
527 && let Some(resource_version) = last_bookmark
528 {
529 // we have drained the last page - move on to next stage
530 return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
531 }
532 let mut lp = wc.to_list_params();
533 lp.continue_token = continue_token;
534 match api.list(&lp).await {
535 Ok(list) => {
536 let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
537 let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
538 if last_bookmark.is_none() && continue_token.is_none() {
539 return (Some(Err(Error::NoResourceVersion)), State::Empty);
540 }
541 // Buffer page here, causing us to return to this enum branch (State::InitPage)
542 // until the objects buffer has drained
543 (None, State::InitPage {
544 continue_token,
545 objects: list.items.into_iter().collect(),
546 last_bookmark,
547 })
548 }
549 Err(err) => {
550 if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
551 warn!("watch list error with 403: {err:?}");
552 } else {
553 debug!("watch list error: {err:?}");
554 }
555 (Some(Err(Error::InitialListFailed(err))), State::Empty)
556 }
557 }
558 }
559 State::InitialWatch { mut stream } => {
560 match stream.next().await {
561 Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
562 (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream })
563 }
564 Some(Ok(WatchEvent::Deleted(_obj))) => {
565 // Kubernetes claims these events are impossible
566 // https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
567 error!("got deleted event during initial watch. this is a bug");
568 (None, State::InitialWatch { stream })
569 }
570 Some(Ok(WatchEvent::Bookmark(bm))) => {
571 let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
572 if marks_initial_end {
573 (Some(Ok(Event::InitDone)), State::Watching {
574 resource_version: bm.metadata.resource_version,
575 stream,
576 })
577 } else {
578 (None, State::InitialWatch { stream })
579 }
580 }
581 Some(Ok(WatchEvent::Error(err))) => {
582 // HTTP GONE, means we have desynced and need to start over and re-list :(
583 let new_state = if err.code == 410 {
584 State::default()
585 } else {
586 State::InitialWatch { stream }
587 };
588 if err.code == 403 {
589 warn!("watcher watchevent error 403: {err:?}");
590 } else {
591 debug!("error watchevent error: {err:?}");
592 }
593 (Some(Err(Error::WatchError(err.boxed()))), new_state)
594 }
595 Some(Err(err)) => {
596 if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
597 warn!("watcher error 403: {err:?}");
598 } else {
599 debug!("watcher error: {err:?}");
600 }
601 (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream })
602 }
603 None => (None, State::default()),
604 }
605 }
606 State::InitListed { resource_version } => {
607 match api
608 .watch(&wc.to_watch_params(WatchPhase::Resumed), &resource_version)
609 .await
610 {
611 Ok(stream) => (None, State::Watching {
612 resource_version,
613 stream,
614 }),
615 Err(err) => {
616 if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
617 warn!("watch initlist error with 403: {err:?}");
618 } else {
619 debug!("watch initlist error: {err:?}");
620 }
621 (Some(Err(Error::WatchStartFailed(err))), State::InitListed {
622 resource_version,
623 })
624 }
625 }
626 }
627 State::Watching {
628 resource_version,
629 mut stream,
630 } => match stream.next().await {
631 Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
632 let resource_version = obj.resource_version().unwrap_or_default();
633 if resource_version.is_empty() {
634 (Some(Err(Error::NoResourceVersion)), State::default())
635 } else {
636 (Some(Ok(Event::Apply(obj))), State::Watching {
637 resource_version,
638 stream,
639 })
640 }
641 }
642 Some(Ok(WatchEvent::Deleted(obj))) => {
643 let resource_version = obj.resource_version().unwrap_or_default();
644 if resource_version.is_empty() {
645 (Some(Err(Error::NoResourceVersion)), State::default())
646 } else {
647 (Some(Ok(Event::Delete(obj))), State::Watching {
648 resource_version,
649 stream,
650 })
651 }
652 }
653 Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
654 resource_version: bm.metadata.resource_version,
655 stream,
656 }),
657 Some(Ok(WatchEvent::Error(err))) => {
658 // HTTP GONE, means we have desynced and need to start over and re-list :(
659 let new_state = if err.code == 410 {
660 State::default()
661 } else {
662 State::Watching {
663 resource_version,
664 stream,
665 }
666 };
667 if err.code == 403 {
668 warn!("watcher watchevent error 403: {err:?}");
669 } else {
670 debug!("error watchevent error: {err:?}");
671 }
672 (Some(Err(Error::WatchError(err.boxed()))), new_state)
673 }
674 Some(Err(err)) => {
675 if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
676 warn!("watcher error 403: {err:?}");
677 } else {
678 debug!("watcher error: {err:?}");
679 }
680 (Some(Err(Error::WatchFailed(err))), State::Watching {
681 resource_version,
682 stream,
683 })
684 }
685 None => (None, State::InitListed { resource_version }),
686 },
687 }
688}
689
690/// Trampoline helper for `step_trampolined`
691async fn step<A>(
692 api: &A,
693 config: &Config,
694 mut state: State<A::Value>,
695) -> (Result<Event<A::Value>>, State<A::Value>)
696where
697 A: ApiMode,
698 A::Value: Resource + 'static,
699{
700 loop {
701 match step_trampolined(api, config, state).await {
702 (Some(result), new_state) => return (result, new_state),
703 (None, new_state) => state = new_state,
704 }
705 }
706}
707
708/// Watches a Kubernetes Resource for changes continuously
709///
710/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
711///
712/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
713/// You can apply your own backoff by not polling the stream for a duration after errors.
714/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
715/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
716/// will terminate eagerly as soon as they receive an [`Err`].
717///
718/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
719/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
720///
721/// ```
722/// use kube::{
723/// api::{Api, ResourceExt}, Client,
724/// runtime::{watcher, WatchStreamExt}
725/// };
726/// use k8s_openapi::api::core::v1::Pod;
727/// use futures::TryStreamExt;
728///
729/// # async fn wrapper() -> Result<(), watcher::Error> {
730/// # let client: Client = todo!();
731/// let pods: Api<Pod> = Api::namespaced(client, "apps");
732///
733/// watcher(pods, watcher::Config::default()).applied_objects()
734/// .try_for_each(|p| async move {
735/// println!("Applied: {}", p.name_any());
736/// Ok(())
737/// })
738/// .await?;
739/// # Ok(())
740/// # }
741/// ```
742/// [`WatchStreamExt`]: super::WatchStreamExt
743/// [`reflector`]: super::reflector::reflector
744/// [`Api::watch`]: kube_client::Api::watch
745///
746/// # Recovery
747///
748/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
749/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
750/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
751///
752/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
753/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
754/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
755/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
756/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
757#[doc(alias = "informer")]
758pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
759 api: Api<K>,
760 watcher_config: Config,
761) -> impl Stream<Item = Result<Event<K>>> + Send {
762 futures::stream::unfold(
763 (api, watcher_config, State::default()),
764 |(api, watcher_config, state)| async {
765 let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
766 Some((event, (api, watcher_config, state)))
767 },
768 )
769}
770
771/// Watches a Kubernetes Resource for changes continuously and receives only the
772/// metadata
773///
774/// Compared to [`Api::watch_metadata`], this automatically tries to recover the stream upon errors.
775///
776/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
777/// You can apply your own backoff by not polling the stream for a duration after errors.
778/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
779/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
780/// will terminate eagerly as soon as they receive an [`Err`].
781///
782/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
783/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
784///
785/// ```
786/// use kube::{
787/// api::{Api, ResourceExt}, Client,
788/// runtime::{watcher, metadata_watcher, WatchStreamExt}
789/// };
790/// use k8s_openapi::api::core::v1::Pod;
791/// use futures::TryStreamExt;
792///
793/// # async fn wrapper() -> Result<(), watcher::Error> {
794/// # let client: Client = todo!();
795/// let pods: Api<Pod> = Api::namespaced(client, "apps");
796///
797/// metadata_watcher(pods, watcher::Config::default()).applied_objects()
798/// .try_for_each(|p| async move {
799/// println!("Applied: {}", p.name_any());
800/// Ok(())
801/// })
802/// .await?;
803/// # Ok(())
804/// # }
805/// ```
806/// [`WatchStreamExt`]: super::WatchStreamExt
807/// [`reflector`]: super::reflector::reflector
808/// [`Api::watch`]: kube_client::Api::watch
809///
810/// # Recovery
811///
812/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
813/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
814/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
815///
816/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
817/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
818/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
819/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
820/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
821pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
822 api: Api<K>,
823 watcher_config: Config,
824) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
825 futures::stream::unfold(
826 (api, watcher_config, State::default()),
827 |(api, watcher_config, state)| async {
828 let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
829 Some((event, (api, watcher_config, state)))
830 },
831 )
832}
833
834/// Watch a single named object for updates
835///
836/// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found).
837///
838/// Often invoked indirectly via [`await_condition`](crate::wait::await_condition()).
839///
840/// ## Scope Warning
841///
842/// When using this with an `Api::all` on namespaced resources there is a chance of duplicated names.
843/// To avoid getting confusing / wrong answers for this, use `Api::namespaced` bound to a specific namespace
844/// when watching for transitions to namespaced objects.
845pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
846 api: Api<K>,
847 name: &str,
848) -> impl Stream<Item = Result<Option<K>>> + Send + use<K> {
849 // filtering by object name in given scope, so there's at most one matching object
850 // footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
851 let fields = format!("metadata.name={name}");
852 watcher(api, Config::default().fields(&fields))
853 // The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
854 // sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
855 // `None` is emitted when the InitDone event is received.
856 //
857 // The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
858 // checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
859 // could happen given K8S events aren't guaranteed delivery.
860 .scan(false, |obj_seen, event| {
861 if matches!(event, Ok(Event::Init)) {
862 *obj_seen = false;
863 } else if matches!(event, Ok(Event::InitApply(_))) {
864 *obj_seen = true;
865 }
866 future::ready(Some((*obj_seen, event)))
867 })
868 .filter_map(|(obj_seen, event)| async move {
869 match event {
870 // Pass up `Some` for Found / Updated
871 Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
872 // Pass up `None` for Deleted
873 Ok(Event::Delete(_)) => Some(Ok(None)),
874 // Pass up `None` if the object wasn't seen in the initial list
875 Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
876 // Ignore marker events
877 Ok(Event::Init | Event::InitDone) => None,
878 // Bubble up errors
879 Err(err) => Some(Err(err)),
880 }
881 })
882}
883
884/// A struct with a manually configured exponential backoff
885pub struct ExponentialBackoff {
886 inner: backon::ExponentialBackoff,
887 builder: backon::ExponentialBuilder,
888}
889
890impl ExponentialBackoff {
891 fn new(min_delay: Duration, max_delay: Duration, factor: f32, enable_jitter: bool) -> Self {
892 let builder = backon::ExponentialBuilder::default()
893 .with_min_delay(min_delay)
894 .with_max_delay(max_delay)
895 .with_factor(factor)
896 .without_max_times();
897
898 let builder = if enable_jitter {
899 builder.with_jitter()
900 } else {
901 builder
902 };
903
904 Self {
905 inner: builder.build(),
906 builder,
907 }
908 }
909}
910
911impl Backoff for ExponentialBackoff {
912 fn reset(&mut self) {
913 self.inner = self.builder.build();
914 }
915}
916
917impl Iterator for ExponentialBackoff {
918 type Item = Duration;
919
920 fn next(&mut self) -> Option<Self::Item> {
921 self.inner.next()
922 }
923}
924
925impl From<backon::ExponentialBuilder> for ExponentialBackoff {
926 fn from(builder: backon::ExponentialBuilder) -> Self {
927 Self {
928 inner: builder.build(),
929 builder,
930 }
931 }
932}
933
934/// Default watcher backoff inspired by Kubernetes' client-go.
935///
936/// The parameters currently optimize for being kind to struggling apiservers.
937/// The exact parameters are taken from
938/// [client-go's reflector source](https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181)
939/// and should not be considered stable.
940///
941/// This struct implements [`Backoff`] and is the default strategy used
942/// when calling `WatchStreamExt::default_backoff`. If you need to create
943/// this manually then [`DefaultBackoff::default`] can be used.
944pub struct DefaultBackoff(Strategy);
945type Strategy = ResetTimerBackoff<ExponentialBackoff>;
946
947impl Default for DefaultBackoff {
948 fn default() -> Self {
949 Self(ResetTimerBackoff::new(
950 ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true),
951 Duration::from_secs(120),
952 ))
953 }
954}
955
956impl Iterator for DefaultBackoff {
957 type Item = Duration;
958
959 fn next(&mut self) -> Option<Self::Item> {
960 self.0.next()
961 }
962}
963
964impl Backoff for DefaultBackoff {
965 fn reset(&mut self) {
966 self.0.reset();
967 }
968}
969
970#[cfg(test)]
971mod tests {
972 use super::*;
973
974 #[test]
975 fn to_watch_params_initial_phase_with_streaming_list_sets_send_initial_events() {
976 let config = Config::default().streaming_lists();
977 let params = config.to_watch_params(WatchPhase::Initial);
978 assert!(params.send_initial_events);
979 }
980
981 #[test]
982 fn to_watch_params_resumed_phase_with_streaming_list_does_not_set_send_initial_events() {
983 let config = Config::default().streaming_lists();
984 let params = config.to_watch_params(WatchPhase::Resumed);
985 assert!(!params.send_initial_events);
986 }
987
988 #[test]
989 fn to_watch_params_listwatch_mode_does_not_set_send_initial_events() {
990 let config = Config::default(); // ListWatch mode
991 let params_initial = config.to_watch_params(WatchPhase::Initial);
992 let params_resumed = config.to_watch_params(WatchPhase::Resumed);
993 assert!(!params_initial.send_initial_events);
994 assert!(!params_resumed.send_initial_events);
995 }
996
997 fn approx_eq(a: Duration, b: Duration) -> bool {
998 a.abs_diff(b) < Duration::from_micros(100)
999 }
1000
1001 #[test]
1002 fn exponential_backoff_without_jitter() {
1003 let mut backoff =
1004 ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(1), 2.0, false);
1005
1006 assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(100)));
1007 assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(200)));
1008 assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(400)));
1009 }
1010
1011 #[test]
1012 fn exponential_backoff_with_jitter_applies_randomness() {
1013 let mut backoff =
1014 ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(1), 2.0, true);
1015
1016 let delays: Vec<_> = (0..5).filter_map(|_| backoff.next()).collect();
1017
1018 // All delays should be positive
1019 for d in &delays {
1020 assert!(*d > Duration::ZERO);
1021 }
1022
1023 // With jitter, at least one delay should differ from exact values
1024 let exact_values = [
1025 Duration::from_millis(100),
1026 Duration::from_millis(200),
1027 Duration::from_millis(400),
1028 Duration::from_millis(800),
1029 Duration::from_secs(1),
1030 ];
1031
1032 let all_exact = delays
1033 .iter()
1034 .zip(exact_values.iter())
1035 .all(|(d, e)| approx_eq(*d, *e));
1036
1037 assert!(
1038 !all_exact,
1039 "With jitter enabled, delays should not all match exact exponential values"
1040 );
1041 }
1042}