use std::error::Error as StdError;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::Instrument;
use warp::Filter;
#[cfg(feature = "admission")]
use crate::admission::{MutatingAdmissionHandler, ValidatingAdmissionHandler};
use crate::context::Context;
use crate::event_handler::Runnable;
pub struct Operator<S, E>
where
S: Clone,
{
context: Arc<Context<S>>,
handlers: Vec<Box<dyn Runnable<S, E>>>,
#[cfg(feature = "admission")]
mutating_handlers: Vec<Box<dyn MutatingAdmissionHandler<Err = E>>>,
#[cfg(feature = "admission")]
validating_handlers: Vec<Box<dyn ValidatingAdmissionHandler<Err = E>>>,
#[cfg(feature = "admission")]
tls_certs_path: Option<PathBuf>,
}
impl<S, E> Operator<S, E>
where
S: Clone + Send + Sync + 'static,
E: StdError + Send + Sync + 'static,
{
pub fn new(context: Arc<Context<S>>) -> Self {
Self {
context,
handlers: Vec::new(),
#[cfg(feature = "admission")]
mutating_handlers: Vec::new(),
#[cfg(feature = "admission")]
validating_handlers: Vec::new(),
#[cfg(feature = "admission")]
tls_certs_path: None,
}
}
#[must_use]
pub fn handler<H>(mut self, handler: H) -> Self
where
H: Runnable<S, E> + Send + Sync + 'static,
{
self.handlers.push(Box::new(handler));
self
}
#[cfg(feature = "admission")]
#[must_use]
pub fn mutator<H>(mut self, handler: H) -> Self
where
H: MutatingAdmissionHandler<Err = E> + 'static,
{
self.mutating_handlers.push(Box::new(handler));
self
}
#[cfg(feature = "admission")]
#[must_use]
pub fn validator<H>(mut self, handler: H) -> Self
where
H: ValidatingAdmissionHandler<Err = E> + 'static,
{
self.validating_handlers.push(Box::new(handler));
self
}
#[cfg(feature = "admission")]
#[must_use]
pub fn with_tls_certs(mut self, path: impl Into<PathBuf>) -> Self {
self.tls_certs_path = Some(path.into());
self
}
pub async fn run(self) -> crate::Result<()> {
tracing::info!("starting operator with {} handlers", self.handlers.len());
let mut set = JoinSet::new();
for handler in self.handlers {
let client = self.context.client.clone();
let context = self.context.clone();
let name = handler.name();
let span = tracing::info_span!("handler", name);
set.spawn(
async move {
if let Err(err) = handler.run(client, context).await {
tracing::error!({ err = &err as &dyn StdError }, "handler error");
}
}
.instrument(span),
);
}
#[cfg(feature = "admission")]
if !self.mutating_handlers.is_empty() || !self.validating_handlers.is_empty() {
tracing::info!(
mutating = self.mutating_handlers.len(),
validating = self.validating_handlers.len(),
"starting admission webhook server"
);
let validate_handler =
crate::admission::create_validating_route(self.validating_handlers);
let mutate_handler = crate::admission::create_mutating_route(self.mutating_handlers);
let shutdown_signal = async {
tokio::signal::ctrl_c().await.ok();
};
let addr = ([0, 0, 0, 0], 8443);
let routes = warp::post()
.and(
warp::path("mutate")
.and(warp::body::json())
.and_then(mutate_handler)
.with(warp::trace::request()),
)
.or(warp::path("validate")
.and(warp::body::json())
.and_then(validate_handler)
.with(warp::trace::request()));
if let Some(path) = self.tls_certs_path {
let (_, task) = warp::serve(routes)
.tls()
.cert_path(path.join("tls.crt"))
.key_path(path.join("tls.key"))
.bind_with_graceful_shutdown(addr, shutdown_signal);
set.spawn(task);
} else {
tracing::warn!(
"admission webhook server running without TLS - not suitable for production"
);
let (_, task) =
warp::serve(routes).bind_with_graceful_shutdown(addr, shutdown_signal);
set.spawn(task);
}
}
set.join_all().await;
Ok(())
}
}
#[doc(hidden)]
pub struct OperatorBuilder;
impl OperatorBuilder {
pub fn with_context<S>(self, context: impl Into<Context<S>>) -> OperatorBuilderWithContext<S>
where
S: Clone + Send + Sync + 'static,
{
let context = Arc::new(context.into());
OperatorBuilderWithContext { context }
}
}
impl Operator<(), ()> {
#[must_use]
pub const fn builder() -> OperatorBuilder {
OperatorBuilder
}
}
#[doc(hidden)]
pub struct OperatorBuilderWithContext<S>
where
S: Clone + Send + Sync + 'static,
{
context: Arc<Context<S>>,
}
impl<S> OperatorBuilderWithContext<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn handler<H, E>(self, handler: H) -> Operator<S, E>
where
H: Runnable<S, E> + Send + Sync + 'static,
E: StdError + Send + Sync + 'static,
{
let mut operator = Operator::<S, E>::new(self.context);
operator.handlers.push(Box::new(handler));
operator
}
#[cfg(feature = "admission")]
#[must_use]
pub fn mutator<H, E>(self, handler: H) -> Operator<S, E>
where
H: MutatingAdmissionHandler<Err = E> + 'static,
E: StdError + Send + Sync + 'static,
{
let mut operator = Operator::<S, E>::new(self.context);
operator.mutating_handlers.push(Box::new(handler));
operator
}
#[cfg(feature = "admission")]
#[must_use]
pub fn validator<H, E>(self, handler: H) -> Operator<S, E>
where
H: ValidatingAdmissionHandler<Err = E> + 'static,
E: StdError + Send + Sync + 'static,
{
let mut operator = Operator::<S, E>::new(self.context);
operator.validating_handlers.push(Box::new(handler));
operator
}
#[cfg(feature = "admission")]
#[must_use]
pub fn with_tls_certs<H, E>(
self,
path: impl Into<PathBuf>,
) -> OperatorBuilderWithContextAndTls<S>
where
E: StdError + Send + Sync + 'static,
{
OperatorBuilderWithContextAndTls {
context: self.context,
tls_certs_path: Some(path.into()),
}
}
}
#[doc(hidden)]
#[cfg(feature = "admission")]
pub struct OperatorBuilderWithContextAndTls<S>
where
S: Clone + Send + Sync + 'static,
{
context: Arc<Context<S>>,
tls_certs_path: Option<PathBuf>,
}
#[cfg(feature = "admission")]
impl<S> OperatorBuilderWithContextAndTls<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn handler<H, E>(self, handler: H) -> Operator<S, E>
where
H: Runnable<S, E> + Send + Sync + 'static,
E: StdError + Send + Sync + 'static,
{
let mut operator = Operator::<S, E>::new(self.context);
operator.handlers.push(Box::new(handler));
operator.tls_certs_path = self.tls_certs_path;
operator
}
}