#[cfg(feature = "server")]
use crate::server::{self, ServerArgs};
use crate::{
admin::{self, Readiness},
client::{self, Client, ClientArgs},
errors,
initialized::{self, Initialized},
shutdown, LogFilter, LogFormat, LogInitError,
};
use futures_core::Stream;
use kube_core::{NamespaceResourceScope, Resource};
use kube_runtime::{reflector, watcher};
use serde::de::DeserializeOwned;
use std::{fmt::Debug, future::Future, hash::Hash, time::Duration};
#[cfg(feature = "server")]
use tower::Service;
pub use kube_client::Api;
pub use reflector::Store;
#[cfg(feature = "prometheus-client")]
mod metrics;
#[derive(Debug, Default)]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
#[must_use]
pub struct Builder<S = NoServer> {
admin: admin::Builder,
client: Option<ClientArgs>,
error_delay: Option<Duration>,
log: Option<LogSettings>,
#[cfg(feature = "server")]
server: S,
#[cfg(not(feature = "server"))]
server: std::marker::PhantomData<S>,
#[cfg(feature = "prometheus-client")]
metrics: Option<RuntimeMetrics>,
}
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
#[must_use]
pub struct Runtime<S = NoServer> {
admin: admin::Bound,
client: Client,
error_delay: Duration,
initialized: Initialized,
shutdown_rx: drain::Watch,
shutdown: shutdown::Shutdown,
#[cfg(feature = "server")]
server: S,
#[cfg(not(feature = "server"))]
server: std::marker::PhantomData<S>,
#[cfg(feature = "prometheus-client")]
metrics: Option<RuntimeMetrics>,
}
#[derive(Debug, Default)]
pub struct NoServer(());
#[cfg(feature = "prometheus-client")]
#[must_use = "RuntimeMetrics must be passed to `Builder::with_metrics`"]
#[derive(Debug)]
pub struct RuntimeMetrics {
watch: metrics::ResourceWatchMetrics,
}
#[derive(Debug, thiserror::Error)]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
pub enum BuildError {
#[error(transparent)]
LogInit(#[from] LogInitError),
#[error(transparent)]
Admin(#[from] admin::Error),
#[error(transparent)]
Client(#[from] client::ConfigError),
#[cfg(feature = "server")]
#[error(transparent)]
Server(#[from] server::Error),
#[error(transparent)]
Signal(#[from] shutdown::RegisterError),
}
#[derive(Debug)]
struct LogSettings {
filter: LogFilter,
format: LogFormat,
}
impl<S> Builder<S> {
const DEFAULT_ERROR_DELAY: Duration = Duration::from_secs(5);
pub fn with_admin(mut self, admin: impl Into<admin::Builder>) -> Self {
self.admin = admin.into();
self
}
pub fn with_client(mut self, client: ClientArgs) -> Self {
self.client = Some(client);
self
}
pub fn with_log(mut self, filter: LogFilter, format: LogFormat) -> Self {
self.log = Some(LogSettings { filter, format });
self
}
pub fn with_fixed_delay_on_error(mut self, delay: Duration) -> Self {
self.error_delay = Some(delay);
self
}
#[cfg(feature = "prometheus-client")]
pub fn with_metrics(mut self, metrics: RuntimeMetrics) -> Self {
self.metrics = Some(metrics);
self
}
#[inline]
async fn build_inner<F>(
self,
mk_client: impl FnOnce(ClientArgs) -> F,
) -> Result<Runtime<S>, BuildError>
where
F: Future<Output = Result<Client, client::ConfigError>>,
{
self.log.unwrap_or_default().try_init()?;
let client = mk_client(self.client.unwrap_or_default()).await?;
let (shutdown, shutdown_rx) = shutdown::sigint_or_sigterm()?;
let admin = self.admin.bind()?;
Ok(Runtime {
client,
shutdown_rx,
shutdown,
admin,
error_delay: self.error_delay.unwrap_or(Self::DEFAULT_ERROR_DELAY),
initialized: Initialized::default(),
server: self.server,
#[cfg(feature = "prometheus-client")]
metrics: self.metrics,
})
}
}
#[cfg(feature = "server")]
impl Builder<NoServer> {
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub fn with_server(self, server: ServerArgs) -> Builder<ServerArgs> {
Builder {
server,
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
log: self.log,
metrics: self.metrics,
}
}
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub fn with_optional_server(self, server: Option<ServerArgs>) -> Builder<Option<ServerArgs>> {
Builder {
server,
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
log: self.log,
metrics: self.metrics,
}
}
}
impl Builder<NoServer> {
pub async fn build(self) -> Result<Runtime<NoServer>, BuildError> {
self.build_inner(ClientArgs::try_client).await
}
}
#[cfg(feature = "server")]
impl Builder<ServerArgs> {
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub async fn build(self) -> Result<Runtime<server::Bound>, BuildError> {
self.build_inner(ClientArgs::try_client)
.await?
.bind_server(|args| async move {
let srv = args.bind().await?;
Ok(srv)
})
.await
}
}
#[cfg(feature = "server")]
impl Builder<Option<ServerArgs>> {
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "server"))))]
pub async fn build(self) -> Result<Runtime<Option<server::Bound>>, BuildError> {
self.build_inner(ClientArgs::try_client)
.await?
.bind_server(|args| async move {
match args {
Some(args) => {
let srv = args.bind().await?;
Ok(Some(srv))
}
None => Ok(None),
}
})
.await
}
}
impl<S> Runtime<S> {
#[inline]
pub fn client(&self) -> Client {
self.client.clone()
}
#[inline]
pub fn initialized_handle(&mut self) -> initialized::Handle {
self.initialized.add_handle()
}
#[inline]
pub fn readiness(&self) -> Readiness {
self.admin.readiness()
}
#[inline]
pub fn shutdown_handle(&self) -> shutdown::Watch {
self.shutdown_rx.clone()
}
pub fn cancel_on_shutdown<T>(&self, inner: T) -> shutdown::CancelOnShutdown<T> {
shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), inner)
}
#[cfg(feature = "requeue")]
#[cfg_attr(docsrs, doc(cfg(all(features = "runtime", feature = "requeue"))))]
pub fn requeue<T>(
&self,
capacity: usize,
) -> (
crate::requeue::Sender<T>,
shutdown::CancelOnShutdown<crate::requeue::Receiver<T>>,
)
where
T: Eq + std::hash::Hash,
{
let (tx, rx) = crate::requeue::channel(capacity);
let rx = shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), rx);
(tx, rx)
}
pub fn watch<T>(
&mut self,
api: Api<T>,
watcher_config: watcher::Config,
) -> impl Stream<Item = watcher::Event<T>>
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Default,
{
let watch = watcher::watcher(api, watcher_config);
#[cfg(feature = "prometheus-client")]
let watch = metrics::ResourceWatchMetrics::instrument_watch(
self.metrics.as_ref().map(|m| m.watch.clone()),
watch,
);
let successful = errors::LogAndSleep::fixed_delay(self.error_delay, watch);
let initialized = self.initialized.add_handle().release_on_ready(successful);
shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), initialized)
}
#[inline]
pub fn watch_all<T>(
&mut self,
watcher_config: watcher::Config,
) -> impl Stream<Item = watcher::Event<T>>
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Default,
{
self.watch(Api::all(self.client()), watcher_config)
}
#[inline]
pub fn watch_namespaced<T>(
&mut self,
ns: impl AsRef<str>,
watcher_config: watcher::Config,
) -> impl Stream<Item = watcher::Event<T>>
where
T: Resource<Scope = NamespaceResourceScope>,
T: DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Default,
{
let api = Api::namespaced(self.client(), ns.as_ref());
self.watch(api, watcher_config)
}
pub fn cache<T>(
&mut self,
api: Api<T>,
watcher_config: watcher::Config,
) -> (Store<T>, impl Stream<Item = watcher::Event<T>>)
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Clone + Default + Eq + Hash + Clone,
{
let writer = reflector::store::Writer::<T>::default();
let store = writer.as_reader();
let watch = watcher::watcher(api, watcher_config);
let cached = reflector::reflector(writer, watch);
let successful = errors::LogAndSleep::fixed_delay(self.error_delay, cached);
let initialized = self.initialized.add_handle().release_on_ready(successful);
let graceful = shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), initialized);
(store, graceful)
}
#[inline]
pub fn cache_all<T>(
&mut self,
watcher_config: watcher::Config,
) -> (Store<T>, impl Stream<Item = watcher::Event<T>>)
where
T: Resource + DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Clone + Default + Eq + Hash + Clone,
{
self.cache(Api::all(self.client()), watcher_config)
}
#[inline]
pub fn cache_namespaced<T>(
&mut self,
ns: impl AsRef<str>,
watcher_config: watcher::Config,
) -> (Store<T>, impl Stream<Item = watcher::Event<T>>)
where
T: Resource<Scope = NamespaceResourceScope>,
T: DeserializeOwned + Clone + Debug + Send + 'static,
T::DynamicType: Clone + Default + Eq + Hash + Clone,
{
let api = Api::namespaced(self.client(), ns.as_ref());
self.cache(api, watcher_config)
}
#[cfg(feature = "server")]
async fn bind_server<F, T>(self, bind: impl Fn(S) -> F) -> Result<Runtime<T>, BuildError>
where
F: std::future::Future<Output = Result<T, BuildError>>,
{
let server = bind(self.server).await?;
Ok(Runtime {
server,
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
initialized: self.initialized,
shutdown_rx: self.shutdown_rx,
shutdown: self.shutdown,
metrics: self.metrics,
})
}
#[cfg(feature = "server")]
fn spawn_server_inner(self, spawn: impl FnOnce(S)) -> Runtime<NoServer> {
spawn(self.server);
Runtime {
server: NoServer(()),
admin: self.admin,
client: self.client,
error_delay: self.error_delay,
initialized: self.initialized,
shutdown_rx: self.shutdown_rx,
shutdown: self.shutdown,
metrics: self.metrics,
}
}
}
#[cfg(feature = "server")]
impl Runtime<server::Bound> {
pub fn server_addr(&self) -> std::net::SocketAddr {
self.server.local_addr()
}
pub fn spawn_server<S, B>(self, service: S) -> Runtime<NoServer>
where
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<B>>
+ Clone
+ Send
+ 'static,
S::Error: std::error::Error + Send + Sync,
S::Future: Send,
B: hyper::body::HttpBody + Send + 'static,
B::Data: Send,
B::Error: std::error::Error + Send + Sync,
{
let shutdown = self.shutdown_rx.clone();
self.spawn_server_inner(move |s| {
s.spawn(service, shutdown);
})
}
}
#[cfg(feature = "server")]
impl Runtime<Option<server::Bound>> {
pub fn server_addr(&self) -> Option<std::net::SocketAddr> {
self.server.as_ref().map(|s| s.local_addr())
}
pub fn spawn_server<S, B, F>(self, mk: F) -> Runtime<NoServer>
where
F: FnOnce() -> S,
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<B>>
+ Clone
+ Send
+ 'static,
S::Error: std::error::Error + Send + Sync,
S::Future: Send,
B: hyper::body::HttpBody + Send + 'static,
B::Data: Send,
B::Error: std::error::Error + Send + Sync,
{
let shutdown = self.shutdown_rx.clone();
self.spawn_server_inner(move |s| match s {
Some(s) => {
s.spawn(mk(), shutdown);
}
None => {
tracing::debug!("No server is configured");
}
})
}
}
impl Runtime<NoServer> {
pub fn builder() -> Builder<NoServer> {
Builder::default()
}
pub async fn run(self) -> Result<(), shutdown::Aborted> {
let Self {
admin,
initialized,
shutdown,
shutdown_rx,
..
} = self;
let admin = admin.spawn();
let ready = admin.readiness();
tokio::spawn(async move {
initialized.initialized().await;
ready.set(true);
tracing::debug!("initialized");
drop(shutdown_rx.signaled().await);
ready.set(false);
tracing::debug!("shutdown");
});
shutdown.signaled().await?;
Ok(())
}
}
impl Default for LogSettings {
fn default() -> Self {
Self {
filter: LogFilter::from_default_env(),
format: LogFormat::default(),
}
}
}
impl LogSettings {
fn try_init(self) -> Result<(), LogInitError> {
self.format.try_init(self.filter)
}
}
#[cfg(feature = "prometheus-client")]
impl RuntimeMetrics {
pub fn register(registry: &mut prometheus_client::registry::Registry) -> Self {
let watch =
metrics::ResourceWatchMetrics::register(registry.sub_registry_with_prefix("watch"));
Self { watch }
}
}