use self::runner::Runner;
use crate::{
reflector::{
reflector,
store::{Store, Writer},
ErasedResource, ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
utils::{try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle},
watcher::{self, watcher},
};
use derivative::Derivative;
use futures::{
channel, future,
stream::{self, SelectAll},
FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube::api::{Api, ListParams, Meta};
use serde::de::DeserializeOwned;
use snafu::{futures::TryStreamExt as SnafuTryStreamExt, Backtrace, ResultExt, Snafu};
use std::{sync::Arc, time::Duration};
use stream::BoxStream;
use tokio::{runtime::Handle, time::Instant};
mod future_hash_map;
mod runner;
#[derive(Snafu, Debug)]
pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error::Error + 'static> {
ObjectNotFound {
obj_ref: ObjectRef<ErasedResource>,
backtrace: Backtrace,
},
ReconcilerFailed {
source: ReconcilerErr,
backtrace: Backtrace,
},
SchedulerDequeueFailed {
#[snafu(backtrace)]
source: scheduler::Error,
},
QueueError {
source: QueueErr,
backtrace: Backtrace,
},
}
#[derive(Debug, Clone)]
pub struct ReconcilerAction {
pub requeue_after: Option<Duration>,
}
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
) -> impl Stream<Item = Result<ObjectRef<K>, S::Error>>
where
S: TryStream<Ok = T>,
I: IntoIterator<Item = ObjectRef<K>>,
K: Meta,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Ok)))
.try_flatten()
}
pub fn trigger_self<S>(stream: S) -> impl Stream<Item = Result<ObjectRef<S::Ok>, S::Error>>
where
S: TryStream,
S::Ok: Meta,
{
trigger_with(stream, |obj| Some(ObjectRef::from_obj(&obj)))
}
pub fn trigger_owners<KOwner, S>(stream: S) -> impl Stream<Item = Result<ObjectRef<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Meta,
KOwner: Meta,
{
trigger_with(stream, |obj| {
let meta = obj.meta().clone();
let ns = meta.namespace;
meta.owner_references
.into_iter()
.flatten()
.flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner))
})
}
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
pub struct Context<T>(Arc<T>);
impl<T> Context<T> {
#[must_use]
pub fn new(state: T) -> Context<T> {
Context(Arc::new(state))
}
#[must_use]
pub fn get_ref(&self) -> &T {
self.0.as_ref()
}
#[must_use]
pub fn into_inner(self) -> Arc<T> {
self.0
}
}
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Meta + 'static,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream<Ok = ObjectRef<K>>,
QueueStream::Error: std::error::Error + 'static,
{
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) = channel::mpsc::channel::<ScheduleRequest<ObjectRef<K>>>(100);
trystream_try_via(
Box::pin(stream::select(
queue.context(QueueError).map_ok(|obj_ref| ScheduleRequest {
message: obj_ref,
run_at: Instant::now() + Duration::from_millis(1),
}),
scheduler_rx.map(Ok),
)),
move |s| {
Runner::new(scheduler(s), move |obj_ref| {
let obj_ref = obj_ref.clone();
match store.get(&obj_ref) {
Some(obj) => reconciler(obj, context.clone())
.into_future()
.map(|res| Ok((obj_ref, res)))
.left_future(),
None => future::err(ObjectNotFound { obj_ref }.build()).right_future(),
}
})
.context(SchedulerDequeueFailed)
.map(|res| res.and_then(|x| x))
},
)
.and_then(move |(obj_ref, reconciler_result)| {
let ReconcilerAction { requeue_after } = match &reconciler_result {
Ok(action) => action.clone(), Err(err) => error_policy(err, err_context.clone()), };
let mut scheduler_tx = scheduler_tx.clone();
async move {
if let Some(delay) = requeue_after {
scheduler_tx
.send(ScheduleRequest {
message: obj_ref.clone(),
run_at: Instant::now() + delay,
})
.await
.expect("Message could not be sent to scheduler_rx");
}
reconciler_result
.map(|action| (obj_ref, action))
.context(ReconcilerFailed)
}
})
}
pub struct Controller<K>
where
K: Clone + Meta + 'static,
{
selector: SelectAll<BoxStream<'static, Result<ObjectRef<K>, watcher::Error>>>,
reader: Store<K>,
}
impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + Send + Sync + 'static,
{
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self {
let writer = Writer::<K>::default();
let reader = writer.as_reader();
let mut selector = stream::SelectAll::new();
let self_watcher =
trigger_self(try_flatten_applied(reflector(writer, watcher(owned_api, lp)))).boxed();
selector.push(self_watcher);
Self { selector, reader }
}
pub fn store(&self) -> Store<K> {
self.reader.clone()
}
pub fn owns<Child: Clone + Meta + DeserializeOwned + Send + 'static>(
mut self,
api: Api<Child>,
lp: ListParams,
) -> Self {
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)));
self.selector.push(child_watcher.boxed());
self
}
pub fn watches<
Other: Clone + Meta + DeserializeOwned + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
>(
mut self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Send + 'static,
) -> Self
where
I::IntoIter: Send,
{
let other_watcher = trigger_with(try_flatten_touched(watcher(api, lp)), mapper);
self.selector.push(other_watcher.boxed());
self
}
pub fn run<ReconcilerFut, T>(
self,
mut reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
where
K: Clone + Meta + 'static,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
applier(
move |obj, ctx| {
CancelableJoinHandle::spawn(reconciler(obj, ctx).into_future(), &Handle::current())
},
error_policy,
context,
self.reader,
self.selector,
)
}
}
#[cfg(test)]
mod tests {
use super::{Context, ReconcilerAction};
use crate::Controller;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::Api;
fn assert_send<T: Send>(x: T) -> T {
x
}
fn mock_type<T>() -> T {
unimplemented!(
"mock_type is not supposed to be called, only used for filling holes in type assertions"
)
}
#[allow(dead_code, unused_must_use)]
fn test_controller_should_be_send() {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<ReconcilerAction>()) },
|_: &std::io::Error, _| mock_type::<ReconcilerAction>(),
Context::new(()),
),
);
}
}