use crate::utils::ResetTimerBackoff;
use backoff::{backoff::Backoff, ExponentialBackoff};
use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube_client::{
api::{ListParams, Resource, ResourceExt, WatchEvent},
Api,
};
use serde::de::DeserializeOwned;
use smallvec::SmallVec;
use std::{clone::Clone, fmt::Debug, time::Duration};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to perform initial object list: {0}")]
InitialListFailed(#[source] kube_client::Error),
#[error("failed to start watching object: {0}")]
WatchStartFailed(#[source] kube_client::Error),
#[error("error returned by apiserver during watch: {0}")]
WatchError(#[source] kube_client::error::ErrorResponse),
#[error("watch stream failed: {0}")]
WatchFailed(#[source] kube_client::Error),
#[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
NoResourceVersion,
#[error("too many objects matched search criteria")]
TooManyObjects,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub enum Event<K> {
Applied(K),
Deleted(K),
Restarted(Vec<K>),
}
impl<K> Event<K> {
pub fn into_iter_applied(self) -> impl Iterator<Item = K> {
match self {
Event::Applied(obj) => SmallVec::from_buf([obj]),
Event::Deleted(_) => SmallVec::new(),
Event::Restarted(objs) => SmallVec::from_vec(objs),
}
.into_iter()
}
pub fn into_iter_touched(self) -> impl Iterator<Item = K> {
match self {
Event::Applied(obj) | Event::Deleted(obj) => SmallVec::from_buf([obj]),
Event::Restarted(objs) => SmallVec::from_vec(objs),
}
.into_iter()
}
#[must_use]
pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
match &mut self {
Event::Applied(obj) | Event::Deleted(obj) => (f)(obj),
Event::Restarted(objs) => {
for k in objs {
(f)(k)
}
}
}
self
}
}
#[derive(Derivative)]
#[derivative(Debug)]
enum State<K: Resource + Clone> {
Empty,
InitListed { resource_version: String },
Watching {
resource_version: String,
#[derivative(Debug = "ignore")]
stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
},
}
async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
list_params: &ListParams,
state: State<K>,
) -> (Option<Result<Event<K>>>, State<K>) {
match state {
State::Empty => match api.list(list_params).await {
Ok(list) => {
if let Some(resource_version) = list.metadata.resource_version {
(Some(Ok(Event::Restarted(list.items))), State::InitListed {
resource_version,
})
} else {
(Some(Err(Error::NoResourceVersion)), State::Empty)
}
}
Err(err) => (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty),
},
State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream: stream.boxed(),
}),
Err(err) => (
Some(Err(err).map_err(Error::WatchStartFailed)),
State::InitListed { resource_version },
),
},
State::Watching {
resource_version,
mut stream,
} => match stream.next().await {
Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
let resource_version = obj.resource_version().unwrap();
(Some(Ok(Event::Applied(obj))), State::Watching {
resource_version,
stream,
})
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_version().unwrap();
(Some(Ok(Event::Deleted(obj))), State::Watching {
resource_version,
stream,
})
}
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
}),
Some(Ok(WatchEvent::Error(err))) => {
let new_state = if err.code == 410 {
State::Empty
} else {
State::Watching {
resource_version,
stream,
}
};
(Some(Err(err).map_err(Error::WatchError)), new_state)
}
Some(Err(err)) => (Some(Err(err).map_err(Error::WatchFailed)), State::Watching {
resource_version,
stream,
}),
None => (None, State::InitListed { resource_version }),
},
}
}
async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
list_params: &ListParams,
mut state: State<K>,
) -> (Result<Event<K>>, State<K>) {
loop {
match step_trampolined(api, list_params, state).await {
(Some(result), new_state) => return (result, new_state),
(None, new_state) => state = new_state,
}
}
}
pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
list_params: ListParams,
) -> impl Stream<Item = Result<Event<K>>> + Send {
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&api, &list_params, state).await;
Some((event, (api, list_params, state)))
},
)
}
pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
name: &str,
) -> impl Stream<Item = Result<Option<K>>> + Send {
watcher(api, ListParams {
field_selector: Some(format!("metadata.name={name}")),
..Default::default()
})
.map(|event| match event? {
Event::Deleted(_) => Ok(None),
Event::Restarted(objs) if objs.len() > 1 => Err(Error::TooManyObjects),
Event::Restarted(mut objs) => Ok(objs.pop()),
Event::Applied(obj) => Ok(Some(obj)),
})
}
#[must_use]
pub fn default_backoff() -> impl Backoff + Send + Sync {
let expo = backoff::ExponentialBackoff {
initial_interval: Duration::from_millis(800),
max_interval: Duration::from_secs(30),
randomization_factor: 1.0,
multiplier: 2.0,
max_elapsed_time: None,
..ExponentialBackoff::default()
};
ResetTimerBackoff::new(expo, Duration::from_secs(120))
}