use self::runner::Runner;
use crate::{
reflector::{
self, ObjectRef, reflector,
store::{Store, Writer},
},
scheduler::{ScheduleRequest, debounced_scheduler},
utils::{
Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, trystream_try_via,
},
watcher::{self, DefaultBackoff, metadata_watcher, watcher},
};
use educe::Educe;
use futures::{
FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, channel,
future::{self, BoxFuture},
stream,
};
use kube_client::api::{Api, DynamicObject, Resource};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
task::{Poll, ready},
time::Duration,
};
use stream::BoxStream;
use thiserror::Error;
use tokio::{runtime::Handle, time::Instant};
use tracing::{Instrument, info_span};
mod future_hash_map;
mod runner;
pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
#[derive(Debug, Error)]
pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
#[error("tried to reconcile object {0} that was not found in local store")]
ObjectNotFound(ObjectRef<DynamicObject>),
#[error("reconciler for object {1} failed")]
ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
#[error("event queue error")]
QueueError(#[source] QueueErr),
#[error("runner error")]
RunnerError(#[source] RunnerError),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Action {
requeue_after: Option<Duration>,
}
impl Action {
#[must_use]
pub const fn requeue(duration: Duration) -> Self {
Self {
requeue_after: Some(duration),
}
}
#[must_use]
pub const fn await_change() -> Self {
Self { requeue_after: None }
}
}
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = T>,
I: IntoIterator,
I::Item: Into<ReconcileRequest<K>>,
K: Resource,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
.try_flatten()
}
pub fn trigger_self<K, S>(
stream: S,
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = K>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}
#[cfg(feature = "unstable-runtime-subscribe")]
fn trigger_self_shared<K, S>(
stream: S,
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}
fn trigger_others<S, K, I>(
stream: S,
mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
dyntype: <S::Ok as Resource>::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
K: Resource,
K::DynamicType: Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
{
trigger_with(stream, move |obj| {
let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
mapper(obj)
.into_iter()
.map(move |mapped_obj_ref| ReconcileRequest {
obj_ref: mapped_obj_ref,
reason: ReconcileReason::RelatedObjectUpdated {
obj_ref: Box::new(watch_ref.clone()),
},
})
})
}
#[cfg(feature = "unstable-runtime-subscribe")]
fn trigger_others_shared<S, O, K, I>(
stream: S,
mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
dyntype: O::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = Arc<O>>,
O: Resource,
O::DynamicType: Clone,
K: Resource,
K::DynamicType: Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
{
trigger_with(stream, move |obj| {
let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
mapper(obj)
.into_iter()
.map(move |mapped_obj_ref| ReconcileRequest {
obj_ref: mapped_obj_ref,
reason: ReconcileReason::RelatedObjectUpdated {
obj_ref: Box::new(watch_ref.clone()),
},
})
})
}
pub fn trigger_owners<KOwner, S>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: <S::Ok as Resource>::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
let mapper = move |obj: S::Ok| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
meta.owner_references
.into_iter()
.flatten()
.filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
};
trigger_others(stream, mapper, child_type)
}
#[cfg(feature = "unstable-runtime-subscribe")]
fn trigger_owners_shared<KOwner, S, K>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
let mapper = move |obj: S::Ok| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
meta.owner_references
.into_iter()
.flatten()
.filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
};
trigger_others_shared(stream, mapper, child_type)
}
#[derive(Educe)]
#[educe(
Debug(bound("K::DynamicType: Debug")),
Clone(bound("K::DynamicType: Clone")),
PartialEq(bound("K::DynamicType: PartialEq")),
Hash(bound("K::DynamicType: Hash"))
)]
pub struct ReconcileRequest<K: Resource> {
pub obj_ref: ObjectRef<K>,
#[educe(PartialEq(ignore), Hash(ignore))]
pub reason: ReconcileReason,
}
impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
fn from(obj_ref: ObjectRef<K>) -> Self {
ReconcileRequest {
obj_ref,
reason: ReconcileReason::Unknown,
}
}
}
#[derive(Debug, Clone)]
pub enum ReconcileReason {
Unknown,
ObjectUpdated,
RelatedObjectUpdated {
obj_ref: Box<ObjectRef<DynamicObject>>,
},
ReconcilerRequestedRetry,
ErrorPolicyRequestedRetry,
BulkReconcile,
Custom {
reason: String,
},
}
impl Display for ReconcileReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReconcileReason::Unknown => f.write_str("unknown"),
ReconcileReason::ObjectUpdated => f.write_str("object updated"),
ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
f.write_fmt(format_args!("related object updated: {object}"))
}
ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
ReconcileReason::Custom { reason } => f.write_str(reason),
}
}
}
const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
#[allow(clippy::needless_pass_by_value)]
#[allow(clippy::type_complexity)]
#[allow(clippy::result_large_err)] pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
config: Config,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream,
QueueStream::Ok: Into<ReconcileRequest<K>>,
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let (scheduler_tx, scheduler_rx) =
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
let error_policy = Arc::new(error_policy);
let delay_store = store.clone();
trystream_try_via(
Box::pin(stream::select(
queue
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now(),
})
.on_complete(async move {
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
scheduler_rx
.map(Ok)
.take_until(scheduler_shutdown_rx)
.on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
)),
move |s| {
Runner::new(
debounced_scheduler(s, config.debounce),
config.concurrency,
move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
TryFutureExt::into_future(
reconciler_span.in_scope(|| reconciler(Arc::clone(&obj), context.clone())),
)
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
.right_future(),
}
},
)
.delay_tasks_until(async move {
tracing::debug!("applier runner held until store is ready");
let res = delay_store.wait_until_ready().await;
tracing::debug!("store is ready, starting runner");
res
})
.map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
.and_then(move |(obj_ref, reconciler_result)| async move {
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
}
})
.on_complete(async { tracing::debug!("applier terminated") })
}
#[pin_project]
#[must_use]
struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
result: Option<Result<Action, ReconcilerErr>>,
}
impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
fn new(
result: Result<Action, ReconcilerErr>,
error_policy: impl FnOnce(&ReconcilerErr) -> Action,
obj_ref: ObjectRef<K>,
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
) -> Self {
let reconciler_finished_at = Instant::now();
let (action, reschedule_reason) = result.as_ref().map_or_else(
|err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
|action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
);
Self {
reschedule_tx,
reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
message: ReconcileRequest {
obj_ref,
reason: reschedule_reason,
},
run_at: reconciler_finished_at
.checked_add(requeue_after)
.unwrap_or_else(crate::scheduler::max_schedule_time),
}),
result: Some(result),
}
}
}
impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
type Output = Result<Action, ReconcilerErr>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.reschedule_request.is_some() {
let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
let reschedule_request = this
.reschedule_request
.take()
.expect("PostReconciler::reschedule_request was taken during processing");
if let Ok(()) = rescheduler_ready {
let _ = this.reschedule_tx.start_send(reschedule_request);
}
}
Poll::Ready(
this.result
.take()
.expect("PostReconciler::result was already taken"),
)
}
}
#[derive(Clone, Debug, Default)]
pub struct Config {
debounce: Duration,
concurrency: u16,
}
impl Config {
#[must_use]
pub fn debounce(mut self, debounce: Duration) -> Self {
self.debounce = debounce;
self
}
#[must_use]
pub fn concurrency(mut self, concurrency: u16) -> Self {
self.concurrency = concurrency;
self
}
}
pub struct Controller<K>
where
K: Clone + Resource + Debug + 'static,
K::DynamicType: Eq + Hash,
{
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
config: Config,
}
impl<K> Controller<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
{
#[must_use]
pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
where
K::DynamicType: Default,
{
Self::new_with(main_api, wc, Default::default())
}
pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self(
reflector(writer, watcher(main_api, wc)).applied_objects(),
dyntype.clone(),
)
.boxed();
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
graceful_shutdown_selector: vec![
future::pending().boxed(),
],
forceful_shutdown_selector: vec![
future::pending().boxed(),
],
dyntype,
reader,
config: Default::default(),
}
}
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
reader: Store<K>,
) -> Self
where
K::DynamicType: Default,
{
Self::for_stream_with(trigger, reader, Default::default())
}
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream_with(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
graceful_shutdown_selector: vec![
future::pending().boxed(),
],
forceful_shutdown_selector: vec![
future::pending().boxed(),
],
dyntype,
reader,
config: Default::default(),
}
}
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
where
K::DynamicType: Default,
{
Self::for_shared_stream_with(trigger, reader, Default::default())
}
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn for_shared_stream_with(
trigger: impl Stream<Item = Arc<K>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
graceful_shutdown_selector: vec![
future::pending().boxed(),
],
forceful_shutdown_selector: vec![
future::pending().boxed(),
],
dyntype,
reader,
config: Default::default(),
}
}
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
self.config = config;
self
}
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
self
}
pub fn store(&self) -> Store<K> {
self.reader.clone()
}
#[must_use]
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
wc: watcher::Config,
) -> Self {
self.owns_with(api, (), wc)
}
#[must_use]
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
wc: watcher::Config,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(
metadata_watcher(api, wc).touched_objects(),
self.dyntype.clone(),
dyntype,
);
self.trigger_selector.push(child_watcher.boxed());
self
}
#[cfg(feature = "unstable-runtime-stream-control")]
#[must_use]
pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
) -> Self {
self.owns_stream_with(trigger, ())
}
#[cfg(feature = "unstable-runtime-stream-control")]
#[must_use]
pub fn owns_stream_with<Child: Resource + Send + 'static>(
mut self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
dyntype: Child::DynamicType,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
self.trigger_selector.push(child_watcher.boxed());
self
}
#[cfg(feature = "unstable-runtime-subscribe")]
#[must_use]
pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
self,
trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
) -> Self {
self.owns_shared_stream_with(trigger, ())
}
#[cfg(feature = "unstable-runtime-subscribe")]
#[must_use]
pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
mut self,
trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
dyntype: Child::DynamicType,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
self.trigger_selector.push(child_watcher.boxed());
self
}
#[must_use]
pub fn watches<Other, I>(
self,
api: Api<Other>,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other::DynamicType: Default + Debug + Clone + Eq + Hash,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
{
self.watches_with(api, Default::default(), wc, mapper)
}
#[must_use]
pub fn watches_with<Other, I>(
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
Other::DynamicType: Debug + Clone + Eq + Hash,
{
let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
self.trigger_selector.push(other_watcher.boxed());
self
}
#[cfg(feature = "unstable-runtime-stream-control")]
#[must_use]
pub fn watches_stream<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other::DynamicType: Default + Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
{
self.watches_stream_with(trigger, mapper, Default::default())
}
#[cfg(feature = "unstable-runtime-stream-control")]
#[must_use]
pub fn watches_stream_with<Other, I>(
mut self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
dyntype: Other::DynamicType,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other::DynamicType: Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
{
let other_watcher = trigger_others(trigger, mapper, dyntype);
self.trigger_selector.push(other_watcher.boxed());
self
}
#[cfg(feature = "unstable-runtime-subscribe")]
#[must_use]
pub fn watches_shared_stream<Other, I>(
self,
trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other::DynamicType: Default + Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
{
self.watches_shared_stream_with(trigger, mapper, Default::default())
}
#[cfg(feature = "unstable-runtime-subscribe")]
#[must_use]
pub fn watches_shared_stream_with<Other, I>(
mut self,
trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
dyntype: Other::DynamicType,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other::DynamicType: Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
{
let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
self.trigger_selector.push(other_watcher.boxed());
self
}
#[must_use]
pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
let store = self.store();
let dyntype = self.dyntype.clone();
self.trigger_selector.push(
trigger
.flat_map(move |()| {
let dyntype = dyntype.clone();
stream::iter(store.state().into_iter().map(move |obj| {
Ok(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
reason: ReconcileReason::BulkReconcile,
})
}))
})
.boxed(),
);
self
}
#[cfg(feature = "unstable-runtime-reconcile-on")]
#[must_use]
pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
self.trigger_selector.push(
trigger
.map(move |obj| {
Ok(ReconcileRequest {
obj_ref: obj,
reason: ReconcileReason::Unknown,
})
})
.boxed(),
);
self
}
#[must_use]
pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
self.graceful_shutdown_selector.push(trigger.boxed());
self
}
#[must_use]
pub fn shutdown_on_signal(mut self) -> Self {
async fn shutdown_signal() {
futures::future::select(
tokio::signal::ctrl_c().map(|_| ()).boxed(),
#[cfg(unix)]
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.unwrap()
.recv()
.map(|_| ())
.boxed(),
#[cfg(not(unix))]
futures::future::pending::<()>(),
)
.await;
}
let (graceful_tx, graceful_rx) = channel::oneshot::channel();
self.graceful_shutdown_selector
.push(graceful_rx.map(|_| ()).boxed());
self.forceful_shutdown_selector.push(
async {
tracing::info!("press ctrl+c to shut down gracefully");
shutdown_signal().await;
if let Ok(()) = graceful_tx.send(()) {
tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
} else {
tracing::info!(
"graceful shutdown already requested, press ctrl+c again to force shutdown"
);
}
shutdown_signal().await;
tracing::info!("forced shutdown requested");
}
.boxed(),
);
self
}
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
applier(
move |obj, ctx| {
CancelableJoinHandle::spawn(
TryFutureExt::into_future(reconciler(obj, ctx)).in_current_span(),
&Handle::current(),
)
},
error_policy,
context,
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
self.config,
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
}
#[cfg(test)]
mod tests {
use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
use super::{APPLIER_REQUEUE_BUF_SIZE, Action};
use crate::{
Config, Controller, applier,
reflector::{self, ObjectRef},
watcher::{self, Event, metadata_watcher, watcher},
};
use futures::{Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::{Api, Resource, core::ObjectMeta};
use serde::de::DeserializeOwned;
use tokio::time::timeout;
fn assert_send<T: Send>(x: T) -> T {
x
}
fn assert_stream<T, K>(x: T) -> T
where
T: Stream<Item = watcher::Result<Event<K>>> + Send,
K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
{
x
}
fn mock_type<T>() -> T {
unimplemented!(
"mock_type is not supposed to be called, only used for filling holes in type assertions"
)
}
#[allow(dead_code, unused_must_use)]
fn test_controller_should_be_send() {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<Action>()) },
|_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
Arc::new(()),
),
);
}
#[allow(dead_code, unused_must_use)]
fn test_watcher_stream_type_drift() {
assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
assert_stream(metadata_watcher(
mock_type::<Api<ConfigMap>>(),
Default::default(),
));
}
#[tokio::test]
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
let items = APPLIER_REQUEUE_BUF_SIZE * 50;
let reconciles = items * 3;
let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let mut applier = pin!(applier(
|_obj, _| {
Box::pin(async move {
Ok(Action::requeue(Duration::ZERO))
})
},
|_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
Config::default(),
));
store_tx.apply_watcher_event(&watcher::Event::InitDone);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
name: Some(format!("cm-{i}")),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}
timeout(
Duration::from_secs(10),
applier
.as_mut()
.take(reconciles)
.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("test timeout expired, applier likely deadlocked")
.unwrap();
drop(queue_tx);
timeout(
Duration::from_secs(10),
applier.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
.unwrap();
}
}