Skip to main content

kube_runtime/controller/
mod.rs

1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4use crate::{
5    reflector::{
6        self, ObjectRef, reflector,
7        store::{Store, Writer},
8    },
9    scheduler::{ScheduleRequest, debounced_scheduler},
10    utils::{
11        Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, trystream_try_via,
12    },
13    watcher::{self, DefaultBackoff, metadata_watcher, watcher},
14};
15use educe::Educe;
16use futures::{
17    FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, channel,
18    future::{self, BoxFuture},
19    stream,
20};
21use kube_client::api::{Api, DynamicObject, Resource};
22use pin_project::pin_project;
23use serde::de::DeserializeOwned;
24use std::{
25    fmt::{Debug, Display},
26    hash::Hash,
27    sync::Arc,
28    task::{Poll, ready},
29    time::Duration,
30};
31use stream::BoxStream;
32use thiserror::Error;
33use tokio::{runtime::Handle, time::Instant};
34use tracing::{Instrument, info_span};
35
36mod future_hash_map;
37mod runner;
38
39/// The reasons the internal runner can fail
40pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
41
42/// Errors returned by the applier and visible in a controller stream if inspecting it
43///
44/// WARNING: These errors do not terminate `Controller::run`, and are not passed to the `reconcile` fn
45/// as they exist primarily for diagnostics.
46///
47/// To inspect these errors, you can run a `for_each` on the run stream:
48///
49/// ```compile_fail
50///    Controller::new(api, watcher_config)
51///        .run(reconcile, error_policy, context)
52///        .for_each(|res| async move {
53///            match res {
54///                Ok(o) => info!("reconciled {:?}", o),
55///                /// Reconciler errors visible here:
56///                Err(e) => warn!("reconcile failed: {}", e),
57///            }
58///        })
59///        .await;
60/// ```
61#[derive(Debug, Error)]
62pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
63    /// A scheduled reconcile for an object refers to an object that no longer exists
64    ///
65    /// This is usually not a problem and often expected with certain relations.
66    /// See <https://github.com/kube-rs/kube/issues/1167#issuecomment-1636773541>
67    /// for a more detailed explanation of how/why this happens.
68    #[error("tried to reconcile object {0} that was not found in local store")]
69    ObjectNotFound(ObjectRef<DynamicObject>),
70
71    /// User's reconcile fn failed for the object
72    #[error("reconciler for object {1} failed")]
73    ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
74
75    /// The queue stream contained an error
76    #[error("event queue error")]
77    QueueError(#[source] QueueErr),
78
79    /// The internal runner returned an error
80    #[error("runner error")]
81    RunnerError(#[source] RunnerError),
82}
83
84/// Results of the reconciliation attempt
85#[derive(Debug, Clone, Eq, PartialEq)]
86pub struct Action {
87    /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
88    ///
89    /// For example, use this to query external systems for updates, expire time-limited resources, or
90    /// (in your `error_policy`) retry after errors.
91    requeue_after: Option<Duration>,
92}
93
94impl Action {
95    /// Action to the reconciliation at this time even if no external watch triggers hit
96    ///
97    /// This is the best-practice action that ensures eventual consistency of your controller
98    /// even in the case of missed changes (which can happen).
99    ///
100    /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
101    #[must_use]
102    pub const fn requeue(duration: Duration) -> Self {
103        Self {
104            requeue_after: Some(duration),
105        }
106    }
107
108    /// Do nothing until a change is detected
109    ///
110    /// This stops the controller periodically reconciling this object until a relevant watch event
111    /// was **detected**.
112    ///
113    /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
114    /// It is therefore not recommended to disable requeuing this way, unless you have
115    /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
116    #[must_use]
117    pub const fn await_change() -> Self {
118        Self { requeue_after: None }
119    }
120}
121
122/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
123pub fn trigger_with<T, K, I, S>(
124    stream: S,
125    mapper: impl Fn(T) -> I,
126) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
127where
128    S: TryStream<Ok = T>,
129    I: IntoIterator,
130    I::Item: Into<ReconcileRequest<K>>,
131    K: Resource,
132{
133    stream
134        .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
135        .try_flatten()
136}
137
138/// Enqueues the object itself for reconciliation
139pub fn trigger_self<K, S>(
140    stream: S,
141    dyntype: K::DynamicType,
142) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
143where
144    S: TryStream<Ok = K>,
145    K: Resource,
146    K::DynamicType: Clone,
147{
148    trigger_with(stream, move |obj| {
149        Some(ReconcileRequest {
150            obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
151            reason: ReconcileReason::ObjectUpdated,
152        })
153    })
154}
155
156/// Enqueues the object itself for reconciliation when the object is behind a
157/// shared pointer
158#[cfg(feature = "unstable-runtime-subscribe")]
159fn trigger_self_shared<K, S>(
160    stream: S,
161    dyntype: K::DynamicType,
162) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
163where
164    // Input stream has item as some Arc'd Resource (via
165    // Controller::for_shared_stream)
166    S: TryStream<Ok = Arc<K>>,
167    K: Resource,
168    K::DynamicType: Clone,
169{
170    trigger_with(stream, move |obj| {
171        Some(ReconcileRequest {
172            obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
173            reason: ReconcileReason::ObjectUpdated,
174        })
175    })
176}
177
178/// Enqueues any mapper returned `K` types for reconciliation
179fn trigger_others<S, K, I>(
180    stream: S,
181    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
182    dyntype: <S::Ok as Resource>::DynamicType,
183) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
184where
185    // Input stream has items as some Resource (via Controller::watches)
186    S: TryStream,
187    S::Ok: Resource,
188    <S::Ok as Resource>::DynamicType: Clone,
189    // Output stream is requests for the root type K
190    K: Resource,
191    K::DynamicType: Clone,
192    // but the mapper can produce many of them
193    I: 'static + IntoIterator<Item = ObjectRef<K>>,
194    I::IntoIter: Send,
195{
196    trigger_with(stream, move |obj| {
197        let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
198        mapper(obj)
199            .into_iter()
200            .map(move |mapped_obj_ref| ReconcileRequest {
201                obj_ref: mapped_obj_ref,
202                reason: ReconcileReason::RelatedObjectUpdated {
203                    obj_ref: Box::new(watch_ref.clone()),
204                },
205            })
206    })
207}
208
209/// Enqueues any mapper returned `Arc<K>` types for reconciliation
210#[cfg(feature = "unstable-runtime-subscribe")]
211fn trigger_others_shared<S, O, K, I>(
212    stream: S,
213    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
214    dyntype: O::DynamicType,
215) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
216where
217    // Input is some shared resource (`Arc<O>`) obtained via `reflect`
218    S: TryStream<Ok = Arc<O>>,
219    O: Resource,
220    O::DynamicType: Clone,
221    // Output stream is requests for the root type K
222    K: Resource,
223    K::DynamicType: Clone,
224    // but the mapper can produce many of them
225    I: 'static + IntoIterator<Item = ObjectRef<K>>,
226    I::IntoIter: Send,
227{
228    trigger_with(stream, move |obj| {
229        let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
230        mapper(obj)
231            .into_iter()
232            .map(move |mapped_obj_ref| ReconcileRequest {
233                obj_ref: mapped_obj_ref,
234                reason: ReconcileReason::RelatedObjectUpdated {
235                    obj_ref: Box::new(watch_ref.clone()),
236                },
237            })
238    })
239}
240
241/// Enqueues any owners of type `KOwner` for reconciliation
242pub fn trigger_owners<KOwner, S>(
243    stream: S,
244    owner_type: KOwner::DynamicType,
245    child_type: <S::Ok as Resource>::DynamicType,
246) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
247where
248    S: TryStream,
249    S::Ok: Resource,
250    <S::Ok as Resource>::DynamicType: Clone,
251    KOwner: Resource,
252    KOwner::DynamicType: Clone,
253{
254    let mapper = move |obj: S::Ok| {
255        let meta = obj.meta().clone();
256        let ns = meta.namespace;
257        let owner_type = owner_type.clone();
258        meta.owner_references
259            .into_iter()
260            .flatten()
261            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
262    };
263    trigger_others(stream, mapper, child_type)
264}
265
266// TODO: do we really need to deal with a trystream? can we simplify this at
267// all?
268/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
269/// owned `K` objects
270#[cfg(feature = "unstable-runtime-subscribe")]
271fn trigger_owners_shared<KOwner, S, K>(
272    stream: S,
273    owner_type: KOwner::DynamicType,
274    child_type: K::DynamicType,
275) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
276where
277    S: TryStream<Ok = Arc<K>>,
278    K: Resource,
279    K::DynamicType: Clone,
280    KOwner: Resource,
281    KOwner::DynamicType: Clone,
282{
283    let mapper = move |obj: S::Ok| {
284        let meta = obj.meta().clone();
285        let ns = meta.namespace;
286        let owner_type = owner_type.clone();
287        meta.owner_references
288            .into_iter()
289            .flatten()
290            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
291    };
292    trigger_others_shared(stream, mapper, child_type)
293}
294
295/// A request to reconcile an object, annotated with why that request was made.
296///
297/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
298/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
299/// In this case, only *the first* reason is stored.
300#[derive(Educe)]
301#[educe(
302    Debug(bound("K::DynamicType: Debug")),
303    Clone(bound("K::DynamicType: Clone")),
304    PartialEq(bound("K::DynamicType: PartialEq")),
305    Hash(bound("K::DynamicType: Hash"))
306)]
307pub struct ReconcileRequest<K: Resource> {
308    /// A reference to the object to be reconciled
309    pub obj_ref: ObjectRef<K>,
310    /// The reason for why reconciliation was requested
311    #[educe(PartialEq(ignore), Hash(ignore))]
312    pub reason: ReconcileReason,
313}
314
315impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
316
317impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
318    fn from(obj_ref: ObjectRef<K>) -> Self {
319        ReconcileRequest {
320            obj_ref,
321            reason: ReconcileReason::Unknown,
322        }
323    }
324}
325
326/// The reason a reconcile was requested
327///
328/// Note that this reason is deliberately hidden from the reconciler.
329/// See <https://kube.rs/controllers/reconciler/#reasons-for-reconciliation>.
330#[derive(Debug, Clone)]
331pub enum ReconcileReason {
332    /// A custom reconcile triggered via `reconcile_on`
333    Unknown,
334
335    /// The main object was updated.
336    ObjectUpdated,
337
338    /// A related object was updated through a mapper
339    ///
340    /// The related object traversed its relation up to the object kind you are reconciling.
341    /// Your object may not have changed, but you may need to update child objects.
342    RelatedObjectUpdated {
343        /// An object ref to the related object
344        obj_ref: Box<ObjectRef<DynamicObject>>,
345    },
346
347    /// The users `reconcile` scheduled a reconciliation via an `Action`
348    ReconcilerRequestedRetry,
349
350    /// The users `error_policy` scheduled a reconciliation via an `Action`
351    ErrorPolicyRequestedRetry,
352
353    /// A bulk reconcile was requested via `reconcile_all_on`
354    BulkReconcile,
355
356    /// A custom reconcile reason for custom integrations.
357    ///
358    /// Can be used when injecting elements into the queue stream directly.
359    Custom {
360        /// A user provided reason through a custom integration
361        reason: String,
362    },
363}
364
365impl Display for ReconcileReason {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        match self {
368            ReconcileReason::Unknown => f.write_str("unknown"),
369            ReconcileReason::ObjectUpdated => f.write_str("object updated"),
370            ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
371                f.write_fmt(format_args!("related object updated: {object}"))
372            }
373            ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
374            ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
375            ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
376            ReconcileReason::Custom { reason } => f.write_str(reason),
377        }
378    }
379}
380
381const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
382
383/// Apply a reconciler to an input stream, with a given retry policy
384///
385/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
386///
387/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
388/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
389/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
390/// with a [`watcher()`] or [`reflector()`] for the subobject.
391///
392/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
393/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
394#[allow(clippy::needless_pass_by_value)]
395#[allow(clippy::type_complexity)]
396#[allow(clippy::result_large_err)] // see #1880 as an alt; https://github.com/kube-rs/kube/pull/1880
397pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
398    mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
399    error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
400    context: Arc<Ctx>,
401    store: Store<K>,
402    queue: QueueStream,
403    config: Config,
404) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
405where
406    K: Clone + Resource + 'static,
407    K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
408    ReconcilerFut: TryFuture<Ok = Action> + Unpin,
409    ReconcilerFut::Error: std::error::Error + 'static,
410    QueueStream: TryStream,
411    QueueStream::Ok: Into<ReconcileRequest<K>>,
412    QueueStream::Error: std::error::Error + 'static,
413{
414    let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
415    let (scheduler_tx, scheduler_rx) =
416        channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
417    let error_policy = Arc::new(error_policy);
418    let delay_store = store.clone();
419    // Create a stream of ObjectRefs that need to be reconciled
420    trystream_try_via(
421        // input: stream combining scheduled tasks and user specified inputs event
422        Box::pin(stream::select(
423            // 1. inputs from users queue stream
424            queue
425                .map_err(Error::QueueError)
426                .map_ok(|request| ScheduleRequest {
427                    message: request.into(),
428                    run_at: Instant::now(),
429                })
430                .on_complete(async move {
431                    // On error: scheduler has already been shut down and there is nothing for us to do
432                    let _ = scheduler_shutdown_tx.send(());
433                    tracing::debug!("applier queue terminated, starting graceful shutdown")
434                }),
435            // 2. requests sent to scheduler_tx
436            scheduler_rx
437                .map(Ok)
438                .take_until(scheduler_shutdown_rx)
439                .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
440        )),
441        // all the Oks from the select gets passed through the scheduler stream, and are then executed
442        move |s| {
443            Runner::new(
444                debounced_scheduler(s, config.debounce),
445                config.concurrency,
446                move |request| {
447                    let request = request.clone();
448                    match store.get(&request.obj_ref) {
449                        Some(obj) => {
450                            let scheduler_tx = scheduler_tx.clone();
451                            let error_policy_ctx = context.clone();
452                            let error_policy = error_policy.clone();
453                            let reconciler_span = info_span!(
454                                "reconciling object",
455                                "object.ref" = %request.obj_ref,
456                                object.reason = %request.reason
457                            );
458                            TryFutureExt::into_future(
459                                reconciler_span.in_scope(|| reconciler(Arc::clone(&obj), context.clone())),
460                            )
461                            .then(move |res| {
462                                let error_policy = error_policy;
463                                RescheduleReconciliation::new(
464                                    res,
465                                    |err| error_policy(obj, err, error_policy_ctx),
466                                    request.obj_ref.clone(),
467                                    scheduler_tx,
468                                )
469                                // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
470                                // to them separately
471                                .map(|res| Ok((request.obj_ref, res)))
472                            })
473                            .instrument(reconciler_span)
474                            .left_future()
475                        }
476                        None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
477                            .right_future(),
478                    }
479                },
480            )
481            .delay_tasks_until(async move {
482                tracing::debug!("applier runner held until store is ready");
483                let res = delay_store.wait_until_ready().await;
484                tracing::debug!("store is ready, starting runner");
485                res
486            })
487            .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
488            .on_complete(async { tracing::debug!("applier runner terminated") })
489        },
490    )
491    .on_complete(async { tracing::debug!("applier runner-merge terminated") })
492    // finally, for each completed reconcile call:
493    .and_then(move |(obj_ref, reconciler_result)| async move {
494        match reconciler_result {
495            Ok(action) => Ok((obj_ref, action)),
496            Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
497        }
498    })
499    .on_complete(async { tracing::debug!("applier terminated") })
500}
501
502/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
503///
504/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
505#[pin_project]
506#[must_use]
507struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
508    reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
509
510    reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
511    result: Option<Result<Action, ReconcilerErr>>,
512}
513
514impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
515where
516    K: Resource,
517{
518    fn new(
519        result: Result<Action, ReconcilerErr>,
520        error_policy: impl FnOnce(&ReconcilerErr) -> Action,
521        obj_ref: ObjectRef<K>,
522        reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
523    ) -> Self {
524        let reconciler_finished_at = Instant::now();
525
526        let (action, reschedule_reason) = result.as_ref().map_or_else(
527            |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
528            |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
529        );
530
531        Self {
532            reschedule_tx,
533            reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
534                message: ReconcileRequest {
535                    obj_ref,
536                    reason: reschedule_reason,
537                },
538                run_at: reconciler_finished_at
539                    .checked_add(requeue_after)
540                    .unwrap_or_else(crate::scheduler::max_schedule_time),
541            }),
542            result: Some(result),
543        }
544    }
545}
546
547impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
548where
549    K: Resource,
550{
551    type Output = Result<Action, ReconcilerErr>;
552
553    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
554        let this = self.get_mut();
555
556        if this.reschedule_request.is_some() {
557            let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
558            let reschedule_request = this
559                .reschedule_request
560                .take()
561                .expect("PostReconciler::reschedule_request was taken during processing");
562            // Failure to schedule item = in graceful shutdown mode, ignore
563            if let Ok(()) = rescheduler_ready {
564                let _ = this.reschedule_tx.start_send(reschedule_request);
565            }
566        }
567
568        Poll::Ready(
569            this.result
570                .take()
571                .expect("PostReconciler::result was already taken"),
572        )
573    }
574}
575
576/// Accumulates all options that can be used on a [`Controller`] invocation.
577#[derive(Clone, Debug, Default)]
578pub struct Config {
579    debounce: Duration,
580    concurrency: u16,
581}
582
583impl Config {
584    /// The debounce duration used to deduplicate reconciliation requests.
585    ///
586    /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
587    /// resulting in __trailing edge debouncing__ of reconciler requests.
588    /// This option can help to reduce the amount of unnecessary reconciler calls
589    /// when using multiple controller relations, or during rapid phase transitions.
590    ///
591    /// ## Warning
592    /// This option delays (and keeps delaying) reconcile requests for objects while
593    /// the object is updated. It can **permanently hide** updates from your reconciler
594    /// if set too high on objects that are updated frequently (like nodes).
595    #[must_use]
596    pub fn debounce(mut self, debounce: Duration) -> Self {
597        self.debounce = debounce;
598        self
599    }
600
601    /// The number of concurrent reconciliations of that are allowed to run at an given moment.
602    ///
603    /// This can be adjusted to the controller's needs to increase
604    /// performance and/or make performance predictable. By default, its 0 meaning
605    /// the controller runs with unbounded concurrency.
606    ///
607    /// Note that despite concurrency, a controller never schedules concurrent reconciles
608    /// on the same object.
609    #[must_use]
610    pub fn concurrency(mut self, concurrency: u16) -> Self {
611        self.concurrency = concurrency;
612        self
613    }
614}
615
616/// Controller for a Resource `K`
617///
618/// A controller is an infinite stream of objects to be reconciled.
619///
620/// Once `run` and continuously awaited, it continuously calls out to user provided
621/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
622/// or if errors are seen from `reconcile`.
623///
624/// Reconciles are generally requested for all changes on your root objects.
625/// Changes to managed child resources will also trigger the reconciler for the
626/// managing object by traversing owner references (for `Controller::owns`),
627/// or traverse a custom mapping (for `Controller::watches`).
628///
629/// This mapping mechanism ultimately hides the reason for the reconciliation request,
630/// and forces you to write an idempotent reconciler.
631///
632/// General setup:
633/// ```no_run
634/// use kube::{Api, Client, CustomResource};
635/// use kube::runtime::{controller::{Controller, Action}, watcher};
636/// # use serde::{Deserialize, Serialize};
637/// # use tokio::time::Duration;
638/// use futures::StreamExt;
639/// use k8s_openapi::api::core::v1::ConfigMap;
640/// use schemars::JsonSchema;
641/// # use std::sync::Arc;
642/// use thiserror::Error;
643///
644/// #[derive(Debug, Error)]
645/// enum Error {}
646///
647/// /// A custom resource
648/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
649/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
650/// struct ConfigMapGeneratorSpec {
651///     content: String,
652/// }
653///
654/// /// The reconciler that will be called when either object change
655/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
656///     // .. use api here to reconcile a child ConfigMap with ownerreferences
657///     // see configmapgen_controller example for full info
658///     Ok(Action::requeue(Duration::from_secs(300)))
659/// }
660/// /// an error handler that will be called when the reconciler fails with access to both the
661/// /// object that caused the failure and the actual error
662/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
663///     Action::requeue(Duration::from_secs(60))
664/// }
665///
666/// /// something to drive the controller
667///
668/// async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
669/// #   let client: Client = todo!();
670///     let context = Arc::new(()); // bad empty context - put client in here
671///     let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
672///     let cms = Api::<ConfigMap>::all(client.clone());
673///     Controller::new(cmgs, watcher::Config::default())
674///         .owns(cms, watcher::Config::default())
675///         .run(reconcile, error_policy, context)
676///         .for_each(|res| async move {
677///             match res {
678///                 Ok(o) => println!("reconciled {:?}", o),
679///                 Err(e) => println!("reconcile failed: {:?}", e),
680///             }
681///         })
682///         .await; // controller does nothing unless polled
683/// #    Ok(())
684/// # }
685/// ```
686pub struct Controller<K>
687where
688    K: Clone + Resource + Debug + 'static,
689    K::DynamicType: Eq + Hash,
690{
691    // NB: Need to Unpin for stream::select_all
692    trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
693    trigger_backoff: Box<dyn Backoff + Send>,
694    /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
695    /// refusing to start any new reconciliations but letting any existing ones finish.
696    graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
697    /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
698    /// requesting that all running reconciliations be aborted.
699    /// However, note that they *will* keep running until their next yield point (`.await`),
700    /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
701    forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
702    dyntype: K::DynamicType,
703    reader: Store<K>,
704    config: Config,
705}
706
707impl<K> Controller<K>
708where
709    K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
710    K::DynamicType: Eq + Hash + Clone,
711{
712    /// Create a Controller for a resource `K`
713    ///
714    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
715    ///
716    /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
717    /// and receive reconcile events for.
718    /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
719    #[must_use]
720    pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
721    where
722        K::DynamicType: Default,
723    {
724        Self::new_with(main_api, wc, Default::default())
725    }
726
727    /// Create a Controller for a resource `K`
728    ///
729    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
730    ///
731    /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
732    /// to watch - in the Api's  configured scope - and receive reconcile events for.
733    /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
734    ///
735    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
736    ///
737    /// [`watcher::Config`]: crate::watcher::Config
738    /// [`Api`]: kube_client::Api
739    /// [`dynamic`]: kube_client::core::dynamic
740    /// [`Config::default`]: crate::watcher::Config::default
741    pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
742        let writer = Writer::<K>::new(dyntype.clone());
743        let reader = writer.as_reader();
744        let mut trigger_selector = stream::SelectAll::new();
745        let self_watcher = trigger_self(
746            reflector(writer, watcher(main_api, wc)).applied_objects(),
747            dyntype.clone(),
748        )
749        .boxed();
750        trigger_selector.push(self_watcher);
751        Self {
752            trigger_selector,
753            trigger_backoff: Box::<DefaultBackoff>::default(),
754            graceful_shutdown_selector: vec![
755                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
756                future::pending().boxed(),
757            ],
758            forceful_shutdown_selector: vec![
759                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
760                future::pending().boxed(),
761            ],
762            dyntype,
763            reader,
764            config: Default::default(),
765        }
766    }
767
768    /// Create a Controller for a resource `K` from a stream of `K` objects
769    ///
770    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
771    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
772    /// as well as sharing input streams between multiple controllers.
773    ///
774    /// # Example:
775    ///
776    /// ```no_run
777    /// # use futures::StreamExt;
778    /// # use k8s_openapi::api::apps::v1::Deployment;
779    /// # use kube::runtime::controller::{Action, Controller};
780    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
781    /// # use kube::{Api, Client, Error, ResourceExt};
782    /// # use std::sync::Arc;
783    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
784    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
785    /// # async fn doc(client: kube::Client) {
786    /// let api: Api<Deployment> = Api::default_namespaced(client);
787    /// let (reader, writer) = reflector::store();
788    /// let deploys = watcher(api, watcher::Config::default())
789    ///     .default_backoff()
790    ///     .reflect(writer)
791    ///     .applied_objects()
792    ///     .predicate_filter(predicates::generation, Default::default());
793    ///
794    /// Controller::for_stream(deploys, reader)
795    ///     .run(reconcile, error_policy, Arc::new(()))
796    ///     .for_each(|_| std::future::ready(()))
797    ///     .await;
798    /// # }
799    /// ```
800    ///
801    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
802    #[cfg(feature = "unstable-runtime-stream-control")]
803    pub fn for_stream(
804        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
805        reader: Store<K>,
806    ) -> Self
807    where
808        K::DynamicType: Default,
809    {
810        Self::for_stream_with(trigger, reader, Default::default())
811    }
812
813    /// Create a Controller for a resource `K` from a stream of `K` objects
814    ///
815    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
816    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
817    /// as well as sharing input streams between multiple controllers.
818    ///
819    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
820    ///
821    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
822    ///
823    /// [`dynamic`]: kube_client::core::dynamic
824    #[cfg(feature = "unstable-runtime-stream-control")]
825    pub fn for_stream_with(
826        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
827        reader: Store<K>,
828        dyntype: K::DynamicType,
829    ) -> Self {
830        let mut trigger_selector = stream::SelectAll::new();
831        let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
832        trigger_selector.push(self_watcher);
833        Self {
834            trigger_selector,
835            trigger_backoff: Box::<DefaultBackoff>::default(),
836            graceful_shutdown_selector: vec![
837                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
838                future::pending().boxed(),
839            ],
840            forceful_shutdown_selector: vec![
841                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
842                future::pending().boxed(),
843            ],
844            dyntype,
845            reader,
846            config: Default::default(),
847        }
848    }
849
850    /// This is the same as [`Controller::for_stream`]. Instead of taking an
851    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
852    /// streams can be created out-of-band by subscribing on a store `Writer`.
853    /// Through this interface, multiple controllers can use the same root
854    /// (shared) input stream of resources to keep memory overheads smaller.
855    ///
856    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
857    /// need to share the stream.
858    ///
859    /// ## Warning:
860    ///
861    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
862    /// is driven to readiness independently of this controller to ensure the
863    /// watcher never deadlocks.
864    ///
865    /// # Example:
866    ///
867    /// ```no_run
868    /// # use futures::StreamExt;
869    /// # use k8s_openapi::api::apps::v1::Deployment;
870    /// # use kube::runtime::controller::{Action, Controller};
871    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
872    /// # use kube::{Api, Client, Error, ResourceExt};
873    /// # use std::sync::Arc;
874    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
875    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
876    /// # async fn doc(client: kube::Client) {
877    /// let api: Api<Deployment> = Api::default_namespaced(client);
878    /// let (reader, writer) = reflector::store_shared(128);
879    /// let subscriber = writer
880    ///     .subscribe()
881    ///     .expect("subscribers can only be created from shared stores");
882    /// let deploys = watcher(api, watcher::Config::default())
883    ///     .default_backoff()
884    ///     .reflect(writer)
885    ///     .applied_objects()
886    ///     .for_each(|ev| async move {
887    ///         match ev {
888    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
889    ///             Err(error) => tracing::error!(%error, "received error")
890    ///         }
891    ///     });
892    ///
893    /// let controller = Controller::for_shared_stream(subscriber, reader)
894    ///     .run(reconcile, error_policy, Arc::new(()))
895    ///     .for_each(|ev| async move {
896    ///         tracing::info!("reconciled {ev:?}")
897    ///     });
898    ///
899    /// // Drive streams using a select statement
900    /// tokio::select! {
901    ///   _ = deploys => {},
902    ///   _ = controller => {},
903    /// }
904    /// # }
905    #[cfg(feature = "unstable-runtime-subscribe")]
906    pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
907    where
908        K::DynamicType: Default,
909    {
910        Self::for_shared_stream_with(trigger, reader, Default::default())
911    }
912
913    /// This is the same as [`Controller::for_stream`]. Instead of taking an
914    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
915    /// streams can be created out-of-band by subscribing on a store `Writer`.
916    /// Through this interface, multiple controllers can use the same root
917    /// (shared) input stream of resources to keep memory overheads smaller.
918    ///
919    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
920    /// need to share the stream.
921    ///
922    /// This variant constructor is used for [`dynamic`] types found through
923    /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
924    /// known at compile time).
925    ///
926    /// [`dynamic`]: kube_client::core::dynamic
927    #[cfg(feature = "unstable-runtime-subscribe")]
928    pub fn for_shared_stream_with(
929        trigger: impl Stream<Item = Arc<K>> + Send + 'static,
930        reader: Store<K>,
931        dyntype: K::DynamicType,
932    ) -> Self {
933        let mut trigger_selector = stream::SelectAll::new();
934        let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
935        trigger_selector.push(self_watcher);
936        Self {
937            trigger_selector,
938            trigger_backoff: Box::<DefaultBackoff>::default(),
939            graceful_shutdown_selector: vec![
940                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
941                future::pending().boxed(),
942            ],
943            forceful_shutdown_selector: vec![
944                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
945                future::pending().boxed(),
946            ],
947            dyntype,
948            reader,
949            config: Default::default(),
950        }
951    }
952
953    /// Specify the configuration for the controller's behavior.
954    #[must_use]
955    pub fn with_config(mut self, config: Config) -> Self {
956        self.config = config;
957        self
958    }
959
960    /// Specify the backoff policy for "trigger" watches
961    ///
962    /// This includes the core watch, as well as auxiliary watches introduced by [`Self::owns`] and [`Self::watches`].
963    ///
964    /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
965    /// but can be overridden by calling this method.
966    #[must_use]
967    pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
968        self.trigger_backoff = Box::new(backoff);
969        self
970    }
971
972    /// Retrieve a copy of the reader before starting the controller
973    pub fn store(&self) -> Store<K> {
974        self.reader.clone()
975    }
976
977    /// Specify `Child` objects which `K` owns and should be watched
978    ///
979    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
980    /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
981    ///
982    /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
983    /// to watch - in the Api's configured scope - and receive reconcile events for.
984    /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
985    ///
986    /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
987    #[must_use]
988    pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
989        self,
990        api: Api<Child>,
991        wc: watcher::Config,
992    ) -> Self {
993        self.owns_with(api, (), wc)
994    }
995
996    /// Specify `Child` objects which `K` owns and should be watched
997    ///
998    /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
999    #[must_use]
1000    pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
1001        mut self,
1002        api: Api<Child>,
1003        dyntype: Child::DynamicType,
1004        wc: watcher::Config,
1005    ) -> Self
1006    where
1007        Child::DynamicType: Debug + Eq + Hash + Clone,
1008    {
1009        // TODO: call owns_stream_with when it's stable
1010        let child_watcher = trigger_owners(
1011            metadata_watcher(api, wc).touched_objects(),
1012            self.dyntype.clone(),
1013            dyntype,
1014        );
1015        self.trigger_selector.push(child_watcher.boxed());
1016        self
1017    }
1018
1019    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1020    ///
1021    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1022    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1023    /// as well as sharing input streams between multiple controllers.
1024    ///
1025    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1026    ///
1027    /// # Example:
1028    ///
1029    /// ```no_run
1030    /// # use futures::StreamExt;
1031    /// # use k8s_openapi::api::core::v1::ConfigMap;
1032    /// # use k8s_openapi::api::apps::v1::StatefulSet;
1033    /// # use kube::runtime::controller::Action;
1034    /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
1035    /// # use kube::{Api, Client, Error, ResourceExt};
1036    /// # use std::sync::Arc;
1037    /// # type CustomResource = ConfigMap;
1038    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1039    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1040    /// # async fn doc(client: kube::Client) {
1041    /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
1042    ///     .touched_objects()
1043    ///     .predicate_filter(predicates::generation, Default::default());
1044    ///
1045    /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
1046    ///     .owns_stream(sts_stream)
1047    ///     .run(reconcile, error_policy, Arc::new(()))
1048    ///     .for_each(|_| std::future::ready(()))
1049    ///     .await;
1050    /// # }
1051    /// ```
1052    #[cfg(feature = "unstable-runtime-stream-control")]
1053    #[must_use]
1054    pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1055        self,
1056        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1057    ) -> Self {
1058        self.owns_stream_with(trigger, ())
1059    }
1060
1061    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1062    ///
1063    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1064    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1065    /// as well as sharing input streams between multiple controllers.
1066    ///
1067    /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1068    #[cfg(feature = "unstable-runtime-stream-control")]
1069    #[must_use]
1070    pub fn owns_stream_with<Child: Resource + Send + 'static>(
1071        mut self,
1072        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1073        dyntype: Child::DynamicType,
1074    ) -> Self
1075    where
1076        Child::DynamicType: Debug + Eq + Hash + Clone,
1077    {
1078        let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1079        self.trigger_selector.push(child_watcher.boxed());
1080        self
1081    }
1082
1083    /// This is the same as [`Controller::for_stream`]. Instead of taking an
1084    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1085    /// streams can be created out-of-band by subscribing on a store `Writer`.
1086    /// Through this interface, multiple controllers can use the same root
1087    /// (shared) input stream of resources to keep memory overheads smaller.
1088    ///
1089    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1090    /// need to share the stream.
1091    ///
1092    /// ## Warning:
1093    ///
1094    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1095    /// is driven to readiness independently of this controller to ensure the
1096    /// watcher never deadlocks.
1097    ///
1098    ///
1099    /// Trigger the reconciliation process for a shared stream of `Child`
1100    /// objects of the owner `K`
1101    ///
1102    /// Conceptually the same as [`Controller::owns`], but a stream is used
1103    /// instead of an `Api`. This interface behaves similarly to its non-shared
1104    /// counterpart [`Controller::owns_stream`].
1105    ///
1106    /// # Example:
1107    ///
1108    /// ```no_run
1109    /// # use futures::StreamExt;
1110    /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1111    /// # use kube::runtime::controller::{Action, Controller};
1112    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1113    /// # use kube::{Api, Client, Error, ResourceExt};
1114    /// # use std::sync::Arc;
1115    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1116    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1117    /// # async fn doc(client: kube::Client) {
1118    /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1119    /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1120    ///
1121    /// let (reader, writer) = reflector::store_shared(128);
1122    /// let subscriber = writer
1123    ///     .subscribe()
1124    ///     .expect("subscribers can only be created from shared stores");
1125    /// let pods = watcher(pod_api, watcher::Config::default())
1126    ///     .default_backoff()
1127    ///     .reflect(writer)
1128    ///     .applied_objects()
1129    ///     .for_each(|ev| async move {
1130    ///         match ev {
1131    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
1132    ///             Err(error) => tracing::error!(%error, "received error")
1133    ///         }
1134    ///     });
1135    ///
1136    /// let controller = Controller::new(deploys, Default::default())
1137    ///     .owns_shared_stream(subscriber)
1138    ///     .run(reconcile, error_policy, Arc::new(()))
1139    ///     .for_each(|ev| async move {
1140    ///         tracing::info!("reconciled {ev:?}")
1141    ///     });
1142    ///
1143    /// // Drive streams using a select statement
1144    /// tokio::select! {
1145    ///   _ = pods => {},
1146    ///   _ = controller => {},
1147    /// }
1148    /// # }
1149    #[cfg(feature = "unstable-runtime-subscribe")]
1150    #[must_use]
1151    pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1152        self,
1153        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1154    ) -> Self {
1155        self.owns_shared_stream_with(trigger, ())
1156    }
1157
1158    /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1159    ///
1160    /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1161    /// The source stream can be shared between multiple controllers, optimising
1162    /// resource usage.
1163    ///
1164    /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1165    #[cfg(feature = "unstable-runtime-subscribe")]
1166    #[must_use]
1167    pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1168        mut self,
1169        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1170        dyntype: Child::DynamicType,
1171    ) -> Self
1172    where
1173        Child::DynamicType: Debug + Eq + Hash + Clone,
1174    {
1175        let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1176        self.trigger_selector.push(child_watcher.boxed());
1177        self
1178    }
1179
1180    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1181    ///
1182    /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1183    /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1184    ///
1185    /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1186    ///
1187    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1188    ///
1189    /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1190    /// to watch - in the Api's configured scope - and run through the custom mapper.
1191    /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1192    ///
1193    /// # Example
1194    ///
1195    /// Tracking cross cluster references using the [Operator-SDK] annotations.
1196    ///
1197    /// ```
1198    /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1199    /// # use kube::{Api, ResourceExt};
1200    /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1201    /// # use futures::StreamExt;
1202    /// # use std::sync::Arc;
1203    /// # type WatchedResource = Namespace;
1204    /// # struct Context;
1205    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1206    /// #     Ok(Action::await_change())
1207    /// # };
1208    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1209    /// #     Action::await_change()
1210    /// # }
1211    /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1212    /// # let memcached = Api::<ConfigMap>::all(client.clone());
1213    /// # let context = Arc::new(Context);
1214    /// Controller::new(memcached, watcher::Config::default())
1215    ///     .watches(
1216    ///         Api::<WatchedResource>::all(client.clone()),
1217    ///         watcher::Config::default(),
1218    ///         |ar| {
1219    ///             let prt = ar
1220    ///                 .annotations()
1221    ///                 .get("operator-sdk/primary-resource-type")
1222    ///                 .map(String::as_str);
1223    ///
1224    ///             if prt != Some("Memcached.cache.example.com") {
1225    ///                 return None;
1226    ///             }
1227    ///
1228    ///             let (namespace, name) = ar
1229    ///                 .annotations()
1230    ///                 .get("operator-sdk/primary-resource")?
1231    ///                 .split_once('/')?;
1232    ///
1233    ///             Some(ObjectRef::new(name).within(namespace))
1234    ///         }
1235    ///     )
1236    ///     .run(reconcile, error_policy, context)
1237    ///     .for_each(|_| futures::future::ready(()))
1238    ///     .await;
1239    /// # Ok(())
1240    /// # }
1241    /// ```
1242    ///
1243    /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1244    #[must_use]
1245    pub fn watches<Other, I>(
1246        self,
1247        api: Api<Other>,
1248        wc: watcher::Config,
1249        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1250    ) -> Self
1251    where
1252        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1253        Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1254        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1255        I::IntoIter: Send,
1256    {
1257        self.watches_with(api, Default::default(), wc, mapper)
1258    }
1259
1260    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1261    ///
1262    /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1263    #[must_use]
1264    pub fn watches_with<Other, I>(
1265        mut self,
1266        api: Api<Other>,
1267        dyntype: Other::DynamicType,
1268        wc: watcher::Config,
1269        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1270    ) -> Self
1271    where
1272        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1273        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1274        I::IntoIter: Send,
1275        Other::DynamicType: Debug + Clone + Eq + Hash,
1276    {
1277        let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1278        self.trigger_selector.push(other_watcher.boxed());
1279        self
1280    }
1281
1282    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1283    ///
1284    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1285    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1286    /// as well as sharing input streams between multiple controllers.
1287    ///
1288    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1289    ///
1290    /// # Example:
1291    ///
1292    /// ```no_run
1293    /// # use futures::StreamExt;
1294    /// # use k8s_openapi::api::core::v1::ConfigMap;
1295    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1296    /// # use kube::runtime::controller::Action;
1297    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1298    /// # use kube::{Api, Client, Error, ResourceExt};
1299    /// # use std::sync::Arc;
1300    /// # type CustomResource = ConfigMap;
1301    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1302    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1303    /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1304    /// # async fn doc(client: kube::Client) {
1305    /// let api: Api<DaemonSet> = Api::all(client.clone());
1306    /// let cr: Api<CustomResource> = Api::all(client.clone());
1307    /// let daemons = watcher(api, watcher::Config::default())
1308    ///     .touched_objects()
1309    ///     .predicate_filter(predicates::generation, Default::default());
1310    ///
1311    /// Controller::new(cr, watcher::Config::default())
1312    ///     .watches_stream(daemons, mapper)
1313    ///     .run(reconcile, error_policy, Arc::new(()))
1314    ///     .for_each(|_| std::future::ready(()))
1315    ///     .await;
1316    /// # }
1317    /// ```
1318    #[cfg(feature = "unstable-runtime-stream-control")]
1319    #[must_use]
1320    pub fn watches_stream<Other, I>(
1321        self,
1322        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1323        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1324    ) -> Self
1325    where
1326        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1327        Other::DynamicType: Default + Debug + Clone,
1328        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1329        I::IntoIter: Send,
1330    {
1331        self.watches_stream_with(trigger, mapper, Default::default())
1332    }
1333
1334    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1335    ///
1336    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1337    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1338    /// as well as sharing input streams between multiple controllers.
1339    ///
1340    /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1341    #[cfg(feature = "unstable-runtime-stream-control")]
1342    #[must_use]
1343    pub fn watches_stream_with<Other, I>(
1344        mut self,
1345        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1346        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1347        dyntype: Other::DynamicType,
1348    ) -> Self
1349    where
1350        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1351        Other::DynamicType: Debug + Clone,
1352        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1353        I::IntoIter: Send,
1354    {
1355        let other_watcher = trigger_others(trigger, mapper, dyntype);
1356        self.trigger_selector.push(other_watcher.boxed());
1357        self
1358    }
1359
1360    /// Trigger the reconciliation process for a shared stream of `Other`
1361    /// objects related to a `K`
1362    ///
1363    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1364    /// stream of resources is used. This allows for sharing input streams
1365    /// between multiple controllers.
1366    ///
1367    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1368    ///
1369    /// # Example:
1370    ///
1371    /// ```no_run
1372    /// # use futures::StreamExt;
1373    /// # use k8s_openapi::api::core::v1::ConfigMap;
1374    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1375    /// # use kube::runtime::controller::Action;
1376    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1377    /// # use kube::{Api, Client, Error, ResourceExt};
1378    /// # use std::sync::Arc;
1379    /// # type CustomResource = ConfigMap;
1380    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1381    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1382    /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1383    /// # async fn doc(client: kube::Client) {
1384    /// let api: Api<DaemonSet> = Api::all(client.clone());
1385    /// let cr: Api<CustomResource> = Api::all(client.clone());
1386    /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1387    /// let subscriber = writer
1388    ///     .subscribe()
1389    ///     .expect("subscribers can only be created from shared stores");
1390    /// let daemons = watcher(api, watcher::Config::default())
1391    ///     .reflect(writer)
1392    ///     .touched_objects()
1393    ///     .for_each(|ev| async move {
1394    ///         match ev {
1395    ///             Ok(obj) => {},
1396    ///             Err(error) => tracing::error!(%error, "received err")
1397    ///         }
1398    ///     });
1399    ///
1400    /// let controller = Controller::new(cr, watcher::Config::default())
1401    ///     .watches_shared_stream(subscriber, mapper)
1402    ///     .run(reconcile, error_policy, Arc::new(()))
1403    ///     .for_each(|_| std::future::ready(()));
1404    ///
1405    /// // Drive streams using a select statement
1406    /// tokio::select! {
1407    ///   _ = daemons => {},
1408    ///   _ = controller => {},
1409    /// }
1410    /// # }
1411    /// ```
1412    #[cfg(feature = "unstable-runtime-subscribe")]
1413    #[must_use]
1414    pub fn watches_shared_stream<Other, I>(
1415        self,
1416        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1417        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1418    ) -> Self
1419    where
1420        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1421        Other::DynamicType: Default + Debug + Clone,
1422        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1423        I::IntoIter: Send,
1424    {
1425        self.watches_shared_stream_with(trigger, mapper, Default::default())
1426    }
1427
1428    /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1429    ///
1430    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1431    /// stream of resources is used. This allows for sharing of streams between
1432    /// multiple controllers.
1433    ///
1434    /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1435    #[cfg(feature = "unstable-runtime-subscribe")]
1436    #[must_use]
1437    pub fn watches_shared_stream_with<Other, I>(
1438        mut self,
1439        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1440        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1441        dyntype: Other::DynamicType,
1442    ) -> Self
1443    where
1444        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1445        Other::DynamicType: Debug + Clone,
1446        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1447        I::IntoIter: Send,
1448    {
1449        let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1450        self.trigger_selector.push(other_watcher.boxed());
1451        self
1452    }
1453
1454    /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1455    ///
1456    /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1457    ///
1458    /// To reconcile all objects when a new line is entered:
1459    ///
1460    /// ```
1461    /// # async {
1462    /// use futures::stream::StreamExt;
1463    /// use k8s_openapi::api::core::v1::ConfigMap;
1464    /// use kube::{
1465    ///     Client,
1466    ///     api::{Api, ResourceExt},
1467    ///     runtime::{
1468    ///         controller::{Controller, Action},
1469    ///         watcher,
1470    ///     },
1471    /// };
1472    /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1473    /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1474    /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1475    /// // and its worker prevents the Tokio runtime from shutting down.
1476    /// std::thread::spawn(move || {
1477    ///     for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1478    ///         let _ = reload_tx.try_send(());
1479    ///     }
1480    /// });
1481    /// Controller::new(
1482    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1483    ///     watcher::Config::default(),
1484    /// )
1485    /// .reconcile_all_on(reload_rx.map(|_| ()))
1486    /// .run(
1487    ///     |o, _| async move {
1488    ///         println!("Reconciling {}", o.name_any());
1489    ///         Ok(Action::await_change())
1490    ///     },
1491    ///     |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1492    ///     Arc::new(()),
1493    /// );
1494    /// # };
1495    /// ```
1496    ///
1497    /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1498    ///
1499    /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1500    #[must_use]
1501    pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1502        let store = self.store();
1503        let dyntype = self.dyntype.clone();
1504        self.trigger_selector.push(
1505            trigger
1506                .flat_map(move |()| {
1507                    let dyntype = dyntype.clone();
1508                    stream::iter(store.state().into_iter().map(move |obj| {
1509                        Ok(ReconcileRequest {
1510                            obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1511                            reason: ReconcileReason::BulkReconcile,
1512                        })
1513                    }))
1514                })
1515                .boxed(),
1516        );
1517        self
1518    }
1519
1520    /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1521    ///
1522    /// This can be used to inject reconciliations for specific objects from an external resource.
1523    ///
1524    /// # Example:
1525    ///
1526    /// ```no_run
1527    /// # async {
1528    /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1529    /// # use k8s_openapi::api::core::v1::{ConfigMap};
1530    /// # use kube::api::Api;
1531    /// # use kube::runtime::controller::Action;
1532    /// # use kube::runtime::reflector::{ObjectRef, Store};
1533    /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1534    /// # use kube::runtime::watcher::Config;
1535    /// # use kube::{Client, Error, ResourceExt};
1536    /// # use std::future;
1537    /// # use std::sync::Arc;
1538    /// #
1539    /// # let client: Client = todo!();
1540    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1541    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1542    /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1543    /// # let ns = "controller-ns".to_string();
1544    /// struct ExternalObject {
1545    ///     name: String,
1546    /// }
1547    /// let external_stream = watch_external_objects().map(|ext| {
1548    ///     ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1549    /// });
1550    ///
1551    /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1552    ///     .reconcile_on(external_stream)
1553    ///     .run(reconcile, error_policy, Arc::new(()))
1554    ///     .for_each(|_| future::ready(()))
1555    ///     .await;
1556    /// # };
1557    /// ```
1558    #[cfg(feature = "unstable-runtime-reconcile-on")]
1559    #[must_use]
1560    pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1561        self.trigger_selector.push(
1562            trigger
1563                .map(move |obj| {
1564                    Ok(ReconcileRequest {
1565                        obj_ref: obj,
1566                        reason: ReconcileReason::Unknown,
1567                    })
1568                })
1569                .boxed(),
1570        );
1571        self
1572    }
1573
1574    /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1575    ///
1576    /// - No new reconciliations are started from the scheduler
1577    /// - The underlying Kubernetes watch is terminated
1578    /// - All running reconciliations are allowed to finish
1579    /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1580    ///
1581    /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1582    ///
1583    /// ```rust
1584    /// # async {
1585    /// use futures::future::FutureExt;
1586    /// use k8s_openapi::api::core::v1::ConfigMap;
1587    /// use kube::{Api, Client, ResourceExt};
1588    /// use kube_runtime::{
1589    ///     controller::{Controller, Action},
1590    ///     watcher,
1591    /// };
1592    /// use std::{convert::Infallible, sync::Arc};
1593    /// Controller::new(
1594    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1595    ///     watcher::Config::default(),
1596    /// )
1597    /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1598    /// .run(
1599    ///     |o, _| async move {
1600    ///         println!("Reconciling {}", o.name_any());
1601    ///         Ok(Action::await_change())
1602    ///     },
1603    ///     |_, err: &Infallible, _| Err(err).unwrap(),
1604    ///     Arc::new(()),
1605    /// );
1606    /// # };
1607    /// ```
1608    ///
1609    /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1610    /// as soon as *any* [`Future`] resolves.
1611    #[must_use]
1612    pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1613        self.graceful_shutdown_selector.push(trigger.boxed());
1614        self
1615    }
1616
1617    /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1618    ///
1619    /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1620    /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1621    ///
1622    /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1623    /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1624    /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1625    /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1626    ///
1627    /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1628    /// [`Controller::graceful_shutdown_on`].
1629    ///
1630    /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1631    /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1632    /// unless you run [`std::process::exit`] afterwards.
1633    #[must_use]
1634    pub fn shutdown_on_signal(mut self) -> Self {
1635        async fn shutdown_signal() {
1636            futures::future::select(
1637                tokio::signal::ctrl_c().map(|_| ()).boxed(),
1638                #[cfg(unix)]
1639                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1640                    .unwrap()
1641                    .recv()
1642                    .map(|_| ())
1643                    .boxed(),
1644                // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1645                #[cfg(not(unix))]
1646                futures::future::pending::<()>(),
1647            )
1648            .await;
1649        }
1650
1651        let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1652        self.graceful_shutdown_selector
1653            .push(graceful_rx.map(|_| ()).boxed());
1654        self.forceful_shutdown_selector.push(
1655            async {
1656                tracing::info!("press ctrl+c to shut down gracefully");
1657                shutdown_signal().await;
1658                if let Ok(()) = graceful_tx.send(()) {
1659                    tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1660                } else {
1661                    tracing::info!(
1662                        "graceful shutdown already requested, press ctrl+c again to force shutdown"
1663                    );
1664                }
1665                shutdown_signal().await;
1666                tracing::info!("forced shutdown requested");
1667            }
1668            .boxed(),
1669        );
1670        self
1671    }
1672
1673    /// Consume all the parameters of the Controller and start the applier stream
1674    ///
1675    /// This creates a stream from all builder calls and starts an applier with
1676    /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1677    /// with a configurable `context`.
1678    pub fn run<ReconcilerFut, Ctx>(
1679        self,
1680        mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1681        error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1682        context: Arc<Ctx>,
1683    ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1684    where
1685        K::DynamicType: Debug + Unpin,
1686        ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1687        ReconcilerFut::Error: std::error::Error + Send + 'static,
1688    {
1689        applier(
1690            move |obj, ctx| {
1691                CancelableJoinHandle::spawn(
1692                    TryFutureExt::into_future(reconciler(obj, ctx)).in_current_span(),
1693                    &Handle::current(),
1694                )
1695            },
1696            error_policy,
1697            context,
1698            self.reader,
1699            StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1700                .take_until(future::select_all(self.graceful_shutdown_selector)),
1701            self.config,
1702        )
1703        .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1704    }
1705}
1706
1707#[cfg(test)]
1708mod tests {
1709    use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1710
1711    use super::{APPLIER_REQUEUE_BUF_SIZE, Action};
1712    use crate::{
1713        Config, Controller, applier,
1714        reflector::{self, ObjectRef},
1715        watcher::{self, Event, metadata_watcher, watcher},
1716    };
1717    use futures::{Stream, StreamExt, TryStreamExt};
1718    use k8s_openapi::api::core::v1::ConfigMap;
1719    use kube_client::{Api, Resource, core::ObjectMeta};
1720    use serde::de::DeserializeOwned;
1721    use tokio::time::timeout;
1722
1723    fn assert_send<T: Send>(x: T) -> T {
1724        x
1725    }
1726
1727    // Used to typecheck that a type T is a generic type that implements Stream
1728    // and returns a WatchEvent generic over a resource `K`
1729    fn assert_stream<T, K>(x: T) -> T
1730    where
1731        T: Stream<Item = watcher::Result<Event<K>>> + Send,
1732        K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1733    {
1734        x
1735    }
1736
1737    fn mock_type<T>() -> T {
1738        unimplemented!(
1739            "mock_type is not supposed to be called, only used for filling holes in type assertions"
1740        )
1741    }
1742
1743    // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1744    #[allow(dead_code, unused_must_use)]
1745    fn test_controller_should_be_send() {
1746        assert_send(
1747            Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1748                |_, _| async { Ok(mock_type::<Action>()) },
1749                |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1750                Arc::new(()),
1751            ),
1752        );
1753    }
1754
1755    // not #[test] because we don't want to actually run it, we just want to
1756    // assert that it typechecks
1757    //
1758    // will check return types for `watcher` and `watch_metadata` do not drift
1759    // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1760    #[allow(dead_code, unused_must_use)]
1761    fn test_watcher_stream_type_drift() {
1762        assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1763        assert_stream(metadata_watcher(
1764            mock_type::<Api<ConfigMap>>(),
1765            Default::default(),
1766        ));
1767    }
1768
1769    #[tokio::test]
1770    async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1771        // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1772        // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1773
1774        // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1775        // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1776        let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1777        // Assume that everything's OK if we can reconcile every object 3 times on average
1778        let reconciles = items * 3;
1779
1780        let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1781        let (store_rx, mut store_tx) = reflector::store();
1782        let mut applier = pin!(applier(
1783            |_obj, _| {
1784                Box::pin(async move {
1785                    // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1786                    //println!("reconciling {:?}", obj.metadata.name);
1787                    Ok(Action::requeue(Duration::ZERO))
1788                })
1789            },
1790            |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1791            Arc::new(()),
1792            store_rx,
1793            queue_rx.map(Result::<_, Infallible>::Ok),
1794            Config::default(),
1795        ));
1796        store_tx.apply_watcher_event(&watcher::Event::InitDone);
1797        for i in 0..items {
1798            let obj = ConfigMap {
1799                metadata: ObjectMeta {
1800                    name: Some(format!("cm-{i}")),
1801                    namespace: Some("default".to_string()),
1802                    ..Default::default()
1803                },
1804                ..Default::default()
1805            };
1806            store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1807            queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1808        }
1809
1810        timeout(
1811            Duration::from_secs(10),
1812            applier
1813                .as_mut()
1814                .take(reconciles)
1815                .try_for_each(|_| async { Ok(()) }),
1816        )
1817        .await
1818        .expect("test timeout expired, applier likely deadlocked")
1819        .unwrap();
1820
1821        // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1822        drop(queue_tx);
1823        timeout(
1824            Duration::from_secs(10),
1825            applier.try_for_each(|_| async { Ok(()) }),
1826        )
1827        .await
1828        .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1829        .unwrap();
1830    }
1831}