Skip to main content

kube_runtime/
watcher.rs

1//! Watches a Kubernetes Resource for changes, with error recovery
2//!
3//! See [`watcher`] for the primary entry point.
4
5use crate::utils::{Backoff, ResetTimerBackoff};
6
7use backon::BackoffBuilder;
8use educe::Educe;
9use futures::{Stream, StreamExt, stream::BoxStream};
10use kube_client::{
11    Api, Error as ClientErr,
12    api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
13    core::{ObjectList, Selector, metadata::PartialObjectMeta},
14    error::Status,
15};
16use serde::de::DeserializeOwned;
17use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
18use thiserror::Error;
19use tracing::{debug, error, warn};
20
21/// Errors that a watcher can emit
22///
23/// These are all considered retryable from a watcher's point of view,
24/// even though they may require patching of rbac/netpols in the background to fix.
25///
26/// To avoid constantly looping errors, make sure backoff is applied.
27#[derive(Debug, Error)]
28pub enum Error {
29    /// Received a raw error while performing an api.list
30    #[error("failed to perform initial object list: {0}")]
31    InitialListFailed(#[source] kube_client::Error),
32
33    /// Received a raw error while starting an api.watch
34    #[error("failed to start watching object: {0}")]
35    WatchStartFailed(#[source] kube_client::Error),
36
37    /// Received a `WatchEvent::Error` from the apiserver
38    #[error("error returned by apiserver during watch: {0}")]
39    WatchError(#[source] Box<Status>),
40
41    /// Received a raw error while watching
42    #[error("watch stream failed: {0}")]
43    WatchFailed(#[source] kube_client::Error),
44
45    /// Missing resource version field from api server
46    #[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
47    NoResourceVersion,
48}
49
50/// Type alias for Result with a `watcher::Error` as default.
51pub type Result<T, E = Error> = std::result::Result<T, E>;
52
53#[derive(Debug, Clone)]
54/// Watch events returned from the [`watcher`]
55pub enum Event<K> {
56    /// An object was added or modified
57    Apply(K),
58    /// An object was deleted
59    ///
60    /// NOTE: This should not be used for managing persistent state elsewhere, since
61    /// events may be lost if the watcher is unavailable. Use Finalizers instead.
62    Delete(K),
63    /// The watch stream was restarted.
64    ///
65    /// A series of `InitApply` events are expected to follow until all matching objects
66    /// have been listed. This event can be used to prepare a buffer for `InitApply` events.
67    Init,
68    /// Received an object during `Init`.
69    ///
70    /// Objects returned here are either from the initial stream using the `StreamingList` strategy,
71    /// or from pages using the `ListWatch` strategy.
72    ///
73    /// These events can be passed up if having a complete set of objects is not a concern.
74    /// If you need to wait for a complete set, please buffer these events until an `InitDone`.
75    InitApply(K),
76    /// The initialisation is complete.
77    ///
78    /// This can be used as a signal to replace buffered store contents atomically.
79    /// No more `InitApply` events will happen until the next `Init` event.
80    ///
81    /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of
82    /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted).
83    InitDone,
84}
85
86impl<K> Event<K> {
87    /// Map each object in an event through a mutator fn
88    ///
89    /// This allows for memory optimizations in watch streams.
90    /// If you are chaining a watch stream into a reflector as an in memory state store,
91    /// you can control the space used by each object by dropping fields.
92    ///
93    /// ```no_run
94    /// use k8s_openapi::api::core::v1::Pod;
95    /// use kube::ResourceExt;
96    /// # use kube::runtime::watcher::Event;
97    /// # let event: Event<Pod> = todo!();
98    /// event.modify(|pod| {
99    ///     pod.managed_fields_mut().clear();
100    ///     pod.annotations_mut().clear();
101    ///     pod.status = None;
102    /// });
103    /// ```
104    #[must_use]
105    pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
106        match &mut self {
107            Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj),
108            Self::Init | Self::InitDone => {} // markers, nothing to modify
109        }
110        self
111    }
112}
113
114#[derive(Educe, Default)]
115#[educe(Debug)]
116/// The internal finite state machine driving the [`watcher`]
117enum State<K> {
118    /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
119    #[default]
120    Empty,
121    /// The Watcher is in the process of paginating through the initial LIST
122    InitPage {
123        continue_token: Option<String>,
124        objects: VecDeque<K>,
125        last_bookmark: Option<String>,
126    },
127    /// Kubernetes 1.27 Streaming Lists
128    /// The initial watch is in progress
129    InitialWatch {
130        #[educe(Debug(ignore))]
131        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
132    },
133    /// The initial LIST was successful, so we should move on to starting the actual watch.
134    InitListed { resource_version: String },
135    /// The watch is in progress, from this point we just return events from the server.
136    ///
137    /// If the connection is disrupted then we propagate the error but try to restart the watch stream by
138    /// returning to the `InitListed` state.
139    /// If we fall out of the K8s watch window then we propagate the error and fall back doing a re-list
140    /// with `Empty`.
141    Watching {
142        resource_version: String,
143        #[educe(Debug(ignore))]
144        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
145    },
146}
147
148/// Used to control whether the watcher receives the full object, or only the
149/// metadata
150trait ApiMode {
151    type Value: Clone;
152
153    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
154    async fn watch(
155        &self,
156        wp: &WatchParams,
157        version: &str,
158    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
159}
160
161/// A wrapper around the `Api` of a `Resource` type that when used by the
162/// watcher will return the entire (full) object
163struct FullObject<'a, K> {
164    api: &'a Api<K>,
165}
166
167/// Configurable list semantics for `watcher` relists
168#[derive(Clone, Default, Debug, PartialEq)]
169pub enum ListSemantic {
170    /// List calls perform a full quorum read for most recent results
171    ///
172    /// Prefer this if you have strong consistency requirements. Note that this
173    /// is more taxing for the apiserver and can be less scalable for the cluster.
174    ///
175    /// If you are observing large resource sets (such as congested `Controller` cases),
176    /// you typically have a delay between the list call completing, and all the events
177    /// getting processed. In such cases, it is probably worth picking `Any` over `MostRecent`,
178    /// as your events are not guaranteed to be up-to-date by the time you get to them anyway.
179    #[default]
180    MostRecent,
181
182    /// List calls returns cached results from apiserver
183    ///
184    /// This is faster and much less taxing on the apiserver, but can result
185    /// in much older results than has previously observed for `Restarted` events,
186    /// particularly in HA configurations, due to partitions or stale caches.
187    ///
188    /// This option makes the most sense for controller usage where events have
189    /// some delay between being seen by the runtime, and it being sent to the reconciler.
190    Any,
191}
192
193/// Configurable watcher listwatch semantics
194
195#[derive(Clone, Default, Debug, PartialEq)]
196pub enum InitialListStrategy {
197    /// List first, then watch from given resource version
198    ///
199    /// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
200    /// When using this mode, you can configure the `page_size` on the watcher.
201    #[default]
202    ListWatch,
203    /// Kubernetes 1.27 Streaming Lists
204    ///
205    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
206    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
207    StreamingList,
208}
209
210/// Accumulates all options that can be used on the watcher invocation.
211#[derive(Clone, Debug, PartialEq)]
212pub struct Config {
213    /// A selector to restrict the list of returned objects by their labels.
214    ///
215    /// Defaults to everything if `None`.
216    pub label_selector: Option<String>,
217
218    /// A selector to restrict the list of returned objects by their fields.
219    ///
220    /// Defaults to everything if `None`.
221    pub field_selector: Option<String>,
222
223    /// Timeout for the list/watch call.
224    ///
225    /// This limits the duration of the call, regardless of any activity or inactivity.
226    /// If unset for a watch call, we will use 290s.
227    /// We limit this to 295s due to [inherent watch limitations](https://github.com/kubernetes/kubernetes/issues/6513).
228    pub timeout: Option<u32>,
229
230    /// Semantics for list calls.
231    ///
232    /// Configures re-list for performance vs. consistency.
233    ///
234    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
235    pub list_semantic: ListSemantic,
236
237    /// Control how the watcher fetches the initial list of objects.
238    ///
239    /// - `ListWatch`: The watcher will fetch the initial list of objects using a list call.
240    /// - `StreamingList`: The watcher will fetch the initial list of objects using a watch call.
241    ///
242    /// `StreamingList` is more efficient than `ListWatch`, but it requires the server to support
243    /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
244    ///
245    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
246    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
247    pub initial_list_strategy: InitialListStrategy,
248
249    /// Maximum number of objects retrieved per list operation resyncs.
250    ///
251    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
252    /// API roundtrips to complete.
253    ///
254    /// Defaults to 500. Note that `None` represents unbounded.
255    ///
256    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
257    pub page_size: Option<u32>,
258
259    /// Enables watch events with type "BOOKMARK".
260    ///
261    /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
262    /// This is default enabled and should generally not be turned off.
263    pub bookmarks: bool,
264}
265
266impl Default for Config {
267    fn default() -> Self {
268        Self {
269            bookmarks: true,
270            label_selector: None,
271            field_selector: None,
272            timeout: None,
273            list_semantic: ListSemantic::default(),
274            // same default page size limit as client-go
275            // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
276            page_size: Some(500),
277            initial_list_strategy: InitialListStrategy::ListWatch,
278        }
279    }
280}
281
282/// Builder interface to Config
283///
284/// Usage:
285/// ```
286/// use kube::runtime::watcher::Config;
287/// let wc = Config::default()
288///     .timeout(60)
289///     .labels("kubernetes.io/lifecycle=spot");
290/// ```
291impl Config {
292    /// Configure the timeout for list/watch calls
293    ///
294    /// This limits the duration of the call, regardless of any activity or inactivity.
295    /// Defaults to 290s
296    #[must_use]
297    pub fn timeout(mut self, timeout_secs: u32) -> Self {
298        self.timeout = Some(timeout_secs);
299        self
300    }
301
302    /// Configure the selector to restrict the list of returned objects by their fields.
303    ///
304    /// Defaults to everything.
305    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
306    /// The server only supports a limited number of field queries per type.
307    #[must_use]
308    pub fn fields(mut self, field_selector: &str) -> Self {
309        self.field_selector = Some(field_selector.to_string());
310        self
311    }
312
313    /// Configure the selector to restrict the list of returned objects by their labels.
314    ///
315    /// Defaults to everything.
316    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
317    #[must_use]
318    pub fn labels(mut self, label_selector: &str) -> Self {
319        self.label_selector = Some(label_selector.to_string());
320        self
321    }
322
323    /// Configure typed label selectors
324    ///
325    /// Configure typed selectors from [`Selector`](kube_client::core::Selector) and [`Expression`](kube_client::core::Expression) lists.
326    ///
327    /// ```
328    /// use kube_runtime::watcher::Config;
329    /// use kube_client::core::{Expression, Selector, ParseExpressionError};
330    /// use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
331    /// let selector: Selector = Expression::In("env".into(), ["development".into(), "sandbox".into()].into()).into();
332    /// let cfg = Config::default().labels_from(&selector);
333    /// let cfg = Config::default().labels_from(&Expression::Exists("foo".into()).into());
334    /// let selector: Selector = LabelSelector::default().try_into()?;
335    /// let cfg = Config::default().labels_from(&selector);
336    /// # Ok::<(), ParseExpressionError>(())
337    ///```
338    #[must_use]
339    pub fn labels_from(mut self, selector: &Selector) -> Self {
340        self.label_selector = Some(selector.to_string());
341        self
342    }
343
344    /// Sets list semantic to configure re-list performance and consistency
345    ///
346    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
347    #[must_use]
348    pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
349        self.list_semantic = semantic;
350        self
351    }
352
353    /// Sets list semantic to `Any` to improve list performance
354    ///
355    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
356    #[must_use]
357    pub fn any_semantic(self) -> Self {
358        self.list_semantic(ListSemantic::Any)
359    }
360
361    /// Disables watch bookmarks to simplify watch handling
362    ///
363    /// This is not recommended to use with production watchers as it can cause desyncs.
364    /// See [#219](https://github.com/kube-rs/kube/issues/219) for details.
365    #[must_use]
366    pub fn disable_bookmarks(mut self) -> Self {
367        self.bookmarks = false;
368        self
369    }
370
371    /// Limits the number of objects retrieved in each list operation during resync.
372    ///
373    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
374    /// API roundtrips to complete.
375    ///
376    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
377    #[must_use]
378    pub fn page_size(mut self, page_size: u32) -> Self {
379        self.page_size = Some(page_size);
380        self
381    }
382
383    /// Kubernetes 1.27 Streaming Lists
384    /// Sets list semantic to `Stream` to make use of watch bookmarks
385    #[must_use]
386    pub fn streaming_lists(mut self) -> Self {
387        self.initial_list_strategy = InitialListStrategy::StreamingList;
388        self
389    }
390
391    /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
392    fn to_list_params(&self) -> ListParams {
393        let (resource_version, version_match) = match self.list_semantic {
394            ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
395            ListSemantic::MostRecent => (None, None),
396        };
397        ListParams {
398            label_selector: self.label_selector.clone(),
399            field_selector: self.field_selector.clone(),
400            timeout: self.timeout,
401            version_match,
402            resource_version,
403            // The watcher handles pagination internally.
404            limit: self.page_size,
405            continue_token: None,
406        }
407    }
408
409    /// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests.
410    fn to_watch_params(&self, phase: WatchPhase) -> WatchParams {
411        WatchParams {
412            label_selector: self.label_selector.clone(),
413            field_selector: self.field_selector.clone(),
414            timeout: self.timeout,
415            bookmarks: self.bookmarks,
416            send_initial_events: phase == WatchPhase::Initial
417                && self.initial_list_strategy == InitialListStrategy::StreamingList,
418        }
419    }
420}
421
422/// Distinguishes between initial watch and resumed watch for streaming lists.
423///
424/// This is used to determine whether to set `sendInitialEvents=true` in watch requests.
425/// Only initial watches should request initial events; reconnections should not.
426#[derive(Clone, Copy, Debug, PartialEq, Eq)]
427enum WatchPhase {
428    /// Initial watch from `State::Empty` - requests initial events for streaming lists
429    Initial,
430    /// Resumed watch from `State::InitListed` - does not request initial events
431    Resumed,
432}
433
434impl<K> ApiMode for FullObject<'_, K>
435where
436    K: Clone + Debug + DeserializeOwned + Send + 'static,
437{
438    type Value = K;
439
440    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
441        self.api.list(lp).await
442    }
443
444    async fn watch(
445        &self,
446        wp: &WatchParams,
447        version: &str,
448    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
449        self.api.watch(wp, version).await.map(StreamExt::boxed)
450    }
451}
452
453/// A wrapper around the `Api` of a `Resource` type that when used by the
454/// watcher will return only the metadata associated with an object
455struct MetaOnly<'a, K> {
456    api: &'a Api<K>,
457}
458
459impl<K> ApiMode for MetaOnly<'_, K>
460where
461    K: Clone + Debug + DeserializeOwned + Send + 'static,
462{
463    type Value = PartialObjectMeta<K>;
464
465    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
466        self.api.list_metadata(lp).await
467    }
468
469    async fn watch(
470        &self,
471        wp: &WatchParams,
472        version: &str,
473    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
474        self.api.watch_metadata(wp, version).await.map(StreamExt::boxed)
475    }
476}
477
478/// Client-side idle timeout margin added on top of the server-side watch timeout.
479///
480/// The server closes the watch stream after the configured timeout (default 290s).
481/// We add a small margin so the client detects dead connections where the
482/// server's close never arrives (e.g. network failure).
483const WATCH_IDLE_TIMEOUT_MARGIN: Duration = Duration::from_secs(5);
484
485/// Poll the next item from a watch stream with an idle timeout.
486///
487/// Returns `None` when the stream ends **or** when no item arrives within
488/// `timeout + WATCH_IDLE_TIMEOUT_MARGIN`, causing the watcher to
489/// treat the connection as dead and reconnect.
490async fn next_with_idle_timeout<S, T>(stream: &mut S, timeout: Option<u32>) -> Option<T>
491where
492    S: Stream<Item = T> + Unpin,
493{
494    let idle_timeout = Duration::from_secs(u64::from(timeout.unwrap_or(290))) + WATCH_IDLE_TIMEOUT_MARGIN;
495    match tokio::time::timeout(idle_timeout, stream.next()).await {
496        Ok(item) => item,
497        Err(_elapsed) => {
498            debug!(
499                timeout_secs = idle_timeout.as_secs(),
500                "watch stream idle timeout, reconnecting"
501            );
502            None
503        }
504    }
505}
506
507/// Progresses the watcher a single step, returning (event, state)
508///
509/// This function should be trampolined: if event == `None`
510/// then the function should be called again until it returns a Some.
511#[allow(clippy::too_many_lines)] // for now
512async fn step_trampolined<A>(
513    api: &A,
514    wc: &Config,
515    state: State<A::Value>,
516) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
517where
518    A: ApiMode,
519    A::Value: Resource + 'static,
520{
521    match state {
522        State::Empty => match wc.initial_list_strategy {
523            InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
524                continue_token: None,
525                objects: VecDeque::default(),
526                last_bookmark: None,
527            }),
528            InitialListStrategy::StreamingList => {
529                match api.watch(&wc.to_watch_params(WatchPhase::Initial), "0").await {
530                    Ok(stream) => (None, State::InitialWatch { stream }),
531                    Err(err) => {
532                        if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
533                            warn!("watch initlist error with 403: {err:?}");
534                        } else {
535                            debug!("watch initlist error: {err:?}");
536                        }
537                        (Some(Err(Error::WatchStartFailed(err))), State::default())
538                    }
539                }
540            }
541        },
542        State::InitPage {
543            continue_token,
544            mut objects,
545            last_bookmark,
546        } => {
547            if let Some(next) = objects.pop_front() {
548                return (Some(Ok(Event::InitApply(next))), State::InitPage {
549                    continue_token,
550                    objects,
551                    last_bookmark,
552                });
553            }
554            // check if we need to perform more pages
555            if continue_token.is_none()
556                && let Some(resource_version) = last_bookmark
557            {
558                // we have drained the last page - move on to next stage
559                return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
560            }
561            let mut lp = wc.to_list_params();
562            lp.continue_token = continue_token;
563            match api.list(&lp).await {
564                Ok(list) => {
565                    let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
566                    let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
567                    if last_bookmark.is_none() && continue_token.is_none() {
568                        return (Some(Err(Error::NoResourceVersion)), State::Empty);
569                    }
570                    // Buffer page here, causing us to return to this enum branch (State::InitPage)
571                    // until the objects buffer has drained
572                    (None, State::InitPage {
573                        continue_token,
574                        objects: list.items.into_iter().collect(),
575                        last_bookmark,
576                    })
577                }
578                Err(err) => {
579                    if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
580                        warn!("watch list error with 403: {err:?}");
581                    } else {
582                        debug!("watch list error: {err:?}");
583                    }
584                    (Some(Err(Error::InitialListFailed(err))), State::Empty)
585                }
586            }
587        }
588        State::InitialWatch { mut stream } => {
589            match next_with_idle_timeout(&mut stream, wc.timeout).await {
590                Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
591                    (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream })
592                }
593                Some(Ok(WatchEvent::Deleted(_obj))) => {
594                    // Kubernetes claims these events are impossible
595                    // https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
596                    error!("got deleted event during initial watch. this is a bug");
597                    (None, State::InitialWatch { stream })
598                }
599                Some(Ok(WatchEvent::Bookmark(bm))) => {
600                    let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
601                    if marks_initial_end {
602                        (Some(Ok(Event::InitDone)), State::Watching {
603                            resource_version: bm.metadata.resource_version,
604                            stream,
605                        })
606                    } else {
607                        (None, State::InitialWatch { stream })
608                    }
609                }
610                Some(Ok(WatchEvent::Error(err))) => {
611                    // HTTP GONE, means we have desynced and need to start over and re-list :(
612                    let new_state = if err.code == 410 {
613                        State::default()
614                    } else {
615                        State::InitialWatch { stream }
616                    };
617                    if err.code == 403 {
618                        warn!("watcher watchevent error 403: {err:?}");
619                    } else {
620                        debug!("error watchevent error: {err:?}");
621                    }
622                    (Some(Err(Error::WatchError(err.boxed()))), new_state)
623                }
624                Some(Err(err)) => {
625                    if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
626                        warn!("watcher error 403: {err:?}");
627                    } else {
628                        debug!("watcher error: {err:?}");
629                    }
630                    (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream })
631                }
632                None => (None, State::default()),
633            }
634        }
635        State::InitListed { resource_version } => {
636            match api
637                .watch(&wc.to_watch_params(WatchPhase::Resumed), &resource_version)
638                .await
639            {
640                Ok(stream) => (None, State::Watching {
641                    resource_version,
642                    stream,
643                }),
644                Err(err) => {
645                    if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
646                        warn!("watch initlist error with 403: {err:?}");
647                    } else {
648                        debug!("watch initlist error: {err:?}");
649                    }
650                    (Some(Err(Error::WatchStartFailed(err))), State::InitListed {
651                        resource_version,
652                    })
653                }
654            }
655        }
656        State::Watching {
657            resource_version,
658            mut stream,
659        } => match next_with_idle_timeout(&mut stream, wc.timeout).await {
660            Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
661                let resource_version = obj.resource_version().unwrap_or_default();
662                if resource_version.is_empty() {
663                    (Some(Err(Error::NoResourceVersion)), State::default())
664                } else {
665                    (Some(Ok(Event::Apply(obj))), State::Watching {
666                        resource_version,
667                        stream,
668                    })
669                }
670            }
671            Some(Ok(WatchEvent::Deleted(obj))) => {
672                let resource_version = obj.resource_version().unwrap_or_default();
673                if resource_version.is_empty() {
674                    (Some(Err(Error::NoResourceVersion)), State::default())
675                } else {
676                    (Some(Ok(Event::Delete(obj))), State::Watching {
677                        resource_version,
678                        stream,
679                    })
680                }
681            }
682            Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
683                resource_version: bm.metadata.resource_version,
684                stream,
685            }),
686            Some(Ok(WatchEvent::Error(err))) => {
687                // HTTP GONE, means we have desynced and need to start over and re-list :(
688                let new_state = if err.code == 410 {
689                    State::default()
690                } else {
691                    State::Watching {
692                        resource_version,
693                        stream,
694                    }
695                };
696                if err.code == 403 {
697                    warn!("watcher watchevent error 403: {err:?}");
698                } else {
699                    debug!("error watchevent error: {err:?}");
700                }
701                (Some(Err(Error::WatchError(err.boxed()))), new_state)
702            }
703            Some(Err(err)) => {
704                if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
705                    warn!("watcher error 403: {err:?}");
706                } else {
707                    debug!("watcher error: {err:?}");
708                }
709                (Some(Err(Error::WatchFailed(err))), State::Watching {
710                    resource_version,
711                    stream,
712                })
713            }
714            None => (None, State::InitListed { resource_version }),
715        },
716    }
717}
718
719/// Trampoline helper for `step_trampolined`
720async fn step<A>(
721    api: &A,
722    config: &Config,
723    mut state: State<A::Value>,
724) -> (Result<Event<A::Value>>, State<A::Value>)
725where
726    A: ApiMode,
727    A::Value: Resource + 'static,
728{
729    loop {
730        match step_trampolined(api, config, state).await {
731            (Some(result), new_state) => return (result, new_state),
732            (None, new_state) => state = new_state,
733        }
734    }
735}
736
737/// Watches a Kubernetes Resource for changes continuously
738///
739/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
740///
741/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
742/// You can apply your own backoff by not polling the stream for a duration after errors.
743/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
744/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
745/// will terminate eagerly as soon as they receive an [`Err`].
746///
747/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
748/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
749///
750/// ```
751/// use kube::{
752///   api::{Api, ResourceExt}, Client,
753///   runtime::{watcher, WatchStreamExt}
754/// };
755/// use k8s_openapi::api::core::v1::Pod;
756/// use futures::TryStreamExt;
757///
758/// # async fn wrapper() -> Result<(), watcher::Error> {
759/// #   let client: Client = todo!();
760/// let pods: Api<Pod> = Api::namespaced(client, "apps");
761///
762/// watcher(pods, watcher::Config::default()).applied_objects()
763///     .try_for_each(|p| async move {
764///         println!("Applied: {}", p.name_any());
765///        Ok(())
766///     })
767///     .await?;
768/// # Ok(())
769/// # }
770/// ```
771/// [`WatchStreamExt`]: super::WatchStreamExt
772/// [`reflector`]: super::reflector::reflector
773/// [`Api::watch`]: kube_client::Api::watch
774///
775/// # Recovery
776///
777/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
778/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
779/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
780///
781/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
782/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
783/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
784/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
785/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
786#[doc(alias = "informer")]
787pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
788    api: Api<K>,
789    watcher_config: Config,
790) -> impl Stream<Item = Result<Event<K>>> + Send {
791    futures::stream::unfold(
792        (api, watcher_config, State::default()),
793        |(api, watcher_config, state)| async {
794            let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
795            Some((event, (api, watcher_config, state)))
796        },
797    )
798}
799
800/// Watches a Kubernetes Resource for changes continuously and receives only the
801/// metadata
802///
803/// Compared to [`Api::watch_metadata`], this automatically tries to recover the stream upon errors.
804///
805/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
806/// You can apply your own backoff by not polling the stream for a duration after errors.
807/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
808/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
809/// will terminate eagerly as soon as they receive an [`Err`].
810///
811/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
812/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
813///
814/// ```
815/// use kube::{
816///   api::{Api, ResourceExt}, Client,
817///   runtime::{watcher, metadata_watcher, WatchStreamExt}
818/// };
819/// use k8s_openapi::api::core::v1::Pod;
820/// use futures::TryStreamExt;
821///
822/// # async fn wrapper() -> Result<(), watcher::Error> {
823/// #   let client: Client = todo!();
824/// let pods: Api<Pod> = Api::namespaced(client, "apps");
825///
826/// metadata_watcher(pods, watcher::Config::default()).applied_objects()
827///         .try_for_each(|p| async move {
828///          println!("Applied: {}", p.name_any());
829///             Ok(())
830///         })
831///         .await?;
832/// #   Ok(())
833/// # }
834/// ```
835/// [`WatchStreamExt`]: super::WatchStreamExt
836/// [`reflector`]: super::reflector::reflector
837/// [`Api::watch`]: kube_client::Api::watch
838///
839/// # Recovery
840///
841/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
842/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
843/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
844///
845/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
846/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
847/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
848/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
849/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
850#[deprecated(
851    since = "3.1.0",
852    note = "Use `watcher(Api::<PartialObjectMeta<K>>::all(client), config)` instead. \
853            `Api<PartialObjectMeta<K>>` now automatically uses metadata-only requests."
854)]
855pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
856    api: Api<K>,
857    watcher_config: Config,
858) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
859    futures::stream::unfold(
860        (api, watcher_config, State::default()),
861        |(api, watcher_config, state)| async {
862            let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
863            Some((event, (api, watcher_config, state)))
864        },
865    )
866}
867
868/// Watch a single named object for updates
869///
870/// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found).
871///
872/// Often invoked indirectly via [`await_condition`](crate::wait::await_condition()).
873///
874/// ## Scope Warning
875///
876/// When using this with an `Api::all` on namespaced resources there is a chance of duplicated names.
877/// To avoid getting confusing / wrong answers for this, use `Api::namespaced` bound to a specific namespace
878/// when watching for transitions to namespaced objects.
879pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
880    api: Api<K>,
881    name: &str,
882) -> impl Stream<Item = Result<Option<K>>> + Send + use<K> {
883    // filtering by object name in given scope, so there's at most one matching object
884    // footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
885    let fields = format!("metadata.name={name}");
886    watcher(api, Config::default().fields(&fields))
887        // The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
888        // sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
889        // `None` is emitted when the InitDone event is received.
890        //
891        // The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
892        // checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
893        // could happen given K8S events aren't guaranteed delivery.
894        .scan(false, |obj_seen, event| {
895            if matches!(event, Ok(Event::Init)) {
896                *obj_seen = false;
897            } else if matches!(event, Ok(Event::InitApply(_))) {
898                *obj_seen = true;
899            }
900            future::ready(Some((*obj_seen, event)))
901        })
902        .filter_map(|(obj_seen, event)| async move {
903            match event {
904                // Pass up `Some` for Found / Updated
905                Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
906                // Pass up `None` for Deleted
907                Ok(Event::Delete(_)) => Some(Ok(None)),
908                // Pass up `None` if the object wasn't seen in the initial list
909                Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
910                // Ignore marker events
911                Ok(Event::Init | Event::InitDone) => None,
912                // Bubble up errors
913                Err(err) => Some(Err(err)),
914            }
915        })
916}
917
918/// A struct with a manually configured exponential backoff
919pub struct ExponentialBackoff {
920    inner: backon::ExponentialBackoff,
921    builder: backon::ExponentialBuilder,
922}
923
924impl ExponentialBackoff {
925    fn new(min_delay: Duration, max_delay: Duration, factor: f32, enable_jitter: bool) -> Self {
926        let builder = backon::ExponentialBuilder::default()
927            .with_min_delay(min_delay)
928            .with_max_delay(max_delay)
929            .with_factor(factor)
930            .without_max_times();
931
932        let builder = if enable_jitter {
933            builder.with_jitter()
934        } else {
935            builder
936        };
937
938        Self {
939            inner: builder.build(),
940            builder,
941        }
942    }
943}
944
945impl Backoff for ExponentialBackoff {
946    fn reset(&mut self) {
947        self.inner = self.builder.build();
948    }
949}
950
951impl Iterator for ExponentialBackoff {
952    type Item = Duration;
953
954    fn next(&mut self) -> Option<Self::Item> {
955        self.inner.next()
956    }
957}
958
959impl From<backon::ExponentialBuilder> for ExponentialBackoff {
960    fn from(builder: backon::ExponentialBuilder) -> Self {
961        Self {
962            inner: builder.build(),
963            builder,
964        }
965    }
966}
967
968/// Default watcher backoff inspired by Kubernetes' client-go.
969///
970/// The parameters currently optimize for being kind to struggling apiservers.
971/// The exact parameters are taken from
972/// [client-go's reflector source](https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181)
973/// and should not be considered stable.
974///
975/// This struct implements [`Backoff`] and is the default strategy used
976/// when calling `WatchStreamExt::default_backoff`. If you need to create
977/// this manually then [`DefaultBackoff::default`] can be used.
978pub struct DefaultBackoff(Strategy);
979type Strategy = ResetTimerBackoff<ExponentialBackoff>;
980
981impl Default for DefaultBackoff {
982    fn default() -> Self {
983        Self(ResetTimerBackoff::new(
984            ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true),
985            Duration::from_secs(120),
986        ))
987    }
988}
989
990impl Iterator for DefaultBackoff {
991    type Item = Duration;
992
993    fn next(&mut self) -> Option<Self::Item> {
994        self.0.next()
995    }
996}
997
998impl Backoff for DefaultBackoff {
999    fn reset(&mut self) {
1000        self.0.reset();
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007
1008    #[test]
1009    fn to_watch_params_initial_phase_with_streaming_list_sets_send_initial_events() {
1010        let config = Config::default().streaming_lists();
1011        let params = config.to_watch_params(WatchPhase::Initial);
1012        assert!(params.send_initial_events);
1013    }
1014
1015    #[test]
1016    fn to_watch_params_resumed_phase_with_streaming_list_does_not_set_send_initial_events() {
1017        let config = Config::default().streaming_lists();
1018        let params = config.to_watch_params(WatchPhase::Resumed);
1019        assert!(!params.send_initial_events);
1020    }
1021
1022    #[test]
1023    fn to_watch_params_listwatch_mode_does_not_set_send_initial_events() {
1024        let config = Config::default(); // ListWatch mode
1025        let params_initial = config.to_watch_params(WatchPhase::Initial);
1026        let params_resumed = config.to_watch_params(WatchPhase::Resumed);
1027        assert!(!params_initial.send_initial_events);
1028        assert!(!params_resumed.send_initial_events);
1029    }
1030
1031    fn approx_eq(a: Duration, b: Duration) -> bool {
1032        a.abs_diff(b) < Duration::from_micros(100)
1033    }
1034
1035    #[test]
1036    fn exponential_backoff_without_jitter() {
1037        let mut backoff =
1038            ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(1), 2.0, false);
1039
1040        assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(100)));
1041        assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(200)));
1042        assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(400)));
1043    }
1044
1045    #[test]
1046    fn exponential_backoff_with_jitter_applies_randomness() {
1047        let mut backoff =
1048            ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(1), 2.0, true);
1049
1050        let delays: Vec<_> = (0..5).filter_map(|_| backoff.next()).collect();
1051
1052        // All delays should be positive
1053        for d in &delays {
1054            assert!(*d > Duration::ZERO);
1055        }
1056
1057        // With jitter, at least one delay should differ from exact values
1058        let exact_values = [
1059            Duration::from_millis(100),
1060            Duration::from_millis(200),
1061            Duration::from_millis(400),
1062            Duration::from_millis(800),
1063            Duration::from_secs(1),
1064        ];
1065
1066        let all_exact = delays
1067            .iter()
1068            .zip(exact_values.iter())
1069            .all(|(d, e)| approx_eq(*d, *e));
1070
1071        assert!(
1072            !all_exact,
1073            "With jitter enabled, delays should not all match exact exponential values"
1074        );
1075    }
1076
1077    #[tokio::test(start_paused = true)]
1078    async fn idle_timeout_returns_item_when_stream_has_data() {
1079        let mut stream = futures::stream::iter(vec![1, 2, 3]);
1080        let result = next_with_idle_timeout(&mut stream, Some(290)).await;
1081        assert_eq!(result, Some(1));
1082    }
1083
1084    #[tokio::test(start_paused = true)]
1085    async fn idle_timeout_returns_none_on_dead_connection() {
1086        let mut stream = futures::stream::pending::<i32>();
1087        // NB tokio auto-advances virtual time when the runtime is idle so next_with_idle_timeout resolves immediately
1088        let result = next_with_idle_timeout(&mut stream, Some(290)).await;
1089        assert_eq!(result, None);
1090    }
1091
1092    #[tokio::test(start_paused = true)]
1093    async fn idle_timeout_returns_none_when_stream_ends() {
1094        let mut stream = futures::stream::empty::<i32>();
1095        let result = next_with_idle_timeout(&mut stream, Some(290)).await;
1096        assert_eq!(result, None);
1097    }
1098}