use self::runner::Runner;
use crate::{
reflector::{
reflector,
store::{Store, Writer},
ObjectRef,
},
scheduler::{scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, watcher},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
use futures::{
channel,
future::{self, BoxFuture},
ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube_client::api::{Api, DynamicObject, ListParams, Resource};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
task::Poll,
time::Duration,
};
use stream::BoxStream;
use thiserror::Error;
use tokio::{runtime::Handle, time::Instant};
use tracing::{info_span, Instrument};
mod future_hash_map;
mod runner;
#[derive(Debug, Error)]
pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
#[error("tried to reconcile object {0} that was not found in local store")]
ObjectNotFound(ObjectRef<DynamicObject>),
#[error("reconciler for object {1} failed")]
ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
#[error("event queue error")]
QueueError(#[source] QueueErr),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Action {
requeue_after: Option<Duration>,
}
impl Action {
#[must_use]
pub fn requeue(duration: Duration) -> Self {
Self {
requeue_after: Some(duration),
}
}
#[must_use]
pub fn await_change() -> Self {
Self { requeue_after: None }
}
}
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = T>,
I: IntoIterator,
I::Item: Into<ReconcileRequest<K>>,
K: Resource,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
.try_flatten()
}
pub fn trigger_self<K, S>(
stream: S,
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = K>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}
pub fn trigger_owners<KOwner, S>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: <S::Ok as Resource>::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
let child_ref = ObjectRef::from_obj_with(&obj, child_type.clone()).erase();
meta.owner_references
.into_iter()
.flatten()
.filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
.map(move |owner_ref| ReconcileRequest {
obj_ref: owner_ref,
reason: ReconcileReason::RelatedObjectUpdated {
obj_ref: Box::new(child_ref.clone()),
},
})
})
}
#[derive(Derivative)]
#[derivative(
Debug(bound = "K::DynamicType: Debug"),
Clone(bound = "K::DynamicType: Clone"),
PartialEq(bound = "K::DynamicType: PartialEq"),
Eq(bound = "K::DynamicType: Eq"),
Hash(bound = "K::DynamicType: Hash")
)]
pub struct ReconcileRequest<K: Resource> {
pub obj_ref: ObjectRef<K>,
#[derivative(PartialEq = "ignore", Hash = "ignore")]
pub reason: ReconcileReason,
}
impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
fn from(obj_ref: ObjectRef<K>) -> Self {
ReconcileRequest {
obj_ref,
reason: ReconcileReason::Unknown,
}
}
}
#[derive(Debug, Clone)]
pub enum ReconcileReason {
Unknown,
ObjectUpdated,
RelatedObjectUpdated { obj_ref: Box<ObjectRef<DynamicObject>> },
ReconcilerRequestedRetry,
ErrorPolicyRequestedRetry,
BulkReconcile,
Custom { reason: String },
}
impl Display for ReconcileReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReconcileReason::Unknown => f.write_str("unknown"),
ReconcileReason::ObjectUpdated => f.write_str("object updated"),
ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
f.write_fmt(format_args!("related object updated: {object}"))
}
ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
ReconcileReason::Custom { reason } => f.write_str(reason),
}
}
}
const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream,
QueueStream::Ok: Into<ReconcileRequest<K>>,
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let (scheduler_tx, scheduler_rx) =
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
let error_policy = Arc::new(error_policy);
trystream_try_via(
Box::pin(stream::select(
queue
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
scheduler_rx
.map(Ok)
.take_until(scheduler_shutdown_rx)
.on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
)),
move |s| {
Runner::new(scheduler(s), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
.and_then(move |(obj_ref, reconciler_result)| async move {
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
}
})
.on_complete(async { tracing::debug!("applier terminated") })
}
#[pin_project]
#[must_use]
struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
result: Option<Result<Action, ReconcilerErr>>,
}
impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
fn new(
result: Result<Action, ReconcilerErr>,
error_policy: impl FnOnce(&ReconcilerErr) -> Action,
obj_ref: ObjectRef<K>,
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
) -> Self {
let reconciler_finished_at = Instant::now();
let (action, reschedule_reason) = result.as_ref().map_or_else(
|err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
|action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
);
Self {
reschedule_tx,
reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
message: ReconcileRequest {
obj_ref,
reason: reschedule_reason,
},
run_at: reconciler_finished_at + requeue_after,
}),
result: Some(result),
}
}
}
impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
type Output = Result<Action, ReconcilerErr>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.reschedule_request.is_some() {
let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
let reschedule_request = this
.reschedule_request
.take()
.expect("PostReconciler::reschedule_request was taken during processing");
if let Ok(()) = rescheduler_ready {
let _ = this.reschedule_tx.start_send(reschedule_request);
}
}
Poll::Ready(
this.result
.take()
.expect("PostReconciler::result was already taken"),
)
}
}
pub struct Controller<K>
where
K: Clone + Resource + Debug + 'static,
K::DynamicType: Eq + Hash,
{
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
}
impl<K> Controller<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
{
#[must_use]
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self
where
K::DynamicType: Default,
{
Self::new_with(owned_api, lp, Default::default())
}
pub fn new_with(owned_api: Api<K>, lp: ListParams, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self(
reflector(writer, watcher(owned_api, lp)).applied_objects(),
dyntype.clone(),
)
.boxed();
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::new(watcher::default_backoff()),
graceful_shutdown_selector: vec![
future::pending().boxed(),
],
forceful_shutdown_selector: vec![
future::pending().boxed(),
],
dyntype,
reader,
}
}
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
self
}
pub fn store(&self) -> Store<K> {
self.reader.clone()
}
#[must_use]
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
lp: ListParams,
) -> Self {
self.owns_with(api, (), lp)
}
#[must_use]
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
lp: ListParams,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(watcher(api, lp).touched_objects(), self.dyntype.clone(), dyntype);
self.trigger_selector.push(child_watcher.boxed());
self
}
#[must_use]
pub fn watches<
Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
>(
self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
I::IntoIter: Send,
{
self.watches_with(api, (), lp, mapper)
}
#[must_use]
pub fn watches_with<
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
>(
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
I::IntoIter: Send,
Other::DynamicType: Clone,
{
let other_watcher = trigger_with(watcher(api, lp).touched_objects(), move |obj| {
let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
mapper(obj)
.into_iter()
.map(move |mapped_obj_ref| ReconcileRequest {
obj_ref: mapped_obj_ref,
reason: ReconcileReason::RelatedObjectUpdated {
obj_ref: Box::new(watched_obj_ref.clone()),
},
})
});
self.trigger_selector.push(other_watcher.boxed());
self
}
#[must_use]
pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
let store = self.store();
let dyntype = self.dyntype.clone();
self.trigger_selector.push(
trigger
.flat_map(move |()| {
let dyntype = dyntype.clone();
stream::iter(store.state().into_iter().map(move |obj| {
Ok(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
reason: ReconcileReason::BulkReconcile,
})
}))
})
.boxed(),
);
self
}
#[must_use]
pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
self.graceful_shutdown_selector.push(trigger.boxed());
self
}
#[must_use]
pub fn shutdown_on_signal(mut self) -> Self {
async fn shutdown_signal() {
futures::future::select(
tokio::signal::ctrl_c().map(|_| ()).boxed(),
#[cfg(unix)]
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.unwrap()
.recv()
.map(|_| ())
.boxed(),
#[cfg(not(unix))]
futures::future::pending::<()>(),
)
.await;
}
let (graceful_tx, graceful_rx) = channel::oneshot::channel();
self.graceful_shutdown_selector
.push(graceful_rx.map(|_| ()).boxed());
self.forceful_shutdown_selector.push(
async {
tracing::info!("press ctrl+c to shut down gracefully");
shutdown_signal().await;
if let Ok(()) = graceful_tx.send(()) {
tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
} else {
tracing::info!(
"graceful shutdown already requested, press ctrl+c again to force shutdown"
);
}
shutdown_signal().await;
tracing::info!("forced shutdown requested");
}
.boxed(),
);
self
}
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
applier(
move |obj, ctx| {
CancelableJoinHandle::spawn(
reconciler(obj, ctx).into_future().in_current_span(),
&Handle::current(),
)
},
error_policy,
context,
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
}
#[cfg(test)]
mod tests {
use std::{convert::Infallible, sync::Arc, time::Duration};
use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
use crate::{
applier,
reflector::{self, ObjectRef},
watcher, Controller,
};
use futures::{pin_mut, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::{core::ObjectMeta, Api};
use tokio::time::timeout;
fn assert_send<T: Send>(x: T) -> T {
x
}
fn mock_type<T>() -> T {
unimplemented!(
"mock_type is not supposed to be called, only used for filling holes in type assertions"
)
}
#[allow(dead_code, unused_must_use)]
fn test_controller_should_be_send() {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<Action>()) },
|_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
Arc::new(()),
),
);
}
#[tokio::test]
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
let items = APPLIER_REQUEUE_BUF_SIZE * 50;
let reconciles = items * 3;
let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let applier = applier(
|obj, _| {
Box::pin(async move {
println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
})
},
|_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
);
pin_mut!(applier);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
name: Some(format!("cm-{i}")),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}
timeout(
Duration::from_secs(10),
applier
.as_mut()
.take(reconciles)
.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("test timeout expired, applier likely deadlocked")
.unwrap();
drop(queue_tx);
timeout(
Duration::from_secs(10),
applier.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
.unwrap();
}
}