use std::error::Error;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use futures::future;
use kube::Resource;
use serde::de::DeserializeOwned;
use crate::event_handler::{DynEventHandler, EventHandler, EventHandlerWrapper};
use crate::{Context, Result};
pub struct Operator<S, E>
where
S: Clone,
{
context: Arc<Context<S>>,
handlers: Vec<Box<dyn DynEventHandler<E>>>,
}
impl<S, E> Operator<S, E>
where
S: Clone + Send + Sync + 'static,
E: Error + Send + Sync + 'static,
{
pub fn new(context: Arc<Context<S>>) -> Self {
Self {
context,
handlers: Default::default(),
}
}
#[must_use]
pub fn handler<H, K>(mut self, _: H) -> Self
where
H: EventHandler<K, S, E> + Send + Sync + 'static,
K: Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Clone + Debug + Default + Hash + Unpin + Eq,
{
let wrapper = EventHandlerWrapper::<H, K, S, E>::new(self.context.clone());
self.handlers.push(Box::new(wrapper));
self
}
pub async fn run(self) -> Result<()> {
tracing::info!(
"starting kubus operator with {} handlers",
self.handlers.len()
);
let tasks: Vec<_> = self
.handlers
.into_iter()
.map(|handler| {
let client = self.context.client.clone();
tokio::spawn(async move {
tracing::info!("starting handler");
let client = client.clone();
if let Err(e) = handler.run(client).await {
tracing::error!("restarting handler failed: {}", e);
}
})
})
.collect();
future::join_all(tasks).await;
Ok(())
}
}
pub struct OperatorBuilder;
impl OperatorBuilder {
pub fn with_context<S>(self, context: impl Into<Context<S>>) -> OperatorBuilderWithContext<S>
where
S: Clone + Send + Sync + 'static,
{
let context = Arc::new(context.into());
OperatorBuilderWithContext { context }
}
}
impl Operator<(), ()> {
#[must_use]
pub const fn builder() -> OperatorBuilder {
OperatorBuilder
}
}
pub struct OperatorBuilderWithContext<S>
where
S: Clone + Send + Sync + 'static,
{
context: Arc<Context<S>>,
}
impl<S> OperatorBuilderWithContext<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn handler<H, K, E>(self, _: H) -> Operator<S, E>
where
H: EventHandler<K, S, E> + Send + Sync + 'static,
E: Error + Send + Sync + 'static,
K: Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Clone + Debug + Default + Hash + Unpin + Eq,
{
let wrapper = EventHandlerWrapper::<H, K, S, E>::new(self.context.clone());
let mut operator = Operator::<S, E>::new(self.context);
operator.handlers.push(Box::new(wrapper));
operator
}
}