#[cfg(feature = "server")]
use crate::server::{self, ServerArgs};
use crate::{
admin::{self, AdminArgs, Readiness},
client::{self, Client, ClientArgs},
errors,
initialized::{self, Initialized},
shutdown, LogFilter, LogFormat, LogInitError,
};
use futures_core::Stream;
use kube_core::{params::ListParams, Resource};
use kube_runtime::{reflector, watcher};
use serde::de::DeserializeOwned;
use std::{fmt::Debug, hash::Hash, time::Duration};
#[cfg(feature = "server")]
use tower_service::Service;
pub use kube_client::Api;
pub use reflector::Store;
#[derive(Debug, Default)]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
#[must_use]
pub struct Builder<S = NoServer> {
admin: Option<AdminArgs>,
client: Option<ClientArgs>,
error_delay: Option<Duration>,
log: Option<LogSettings>,
#[cfg(feature = "server")]
server: S,
#[cfg(not(feature = "server"))]
server: std::marker::PhantomData<S>,
}
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
#[must_use]
pub struct Runtime<S = NoServer> {
admin: admin::Bound,
client: Client,
error_delay: Duration,
initialized: Initialized,
shutdown_rx: drain::Watch,
shutdown: shutdown::Shutdown,
#[cfg(feature = "server")]
server: S,
#[cfg(not(feature = "server"))]
server: std::marker::PhantomData<S>,
}
#[derive(Debug, Default)]
pub struct NoServer(());
#[derive(Debug, thiserror::Error)]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
pub enum BuildError {
#[error(transparent)]
LogInit(#[from] LogInitError),
#[error(transparent)]
Admin(#[from] admin::Error),
#[error(transparent)]
Client(#[from] client::ConfigError),
#[cfg(feature = "server")]
#[error(transparent)]
Server(#[from] server::Error),
#[error(transparent)]
Signal(#[from] shutdown::RegisterError),
}
#[derive(Debug)]
struct LogSettings {
filter: LogFilter,
format: LogFormat,
}
impl<S> Builder<S> {
const DEFAULT_ERROR_DELAY: Duration = Duration::from_secs(5);
pub fn with_admin(mut self, admin: AdminArgs) -> Self {
self.admin = Some(admin);
self
}
pub fn with_client(mut self, client: ClientArgs) -> Self {
self.client = Some(client);
self
}
pub fn with_log(mut self, filter: LogFilter, format: LogFormat) -> Self {
self.log = Some(LogSettings { filter, format });
self
}
pub fn with_fixed_delay_on_error(mut self, delay: Duration) -> Self {
self.error_delay = Some(delay);
self
}
#[inline]
async fn build_inner(self) -> Result<Runtime<S>, BuildError> {
self.log.unwrap_or_default().try_init()?;
let client = self.client.unwrap_or_default().try_client().await?;
let (shutdown, shutdown_rx) = shutdown::sigint_or_sigterm()?;
let admin = self.admin.unwrap_or_default().into_builder().bind()?;
Ok(Runtime {
client,
shutdown_rx,
shutdown,
admin,
error_delay: self.error_delay.unwrap_or(Self::DEFAULT_ERROR_DELAY),
initialized: Initialized::default(),
server: self.server,
})
}
}
#[cfg(feature = "server")]
impl Builder<NoServer> {
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub fn with_server(self, server: ServerArgs) -> Builder<ServerArgs> {
Builder {
server,
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
log: self.log,
}
}
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub fn with_optional_server(self, server: Option<ServerArgs>) -> Builder<Option<ServerArgs>> {
Builder {
server,
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
log: self.log,
}
}
}
impl Builder<NoServer> {
pub async fn build(self) -> Result<Runtime<NoServer>, BuildError> {
self.build_inner().await
}
}
#[cfg(feature = "server")]
impl Builder<ServerArgs> {
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub async fn build(self) -> Result<Runtime<server::Bound>, BuildError> {
let rt = self.build_inner().await?;
let server = rt.server.bind().await?;
Ok(Runtime {
server,
admin: rt.admin,
client: rt.client,
error_delay: rt.error_delay,
initialized: rt.initialized,
shutdown_rx: rt.shutdown_rx,
shutdown: rt.shutdown,
})
}
}
#[cfg(feature = "server")]
impl Builder<Option<ServerArgs>> {
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub async fn build(self) -> Result<Runtime<Option<server::Bound>>, BuildError> {
let rt = self.build_inner().await?;
let server = match rt.server {
Some(s) => Some(s.bind().await?),
None => None,
};
Ok(Runtime {
server,
admin: rt.admin,
client: rt.client,
error_delay: rt.error_delay,
initialized: rt.initialized,
shutdown_rx: rt.shutdown_rx,
shutdown: rt.shutdown,
})
}
}
impl<S> Runtime<S> {
#[inline]
pub fn client(&self) -> Client {
self.client.clone()
}
#[inline]
pub fn initialized_handle(&mut self) -> initialized::Handle {
self.initialized.add_handle()
}
#[inline]
pub fn readiness(&self) -> Readiness {
self.admin.readiness()
}
#[inline]
pub fn shutdown_handle(&self) -> shutdown::Watch {
self.shutdown_rx.clone()
}
pub fn cancel_on_shutdown<T>(&self, inner: T) -> shutdown::CancelOnShutdown<T> {
shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), inner)
}
#[cfg(feature = "requeue")]
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "requeue"))))]
pub fn requeue<T>(
&self,
capacity: usize,
) -> (
crate::requeue::Sender<T>,
shutdown::CancelOnShutdown<crate::requeue::Receiver<T>>,
)
where
T: Eq + std::hash::Hash,
{
let (tx, rx) = crate::requeue::channel(capacity);
let rx = shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), rx);
(tx, rx)
}
pub fn watch<T>(
&mut self,
api: Api<T>,
params: ListParams,
) -> impl Stream<Item = watcher::Event<T>>
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Default,
{
let watch = watcher::watcher(api, params);
let successful = errors::LogAndSleep::fixed_delay(self.error_delay, watch);
let initialized = self.initialized.add_handle().release_on_ready(successful);
shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), initialized)
}
#[inline]
pub fn watch_all<T>(&mut self, params: ListParams) -> impl Stream<Item = watcher::Event<T>>
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Default,
{
self.watch(Api::all(self.client()), params)
}
#[inline]
pub fn watch_namespaced<T>(
&mut self,
ns: impl AsRef<str>,
params: ListParams,
) -> impl Stream<Item = watcher::Event<T>>
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Default,
{
let api = Api::namespaced(self.client(), ns.as_ref());
self.watch(api, params)
}
pub fn cache<T>(
&mut self,
api: Api<T>,
params: ListParams,
) -> (Store<T>, impl Stream<Item = watcher::Event<T>>)
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Clone + Default + Eq + Hash + Clone,
{
let writer = reflector::store::Writer::<T>::default();
let store = writer.as_reader();
let watch = watcher::watcher(api, params);
let cached = reflector::reflector(writer, watch);
let successful = errors::LogAndSleep::fixed_delay(self.error_delay, cached);
let initialized = self.initialized.add_handle().release_on_ready(successful);
let graceful = shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), initialized);
(store, graceful)
}
#[inline]
pub fn cache_all<T>(
&mut self,
params: ListParams,
) -> (Store<T>, impl Stream<Item = watcher::Event<T>>)
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Clone + Default + Eq + Hash + Clone,
{
self.cache(Api::all(self.client()), params)
}
#[inline]
pub fn cache_namespaced<T>(
&mut self,
ns: impl AsRef<str>,
params: ListParams,
) -> (Store<T>, impl Stream<Item = watcher::Event<T>>)
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Clone + Default + Eq + Hash + Clone,
{
let api = Api::namespaced(self.client(), ns.as_ref());
self.cache(api, params)
}
}
#[cfg(feature = "server")]
impl Runtime<server::Bound> {
pub fn server_addr(&self) -> std::net::SocketAddr {
self.server.local_addr()
}
pub fn spawn_server<S, B>(self, service: S) -> Runtime<NoServer>
where
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<B>>
+ Clone
+ Send
+ 'static,
S::Error: std::error::Error + Send + Sync,
S::Future: Send,
B: hyper::body::HttpBody + Send + 'static,
B::Data: Send,
B::Error: std::error::Error + Send + Sync,
{
self.server.spawn(service, self.shutdown_rx.clone());
Runtime {
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
initialized: self.initialized,
server: NoServer(()),
shutdown_rx: self.shutdown_rx,
shutdown: self.shutdown,
}
}
}
#[cfg(feature = "server")]
impl Runtime<Option<server::Bound>> {
pub fn server_addr(&self) -> Option<std::net::SocketAddr> {
self.server.as_ref().map(|s| s.local_addr())
}
pub fn spawn_server<S, B, F>(self, mk: F) -> Runtime<NoServer>
where
F: FnOnce() -> S,
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<B>>
+ Clone
+ Send
+ 'static,
S::Error: std::error::Error + Send + Sync,
S::Future: Send,
B: hyper::body::HttpBody + Send + 'static,
B::Data: Send,
B::Error: std::error::Error + Send + Sync,
{
if let Some(s) = self.server {
s.spawn(mk(), self.shutdown_rx.clone());
} else {
tracing::debug!("No server is configured")
}
Runtime {
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
initialized: self.initialized,
server: NoServer(()),
shutdown_rx: self.shutdown_rx,
shutdown: self.shutdown,
}
}
}
impl Runtime<NoServer> {
pub fn builder() -> Builder<NoServer> {
Builder::default()
}
pub async fn run(self) -> Result<(), shutdown::Aborted> {
let Self {
admin,
initialized,
shutdown,
shutdown_rx,
..
} = self;
let admin = admin.spawn();
let ready = admin.readiness();
tokio::spawn(async move {
initialized.initialized().await;
ready.set(true);
tracing::debug!("initialized");
drop(shutdown_rx.signaled().await);
ready.set(false);
tracing::debug!("shutdown");
});
shutdown.signaled().await?;
Ok(())
}
}
impl Default for LogSettings {
fn default() -> Self {
Self {
filter: LogFilter::from_default_env(),
format: LogFormat::default(),
}
}
}
impl LogSettings {
fn try_init(self) -> Result<(), LogInitError> {
self.format.try_init(self.filter)
}
}