use self::runner::Runner;
use crate::{
reflector::{
reflector,
store::{Store, Writer},
ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
utils::{
try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle,
KubeRuntimeStreamExt, StreamBackoff,
},
watcher::{self, watcher},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
use futures::{
channel,
future::{self, BoxFuture},
stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube_client::api::{Api, DynamicObject, ListParams, Resource};
use serde::de::DeserializeOwned;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
time::Duration,
};
use stream::BoxStream;
use thiserror::Error;
use tokio::{runtime::Handle, time::Instant};
use tracing::{info_span, Instrument};
mod future_hash_map;
mod runner;
#[derive(Debug, Error)]
pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error::Error + 'static> {
#[error("tried to reconcile object {0} that was not found in local store")]
ObjectNotFound(ObjectRef<DynamicObject>),
#[error("reconciler for object {1} failed")]
ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
#[error("scheduler dequeue failed")]
SchedulerDequeueFailed(#[source] scheduler::Error),
#[error("event queue error")]
QueueError(#[source] QueueErr),
}
#[derive(Debug, Clone)]
pub struct ReconcilerAction {
pub requeue_after: Option<Duration>,
}
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = T>,
I: IntoIterator,
I::Item: Into<ReconcileRequest<K>>,
K: Resource,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
.try_flatten()
}
pub fn trigger_self<K, S>(
stream: S,
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = K>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}
pub fn trigger_owners<KOwner, S>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: <S::Ok as Resource>::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
let child_ref = ObjectRef::from_obj_with(&obj, child_type.clone()).erase();
meta.owner_references
.into_iter()
.flatten()
.filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
.map(move |owner_ref| ReconcileRequest {
obj_ref: owner_ref,
reason: ReconcileReason::RelatedObjectUpdated {
obj_ref: child_ref.clone(),
},
})
})
}
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
pub struct Context<T>(Arc<T>);
impl<T> Context<T> {
#[must_use]
pub fn new(state: T) -> Context<T> {
Context(Arc::new(state))
}
#[must_use]
pub fn get_ref(&self) -> &T {
self.0.as_ref()
}
#[must_use]
pub fn into_inner(self) -> Arc<T> {
self.0
}
}
#[derive(Derivative)]
#[derivative(
Debug(bound = "K::DynamicType: Debug"),
Clone(bound = "K::DynamicType: Clone"),
PartialEq(bound = "K::DynamicType: PartialEq"),
Eq(bound = "K::DynamicType: Eq"),
Hash(bound = "K::DynamicType: Hash")
)]
pub struct ReconcileRequest<K: Resource> {
pub obj_ref: ObjectRef<K>,
#[derivative(PartialEq = "ignore", Hash = "ignore")]
pub reason: ReconcileReason,
}
impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
fn from(obj_ref: ObjectRef<K>) -> Self {
ReconcileRequest {
obj_ref,
reason: ReconcileReason::Unknown,
}
}
}
#[derive(Debug, Clone)]
pub enum ReconcileReason {
Unknown,
ObjectUpdated,
RelatedObjectUpdated { obj_ref: ObjectRef<DynamicObject> },
ReconcilerRequestedRetry,
ErrorPolicyRequestedRetry,
BulkReconcile,
Custom { reason: String },
}
impl Display for ReconcileReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReconcileReason::Unknown => f.write_str("unknown"),
ReconcileReason::ObjectUpdated => f.write_str("object updated"),
ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
f.write_fmt(format_args!("related object updated: {}", object))
}
ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
ReconcileReason::Custom { reason } => f.write_str(reason),
}
}
}
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream,
QueueStream::Ok: Into<ReconcileRequest<K>>,
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) = channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(100);
trystream_try_via(
Box::pin(stream::select(
queue.map_err(Error::QueueError).map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
scheduler_rx
.map(Ok)
.take_until(scheduler_shutdown_rx)
.on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
)),
move |s| {
Runner::new(scheduler(s), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let reconciler_span = info_span!("reconciling object", "object.ref" = %request.obj_ref, object.reason = %request.reason);
reconciler_span.in_scope(|| reconciler(obj, context.clone()))
.into_future()
.instrument(reconciler_span.clone())
.map(|res| Ok((request.obj_ref, res, reconciler_span)))
.left_future()
},
None => future::err(
Error::ObjectNotFound(request.obj_ref.erase())
)
.right_future(),
}
})
.map_err(Error::SchedulerDequeueFailed)
.map(|res| res.and_then(|x| x))
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
.and_then(move |(obj_ref, reconciler_result, reconciler_span)| {
let (ReconcilerAction { requeue_after }, requeue_reason) = match &reconciler_result {
Ok(action) =>
(action.clone(), ReconcileReason::ReconcilerRequestedRetry),
Err(err) =>
(reconciler_span.in_scope(|| error_policy(err, err_context.clone())), ReconcileReason::ErrorPolicyRequestedRetry),
};
let mut scheduler_tx = scheduler_tx.clone();
async move {
if let Some(delay) = requeue_after {
let _ = scheduler_tx
.send(ScheduleRequest {
message: ReconcileRequest {obj_ref: obj_ref.clone(), reason: requeue_reason},
run_at: Instant::now() + delay,
})
.await;
}
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase()))
}
}
})
.on_complete(async { tracing::debug!("applier terminated") })
}
pub struct Controller<K>
where
K: Clone + Resource + Debug + 'static,
K::DynamicType: Eq + Hash,
{
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
}
impl<K> Controller<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
{
#[must_use]
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self
where
K::DynamicType: Default,
{
Self::new_with(owned_api, lp, Default::default())
}
pub fn new_with(owned_api: Api<K>, lp: ListParams, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self(
try_flatten_applied(reflector(writer, watcher(owned_api, lp))),
dyntype.clone(),
)
.boxed();
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::new(watcher::default_backoff()),
graceful_shutdown_selector: vec![
future::pending().boxed(),
],
forceful_shutdown_selector: vec![
future::pending().boxed(),
],
dyntype,
reader,
}
}
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
self
}
pub fn store(&self) -> Store<K> {
self.reader.clone()
}
#[must_use]
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
lp: ListParams,
) -> Self {
self.owns_with(api, (), lp)
}
#[must_use]
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
lp: ListParams,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(
try_flatten_touched(watcher(api, lp)),
self.dyntype.clone(),
dyntype,
);
self.trigger_selector.push(child_watcher.boxed());
self
}
#[must_use]
pub fn watches<
Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
>(
self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
I::IntoIter: Send,
{
self.watches_with(api, (), lp, mapper)
}
#[must_use]
pub fn watches_with<
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
>(
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
I::IntoIter: Send,
Other::DynamicType: Clone,
{
let other_watcher = trigger_with(try_flatten_touched(watcher(api, lp)), move |obj| {
let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
mapper(obj)
.into_iter()
.map(move |mapped_obj_ref| ReconcileRequest {
obj_ref: mapped_obj_ref,
reason: ReconcileReason::RelatedObjectUpdated {
obj_ref: watched_obj_ref.clone(),
},
})
});
self.trigger_selector.push(other_watcher.boxed());
self
}
#[must_use]
pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
let store = self.store();
let dyntype = self.dyntype.clone();
self.trigger_selector.push(
trigger
.flat_map(move |()| {
let dyntype = dyntype.clone();
stream::iter(store.state().into_iter().map(move |obj| {
Ok(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
reason: ReconcileReason::BulkReconcile,
})
}))
})
.boxed(),
);
self
}
#[must_use]
pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
self.graceful_shutdown_selector.push(trigger.boxed());
self
}
#[must_use]
pub fn shutdown_on_signal(mut self) -> Self {
async fn shutdown_signal() {
futures::future::select(
tokio::signal::ctrl_c().map(|_| ()).boxed(),
#[cfg(unix)]
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.unwrap()
.recv()
.map(|_| ())
.boxed(),
#[cfg(not(unix))]
futures::future::pending::<()>(),
)
.await;
}
let (graceful_tx, graceful_rx) = channel::oneshot::channel();
self.graceful_shutdown_selector
.push(graceful_rx.map(|_| ()).boxed());
self.forceful_shutdown_selector.push(
async {
tracing::info!("press ctrl+c to shut down gracefully");
shutdown_signal().await;
if let Ok(()) = graceful_tx.send(()) {
tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
} else {
tracing::info!(
"graceful shutdown already requested, press ctrl+c again to force shutdown"
);
}
shutdown_signal().await;
tracing::info!("forced shutdown requested");
}
.boxed(),
);
self
}
pub fn run<ReconcilerFut, T>(
self,
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
applier(
move |obj, ctx| {
CancelableJoinHandle::spawn(
reconciler(obj, ctx).into_future().in_current_span(),
&Handle::current(),
)
},
error_policy,
context,
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
}
#[cfg(test)]
mod tests {
use super::{Context, ReconcilerAction};
use crate::Controller;
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::Api;
fn assert_send<T: Send>(x: T) -> T {
x
}
fn mock_type<T>() -> T {
unimplemented!(
"mock_type is not supposed to be called, only used for filling holes in type assertions"
)
}
#[allow(dead_code, unused_must_use)]
fn test_controller_should_be_send() {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<ReconcilerAction>()) },
|_: &std::io::Error, _| mock_type::<ReconcilerAction>(),
Context::new(()),
),
);
}
}