kube_runtime/
watcher.rs

1//! Watches a Kubernetes Resource for changes, with error recovery
2//!
3//! See [`watcher`] for the primary entry point.
4
5use crate::utils::{Backoff, ResetTimerBackoff};
6
7use backon::BackoffBuilder;
8use educe::Educe;
9use futures::{Stream, StreamExt, stream::BoxStream};
10use kube_client::{
11    Api, Error as ClientErr,
12    api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
13    core::{ObjectList, Selector, metadata::PartialObjectMeta},
14    error::Status,
15};
16use serde::de::DeserializeOwned;
17use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
18use thiserror::Error;
19use tracing::{debug, error, warn};
20
21#[derive(Debug, Error)]
22pub enum Error {
23    #[error("failed to perform initial object list: {0}")]
24    InitialListFailed(#[source] kube_client::Error),
25    #[error("failed to start watching object: {0}")]
26    WatchStartFailed(#[source] kube_client::Error),
27    #[error("error returned by apiserver during watch: {0}")]
28    WatchError(#[source] Box<Status>),
29    #[error("watch stream failed: {0}")]
30    WatchFailed(#[source] kube_client::Error),
31    #[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
32    NoResourceVersion,
33}
34pub type Result<T, E = Error> = std::result::Result<T, E>;
35
36#[derive(Debug, Clone)]
37/// Watch events returned from the [`watcher`]
38pub enum Event<K> {
39    /// An object was added or modified
40    Apply(K),
41    /// An object was deleted
42    ///
43    /// NOTE: This should not be used for managing persistent state elsewhere, since
44    /// events may be lost if the watcher is unavailable. Use Finalizers instead.
45    Delete(K),
46    /// The watch stream was restarted.
47    ///
48    /// A series of `InitApply` events are expected to follow until all matching objects
49    /// have been listed. This event can be used to prepare a buffer for `InitApply` events.
50    Init,
51    /// Received an object during `Init`.
52    ///
53    /// Objects returned here are either from the initial stream using the `StreamingList` strategy,
54    /// or from pages using the `ListWatch` strategy.
55    ///
56    /// These events can be passed up if having a complete set of objects is not a concern.
57    /// If you need to wait for a complete set, please buffer these events until an `InitDone`.
58    InitApply(K),
59    /// The initialisation is complete.
60    ///
61    /// This can be used as a signal to replace buffered store contents atomically.
62    /// No more `InitApply` events will happen until the next `Init` event.
63    ///
64    /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of
65    /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted).
66    InitDone,
67}
68
69impl<K> Event<K> {
70    /// Map each object in an event through a mutator fn
71    ///
72    /// This allows for memory optimizations in watch streams.
73    /// If you are chaining a watch stream into a reflector as an in memory state store,
74    /// you can control the space used by each object by dropping fields.
75    ///
76    /// ```no_run
77    /// use k8s_openapi::api::core::v1::Pod;
78    /// use kube::ResourceExt;
79    /// # use kube::runtime::watcher::Event;
80    /// # let event: Event<Pod> = todo!();
81    /// event.modify(|pod| {
82    ///     pod.managed_fields_mut().clear();
83    ///     pod.annotations_mut().clear();
84    ///     pod.status = None;
85    /// });
86    /// ```
87    #[must_use]
88    pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
89        match &mut self {
90            Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj),
91            Self::Init | Self::InitDone => {} // markers, nothing to modify
92        }
93        self
94    }
95}
96
97#[derive(Educe, Default)]
98#[educe(Debug)]
99/// The internal finite state machine driving the [`watcher`]
100enum State<K> {
101    /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
102    #[default]
103    Empty,
104    /// The Watcher is in the process of paginating through the initial LIST
105    InitPage {
106        continue_token: Option<String>,
107        objects: VecDeque<K>,
108        last_bookmark: Option<String>,
109    },
110    /// Kubernetes 1.27 Streaming Lists
111    /// The initial watch is in progress
112    InitialWatch {
113        #[educe(Debug(ignore))]
114        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
115    },
116    /// The initial LIST was successful, so we should move on to starting the actual watch.
117    InitListed { resource_version: String },
118    /// The watch is in progress, from this point we just return events from the server.
119    ///
120    /// If the connection is disrupted then we propagate the error but try to restart the watch stream by
121    /// returning to the `InitListed` state.
122    /// If we fall out of the K8s watch window then we propagate the error and fall back doing a re-list
123    /// with `Empty`.
124    Watching {
125        resource_version: String,
126        #[educe(Debug(ignore))]
127        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
128    },
129}
130
131/// Used to control whether the watcher receives the full object, or only the
132/// metadata
133trait ApiMode {
134    type Value: Clone;
135
136    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
137    async fn watch(
138        &self,
139        wp: &WatchParams,
140        version: &str,
141    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
142}
143
144/// A wrapper around the `Api` of a `Resource` type that when used by the
145/// watcher will return the entire (full) object
146struct FullObject<'a, K> {
147    api: &'a Api<K>,
148}
149
150/// Configurable list semantics for `watcher` relists
151#[derive(Clone, Default, Debug, PartialEq)]
152pub enum ListSemantic {
153    /// List calls perform a full quorum read for most recent results
154    ///
155    /// Prefer this if you have strong consistency requirements. Note that this
156    /// is more taxing for the apiserver and can be less scalable for the cluster.
157    ///
158    /// If you are observing large resource sets (such as congested `Controller` cases),
159    /// you typically have a delay between the list call completing, and all the events
160    /// getting processed. In such cases, it is probably worth picking `Any` over `MostRecent`,
161    /// as your events are not guaranteed to be up-to-date by the time you get to them anyway.
162    #[default]
163    MostRecent,
164
165    /// List calls returns cached results from apiserver
166    ///
167    /// This is faster and much less taxing on the apiserver, but can result
168    /// in much older results than has previously observed for `Restarted` events,
169    /// particularly in HA configurations, due to partitions or stale caches.
170    ///
171    /// This option makes the most sense for controller usage where events have
172    /// some delay between being seen by the runtime, and it being sent to the reconciler.
173    Any,
174}
175
176/// Configurable watcher listwatch semantics
177
178#[derive(Clone, Default, Debug, PartialEq)]
179pub enum InitialListStrategy {
180    /// List first, then watch from given resouce version
181    ///
182    /// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
183    /// When using this mode, you can configure the `page_size` on the watcher.
184    #[default]
185    ListWatch,
186    /// Kubernetes 1.27 Streaming Lists
187    ///
188    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
189    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
190    StreamingList,
191}
192
193/// Accumulates all options that can be used on the watcher invocation.
194#[derive(Clone, Debug, PartialEq)]
195pub struct Config {
196    /// A selector to restrict the list of returned objects by their labels.
197    ///
198    /// Defaults to everything if `None`.
199    pub label_selector: Option<String>,
200
201    /// A selector to restrict the list of returned objects by their fields.
202    ///
203    /// Defaults to everything if `None`.
204    pub field_selector: Option<String>,
205
206    /// Timeout for the list/watch call.
207    ///
208    /// This limits the duration of the call, regardless of any activity or inactivity.
209    /// If unset for a watch call, we will use 290s.
210    /// We limit this to 295s due to [inherent watch limitations](https://github.com/kubernetes/kubernetes/issues/6513).
211    pub timeout: Option<u32>,
212
213    /// Semantics for list calls.
214    ///
215    /// Configures re-list for performance vs. consistency.
216    ///
217    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
218    pub list_semantic: ListSemantic,
219
220    /// Control how the watcher fetches the initial list of objects.
221    ///
222    /// - `ListWatch`: The watcher will fetch the initial list of objects using a list call.
223    /// - `StreamingList`: The watcher will fetch the initial list of objects using a watch call.
224    ///
225    /// `StreamingList` is more efficient than `ListWatch`, but it requires the server to support
226    /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
227    ///
228    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
229    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
230    pub initial_list_strategy: InitialListStrategy,
231
232    /// Maximum number of objects retrieved per list operation resyncs.
233    ///
234    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
235    /// API roundtrips to complete.
236    ///
237    /// Defaults to 500. Note that `None` represents unbounded.
238    ///
239    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
240    pub page_size: Option<u32>,
241
242    /// Enables watch events with type "BOOKMARK".
243    ///
244    /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
245    /// This is default enabled and should generally not be turned off.
246    pub bookmarks: bool,
247}
248
249impl Default for Config {
250    fn default() -> Self {
251        Self {
252            bookmarks: true,
253            label_selector: None,
254            field_selector: None,
255            timeout: None,
256            list_semantic: ListSemantic::default(),
257            // same default page size limit as client-go
258            // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
259            page_size: Some(500),
260            initial_list_strategy: InitialListStrategy::ListWatch,
261        }
262    }
263}
264
265/// Builder interface to Config
266///
267/// Usage:
268/// ```
269/// use kube::runtime::watcher::Config;
270/// let wc = Config::default()
271///     .timeout(60)
272///     .labels("kubernetes.io/lifecycle=spot");
273/// ```
274impl Config {
275    /// Configure the timeout for list/watch calls
276    ///
277    /// This limits the duration of the call, regardless of any activity or inactivity.
278    /// Defaults to 290s
279    #[must_use]
280    pub fn timeout(mut self, timeout_secs: u32) -> Self {
281        self.timeout = Some(timeout_secs);
282        self
283    }
284
285    /// Configure the selector to restrict the list of returned objects by their fields.
286    ///
287    /// Defaults to everything.
288    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
289    /// The server only supports a limited number of field queries per type.
290    #[must_use]
291    pub fn fields(mut self, field_selector: &str) -> Self {
292        self.field_selector = Some(field_selector.to_string());
293        self
294    }
295
296    /// Configure the selector to restrict the list of returned objects by their labels.
297    ///
298    /// Defaults to everything.
299    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
300    #[must_use]
301    pub fn labels(mut self, label_selector: &str) -> Self {
302        self.label_selector = Some(label_selector.to_string());
303        self
304    }
305
306    /// Configure typed label selectors
307    ///
308    /// Configure typed selectors from [`Selector`](kube_client::core::Selector) and [`Expression`](kube_client::core::Expression) lists.
309    ///
310    /// ```
311    /// use kube_runtime::watcher::Config;
312    /// use kube_client::core::{Expression, Selector, ParseExpressionError};
313    /// use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
314    /// let selector: Selector = Expression::In("env".into(), ["development".into(), "sandbox".into()].into()).into();
315    /// let cfg = Config::default().labels_from(&selector);
316    /// let cfg = Config::default().labels_from(&Expression::Exists("foo".into()).into());
317    /// let selector: Selector = LabelSelector::default().try_into()?;
318    /// let cfg = Config::default().labels_from(&selector);
319    /// # Ok::<(), ParseExpressionError>(())
320    ///```
321    #[must_use]
322    pub fn labels_from(mut self, selector: &Selector) -> Self {
323        self.label_selector = Some(selector.to_string());
324        self
325    }
326
327    /// Sets list semantic to configure re-list performance and consistency
328    ///
329    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
330    #[must_use]
331    pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
332        self.list_semantic = semantic;
333        self
334    }
335
336    /// Sets list semantic to `Any` to improve list performance
337    ///
338    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
339    #[must_use]
340    pub fn any_semantic(self) -> Self {
341        self.list_semantic(ListSemantic::Any)
342    }
343
344    /// Disables watch bookmarks to simplify watch handling
345    ///
346    /// This is not recommended to use with production watchers as it can cause desyncs.
347    /// See [#219](https://github.com/kube-rs/kube/issues/219) for details.
348    #[must_use]
349    pub fn disable_bookmarks(mut self) -> Self {
350        self.bookmarks = false;
351        self
352    }
353
354    /// Limits the number of objects retrieved in each list operation during resync.
355    ///
356    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
357    /// API roundtrips to complete.
358    ///
359    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
360    #[must_use]
361    pub fn page_size(mut self, page_size: u32) -> Self {
362        self.page_size = Some(page_size);
363        self
364    }
365
366    /// Kubernetes 1.27 Streaming Lists
367    /// Sets list semantic to `Stream` to make use of watch bookmarks
368    #[must_use]
369    pub fn streaming_lists(mut self) -> Self {
370        self.initial_list_strategy = InitialListStrategy::StreamingList;
371        self
372    }
373
374    /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
375    fn to_list_params(&self) -> ListParams {
376        let (resource_version, version_match) = match self.list_semantic {
377            ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
378            ListSemantic::MostRecent => (None, None),
379        };
380        ListParams {
381            label_selector: self.label_selector.clone(),
382            field_selector: self.field_selector.clone(),
383            timeout: self.timeout,
384            version_match,
385            resource_version,
386            // The watcher handles pagination internally.
387            limit: self.page_size,
388            continue_token: None,
389        }
390    }
391
392    /// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests.
393    fn to_watch_params(&self, phase: WatchPhase) -> WatchParams {
394        WatchParams {
395            label_selector: self.label_selector.clone(),
396            field_selector: self.field_selector.clone(),
397            timeout: self.timeout,
398            bookmarks: self.bookmarks,
399            send_initial_events: phase == WatchPhase::Initial
400                && self.initial_list_strategy == InitialListStrategy::StreamingList,
401        }
402    }
403}
404
405/// Distinguishes between initial watch and resumed watch for streaming lists.
406///
407/// This is used to determine whether to set `sendInitialEvents=true` in watch requests.
408/// Only initial watches should request initial events; reconnections should not.
409#[derive(Clone, Copy, Debug, PartialEq, Eq)]
410enum WatchPhase {
411    /// Initial watch from `State::Empty` - requests initial events for streaming lists
412    Initial,
413    /// Resumed watch from `State::InitListed` - does not request initial events
414    Resumed,
415}
416
417impl<K> ApiMode for FullObject<'_, K>
418where
419    K: Clone + Debug + DeserializeOwned + Send + 'static,
420{
421    type Value = K;
422
423    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
424        self.api.list(lp).await
425    }
426
427    async fn watch(
428        &self,
429        wp: &WatchParams,
430        version: &str,
431    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
432        self.api.watch(wp, version).await.map(StreamExt::boxed)
433    }
434}
435
436/// A wrapper around the `Api` of a `Resource` type that when used by the
437/// watcher will return only the metadata associated with an object
438struct MetaOnly<'a, K> {
439    api: &'a Api<K>,
440}
441
442impl<K> ApiMode for MetaOnly<'_, K>
443where
444    K: Clone + Debug + DeserializeOwned + Send + 'static,
445{
446    type Value = PartialObjectMeta<K>;
447
448    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
449        self.api.list_metadata(lp).await
450    }
451
452    async fn watch(
453        &self,
454        wp: &WatchParams,
455        version: &str,
456    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
457        self.api.watch_metadata(wp, version).await.map(StreamExt::boxed)
458    }
459}
460
461/// Progresses the watcher a single step, returning (event, state)
462///
463/// This function should be trampolined: if event == `None`
464/// then the function should be called again until it returns a Some.
465#[allow(clippy::too_many_lines)] // for now
466async fn step_trampolined<A>(
467    api: &A,
468    wc: &Config,
469    state: State<A::Value>,
470) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
471where
472    A: ApiMode,
473    A::Value: Resource + 'static,
474{
475    match state {
476        State::Empty => match wc.initial_list_strategy {
477            InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
478                continue_token: None,
479                objects: VecDeque::default(),
480                last_bookmark: None,
481            }),
482            InitialListStrategy::StreamingList => {
483                match api.watch(&wc.to_watch_params(WatchPhase::Initial), "0").await {
484                    Ok(stream) => (None, State::InitialWatch { stream }),
485                    Err(err) => {
486                        if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
487                            warn!("watch initlist error with 403: {err:?}");
488                        } else {
489                            debug!("watch initlist error: {err:?}");
490                        }
491                        (Some(Err(Error::WatchStartFailed(err))), State::default())
492                    }
493                }
494            }
495        },
496        State::InitPage {
497            continue_token,
498            mut objects,
499            last_bookmark,
500        } => {
501            if let Some(next) = objects.pop_front() {
502                return (Some(Ok(Event::InitApply(next))), State::InitPage {
503                    continue_token,
504                    objects,
505                    last_bookmark,
506                });
507            }
508            // check if we need to perform more pages
509            if continue_token.is_none()
510                && let Some(resource_version) = last_bookmark
511            {
512                // we have drained the last page - move on to next stage
513                return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
514            }
515            let mut lp = wc.to_list_params();
516            lp.continue_token = continue_token;
517            match api.list(&lp).await {
518                Ok(list) => {
519                    let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
520                    let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
521                    if last_bookmark.is_none() && continue_token.is_none() {
522                        return (Some(Err(Error::NoResourceVersion)), State::Empty);
523                    }
524                    // Buffer page here, causing us to return to this enum branch (State::InitPage)
525                    // until the objects buffer has drained
526                    (None, State::InitPage {
527                        continue_token,
528                        objects: list.items.into_iter().collect(),
529                        last_bookmark,
530                    })
531                }
532                Err(err) => {
533                    if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
534                        warn!("watch list error with 403: {err:?}");
535                    } else {
536                        debug!("watch list error: {err:?}");
537                    }
538                    (Some(Err(Error::InitialListFailed(err))), State::Empty)
539                }
540            }
541        }
542        State::InitialWatch { mut stream } => {
543            match stream.next().await {
544                Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
545                    (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream })
546                }
547                Some(Ok(WatchEvent::Deleted(_obj))) => {
548                    // Kubernetes claims these events are impossible
549                    // https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
550                    error!("got deleted event during initial watch. this is a bug");
551                    (None, State::InitialWatch { stream })
552                }
553                Some(Ok(WatchEvent::Bookmark(bm))) => {
554                    let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
555                    if marks_initial_end {
556                        (Some(Ok(Event::InitDone)), State::Watching {
557                            resource_version: bm.metadata.resource_version,
558                            stream,
559                        })
560                    } else {
561                        (None, State::InitialWatch { stream })
562                    }
563                }
564                Some(Ok(WatchEvent::Error(err))) => {
565                    // HTTP GONE, means we have desynced and need to start over and re-list :(
566                    let new_state = if err.code == 410 {
567                        State::default()
568                    } else {
569                        State::InitialWatch { stream }
570                    };
571                    if err.code == 403 {
572                        warn!("watcher watchevent error 403: {err:?}");
573                    } else {
574                        debug!("error watchevent error: {err:?}");
575                    }
576                    (Some(Err(Error::WatchError(err.boxed()))), new_state)
577                }
578                Some(Err(err)) => {
579                    if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
580                        warn!("watcher error 403: {err:?}");
581                    } else {
582                        debug!("watcher error: {err:?}");
583                    }
584                    (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream })
585                }
586                None => (None, State::default()),
587            }
588        }
589        State::InitListed { resource_version } => {
590            match api
591                .watch(&wc.to_watch_params(WatchPhase::Resumed), &resource_version)
592                .await
593            {
594                Ok(stream) => (None, State::Watching {
595                    resource_version,
596                    stream,
597                }),
598                Err(err) => {
599                    if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
600                        warn!("watch initlist error with 403: {err:?}");
601                    } else {
602                        debug!("watch initlist error: {err:?}");
603                    }
604                    (Some(Err(Error::WatchStartFailed(err))), State::InitListed {
605                        resource_version,
606                    })
607                }
608            }
609        }
610        State::Watching {
611            resource_version,
612            mut stream,
613        } => match stream.next().await {
614            Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
615                let resource_version = obj.resource_version().unwrap_or_default();
616                if resource_version.is_empty() {
617                    (Some(Err(Error::NoResourceVersion)), State::default())
618                } else {
619                    (Some(Ok(Event::Apply(obj))), State::Watching {
620                        resource_version,
621                        stream,
622                    })
623                }
624            }
625            Some(Ok(WatchEvent::Deleted(obj))) => {
626                let resource_version = obj.resource_version().unwrap_or_default();
627                if resource_version.is_empty() {
628                    (Some(Err(Error::NoResourceVersion)), State::default())
629                } else {
630                    (Some(Ok(Event::Delete(obj))), State::Watching {
631                        resource_version,
632                        stream,
633                    })
634                }
635            }
636            Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
637                resource_version: bm.metadata.resource_version,
638                stream,
639            }),
640            Some(Ok(WatchEvent::Error(err))) => {
641                // HTTP GONE, means we have desynced and need to start over and re-list :(
642                let new_state = if err.code == 410 {
643                    State::default()
644                } else {
645                    State::Watching {
646                        resource_version,
647                        stream,
648                    }
649                };
650                if err.code == 403 {
651                    warn!("watcher watchevent error 403: {err:?}");
652                } else {
653                    debug!("error watchevent error: {err:?}");
654                }
655                (Some(Err(Error::WatchError(err.boxed()))), new_state)
656            }
657            Some(Err(err)) => {
658                if std::matches!(err, ClientErr::Api(ref status) if status.is_forbidden()) {
659                    warn!("watcher error 403: {err:?}");
660                } else {
661                    debug!("watcher error: {err:?}");
662                }
663                (Some(Err(Error::WatchFailed(err))), State::Watching {
664                    resource_version,
665                    stream,
666                })
667            }
668            None => (None, State::InitListed { resource_version }),
669        },
670    }
671}
672
673/// Trampoline helper for `step_trampolined`
674async fn step<A>(
675    api: &A,
676    config: &Config,
677    mut state: State<A::Value>,
678) -> (Result<Event<A::Value>>, State<A::Value>)
679where
680    A: ApiMode,
681    A::Value: Resource + 'static,
682{
683    loop {
684        match step_trampolined(api, config, state).await {
685            (Some(result), new_state) => return (result, new_state),
686            (None, new_state) => state = new_state,
687        }
688    }
689}
690
691/// Watches a Kubernetes Resource for changes continuously
692///
693/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
694///
695/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
696/// You can apply your own backoff by not polling the stream for a duration after errors.
697/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
698/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
699/// will terminate eagerly as soon as they receive an [`Err`].
700///
701/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
702/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
703///
704/// ```
705/// use kube::{
706///   api::{Api, ResourceExt}, Client,
707///   runtime::{watcher, WatchStreamExt}
708/// };
709/// use k8s_openapi::api::core::v1::Pod;
710/// use futures::TryStreamExt;
711///
712/// # async fn wrapper() -> Result<(), watcher::Error> {
713/// #   let client: Client = todo!();
714/// let pods: Api<Pod> = Api::namespaced(client, "apps");
715///
716/// watcher(pods, watcher::Config::default()).applied_objects()
717///     .try_for_each(|p| async move {
718///         println!("Applied: {}", p.name_any());
719///        Ok(())
720///     })
721///     .await?;
722/// # Ok(())
723/// # }
724/// ```
725/// [`WatchStreamExt`]: super::WatchStreamExt
726/// [`reflector`]: super::reflector::reflector
727/// [`Api::watch`]: kube_client::Api::watch
728///
729/// # Recovery
730///
731/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
732/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
733/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
734///
735/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
736/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
737/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
738/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
739/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
740#[doc(alias = "informer")]
741pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
742    api: Api<K>,
743    watcher_config: Config,
744) -> impl Stream<Item = Result<Event<K>>> + Send {
745    futures::stream::unfold(
746        (api, watcher_config, State::default()),
747        |(api, watcher_config, state)| async {
748            let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
749            Some((event, (api, watcher_config, state)))
750        },
751    )
752}
753
754/// Watches a Kubernetes Resource for changes continuously and receives only the
755/// metadata
756///
757/// Compared to [`Api::watch_metadata`], this automatically tries to recover the stream upon errors.
758///
759/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
760/// You can apply your own backoff by not polling the stream for a duration after errors.
761/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
762/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
763/// will terminate eagerly as soon as they receive an [`Err`].
764///
765/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
766/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
767///
768/// ```
769/// use kube::{
770///   api::{Api, ResourceExt}, Client,
771///   runtime::{watcher, metadata_watcher, WatchStreamExt}
772/// };
773/// use k8s_openapi::api::core::v1::Pod;
774/// use futures::TryStreamExt;
775///
776/// # async fn wrapper() -> Result<(), watcher::Error> {
777/// #   let client: Client = todo!();
778/// let pods: Api<Pod> = Api::namespaced(client, "apps");
779///
780/// metadata_watcher(pods, watcher::Config::default()).applied_objects()
781///         .try_for_each(|p| async move {
782///          println!("Applied: {}", p.name_any());
783///             Ok(())
784///         })
785///         .await?;
786/// #   Ok(())
787/// # }
788/// ```
789/// [`WatchStreamExt`]: super::WatchStreamExt
790/// [`reflector`]: super::reflector::reflector
791/// [`Api::watch`]: kube_client::Api::watch
792///
793/// # Recovery
794///
795/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
796/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
797/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
798///
799/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
800/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
801/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
802/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
803/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
804#[allow(clippy::module_name_repetitions)]
805pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
806    api: Api<K>,
807    watcher_config: Config,
808) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
809    futures::stream::unfold(
810        (api, watcher_config, State::default()),
811        |(api, watcher_config, state)| async {
812            let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
813            Some((event, (api, watcher_config, state)))
814        },
815    )
816}
817
818/// Watch a single named object for updates
819///
820/// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found).
821///
822/// Often invoked indirectly via [`await_condition`](crate::wait::await_condition()).
823///
824/// ## Scope Warning
825///
826/// When using this with an `Api::all` on namespaced resources there is a chance of duplicated names.
827/// To avoid getting confusing / wrong answers for this, use `Api::namespaced` bound to a specific namespace
828/// when watching for transitions to namespaced objects.
829pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
830    api: Api<K>,
831    name: &str,
832) -> impl Stream<Item = Result<Option<K>>> + Send + use<K> {
833    // filtering by object name in given scope, so there's at most one matching object
834    // footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
835    let fields = format!("metadata.name={name}");
836    watcher(api, Config::default().fields(&fields))
837        // The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
838        // sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
839        // `None` is emitted when the InitDone event is received.
840        //
841        // The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
842        // checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
843        // could happen given K8S events aren't guaranteed delivery.
844        .scan(false, |obj_seen, event| {
845            if matches!(event, Ok(Event::Init)) {
846                *obj_seen = false;
847            } else if matches!(event, Ok(Event::InitApply(_))) {
848                *obj_seen = true;
849            }
850            future::ready(Some((*obj_seen, event)))
851        })
852        .filter_map(|(obj_seen, event)| async move {
853            match event {
854                // Pass up `Some` for Found / Updated
855                Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
856                // Pass up `None` for Deleted
857                Ok(Event::Delete(_)) => Some(Ok(None)),
858                // Pass up `None` if the object wasn't seen in the initial list
859                Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
860                // Ignore marker events
861                Ok(Event::Init | Event::InitDone) => None,
862                // Bubble up errors
863                Err(err) => Some(Err(err)),
864            }
865        })
866}
867
868pub struct ExponentialBackoff {
869    inner: backon::ExponentialBackoff,
870    builder: backon::ExponentialBuilder,
871}
872
873impl ExponentialBackoff {
874    fn new(min_delay: Duration, max_delay: Duration, factor: f32, enable_jitter: bool) -> Self {
875        let builder = backon::ExponentialBuilder::default()
876            .with_min_delay(min_delay)
877            .with_max_delay(max_delay)
878            .with_factor(factor)
879            .without_max_times();
880
881        let builder = if enable_jitter {
882            builder.with_jitter()
883        } else {
884            builder
885        };
886
887        Self {
888            inner: builder.build(),
889            builder,
890        }
891    }
892}
893
894impl Backoff for ExponentialBackoff {
895    fn reset(&mut self) {
896        self.inner = self.builder.build();
897    }
898}
899
900impl Iterator for ExponentialBackoff {
901    type Item = Duration;
902
903    fn next(&mut self) -> Option<Self::Item> {
904        self.inner.next()
905    }
906}
907
908impl From<backon::ExponentialBuilder> for ExponentialBackoff {
909    fn from(builder: backon::ExponentialBuilder) -> Self {
910        Self {
911            inner: builder.build(),
912            builder,
913        }
914    }
915}
916
917/// Default watcher backoff inspired by Kubernetes' client-go.
918///
919/// The parameters currently optimize for being kind to struggling apiservers.
920/// The exact parameters are taken from
921/// [client-go's reflector source](https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181)
922/// and should not be considered stable.
923///
924/// This struct implements [`Backoff`] and is the default strategy used
925/// when calling `WatchStreamExt::default_backoff`. If you need to create
926/// this manually then [`DefaultBackoff::default`] can be used.
927pub struct DefaultBackoff(Strategy);
928type Strategy = ResetTimerBackoff<ExponentialBackoff>;
929
930impl Default for DefaultBackoff {
931    fn default() -> Self {
932        Self(ResetTimerBackoff::new(
933            ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true),
934            Duration::from_secs(120),
935        ))
936    }
937}
938
939impl Iterator for DefaultBackoff {
940    type Item = Duration;
941
942    fn next(&mut self) -> Option<Self::Item> {
943        self.0.next()
944    }
945}
946
947impl Backoff for DefaultBackoff {
948    fn reset(&mut self) {
949        self.0.reset();
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use super::*;
956
957    #[test]
958    fn to_watch_params_initial_phase_with_streaming_list_sets_send_initial_events() {
959        let config = Config::default().streaming_lists();
960        let params = config.to_watch_params(WatchPhase::Initial);
961        assert!(params.send_initial_events);
962    }
963
964    #[test]
965    fn to_watch_params_resumed_phase_with_streaming_list_does_not_set_send_initial_events() {
966        let config = Config::default().streaming_lists();
967        let params = config.to_watch_params(WatchPhase::Resumed);
968        assert!(!params.send_initial_events);
969    }
970
971    #[test]
972    fn to_watch_params_listwatch_mode_does_not_set_send_initial_events() {
973        let config = Config::default(); // ListWatch mode
974        let params_initial = config.to_watch_params(WatchPhase::Initial);
975        let params_resumed = config.to_watch_params(WatchPhase::Resumed);
976        assert!(!params_initial.send_initial_events);
977        assert!(!params_resumed.send_initial_events);
978    }
979
980    fn approx_eq(a: Duration, b: Duration) -> bool {
981        let diff = if a > b { a - b } else { b - a };
982        diff < Duration::from_micros(100)
983    }
984
985    #[test]
986    fn exponential_backoff_without_jitter() {
987        let mut backoff =
988            ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(1), 2.0, false);
989
990        assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(100)));
991        assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(200)));
992        assert!(approx_eq(backoff.next().unwrap(), Duration::from_millis(400)));
993    }
994
995    #[test]
996    fn exponential_backoff_with_jitter_applies_randomness() {
997        let mut backoff =
998            ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(1), 2.0, true);
999
1000        let delays: Vec<_> = (0..5).filter_map(|_| backoff.next()).collect();
1001
1002        // All delays should be positive
1003        for d in &delays {
1004            assert!(*d > Duration::ZERO);
1005        }
1006
1007        // With jitter, at least one delay should differ from exact values
1008        let exact_values = [
1009            Duration::from_millis(100),
1010            Duration::from_millis(200),
1011            Duration::from_millis(400),
1012            Duration::from_millis(800),
1013            Duration::from_secs(1),
1014        ];
1015
1016        let all_exact = delays
1017            .iter()
1018            .zip(exact_values.iter())
1019            .all(|(d, e)| approx_eq(*d, *e));
1020
1021        assert!(
1022            !all_exact,
1023            "With jitter enabled, delays should not all match exact exponential values"
1024        );
1025    }
1026}