Skip to main content

kube_runtime/controller/
mod.rs

1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4#[allow(deprecated)] use crate::watcher::metadata_watcher;
5use crate::{
6    reflector::{
7        self, ObjectRef, reflector,
8        store::{Store, Writer},
9    },
10    scheduler::{ScheduleRequest, debounced_scheduler},
11    utils::{
12        Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, trystream_try_via,
13    },
14    watcher::{self, DefaultBackoff, watcher},
15};
16use educe::Educe;
17use futures::{
18    FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, channel,
19    future::{self, BoxFuture},
20    stream,
21};
22use kube_client::api::{Api, DynamicObject, Resource};
23use pin_project::pin_project;
24use serde::de::DeserializeOwned;
25use std::{
26    fmt::{Debug, Display},
27    hash::Hash,
28    sync::Arc,
29    task::{Poll, ready},
30    time::Duration,
31};
32use stream::BoxStream;
33use thiserror::Error;
34use tokio::{runtime::Handle, time::Instant};
35use tracing::{Instrument, info_span};
36
37mod future_hash_map;
38mod runner;
39
40/// The reasons the internal runner can fail
41pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
42
43/// Errors returned by the applier and visible in a controller stream if inspecting it
44///
45/// WARNING: These errors do not terminate `Controller::run`, and are not passed to the `reconcile` fn
46/// as they exist primarily for diagnostics.
47///
48/// To inspect these errors, you can run a `for_each` on the run stream:
49///
50/// ```compile_fail
51///    Controller::new(api, watcher_config)
52///        .run(reconcile, error_policy, context)
53///        .for_each(|res| async move {
54///            match res {
55///                Ok(o) => info!("reconciled {:?}", o),
56///                /// Reconciler errors visible here:
57///                Err(e) => warn!("reconcile failed: {}", e),
58///            }
59///        })
60///        .await;
61/// ```
62#[derive(Debug, Error)]
63pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
64    /// A scheduled reconcile for an object refers to an object that no longer exists
65    ///
66    /// This is usually not a problem and often expected with certain relations.
67    /// See <https://github.com/kube-rs/kube/issues/1167#issuecomment-1636773541>
68    /// for a more detailed explanation of how/why this happens.
69    #[error("tried to reconcile object {0} that was not found in local store")]
70    ObjectNotFound(Box<ObjectRef<DynamicObject>>),
71
72    /// User's reconcile fn failed for the object
73    #[error("reconciler for object {1} failed")]
74    ReconcilerFailed(#[source] ReconcilerErr, Box<ObjectRef<DynamicObject>>),
75
76    /// The queue stream contained an error
77    #[error("event queue error")]
78    QueueError(#[source] QueueErr),
79
80    /// The internal runner returned an error
81    #[error("runner error")]
82    RunnerError(#[source] RunnerError),
83}
84
85/// Results of the reconciliation attempt
86#[derive(Debug, Clone, Eq, PartialEq)]
87pub struct Action {
88    /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
89    ///
90    /// For example, use this to query external systems for updates, expire time-limited resources, or
91    /// (in your `error_policy`) retry after errors.
92    requeue_after: Option<Duration>,
93}
94
95impl Action {
96    /// Action to the reconciliation at this time even if no external watch triggers hit
97    ///
98    /// This is the best-practice action that ensures eventual consistency of your controller
99    /// even in the case of missed changes (which can happen).
100    ///
101    /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
102    #[must_use]
103    pub const fn requeue(duration: Duration) -> Self {
104        Self {
105            requeue_after: Some(duration),
106        }
107    }
108
109    /// Do nothing until a change is detected
110    ///
111    /// This stops the controller periodically reconciling this object until a relevant watch event
112    /// was **detected**.
113    ///
114    /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
115    /// It is therefore not recommended to disable requeuing this way, unless you have
116    /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
117    #[must_use]
118    pub const fn await_change() -> Self {
119        Self { requeue_after: None }
120    }
121}
122
123/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
124pub fn trigger_with<T, K, I, S>(
125    stream: S,
126    mapper: impl Fn(T) -> I,
127) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
128where
129    S: TryStream<Ok = T>,
130    I: IntoIterator,
131    I::Item: Into<ReconcileRequest<K>>,
132    K: Resource,
133{
134    stream
135        .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
136        .try_flatten()
137}
138
139/// Enqueues the object itself for reconciliation
140pub fn trigger_self<K, S>(
141    stream: S,
142    dyntype: K::DynamicType,
143) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
144where
145    S: TryStream<Ok = K>,
146    K: Resource,
147    K::DynamicType: Clone,
148{
149    trigger_with(stream, move |obj| {
150        Some(ReconcileRequest {
151            obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
152            reason: ReconcileReason::ObjectUpdated,
153        })
154    })
155}
156
157/// Enqueues the object itself for reconciliation when the object is behind a
158/// shared pointer
159#[cfg(feature = "unstable-runtime-subscribe")]
160fn trigger_self_shared<K, S>(
161    stream: S,
162    dyntype: K::DynamicType,
163) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
164where
165    // Input stream has item as some Arc'd Resource (via
166    // Controller::for_shared_stream)
167    S: TryStream<Ok = Arc<K>>,
168    K: Resource,
169    K::DynamicType: Clone,
170{
171    trigger_with(stream, move |obj| {
172        Some(ReconcileRequest {
173            obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
174            reason: ReconcileReason::ObjectUpdated,
175        })
176    })
177}
178
179/// Enqueues any mapper returned `K` types for reconciliation
180fn trigger_others<S, K, I>(
181    stream: S,
182    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
183    dyntype: <S::Ok as Resource>::DynamicType,
184) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
185where
186    // Input stream has items as some Resource (via Controller::watches)
187    S: TryStream,
188    S::Ok: Resource,
189    <S::Ok as Resource>::DynamicType: Clone,
190    // Output stream is requests for the root type K
191    K: Resource,
192    K::DynamicType: Clone,
193    // but the mapper can produce many of them
194    I: 'static + IntoIterator<Item = ObjectRef<K>>,
195    I::IntoIter: Send,
196{
197    trigger_with(stream, move |obj| {
198        let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
199        mapper(obj)
200            .into_iter()
201            .map(move |mapped_obj_ref| ReconcileRequest {
202                obj_ref: mapped_obj_ref,
203                reason: ReconcileReason::RelatedObjectUpdated {
204                    obj_ref: Box::new(watch_ref.clone()),
205                },
206            })
207    })
208}
209
210/// Enqueues any mapper returned `Arc<K>` types for reconciliation
211#[cfg(feature = "unstable-runtime-subscribe")]
212fn trigger_others_shared<S, O, K, I>(
213    stream: S,
214    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
215    dyntype: O::DynamicType,
216) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
217where
218    // Input is some shared resource (`Arc<O>`) obtained via `reflect`
219    S: TryStream<Ok = Arc<O>>,
220    O: Resource,
221    O::DynamicType: Clone,
222    // Output stream is requests for the root type K
223    K: Resource,
224    K::DynamicType: Clone,
225    // but the mapper can produce many of them
226    I: 'static + IntoIterator<Item = ObjectRef<K>>,
227    I::IntoIter: Send,
228{
229    trigger_with(stream, move |obj| {
230        let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
231        mapper(obj)
232            .into_iter()
233            .map(move |mapped_obj_ref| ReconcileRequest {
234                obj_ref: mapped_obj_ref,
235                reason: ReconcileReason::RelatedObjectUpdated {
236                    obj_ref: Box::new(watch_ref.clone()),
237                },
238            })
239    })
240}
241
242/// Enqueues any owners of type `KOwner` for reconciliation
243pub fn trigger_owners<KOwner, S>(
244    stream: S,
245    owner_type: KOwner::DynamicType,
246    child_type: <S::Ok as Resource>::DynamicType,
247) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
248where
249    S: TryStream,
250    S::Ok: Resource,
251    <S::Ok as Resource>::DynamicType: Clone,
252    KOwner: Resource,
253    KOwner::DynamicType: Clone,
254{
255    let mapper = move |obj: S::Ok| {
256        let meta = obj.meta().clone();
257        let ns = meta.namespace;
258        let owner_type = owner_type.clone();
259        meta.owner_references
260            .into_iter()
261            .flatten()
262            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
263    };
264    trigger_others(stream, mapper, child_type)
265}
266
267// TODO: do we really need to deal with a trystream? can we simplify this at
268// all?
269/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
270/// owned `K` objects
271#[cfg(feature = "unstable-runtime-subscribe")]
272fn trigger_owners_shared<KOwner, S, K>(
273    stream: S,
274    owner_type: KOwner::DynamicType,
275    child_type: K::DynamicType,
276) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
277where
278    S: TryStream<Ok = Arc<K>>,
279    K: Resource,
280    K::DynamicType: Clone,
281    KOwner: Resource,
282    KOwner::DynamicType: Clone,
283{
284    let mapper = move |obj: S::Ok| {
285        let meta = obj.meta().clone();
286        let ns = meta.namespace;
287        let owner_type = owner_type.clone();
288        meta.owner_references
289            .into_iter()
290            .flatten()
291            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
292    };
293    trigger_others_shared(stream, mapper, child_type)
294}
295
296/// A request to reconcile an object, annotated with why that request was made.
297///
298/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
299/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
300/// In this case, only *the first* reason is stored.
301#[derive(Educe)]
302#[educe(
303    Debug(bound("K::DynamicType: Debug")),
304    Clone(bound("K::DynamicType: Clone")),
305    PartialEq(bound("K::DynamicType: PartialEq")),
306    Hash(bound("K::DynamicType: Hash"))
307)]
308pub struct ReconcileRequest<K: Resource> {
309    /// A reference to the object to be reconciled
310    pub obj_ref: ObjectRef<K>,
311    /// The reason for why reconciliation was requested
312    #[educe(PartialEq(ignore), Hash(ignore))]
313    pub reason: ReconcileReason,
314}
315
316impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
317
318impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
319    fn from(obj_ref: ObjectRef<K>) -> Self {
320        ReconcileRequest {
321            obj_ref,
322            reason: ReconcileReason::Unknown,
323        }
324    }
325}
326
327/// The reason a reconcile was requested
328///
329/// Note that this reason is deliberately hidden from the reconciler.
330/// See <https://kube.rs/controllers/reconciler/#reasons-for-reconciliation>.
331#[derive(Debug, Clone)]
332pub enum ReconcileReason {
333    /// A custom reconcile triggered via `reconcile_on`
334    Unknown,
335
336    /// The main object was updated.
337    ObjectUpdated,
338
339    /// A related object was updated through a mapper
340    ///
341    /// The related object traversed its relation up to the object kind you are reconciling.
342    /// Your object may not have changed, but you may need to update child objects.
343    RelatedObjectUpdated {
344        /// An object ref to the related object
345        obj_ref: Box<ObjectRef<DynamicObject>>,
346    },
347
348    /// The users `reconcile` scheduled a reconciliation via an `Action`
349    ReconcilerRequestedRetry,
350
351    /// The users `error_policy` scheduled a reconciliation via an `Action`
352    ErrorPolicyRequestedRetry,
353
354    /// A bulk reconcile was requested via `reconcile_all_on`
355    BulkReconcile,
356
357    /// A custom reconcile reason for custom integrations.
358    ///
359    /// Can be used when injecting elements into the queue stream directly.
360    Custom {
361        /// A user provided reason through a custom integration
362        reason: String,
363    },
364}
365
366impl Display for ReconcileReason {
367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368        match self {
369            ReconcileReason::Unknown => f.write_str("unknown"),
370            ReconcileReason::ObjectUpdated => f.write_str("object updated"),
371            ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
372                f.write_fmt(format_args!("related object updated: {object}"))
373            }
374            ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
375            ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
376            ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
377            ReconcileReason::Custom { reason } => f.write_str(reason),
378        }
379    }
380}
381
382const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
383
384/// Apply a reconciler to an input stream, with a given retry policy
385///
386/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
387///
388/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
389/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
390/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
391/// with a [`watcher()`] or [`reflector()`] for the subobject.
392///
393/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
394/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
395#[allow(clippy::needless_pass_by_value)]
396#[allow(clippy::type_complexity)]
397pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
398    mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
399    error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
400    context: Arc<Ctx>,
401    store: Store<K>,
402    queue: QueueStream,
403    config: Config,
404) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
405where
406    K: Clone + Resource + 'static,
407    K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
408    ReconcilerFut: TryFuture<Ok = Action> + Unpin,
409    ReconcilerFut::Error: std::error::Error + 'static,
410    QueueStream: TryStream,
411    QueueStream::Ok: Into<ReconcileRequest<K>>,
412    QueueStream::Error: std::error::Error + 'static,
413{
414    let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
415    let (scheduler_tx, scheduler_rx) =
416        channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
417    let error_policy = Arc::new(error_policy);
418    let delay_store = store.clone();
419    // Create a stream of ObjectRefs that need to be reconciled
420    trystream_try_via(
421        // input: stream combining scheduled tasks and user specified inputs event
422        Box::pin(stream::select(
423            // 1. inputs from users queue stream
424            queue
425                .map_err(Error::QueueError)
426                .map_ok(|request| ScheduleRequest {
427                    message: request.into(),
428                    run_at: Instant::now(),
429                })
430                .on_complete(async move {
431                    // On error: scheduler has already been shut down and there is nothing for us to do
432                    let _ = scheduler_shutdown_tx.send(());
433                    tracing::debug!("applier queue terminated, starting graceful shutdown")
434                }),
435            // 2. requests sent to scheduler_tx
436            scheduler_rx
437                .map(Ok)
438                .take_until(scheduler_shutdown_rx)
439                .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
440        )),
441        // all the Oks from the select gets passed through the scheduler stream, and are then executed
442        move |s| {
443            Runner::new(
444                debounced_scheduler(s, config.debounce),
445                config.concurrency,
446                move |request| {
447                    let request = request.clone();
448                    match store.get(&request.obj_ref) {
449                        Some(obj) => {
450                            let scheduler_tx = scheduler_tx.clone();
451                            let error_policy_ctx = context.clone();
452                            let error_policy = error_policy.clone();
453                            let reconciler_span = info_span!(
454                                "reconciling object",
455                                "object.ref" = %request.obj_ref,
456                                object.reason = %request.reason
457                            );
458                            TryFutureExt::into_future(
459                                reconciler_span.in_scope(|| reconciler(Arc::clone(&obj), context.clone())),
460                            )
461                            .then(move |res| {
462                                let error_policy = error_policy;
463                                RescheduleReconciliation::new(
464                                    res,
465                                    |err| error_policy(obj, err, error_policy_ctx),
466                                    request.obj_ref.clone(),
467                                    scheduler_tx,
468                                )
469                                // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
470                                // to them separately
471                                .map(|res| Ok((request.obj_ref, res)))
472                            })
473                            .instrument(reconciler_span)
474                            .left_future()
475                        }
476                        None => {
477                            std::future::ready(Err(Error::ObjectNotFound(Box::new(request.obj_ref.erase()))))
478                                .right_future()
479                        }
480                    }
481                },
482            )
483            .delay_tasks_until(async move {
484                tracing::debug!("applier runner held until store is ready");
485                let res = delay_store.wait_until_ready().await;
486                tracing::debug!("store is ready, starting runner");
487                res
488            })
489            .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
490            .on_complete(async { tracing::debug!("applier runner terminated") })
491        },
492    )
493    .on_complete(async { tracing::debug!("applier runner-merge terminated") })
494    // finally, for each completed reconcile call:
495    .and_then(move |(obj_ref, reconciler_result)| async move {
496        match reconciler_result {
497            Ok(action) => Ok((obj_ref, action)),
498            Err(err) => Err(Error::ReconcilerFailed(err, Box::new(obj_ref.erase()))),
499        }
500    })
501    .on_complete(async { tracing::debug!("applier terminated") })
502}
503
504/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
505///
506/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
507#[pin_project]
508#[must_use]
509struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
510    reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
511
512    reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
513    result: Option<Result<Action, ReconcilerErr>>,
514}
515
516impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
517where
518    K: Resource,
519{
520    fn new(
521        result: Result<Action, ReconcilerErr>,
522        error_policy: impl FnOnce(&ReconcilerErr) -> Action,
523        obj_ref: ObjectRef<K>,
524        reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
525    ) -> Self {
526        let reconciler_finished_at = Instant::now();
527
528        let (action, reschedule_reason) = result.as_ref().map_or_else(
529            |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
530            |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
531        );
532
533        Self {
534            reschedule_tx,
535            reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
536                message: ReconcileRequest {
537                    obj_ref,
538                    reason: reschedule_reason,
539                },
540                run_at: reconciler_finished_at
541                    .checked_add(requeue_after)
542                    .unwrap_or_else(crate::scheduler::max_schedule_time),
543            }),
544            result: Some(result),
545        }
546    }
547}
548
549impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
550where
551    K: Resource,
552{
553    type Output = Result<Action, ReconcilerErr>;
554
555    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
556        let this = self.get_mut();
557
558        if this.reschedule_request.is_some() {
559            let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
560            let reschedule_request = this
561                .reschedule_request
562                .take()
563                .expect("PostReconciler::reschedule_request was taken during processing");
564            // Failure to schedule item = in graceful shutdown mode, ignore
565            if let Ok(()) = rescheduler_ready {
566                let _ = this.reschedule_tx.start_send(reschedule_request);
567            }
568        }
569
570        Poll::Ready(
571            this.result
572                .take()
573                .expect("PostReconciler::result was already taken"),
574        )
575    }
576}
577
578/// Accumulates all options that can be used on a [`Controller`] invocation.
579#[derive(Clone, Debug, Default)]
580pub struct Config {
581    debounce: Duration,
582    concurrency: u16,
583}
584
585impl Config {
586    /// The debounce duration used to deduplicate reconciliation requests.
587    ///
588    /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
589    /// resulting in __trailing edge debouncing__ of reconciler requests.
590    /// This option can help to reduce the amount of unnecessary reconciler calls
591    /// when using multiple controller relations, or during rapid phase transitions.
592    ///
593    /// ## Warning
594    /// This option delays (and keeps delaying) reconcile requests for objects while
595    /// the object is updated. It can **permanently hide** updates from your reconciler
596    /// if set too high on objects that are updated frequently (like nodes).
597    #[must_use]
598    pub fn debounce(mut self, debounce: Duration) -> Self {
599        self.debounce = debounce;
600        self
601    }
602
603    /// The number of concurrent reconciliations of that are allowed to run at an given moment.
604    ///
605    /// This can be adjusted to the controller's needs to increase
606    /// performance and/or make performance predictable. By default, its 0 meaning
607    /// the controller runs with unbounded concurrency.
608    ///
609    /// Note that despite concurrency, a controller never schedules concurrent reconciles
610    /// on the same object.
611    #[must_use]
612    pub fn concurrency(mut self, concurrency: u16) -> Self {
613        self.concurrency = concurrency;
614        self
615    }
616}
617
618/// Controller for a Resource `K`
619///
620/// A controller is an infinite stream of objects to be reconciled.
621///
622/// Once `run` and continuously awaited, it continuously calls out to user provided
623/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
624/// or if errors are seen from `reconcile`.
625///
626/// Reconciles are generally requested for all changes on your root objects.
627/// Changes to managed child resources will also trigger the reconciler for the
628/// managing object by traversing owner references (for `Controller::owns`),
629/// or traverse a custom mapping (for `Controller::watches`).
630///
631/// This mapping mechanism ultimately hides the reason for the reconciliation request,
632/// and forces you to write an idempotent reconciler.
633///
634/// General setup:
635/// ```no_run
636/// use kube::{Api, Client, CustomResource};
637/// use kube::runtime::{controller::{Controller, Action}, watcher};
638/// # use serde::{Deserialize, Serialize};
639/// # use tokio::time::Duration;
640/// use futures::StreamExt;
641/// use k8s_openapi::api::core::v1::ConfigMap;
642/// use schemars::JsonSchema;
643/// # use std::sync::Arc;
644/// use thiserror::Error;
645///
646/// #[derive(Debug, Error)]
647/// enum Error {}
648///
649/// /// A custom resource
650/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
651/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
652/// struct ConfigMapGeneratorSpec {
653///     content: String,
654/// }
655///
656/// /// The reconciler that will be called when either object change
657/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
658///     // .. use api here to reconcile a child ConfigMap with ownerreferences
659///     // see configmapgen_controller example for full info
660///     Ok(Action::requeue(Duration::from_secs(300)))
661/// }
662/// /// an error handler that will be called when the reconciler fails with access to both the
663/// /// object that caused the failure and the actual error
664/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
665///     Action::requeue(Duration::from_secs(60))
666/// }
667///
668/// /// something to drive the controller
669///
670/// async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
671/// #   let client: Client = todo!();
672///     let context = Arc::new(()); // bad empty context - put client in here
673///     let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
674///     let cms = Api::<ConfigMap>::all(client.clone());
675///     Controller::new(cmgs, watcher::Config::default())
676///         .owns(cms, watcher::Config::default())
677///         .run(reconcile, error_policy, context)
678///         .for_each(|res| async move {
679///             match res {
680///                 Ok(o) => println!("reconciled {:?}", o),
681///                 Err(e) => println!("reconcile failed: {:?}", e),
682///             }
683///         })
684///         .await; // controller does nothing unless polled
685/// #    Ok(())
686/// # }
687/// ```
688pub struct Controller<K>
689where
690    K: Clone + Resource + Debug + 'static,
691    K::DynamicType: Eq + Hash,
692{
693    // NB: Need to Unpin for stream::select_all
694    trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
695    trigger_backoff: Box<dyn Backoff + Send>,
696    /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
697    /// refusing to start any new reconciliations but letting any existing ones finish.
698    graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
699    /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
700    /// requesting that all running reconciliations be aborted.
701    /// However, note that they *will* keep running until their next yield point (`.await`),
702    /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
703    forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
704    dyntype: K::DynamicType,
705    reader: Store<K>,
706    config: Config,
707}
708
709impl<K> Controller<K>
710where
711    K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
712    K::DynamicType: Eq + Hash + Clone,
713{
714    /// Create a Controller for a resource `K`
715    ///
716    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
717    ///
718    /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
719    /// and receive reconcile events for.
720    /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
721    #[must_use]
722    pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
723    where
724        K::DynamicType: Default,
725    {
726        Self::new_with(main_api, wc, Default::default())
727    }
728
729    /// Create a Controller for a resource `K`
730    ///
731    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
732    ///
733    /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
734    /// to watch - in the Api's  configured scope - and receive reconcile events for.
735    /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
736    ///
737    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
738    ///
739    /// [`watcher::Config`]: crate::watcher::Config
740    /// [`Api`]: kube_client::Api
741    /// [`dynamic`]: kube_client::core::dynamic
742    /// [`Config::default`]: crate::watcher::Config::default
743    pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
744        let writer = Writer::<K>::new(dyntype.clone());
745        let reader = writer.as_reader();
746        let mut trigger_selector = stream::SelectAll::new();
747        let self_watcher = trigger_self(
748            reflector(writer, watcher(main_api, wc)).applied_objects(),
749            dyntype.clone(),
750        )
751        .boxed();
752        trigger_selector.push(self_watcher);
753        Self {
754            trigger_selector,
755            trigger_backoff: Box::<DefaultBackoff>::default(),
756            graceful_shutdown_selector: vec![
757                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
758                future::pending().boxed(),
759            ],
760            forceful_shutdown_selector: vec![
761                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
762                future::pending().boxed(),
763            ],
764            dyntype,
765            reader,
766            config: Default::default(),
767        }
768    }
769
770    /// Create a Controller for a resource `K` from a stream of `K` objects
771    ///
772    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
773    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
774    /// as well as sharing input streams between multiple controllers.
775    ///
776    /// # Example:
777    ///
778    /// ```no_run
779    /// # use futures::StreamExt;
780    /// # use k8s_openapi::api::apps::v1::Deployment;
781    /// # use kube::runtime::controller::{Action, Controller};
782    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
783    /// # use kube::{Api, Client, Error, ResourceExt};
784    /// # use std::sync::Arc;
785    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
786    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
787    /// # async fn doc(client: kube::Client) {
788    /// let api: Api<Deployment> = Api::default_namespaced(client);
789    /// let (reader, writer) = reflector::store();
790    /// let deploys = watcher(api, watcher::Config::default())
791    ///     .default_backoff()
792    ///     .reflect(writer)
793    ///     .applied_objects()
794    ///     .predicate_filter(predicates::generation, Default::default());
795    ///
796    /// Controller::for_stream(deploys, reader)
797    ///     .run(reconcile, error_policy, Arc::new(()))
798    ///     .for_each(|_| std::future::ready(()))
799    ///     .await;
800    /// # }
801    /// ```
802    ///
803    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
804    #[cfg(feature = "unstable-runtime-stream-control")]
805    pub fn for_stream(
806        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
807        reader: Store<K>,
808    ) -> Self
809    where
810        K::DynamicType: Default,
811    {
812        Self::for_stream_with(trigger, reader, Default::default())
813    }
814
815    /// Create a Controller for a resource `K` from a stream of `K` objects
816    ///
817    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
818    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
819    /// as well as sharing input streams between multiple controllers.
820    ///
821    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
822    ///
823    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
824    ///
825    /// [`dynamic`]: kube_client::core::dynamic
826    #[cfg(feature = "unstable-runtime-stream-control")]
827    pub fn for_stream_with(
828        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
829        reader: Store<K>,
830        dyntype: K::DynamicType,
831    ) -> Self {
832        let mut trigger_selector = stream::SelectAll::new();
833        let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
834        trigger_selector.push(self_watcher);
835        Self {
836            trigger_selector,
837            trigger_backoff: Box::<DefaultBackoff>::default(),
838            graceful_shutdown_selector: vec![
839                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
840                future::pending().boxed(),
841            ],
842            forceful_shutdown_selector: vec![
843                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
844                future::pending().boxed(),
845            ],
846            dyntype,
847            reader,
848            config: Default::default(),
849        }
850    }
851
852    /// This is the same as [`Controller::for_stream`]. Instead of taking an
853    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
854    /// streams can be created out-of-band by subscribing on a store `Writer`.
855    /// Through this interface, multiple controllers can use the same root
856    /// (shared) input stream of resources to keep memory overheads smaller.
857    ///
858    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
859    /// need to share the stream.
860    ///
861    /// ## Warning:
862    ///
863    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
864    /// is driven to readiness independently of this controller to ensure the
865    /// watcher never deadlocks.
866    ///
867    /// # Example:
868    ///
869    /// ```no_run
870    /// # use futures::StreamExt;
871    /// # use k8s_openapi::api::apps::v1::Deployment;
872    /// # use kube::runtime::controller::{Action, Controller};
873    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
874    /// # use kube::{Api, Client, Error, ResourceExt};
875    /// # use std::sync::Arc;
876    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
877    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
878    /// # async fn doc(client: kube::Client) {
879    /// let api: Api<Deployment> = Api::default_namespaced(client);
880    /// let (reader, writer) = reflector::store_shared(128);
881    /// let subscriber = writer
882    ///     .subscribe()
883    ///     .expect("subscribers can only be created from shared stores");
884    /// let deploys = watcher(api, watcher::Config::default())
885    ///     .default_backoff()
886    ///     .reflect(writer)
887    ///     .applied_objects()
888    ///     .for_each(|ev| async move {
889    ///         match ev {
890    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
891    ///             Err(error) => tracing::error!(%error, "received error")
892    ///         }
893    ///     });
894    ///
895    /// let controller = Controller::for_shared_stream(subscriber, reader)
896    ///     .run(reconcile, error_policy, Arc::new(()))
897    ///     .for_each(|ev| async move {
898    ///         tracing::info!("reconciled {ev:?}")
899    ///     });
900    ///
901    /// // Drive streams using a select statement
902    /// tokio::select! {
903    ///   _ = deploys => {},
904    ///   _ = controller => {},
905    /// }
906    /// # }
907    #[cfg(feature = "unstable-runtime-subscribe")]
908    pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
909    where
910        K::DynamicType: Default,
911    {
912        Self::for_shared_stream_with(trigger, reader, Default::default())
913    }
914
915    /// This is the same as [`Controller::for_stream`]. Instead of taking an
916    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
917    /// streams can be created out-of-band by subscribing on a store `Writer`.
918    /// Through this interface, multiple controllers can use the same root
919    /// (shared) input stream of resources to keep memory overheads smaller.
920    ///
921    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
922    /// need to share the stream.
923    ///
924    /// This variant constructor is used for [`dynamic`] types found through
925    /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
926    /// known at compile time).
927    ///
928    /// [`dynamic`]: kube_client::core::dynamic
929    #[cfg(feature = "unstable-runtime-subscribe")]
930    pub fn for_shared_stream_with(
931        trigger: impl Stream<Item = Arc<K>> + Send + 'static,
932        reader: Store<K>,
933        dyntype: K::DynamicType,
934    ) -> Self {
935        let mut trigger_selector = stream::SelectAll::new();
936        let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
937        trigger_selector.push(self_watcher);
938        Self {
939            trigger_selector,
940            trigger_backoff: Box::<DefaultBackoff>::default(),
941            graceful_shutdown_selector: vec![
942                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
943                future::pending().boxed(),
944            ],
945            forceful_shutdown_selector: vec![
946                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
947                future::pending().boxed(),
948            ],
949            dyntype,
950            reader,
951            config: Default::default(),
952        }
953    }
954
955    /// Specify the configuration for the controller's behavior.
956    #[must_use]
957    pub fn with_config(mut self, config: Config) -> Self {
958        self.config = config;
959        self
960    }
961
962    /// Specify the backoff policy for "trigger" watches
963    ///
964    /// This includes the core watch, as well as auxiliary watches introduced by [`Self::owns`] and [`Self::watches`].
965    ///
966    /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
967    /// but can be overridden by calling this method.
968    #[must_use]
969    pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
970        self.trigger_backoff = Box::new(backoff);
971        self
972    }
973
974    /// Retrieve a copy of the reader before starting the controller
975    pub fn store(&self) -> Store<K> {
976        self.reader.clone()
977    }
978
979    /// Specify `Child` objects which `K` owns and should be watched
980    ///
981    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
982    /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
983    ///
984    /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
985    /// to watch - in the Api's configured scope - and receive reconcile events for.
986    /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
987    ///
988    /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
989    #[must_use]
990    pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
991        self,
992        api: Api<Child>,
993        wc: watcher::Config,
994    ) -> Self {
995        self.owns_with(api, (), wc)
996    }
997
998    /// Specify `Child` objects which `K` owns and should be watched
999    ///
1000    /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
1001    #[must_use]
1002    pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
1003        mut self,
1004        api: Api<Child>,
1005        dyntype: Child::DynamicType,
1006        wc: watcher::Config,
1007    ) -> Self
1008    where
1009        Child::DynamicType: Debug + Eq + Hash + Clone,
1010    {
1011        // TODO: call owns_stream_with when it's stable
1012        #[allow(deprecated)]
1013        let child_watcher = trigger_owners(
1014            metadata_watcher(api, wc).touched_objects(),
1015            self.dyntype.clone(),
1016            dyntype,
1017        );
1018        self.trigger_selector.push(child_watcher.boxed());
1019        self
1020    }
1021
1022    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1023    ///
1024    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1025    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1026    /// as well as sharing input streams between multiple controllers.
1027    ///
1028    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1029    ///
1030    /// # Example:
1031    ///
1032    /// ```no_run
1033    /// # use futures::StreamExt;
1034    /// # use k8s_openapi::api::core::v1::ConfigMap;
1035    /// # use k8s_openapi::api::apps::v1::StatefulSet;
1036    /// # use kube::runtime::controller::Action;
1037    /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
1038    /// # use kube::{Api, Client, Error, ResourceExt};
1039    /// # use std::sync::Arc;
1040    /// # type CustomResource = ConfigMap;
1041    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1042    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1043    /// # async fn doc(client: kube::Client) {
1044    /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
1045    ///     .touched_objects()
1046    ///     .predicate_filter(predicates::generation, Default::default());
1047    ///
1048    /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
1049    ///     .owns_stream(sts_stream)
1050    ///     .run(reconcile, error_policy, Arc::new(()))
1051    ///     .for_each(|_| std::future::ready(()))
1052    ///     .await;
1053    /// # }
1054    /// ```
1055    #[cfg(feature = "unstable-runtime-stream-control")]
1056    #[must_use]
1057    pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1058        self,
1059        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1060    ) -> Self {
1061        self.owns_stream_with(trigger, ())
1062    }
1063
1064    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1065    ///
1066    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1067    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1068    /// as well as sharing input streams between multiple controllers.
1069    ///
1070    /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1071    #[cfg(feature = "unstable-runtime-stream-control")]
1072    #[must_use]
1073    pub fn owns_stream_with<Child: Resource + Send + 'static>(
1074        mut self,
1075        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1076        dyntype: Child::DynamicType,
1077    ) -> Self
1078    where
1079        Child::DynamicType: Debug + Eq + Hash + Clone,
1080    {
1081        let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1082        self.trigger_selector.push(child_watcher.boxed());
1083        self
1084    }
1085
1086    /// This is the same as [`Controller::for_stream`]. Instead of taking an
1087    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1088    /// streams can be created out-of-band by subscribing on a store `Writer`.
1089    /// Through this interface, multiple controllers can use the same root
1090    /// (shared) input stream of resources to keep memory overheads smaller.
1091    ///
1092    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1093    /// need to share the stream.
1094    ///
1095    /// ## Warning:
1096    ///
1097    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1098    /// is driven to readiness independently of this controller to ensure the
1099    /// watcher never deadlocks.
1100    ///
1101    ///
1102    /// Trigger the reconciliation process for a shared stream of `Child`
1103    /// objects of the owner `K`
1104    ///
1105    /// Conceptually the same as [`Controller::owns`], but a stream is used
1106    /// instead of an `Api`. This interface behaves similarly to its non-shared
1107    /// counterpart [`Controller::owns_stream`].
1108    ///
1109    /// # Example:
1110    ///
1111    /// ```no_run
1112    /// # use futures::StreamExt;
1113    /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1114    /// # use kube::runtime::controller::{Action, Controller};
1115    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1116    /// # use kube::{Api, Client, Error, ResourceExt};
1117    /// # use std::sync::Arc;
1118    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1119    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1120    /// # async fn doc(client: kube::Client) {
1121    /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1122    /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1123    ///
1124    /// let (reader, writer) = reflector::store_shared(128);
1125    /// let subscriber = writer
1126    ///     .subscribe()
1127    ///     .expect("subscribers can only be created from shared stores");
1128    /// let pods = watcher(pod_api, watcher::Config::default())
1129    ///     .default_backoff()
1130    ///     .reflect(writer)
1131    ///     .applied_objects()
1132    ///     .for_each(|ev| async move {
1133    ///         match ev {
1134    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
1135    ///             Err(error) => tracing::error!(%error, "received error")
1136    ///         }
1137    ///     });
1138    ///
1139    /// let controller = Controller::new(deploys, Default::default())
1140    ///     .owns_shared_stream(subscriber)
1141    ///     .run(reconcile, error_policy, Arc::new(()))
1142    ///     .for_each(|ev| async move {
1143    ///         tracing::info!("reconciled {ev:?}")
1144    ///     });
1145    ///
1146    /// // Drive streams using a select statement
1147    /// tokio::select! {
1148    ///   _ = pods => {},
1149    ///   _ = controller => {},
1150    /// }
1151    /// # }
1152    #[cfg(feature = "unstable-runtime-subscribe")]
1153    #[must_use]
1154    pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1155        self,
1156        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1157    ) -> Self {
1158        self.owns_shared_stream_with(trigger, ())
1159    }
1160
1161    /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1162    ///
1163    /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1164    /// The source stream can be shared between multiple controllers, optimising
1165    /// resource usage.
1166    ///
1167    /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1168    #[cfg(feature = "unstable-runtime-subscribe")]
1169    #[must_use]
1170    pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1171        mut self,
1172        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1173        dyntype: Child::DynamicType,
1174    ) -> Self
1175    where
1176        Child::DynamicType: Debug + Eq + Hash + Clone,
1177    {
1178        let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1179        self.trigger_selector.push(child_watcher.boxed());
1180        self
1181    }
1182
1183    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1184    ///
1185    /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1186    /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1187    ///
1188    /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1189    ///
1190    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1191    ///
1192    /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1193    /// to watch - in the Api's configured scope - and run through the custom mapper.
1194    /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1195    ///
1196    /// # Example
1197    ///
1198    /// Tracking cross cluster references using the [Operator-SDK] annotations.
1199    ///
1200    /// ```
1201    /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1202    /// # use kube::{Api, ResourceExt};
1203    /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1204    /// # use futures::StreamExt;
1205    /// # use std::sync::Arc;
1206    /// # type WatchedResource = Namespace;
1207    /// # struct Context;
1208    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1209    /// #     Ok(Action::await_change())
1210    /// # };
1211    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1212    /// #     Action::await_change()
1213    /// # }
1214    /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1215    /// # let memcached = Api::<ConfigMap>::all(client.clone());
1216    /// # let context = Arc::new(Context);
1217    /// Controller::new(memcached, watcher::Config::default())
1218    ///     .watches(
1219    ///         Api::<WatchedResource>::all(client.clone()),
1220    ///         watcher::Config::default(),
1221    ///         |ar| {
1222    ///             let prt = ar
1223    ///                 .annotations()
1224    ///                 .get("operator-sdk/primary-resource-type")
1225    ///                 .map(String::as_str);
1226    ///
1227    ///             if prt != Some("Memcached.cache.example.com") {
1228    ///                 return None;
1229    ///             }
1230    ///
1231    ///             let (namespace, name) = ar
1232    ///                 .annotations()
1233    ///                 .get("operator-sdk/primary-resource")?
1234    ///                 .split_once('/')?;
1235    ///
1236    ///             Some(ObjectRef::new(name).within(namespace))
1237    ///         }
1238    ///     )
1239    ///     .run(reconcile, error_policy, context)
1240    ///     .for_each(|_| futures::future::ready(()))
1241    ///     .await;
1242    /// # Ok(())
1243    /// # }
1244    /// ```
1245    ///
1246    /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1247    #[must_use]
1248    pub fn watches<Other, I>(
1249        self,
1250        api: Api<Other>,
1251        wc: watcher::Config,
1252        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1253    ) -> Self
1254    where
1255        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1256        Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1257        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1258        I::IntoIter: Send,
1259    {
1260        self.watches_with(api, Default::default(), wc, mapper)
1261    }
1262
1263    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1264    ///
1265    /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1266    #[must_use]
1267    pub fn watches_with<Other, I>(
1268        mut self,
1269        api: Api<Other>,
1270        dyntype: Other::DynamicType,
1271        wc: watcher::Config,
1272        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1273    ) -> Self
1274    where
1275        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1276        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1277        I::IntoIter: Send,
1278        Other::DynamicType: Debug + Clone + Eq + Hash,
1279    {
1280        let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1281        self.trigger_selector.push(other_watcher.boxed());
1282        self
1283    }
1284
1285    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1286    ///
1287    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1288    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1289    /// as well as sharing input streams between multiple controllers.
1290    ///
1291    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1292    ///
1293    /// # Example:
1294    ///
1295    /// ```no_run
1296    /// # use futures::StreamExt;
1297    /// # use k8s_openapi::api::core::v1::ConfigMap;
1298    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1299    /// # use kube::runtime::controller::Action;
1300    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1301    /// # use kube::{Api, Client, Error, ResourceExt};
1302    /// # use std::sync::Arc;
1303    /// # type CustomResource = ConfigMap;
1304    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1305    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1306    /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1307    /// # async fn doc(client: kube::Client) {
1308    /// let api: Api<DaemonSet> = Api::all(client.clone());
1309    /// let cr: Api<CustomResource> = Api::all(client.clone());
1310    /// let daemons = watcher(api, watcher::Config::default())
1311    ///     .touched_objects()
1312    ///     .predicate_filter(predicates::generation, Default::default());
1313    ///
1314    /// Controller::new(cr, watcher::Config::default())
1315    ///     .watches_stream(daemons, mapper)
1316    ///     .run(reconcile, error_policy, Arc::new(()))
1317    ///     .for_each(|_| std::future::ready(()))
1318    ///     .await;
1319    /// # }
1320    /// ```
1321    #[cfg(feature = "unstable-runtime-stream-control")]
1322    #[must_use]
1323    pub fn watches_stream<Other, I>(
1324        self,
1325        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1326        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1327    ) -> Self
1328    where
1329        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1330        Other::DynamicType: Default + Debug + Clone,
1331        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1332        I::IntoIter: Send,
1333    {
1334        self.watches_stream_with(trigger, mapper, Default::default())
1335    }
1336
1337    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1338    ///
1339    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1340    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1341    /// as well as sharing input streams between multiple controllers.
1342    ///
1343    /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1344    #[cfg(feature = "unstable-runtime-stream-control")]
1345    #[must_use]
1346    pub fn watches_stream_with<Other, I>(
1347        mut self,
1348        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1349        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1350        dyntype: Other::DynamicType,
1351    ) -> Self
1352    where
1353        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1354        Other::DynamicType: Debug + Clone,
1355        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1356        I::IntoIter: Send,
1357    {
1358        let other_watcher = trigger_others(trigger, mapper, dyntype);
1359        self.trigger_selector.push(other_watcher.boxed());
1360        self
1361    }
1362
1363    /// Trigger the reconciliation process for a shared stream of `Other`
1364    /// objects related to a `K`
1365    ///
1366    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1367    /// stream of resources is used. This allows for sharing input streams
1368    /// between multiple controllers.
1369    ///
1370    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1371    ///
1372    /// # Example:
1373    ///
1374    /// ```no_run
1375    /// # use futures::StreamExt;
1376    /// # use k8s_openapi::api::core::v1::ConfigMap;
1377    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1378    /// # use kube::runtime::controller::Action;
1379    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1380    /// # use kube::{Api, Client, Error, ResourceExt};
1381    /// # use std::sync::Arc;
1382    /// # type CustomResource = ConfigMap;
1383    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1384    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1385    /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1386    /// # async fn doc(client: kube::Client) {
1387    /// let api: Api<DaemonSet> = Api::all(client.clone());
1388    /// let cr: Api<CustomResource> = Api::all(client.clone());
1389    /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1390    /// let subscriber = writer
1391    ///     .subscribe()
1392    ///     .expect("subscribers can only be created from shared stores");
1393    /// let daemons = watcher(api, watcher::Config::default())
1394    ///     .reflect(writer)
1395    ///     .touched_objects()
1396    ///     .for_each(|ev| async move {
1397    ///         match ev {
1398    ///             Ok(obj) => {},
1399    ///             Err(error) => tracing::error!(%error, "received err")
1400    ///         }
1401    ///     });
1402    ///
1403    /// let controller = Controller::new(cr, watcher::Config::default())
1404    ///     .watches_shared_stream(subscriber, mapper)
1405    ///     .run(reconcile, error_policy, Arc::new(()))
1406    ///     .for_each(|_| std::future::ready(()));
1407    ///
1408    /// // Drive streams using a select statement
1409    /// tokio::select! {
1410    ///   _ = daemons => {},
1411    ///   _ = controller => {},
1412    /// }
1413    /// # }
1414    /// ```
1415    #[cfg(feature = "unstable-runtime-subscribe")]
1416    #[must_use]
1417    pub fn watches_shared_stream<Other, I>(
1418        self,
1419        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1420        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1421    ) -> Self
1422    where
1423        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1424        Other::DynamicType: Default + Debug + Clone,
1425        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1426        I::IntoIter: Send,
1427    {
1428        self.watches_shared_stream_with(trigger, mapper, Default::default())
1429    }
1430
1431    /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1432    ///
1433    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1434    /// stream of resources is used. This allows for sharing of streams between
1435    /// multiple controllers.
1436    ///
1437    /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1438    #[cfg(feature = "unstable-runtime-subscribe")]
1439    #[must_use]
1440    pub fn watches_shared_stream_with<Other, I>(
1441        mut self,
1442        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1443        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1444        dyntype: Other::DynamicType,
1445    ) -> Self
1446    where
1447        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1448        Other::DynamicType: Debug + Clone,
1449        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1450        I::IntoIter: Send,
1451    {
1452        let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1453        self.trigger_selector.push(other_watcher.boxed());
1454        self
1455    }
1456
1457    /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1458    ///
1459    /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1460    ///
1461    /// To reconcile all objects when a new line is entered:
1462    ///
1463    /// ```
1464    /// # async {
1465    /// use futures::stream::StreamExt;
1466    /// use k8s_openapi::api::core::v1::ConfigMap;
1467    /// use kube::{
1468    ///     Client,
1469    ///     api::{Api, ResourceExt},
1470    ///     runtime::{
1471    ///         controller::{Controller, Action},
1472    ///         watcher,
1473    ///     },
1474    /// };
1475    /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1476    /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1477    /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1478    /// // and its worker prevents the Tokio runtime from shutting down.
1479    /// std::thread::spawn(move || {
1480    ///     for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1481    ///         let _ = reload_tx.try_send(());
1482    ///     }
1483    /// });
1484    /// Controller::new(
1485    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1486    ///     watcher::Config::default(),
1487    /// )
1488    /// .reconcile_all_on(reload_rx.map(|_| ()))
1489    /// .run(
1490    ///     |o, _| async move {
1491    ///         println!("Reconciling {}", o.name_any());
1492    ///         Ok(Action::await_change())
1493    ///     },
1494    ///     |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1495    ///     Arc::new(()),
1496    /// );
1497    /// # };
1498    /// ```
1499    ///
1500    /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1501    ///
1502    /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1503    #[must_use]
1504    pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1505        let store = self.store();
1506        let dyntype = self.dyntype.clone();
1507        self.trigger_selector.push(
1508            trigger
1509                .flat_map(move |()| {
1510                    let dyntype = dyntype.clone();
1511                    stream::iter(store.state().into_iter().map(move |obj| {
1512                        Ok(ReconcileRequest {
1513                            obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1514                            reason: ReconcileReason::BulkReconcile,
1515                        })
1516                    }))
1517                })
1518                .boxed(),
1519        );
1520        self
1521    }
1522
1523    /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1524    ///
1525    /// This can be used to inject reconciliations for specific objects from an external resource.
1526    ///
1527    /// # Example:
1528    ///
1529    /// ```no_run
1530    /// # async {
1531    /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1532    /// # use k8s_openapi::api::core::v1::{ConfigMap};
1533    /// # use kube::api::Api;
1534    /// # use kube::runtime::controller::Action;
1535    /// # use kube::runtime::reflector::{ObjectRef, Store};
1536    /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1537    /// # use kube::runtime::watcher::Config;
1538    /// # use kube::{Client, Error, ResourceExt};
1539    /// # use std::future;
1540    /// # use std::sync::Arc;
1541    /// #
1542    /// # let client: Client = todo!();
1543    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1544    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1545    /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1546    /// # let ns = "controller-ns".to_string();
1547    /// struct ExternalObject {
1548    ///     name: String,
1549    /// }
1550    /// let external_stream = watch_external_objects().map(|ext| {
1551    ///     ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1552    /// });
1553    ///
1554    /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1555    ///     .reconcile_on(external_stream)
1556    ///     .run(reconcile, error_policy, Arc::new(()))
1557    ///     .for_each(|_| future::ready(()))
1558    ///     .await;
1559    /// # };
1560    /// ```
1561    #[cfg(feature = "unstable-runtime-reconcile-on")]
1562    #[must_use]
1563    pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1564        self.trigger_selector.push(
1565            trigger
1566                .map(move |obj| {
1567                    Ok(ReconcileRequest {
1568                        obj_ref: obj,
1569                        reason: ReconcileReason::Unknown,
1570                    })
1571                })
1572                .boxed(),
1573        );
1574        self
1575    }
1576
1577    /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1578    ///
1579    /// - No new reconciliations are started from the scheduler
1580    /// - The underlying Kubernetes watch is terminated
1581    /// - All running reconciliations are allowed to finish
1582    /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1583    ///
1584    /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1585    ///
1586    /// ```rust
1587    /// # async {
1588    /// use futures::future::FutureExt;
1589    /// use k8s_openapi::api::core::v1::ConfigMap;
1590    /// use kube::{Api, Client, ResourceExt};
1591    /// use kube_runtime::{
1592    ///     controller::{Controller, Action},
1593    ///     watcher,
1594    /// };
1595    /// use std::{convert::Infallible, sync::Arc};
1596    /// Controller::new(
1597    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1598    ///     watcher::Config::default(),
1599    /// )
1600    /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1601    /// .run(
1602    ///     |o, _| async move {
1603    ///         println!("Reconciling {}", o.name_any());
1604    ///         Ok(Action::await_change())
1605    ///     },
1606    ///     |_, err: &Infallible, _| Err(err).unwrap(),
1607    ///     Arc::new(()),
1608    /// );
1609    /// # };
1610    /// ```
1611    ///
1612    /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1613    /// as soon as *any* [`Future`] resolves.
1614    #[must_use]
1615    pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1616        self.graceful_shutdown_selector.push(trigger.boxed());
1617        self
1618    }
1619
1620    /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1621    ///
1622    /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1623    /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1624    ///
1625    /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1626    /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1627    /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1628    /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1629    ///
1630    /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1631    /// [`Controller::graceful_shutdown_on`].
1632    ///
1633    /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1634    /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1635    /// unless you run [`std::process::exit`] afterwards.
1636    #[must_use]
1637    pub fn shutdown_on_signal(mut self) -> Self {
1638        async fn shutdown_signal() {
1639            futures::future::select(
1640                tokio::signal::ctrl_c().map(|_| ()).boxed(),
1641                #[cfg(unix)]
1642                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1643                    .unwrap()
1644                    .recv()
1645                    .map(|_| ())
1646                    .boxed(),
1647                // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1648                #[cfg(not(unix))]
1649                futures::future::pending::<()>(),
1650            )
1651            .await;
1652        }
1653
1654        let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1655        self.graceful_shutdown_selector
1656            .push(graceful_rx.map(|_| ()).boxed());
1657        self.forceful_shutdown_selector.push(
1658            async {
1659                tracing::info!("press ctrl+c to shut down gracefully");
1660                shutdown_signal().await;
1661                if let Ok(()) = graceful_tx.send(()) {
1662                    tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1663                } else {
1664                    tracing::info!(
1665                        "graceful shutdown already requested, press ctrl+c again to force shutdown"
1666                    );
1667                }
1668                shutdown_signal().await;
1669                tracing::info!("forced shutdown requested");
1670            }
1671            .boxed(),
1672        );
1673        self
1674    }
1675
1676    /// Consume all the parameters of the Controller and start the applier stream
1677    ///
1678    /// This creates a stream from all builder calls and starts an applier with
1679    /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1680    /// with a configurable `context`.
1681    pub fn run<ReconcilerFut, Ctx>(
1682        self,
1683        mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1684        error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1685        context: Arc<Ctx>,
1686    ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1687    where
1688        K::DynamicType: Debug + Unpin,
1689        ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1690        ReconcilerFut::Error: std::error::Error + Send + 'static,
1691    {
1692        applier(
1693            move |obj, ctx| {
1694                CancelableJoinHandle::spawn(
1695                    TryFutureExt::into_future(reconciler(obj, ctx)).in_current_span(),
1696                    &Handle::current(),
1697                )
1698            },
1699            error_policy,
1700            context,
1701            self.reader,
1702            StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1703                .take_until(future::select_all(self.graceful_shutdown_selector)),
1704            self.config,
1705        )
1706        .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1707    }
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712    use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1713
1714    use super::{APPLIER_REQUEUE_BUF_SIZE, Action};
1715    use crate::{
1716        Config, Controller, applier,
1717        reflector::{self, ObjectRef},
1718        watcher::{self, Event, watcher},
1719    };
1720    use futures::{Stream, StreamExt, TryStreamExt};
1721    use k8s_openapi::api::core::v1::ConfigMap;
1722    use kube_client::{
1723        Api, Resource,
1724        core::{ObjectMeta, PartialObjectMeta},
1725    };
1726    use serde::de::DeserializeOwned;
1727    use tokio::time::timeout;
1728
1729    fn assert_send<T: Send>(x: T) -> T {
1730        x
1731    }
1732
1733    // Used to typecheck that a type T is a generic type that implements Stream
1734    // and returns a WatchEvent generic over a resource `K`
1735    fn assert_stream<T, K>(x: T) -> T
1736    where
1737        T: Stream<Item = watcher::Result<Event<K>>> + Send,
1738        K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1739    {
1740        x
1741    }
1742
1743    fn mock_type<T>() -> T {
1744        unimplemented!(
1745            "mock_type is not supposed to be called, only used for filling holes in type assertions"
1746        )
1747    }
1748
1749    // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1750    #[allow(dead_code, unused_must_use)]
1751    fn test_controller_should_be_send() {
1752        assert_send(
1753            Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1754                |_, _| async { Ok(mock_type::<Action>()) },
1755                |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1756                Arc::new(()),
1757            ),
1758        );
1759    }
1760
1761    // not #[test] because we don't want to actually run it, we just want to
1762    // assert that it typechecks
1763    //
1764    // will check return types for `watcher` and `watcher with PartialObjectMeta` do not drift
1765    // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1766    #[allow(dead_code, unused_must_use)]
1767    fn test_watcher_stream_type_drift() {
1768        assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1769        assert_stream(watcher(
1770            mock_type::<Api<PartialObjectMeta<ConfigMap>>>(),
1771            Default::default(),
1772        ));
1773    }
1774
1775    #[tokio::test]
1776    async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1777        // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1778        // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1779
1780        // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1781        // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1782        let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1783        // Assume that everything's OK if we can reconcile every object 3 times on average
1784        let reconciles = items * 3;
1785
1786        let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1787        let (store_rx, mut store_tx) = reflector::store();
1788        let mut applier = pin!(applier(
1789            |_obj, _| {
1790                Box::pin(async move {
1791                    // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1792                    //println!("reconciling {:?}", obj.metadata.name);
1793                    Ok(Action::requeue(Duration::ZERO))
1794                })
1795            },
1796            |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1797            Arc::new(()),
1798            store_rx,
1799            queue_rx.map(Result::<_, Infallible>::Ok),
1800            Config::default(),
1801        ));
1802        store_tx.apply_watcher_event(&watcher::Event::InitDone);
1803        for i in 0..items {
1804            let obj = ConfigMap {
1805                metadata: ObjectMeta {
1806                    name: Some(format!("cm-{i}")),
1807                    namespace: Some("default".to_string()),
1808                    ..Default::default()
1809                },
1810                ..Default::default()
1811            };
1812            store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1813            queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1814        }
1815
1816        timeout(
1817            Duration::from_secs(10),
1818            applier
1819                .as_mut()
1820                .take(reconciles)
1821                .try_for_each(|_| async { Ok(()) }),
1822        )
1823        .await
1824        .expect("test timeout expired, applier likely deadlocked")
1825        .unwrap();
1826
1827        // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1828        drop(queue_tx);
1829        timeout(
1830            Duration::from_secs(10),
1831            applier.try_for_each(|_| async { Ok(()) }),
1832        )
1833        .await
1834        .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1835        .unwrap();
1836    }
1837}