use crate::utils::{Backoff, ResetTimerBackoff};
use async_trait::async_trait;
use backon::BackoffBuilder;
use educe::Educe;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube_client::{
api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
core::{metadata::PartialObjectMeta, ObjectList, Selector},
error::ErrorResponse,
Api, Error as ClientErr,
};
use serde::de::DeserializeOwned;
use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
use thiserror::Error;
use tracing::{debug, error, warn};
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to perform initial object list: {0}")]
InitialListFailed(#[source] kube_client::Error),
#[error("failed to start watching object: {0}")]
WatchStartFailed(#[source] kube_client::Error),
#[error("error returned by apiserver during watch: {0}")]
WatchError(#[source] ErrorResponse),
#[error("watch stream failed: {0}")]
WatchFailed(#[source] kube_client::Error),
#[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
NoResourceVersion,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub enum Event<K> {
Apply(K),
Delete(K),
Init,
InitApply(K),
InitDone,
}
impl<K> Event<K> {
#[deprecated(
since = "0.92.0",
note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
)]
pub fn into_iter_applied(self) -> impl Iterator<Item = K> {
match self {
Self::Apply(obj) | Self::InitApply(obj) => Some(obj),
Self::Delete(_) | Self::Init | Self::InitDone => None,
}
.into_iter()
}
#[deprecated(
since = "0.92.0",
note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
)]
pub fn into_iter_touched(self) -> impl Iterator<Item = K> {
match self {
Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => Some(obj),
Self::Init | Self::InitDone => None,
}
.into_iter()
}
#[must_use]
pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
match &mut self {
Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj),
Self::Init | Self::InitDone => {} }
self
}
}
#[derive(Educe, Default)]
#[educe(Debug)]
enum State<K> {
#[default]
Empty,
InitPage {
continue_token: Option<String>,
objects: VecDeque<K>,
last_bookmark: Option<String>,
},
InitialWatch {
#[educe(Debug(ignore))]
stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
},
InitListed { resource_version: String },
Watching {
resource_version: String,
#[educe(Debug(ignore))]
stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
},
}
#[async_trait]
trait ApiMode {
type Value: Clone;
async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
async fn watch(
&self,
wp: &WatchParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
}
struct FullObject<'a, K> {
api: &'a Api<K>,
}
#[derive(Clone, Default, Debug, PartialEq)]
pub enum ListSemantic {
#[default]
MostRecent,
Any,
}
#[derive(Clone, Default, Debug, PartialEq)]
pub enum InitialListStrategy {
#[default]
ListWatch,
StreamingList,
}
#[derive(Clone, Debug, PartialEq)]
pub struct Config {
pub label_selector: Option<String>,
pub field_selector: Option<String>,
pub timeout: Option<u32>,
pub list_semantic: ListSemantic,
pub initial_list_strategy: InitialListStrategy,
pub page_size: Option<u32>,
pub bookmarks: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
bookmarks: true,
label_selector: None,
field_selector: None,
timeout: None,
list_semantic: ListSemantic::default(),
page_size: Some(500),
initial_list_strategy: InitialListStrategy::ListWatch,
}
}
}
impl Config {
#[must_use]
pub fn timeout(mut self, timeout_secs: u32) -> Self {
self.timeout = Some(timeout_secs);
self
}
#[must_use]
pub fn fields(mut self, field_selector: &str) -> Self {
self.field_selector = Some(field_selector.to_string());
self
}
#[must_use]
pub fn labels(mut self, label_selector: &str) -> Self {
self.label_selector = Some(label_selector.to_string());
self
}
#[must_use]
pub fn labels_from(mut self, selector: &Selector) -> Self {
self.label_selector = Some(selector.to_string());
self
}
#[must_use]
pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
self.list_semantic = semantic;
self
}
#[must_use]
pub fn any_semantic(self) -> Self {
self.list_semantic(ListSemantic::Any)
}
#[must_use]
pub fn disable_bookmarks(mut self) -> Self {
self.bookmarks = false;
self
}
#[must_use]
pub fn page_size(mut self, page_size: u32) -> Self {
self.page_size = Some(page_size);
self
}
#[must_use]
pub fn streaming_lists(mut self) -> Self {
self.initial_list_strategy = InitialListStrategy::StreamingList;
self
}
fn to_list_params(&self) -> ListParams {
let (resource_version, version_match) = match self.list_semantic {
ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
ListSemantic::MostRecent => (None, None),
};
ListParams {
label_selector: self.label_selector.clone(),
field_selector: self.field_selector.clone(),
timeout: self.timeout,
version_match,
resource_version,
limit: self.page_size,
continue_token: None,
}
}
fn to_watch_params(&self) -> WatchParams {
WatchParams {
label_selector: self.label_selector.clone(),
field_selector: self.field_selector.clone(),
timeout: self.timeout,
bookmarks: self.bookmarks,
send_initial_events: self.initial_list_strategy == InitialListStrategy::StreamingList,
}
}
}
#[async_trait]
impl<K> ApiMode for FullObject<'_, K>
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
type Value = K;
async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
self.api.list(lp).await
}
async fn watch(
&self,
wp: &WatchParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
self.api.watch(wp, version).await.map(StreamExt::boxed)
}
}
struct MetaOnly<'a, K> {
api: &'a Api<K>,
}
#[async_trait]
impl<K> ApiMode for MetaOnly<'_, K>
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
type Value = PartialObjectMeta<K>;
async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
self.api.list_metadata(lp).await
}
async fn watch(
&self,
wp: &WatchParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
self.api.watch_metadata(wp, version).await.map(StreamExt::boxed)
}
}
#[allow(clippy::too_many_lines)] async fn step_trampolined<A>(
api: &A,
wc: &Config,
state: State<A::Value>,
) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
where
A: ApiMode,
A::Value: Resource + 'static,
{
match state {
State::Empty => match wc.initial_list_strategy {
InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
continue_token: None,
objects: VecDeque::default(),
last_bookmark: None,
}),
InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await {
Ok(stream) => (None, State::InitialWatch { stream }),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");
} else {
debug!("watch initlist error: {err:?}");
}
(Some(Err(Error::WatchStartFailed(err))), State::default())
}
},
},
State::InitPage {
continue_token,
mut objects,
last_bookmark,
} => {
if let Some(next) = objects.pop_front() {
return (Some(Ok(Event::InitApply(next))), State::InitPage {
continue_token,
objects,
last_bookmark,
});
}
if continue_token.is_none() {
if let Some(resource_version) = last_bookmark {
return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
}
}
let mut lp = wc.to_list_params();
lp.continue_token = continue_token;
match api.list(&lp).await {
Ok(list) => {
let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
if last_bookmark.is_none() && continue_token.is_none() {
return (Some(Err(Error::NoResourceVersion)), State::Empty);
}
(None, State::InitPage {
continue_token,
objects: list.items.into_iter().collect(),
last_bookmark,
})
}
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch list error with 403: {err:?}");
} else {
debug!("watch list error: {err:?}");
}
(Some(Err(Error::InitialListFailed(err))), State::Empty)
}
}
}
State::InitialWatch { mut stream } => {
match stream.next().await {
Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
(Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream })
}
Some(Ok(WatchEvent::Deleted(_obj))) => {
error!("got deleted event during initial watch. this is a bug");
(None, State::InitialWatch { stream })
}
Some(Ok(WatchEvent::Bookmark(bm))) => {
let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
if marks_initial_end {
(Some(Ok(Event::InitDone)), State::Watching {
resource_version: bm.metadata.resource_version,
stream,
})
} else {
(None, State::InitialWatch { stream })
}
}
Some(Ok(WatchEvent::Error(err))) => {
let new_state = if err.code == 410 {
State::default()
} else {
State::InitialWatch { stream }
};
if err.code == 403 {
warn!("watcher watchevent error 403: {err:?}");
} else {
debug!("error watchevent error: {err:?}");
}
(Some(Err(Error::WatchError(err))), new_state)
}
Some(Err(err)) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watcher error 403: {err:?}");
} else {
debug!("watcher error: {err:?}");
}
(Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream })
}
None => (None, State::default()),
}
}
State::InitListed { resource_version } => {
match api.watch(&wc.to_watch_params(), &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream,
}),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");
} else {
debug!("watch initlist error: {err:?}");
}
(Some(Err(Error::WatchStartFailed(err))), State::InitListed {
resource_version,
})
}
}
}
State::Watching {
resource_version,
mut stream,
} => match stream.next().await {
Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
let resource_version = obj.resource_version().unwrap_or_default();
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(Some(Ok(Event::Apply(obj))), State::Watching {
resource_version,
stream,
})
}
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_version().unwrap_or_default();
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(Some(Ok(Event::Delete(obj))), State::Watching {
resource_version,
stream,
})
}
}
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
}),
Some(Ok(WatchEvent::Error(err))) => {
let new_state = if err.code == 410 {
State::default()
} else {
State::Watching {
resource_version,
stream,
}
};
if err.code == 403 {
warn!("watcher watchevent error 403: {err:?}");
} else {
debug!("error watchevent error: {err:?}");
}
(Some(Err(Error::WatchError(err))), new_state)
}
Some(Err(err)) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watcher error 403: {err:?}");
} else {
debug!("watcher error: {err:?}");
}
(Some(Err(Error::WatchFailed(err))), State::Watching {
resource_version,
stream,
})
}
None => (None, State::InitListed { resource_version }),
},
}
}
async fn step<A>(
api: &A,
config: &Config,
mut state: State<A::Value>,
) -> (Result<Event<A::Value>>, State<A::Value>)
where
A: ApiMode,
A::Value: Resource + 'static,
{
loop {
match step_trampolined(api, config, state).await {
(Some(result), new_state) => return (result, new_state),
(None, new_state) => state = new_state,
}
}
}
pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
watcher_config: Config,
) -> impl Stream<Item = Result<Event<K>>> + Send {
futures::stream::unfold(
(api, watcher_config, State::default()),
|(api, watcher_config, state)| async {
let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
Some((event, (api, watcher_config, state)))
},
)
}
#[allow(clippy::module_name_repetitions)]
pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
watcher_config: Config,
) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
futures::stream::unfold(
(api, watcher_config, State::default()),
|(api, watcher_config, state)| async {
let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
Some((event, (api, watcher_config, state)))
},
)
}
pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
name: &str,
) -> impl Stream<Item = Result<Option<K>>> + Send {
let fields = format!("metadata.name={name}");
watcher(api, Config::default().fields(&fields))
.scan(false, |obj_seen, event| {
if matches!(event, Ok(Event::Init)) {
*obj_seen = false;
} else if matches!(event, Ok(Event::InitApply(_))) {
*obj_seen = true;
}
future::ready(Some((*obj_seen, event)))
})
.filter_map(|(obj_seen, event)| async move {
match event {
Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
Ok(Event::Delete(_)) => Some(Ok(None)),
Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
Ok(Event::Init | Event::InitDone) => None,
Err(err) => Some(Err(err)),
}
})
}
pub struct ExponentialBackoff {
inner: backon::ExponentialBackoff,
builder: backon::ExponentialBuilder,
}
impl ExponentialBackoff {
fn new(min_delay: Duration, max_delay: Duration, factor: f32, enable_jitter: bool) -> Self {
let builder = backon::ExponentialBuilder::default()
.with_min_delay(min_delay)
.with_max_delay(max_delay)
.with_factor(factor)
.without_max_times();
if enable_jitter {
builder.with_jitter();
}
Self {
inner: builder.build(),
builder,
}
}
}
impl Backoff for ExponentialBackoff {
fn reset(&mut self) {
self.inner = self.builder.build();
}
}
impl Iterator for ExponentialBackoff {
type Item = Duration;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
impl From<backon::ExponentialBuilder> for ExponentialBackoff {
fn from(builder: backon::ExponentialBuilder) -> Self {
Self {
inner: builder.build(),
builder,
}
}
}
pub struct DefaultBackoff(Strategy);
type Strategy = ResetTimerBackoff<ExponentialBackoff>;
impl Default for DefaultBackoff {
fn default() -> Self {
Self(ResetTimerBackoff::new(
ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true),
Duration::from_secs(120),
))
}
}
impl Iterator for DefaultBackoff {
type Item = Duration;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
impl Backoff for DefaultBackoff {
fn reset(&mut self) {
self.0.reset();
}
}