#![warn(clippy::pedantic)]
#![warn(clippy::nursery)]
#![warn(clippy::cargo)]
#![warn(missing_docs)]
#![warn(rust_2018_idioms)]
#![warn(unused)]
#![forbid(unsafe_code)]
#![deny(warnings)]
#![warn(clippy::unwrap_used)]
#![warn(clippy::expect_used)]
#![warn(clippy::panic)]
#![warn(clippy::todo)]
#![warn(clippy::unimplemented)]
#![warn(clippy::missing_errors_doc)]
#![warn(clippy::missing_panics_doc)]
#![warn(clippy::missing_safety_doc)]
use std::{fmt::Debug, hash::Hash, str::FromStr, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::{
StreamExt,
future::{self, BoxFuture},
};
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope};
use kube::{
Api, Client, Resource,
runtime::{Controller, controller::Action, watcher::Config as ControllerConfig},
};
pub use kube;
pub use finalizer::update_finalizer;
pub use kubus_derive::kubus;
use serde::de::DeserializeOwned;
mod finalizer;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),
#[error("Kube Error: {0}")]
KubeError(#[from] kube::Error),
#[error("Finalizer Error: {0}")]
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<Error>>),
}
pub trait ScopeExt<K>
where
K: Resource<Scope = Self>,
{
fn api(client: Client, namespace: Option<impl AsRef<str>>) -> Api<K>;
}
impl<K> ScopeExt<K> for NamespaceResourceScope
where
K: Resource<Scope = Self>,
K::DynamicType: Default,
{
fn api(client: Client, namespace: Option<impl AsRef<str>>) -> Api<K> {
if let Some(namespace) = namespace {
Api::namespaced(client, namespace.as_ref())
} else {
Api::all(client)
}
}
}
impl<K> ScopeExt<K> for ClusterResourceScope
where
K: Resource<Scope = Self>,
K::DynamicType: Default,
{
fn api(client: Client, _: Option<impl AsRef<str>>) -> Api<K> {
Api::all(client)
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
trait DynEventHandler: Send + Sync {
fn run(&self, client: Client) -> BoxFuture<'static, Result<()>>;
}
#[async_trait]
pub trait EventHandler<K, Ctx>
where
K: Resource + Clone + Debug + DeserializeOwned + Send + Sync + 'static,
K::DynamicType: Clone + Debug + Default + Hash + Unpin + Eq,
Ctx: Send + Sync + 'static,
{
async fn handler(resource: Arc<K>, context: Arc<Ctx>) -> Result<Action>;
fn error_policy(_resource: Arc<K>, err: &Error, _ctx: Arc<Ctx>) -> Action {
tracing::warn!("Reconciliation error: {:?}", err);
Action::requeue(Duration::from_secs(5))
}
async fn watch(client: Client, context: Arc<Ctx>) -> Result<()>
where
Self: Sized,
{
println!("starting controller");
let api = Api::<K>::all(client);
Controller::new(api, ControllerConfig::default())
.shutdown_on_signal()
.run(Self::handler, Self::error_policy, context)
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
Ok(())
}
}
struct EventHandlerWrapper<H, K, Ctx>
where
H: EventHandler<K, Ctx>,
K: Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Clone + Debug + Default + Hash + Unpin + Eq,
Ctx: Send + Sync + 'static,
{
context: Arc<Ctx>,
_phantom: std::marker::PhantomData<(H, K)>,
}
impl<H, K, Ctx> EventHandlerWrapper<H, K, Ctx>
where
H: EventHandler<K, Ctx>,
K: Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Clone + Debug + Default + Hash + Unpin + Eq,
Ctx: Send + Sync + 'static,
{
const fn new(context: Arc<Ctx>) -> Self {
Self {
context,
_phantom: std::marker::PhantomData,
}
}
}
impl<H, K, Ctx> DynEventHandler for EventHandlerWrapper<H, K, Ctx>
where
H: EventHandler<K, Ctx> + Send + Sync + 'static,
K: Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Clone + Debug + Default + Hash + Unpin + Eq,
Ctx: Send + Sync + 'static,
{
fn run(&self, client: Client) -> BoxFuture<'static, Result<()>> {
let context = self.context.clone();
H::watch(client, context)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum EventType {
Apply,
Delete,
}
impl FromStr for EventType {
type Err = std::io::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"Apply" => Ok(Self::Apply),
"Delete" => Ok(Self::Delete),
_ => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"no such event type",
)),
}
}
}
pub struct Operator<Ctx> {
client: Client,
context: Arc<Ctx>,
handlers: Vec<Box<dyn DynEventHandler>>,
}
impl<Ctx> Operator<Ctx>
where
Ctx: Send + Sync + 'static,
{
pub async fn new(context: Ctx) -> Result<Self> {
let client = Client::try_default().await?;
Ok(Self {
client,
context: Arc::new(context),
handlers: Default::default(),
})
}
pub fn with_client(client: Client, context: Ctx) -> Self {
Self {
client,
context: Arc::new(context),
handlers: Default::default(),
}
}
#[must_use]
pub fn handler<H, K>(mut self, _: H) -> Self
where
H: EventHandler<K, Ctx> + Send + Sync + 'static,
K: Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Clone + Debug + Default + Hash + Unpin + Eq,
{
let wrapper = EventHandlerWrapper::<H, K, Ctx>::new(self.context.clone());
self.handlers.push(Box::new(wrapper));
self
}
pub async fn run(self) -> Result<()> {
tracing::info!(
"starting kubus operator with {} handlers",
self.handlers.len()
);
let tasks: Vec<_> = self
.handlers
.into_iter()
.map(|handler| {
let client = self.client.clone();
tokio::spawn(async move {
tracing::info!("starting handler");
let client = client.clone();
if let Err(e) = handler.run(client).await {
tracing::error!("restarting handler failed: {}", e);
}
})
})
.collect();
future::join_all(tasks).await;
Ok(())
}
}