kube_runtime/controller/mod.rs
1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4#[allow(deprecated)] use crate::watcher::metadata_watcher;
5use crate::{
6 reflector::{
7 self, ObjectRef, reflector,
8 store::{Store, Writer},
9 },
10 scheduler::{ScheduleRequest, debounced_scheduler},
11 utils::{
12 Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, trystream_try_via,
13 },
14 watcher::{self, DefaultBackoff, watcher},
15};
16use educe::Educe;
17use futures::{
18 FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, channel,
19 future::{self, BoxFuture},
20 stream,
21};
22use kube_client::api::{Api, DynamicObject, Resource};
23use pin_project::pin_project;
24use serde::de::DeserializeOwned;
25use std::{
26 fmt::{Debug, Display},
27 hash::Hash,
28 sync::Arc,
29 task::{Poll, ready},
30 time::Duration,
31};
32use stream::BoxStream;
33use thiserror::Error;
34use tokio::{runtime::Handle, time::Instant};
35use tracing::{Instrument, info_span};
36
37mod future_hash_map;
38mod runner;
39
40/// The reasons the internal runner can fail
41pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
42
43/// Errors returned by the applier and visible in a controller stream if inspecting it
44///
45/// WARNING: These errors do not terminate `Controller::run`, and are not passed to the `reconcile` fn
46/// as they exist primarily for diagnostics.
47///
48/// To inspect these errors, you can run a `for_each` on the run stream:
49///
50/// ```compile_fail
51/// Controller::new(api, watcher_config)
52/// .run(reconcile, error_policy, context)
53/// .for_each(|res| async move {
54/// match res {
55/// Ok(o) => info!("reconciled {:?}", o),
56/// /// Reconciler errors visible here:
57/// Err(e) => warn!("reconcile failed: {}", e),
58/// }
59/// })
60/// .await;
61/// ```
62#[derive(Debug, Error)]
63pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
64 /// A scheduled reconcile for an object refers to an object that no longer exists
65 ///
66 /// This is usually not a problem and often expected with certain relations.
67 /// See <https://github.com/kube-rs/kube/issues/1167#issuecomment-1636773541>
68 /// for a more detailed explanation of how/why this happens.
69 #[error("tried to reconcile object {0} that was not found in local store")]
70 ObjectNotFound(Box<ObjectRef<DynamicObject>>),
71
72 /// User's reconcile fn failed for the object
73 #[error("reconciler for object {1} failed")]
74 ReconcilerFailed(#[source] ReconcilerErr, Box<ObjectRef<DynamicObject>>),
75
76 /// The queue stream contained an error
77 #[error("event queue error")]
78 QueueError(#[source] QueueErr),
79
80 /// The internal runner returned an error
81 #[error("runner error")]
82 RunnerError(#[source] RunnerError),
83}
84
85/// Results of the reconciliation attempt
86#[derive(Debug, Clone, Eq, PartialEq)]
87pub struct Action {
88 /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
89 ///
90 /// For example, use this to query external systems for updates, expire time-limited resources, or
91 /// (in your `error_policy`) retry after errors.
92 requeue_after: Option<Duration>,
93}
94
95impl Action {
96 /// Action to the reconciliation at this time even if no external watch triggers hit
97 ///
98 /// This is the best-practice action that ensures eventual consistency of your controller
99 /// even in the case of missed changes (which can happen).
100 ///
101 /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
102 #[must_use]
103 pub const fn requeue(duration: Duration) -> Self {
104 Self {
105 requeue_after: Some(duration),
106 }
107 }
108
109 /// Do nothing until a change is detected
110 ///
111 /// This stops the controller periodically reconciling this object until a relevant watch event
112 /// was **detected**.
113 ///
114 /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
115 /// It is therefore not recommended to disable requeuing this way, unless you have
116 /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
117 #[must_use]
118 pub const fn await_change() -> Self {
119 Self { requeue_after: None }
120 }
121}
122
123/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
124pub fn trigger_with<T, K, I, S>(
125 stream: S,
126 mapper: impl Fn(T) -> I,
127) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
128where
129 S: TryStream<Ok = T>,
130 I: IntoIterator,
131 I::Item: Into<ReconcileRequest<K>>,
132 K: Resource,
133{
134 stream
135 .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
136 .try_flatten()
137}
138
139/// Enqueues the object itself for reconciliation
140pub fn trigger_self<K, S>(
141 stream: S,
142 dyntype: K::DynamicType,
143) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
144where
145 S: TryStream<Ok = K>,
146 K: Resource,
147 K::DynamicType: Clone,
148{
149 trigger_with(stream, move |obj| {
150 Some(ReconcileRequest {
151 obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
152 reason: ReconcileReason::ObjectUpdated,
153 })
154 })
155}
156
157/// Enqueues the object itself for reconciliation when the object is behind a
158/// shared pointer
159#[cfg(feature = "unstable-runtime-subscribe")]
160fn trigger_self_shared<K, S>(
161 stream: S,
162 dyntype: K::DynamicType,
163) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
164where
165 // Input stream has item as some Arc'd Resource (via
166 // Controller::for_shared_stream)
167 S: TryStream<Ok = Arc<K>>,
168 K: Resource,
169 K::DynamicType: Clone,
170{
171 trigger_with(stream, move |obj| {
172 Some(ReconcileRequest {
173 obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
174 reason: ReconcileReason::ObjectUpdated,
175 })
176 })
177}
178
179/// Enqueues any mapper returned `K` types for reconciliation
180fn trigger_others<S, K, I>(
181 stream: S,
182 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
183 dyntype: <S::Ok as Resource>::DynamicType,
184) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
185where
186 // Input stream has items as some Resource (via Controller::watches)
187 S: TryStream,
188 S::Ok: Resource,
189 <S::Ok as Resource>::DynamicType: Clone,
190 // Output stream is requests for the root type K
191 K: Resource,
192 K::DynamicType: Clone,
193 // but the mapper can produce many of them
194 I: 'static + IntoIterator<Item = ObjectRef<K>>,
195 I::IntoIter: Send,
196{
197 trigger_with(stream, move |obj| {
198 let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
199 mapper(obj)
200 .into_iter()
201 .map(move |mapped_obj_ref| ReconcileRequest {
202 obj_ref: mapped_obj_ref,
203 reason: ReconcileReason::RelatedObjectUpdated {
204 obj_ref: Box::new(watch_ref.clone()),
205 },
206 })
207 })
208}
209
210/// Enqueues any mapper returned `Arc<K>` types for reconciliation
211#[cfg(feature = "unstable-runtime-subscribe")]
212fn trigger_others_shared<S, O, K, I>(
213 stream: S,
214 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
215 dyntype: O::DynamicType,
216) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
217where
218 // Input is some shared resource (`Arc<O>`) obtained via `reflect`
219 S: TryStream<Ok = Arc<O>>,
220 O: Resource,
221 O::DynamicType: Clone,
222 // Output stream is requests for the root type K
223 K: Resource,
224 K::DynamicType: Clone,
225 // but the mapper can produce many of them
226 I: 'static + IntoIterator<Item = ObjectRef<K>>,
227 I::IntoIter: Send,
228{
229 trigger_with(stream, move |obj| {
230 let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
231 mapper(obj)
232 .into_iter()
233 .map(move |mapped_obj_ref| ReconcileRequest {
234 obj_ref: mapped_obj_ref,
235 reason: ReconcileReason::RelatedObjectUpdated {
236 obj_ref: Box::new(watch_ref.clone()),
237 },
238 })
239 })
240}
241
242/// Enqueues any owners of type `KOwner` for reconciliation
243pub fn trigger_owners<KOwner, S>(
244 stream: S,
245 owner_type: KOwner::DynamicType,
246 child_type: <S::Ok as Resource>::DynamicType,
247) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
248where
249 S: TryStream,
250 S::Ok: Resource,
251 <S::Ok as Resource>::DynamicType: Clone,
252 KOwner: Resource,
253 KOwner::DynamicType: Clone,
254{
255 let mapper = move |obj: S::Ok| {
256 let meta = obj.meta().clone();
257 let ns = meta.namespace;
258 let owner_type = owner_type.clone();
259 meta.owner_references
260 .into_iter()
261 .flatten()
262 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
263 };
264 trigger_others(stream, mapper, child_type)
265}
266
267// TODO: do we really need to deal with a trystream? can we simplify this at
268// all?
269/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
270/// owned `K` objects
271#[cfg(feature = "unstable-runtime-subscribe")]
272fn trigger_owners_shared<KOwner, S, K>(
273 stream: S,
274 owner_type: KOwner::DynamicType,
275 child_type: K::DynamicType,
276) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
277where
278 S: TryStream<Ok = Arc<K>>,
279 K: Resource,
280 K::DynamicType: Clone,
281 KOwner: Resource,
282 KOwner::DynamicType: Clone,
283{
284 let mapper = move |obj: S::Ok| {
285 let meta = obj.meta().clone();
286 let ns = meta.namespace;
287 let owner_type = owner_type.clone();
288 meta.owner_references
289 .into_iter()
290 .flatten()
291 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
292 };
293 trigger_others_shared(stream, mapper, child_type)
294}
295
296/// A request to reconcile an object, annotated with why that request was made.
297///
298/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
299/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
300/// In this case, only *the first* reason is stored.
301#[derive(Educe)]
302#[educe(
303 Debug(bound("K::DynamicType: Debug")),
304 Clone(bound("K::DynamicType: Clone")),
305 PartialEq(bound("K::DynamicType: PartialEq")),
306 Hash(bound("K::DynamicType: Hash"))
307)]
308pub struct ReconcileRequest<K: Resource> {
309 /// A reference to the object to be reconciled
310 pub obj_ref: ObjectRef<K>,
311 /// The reason for why reconciliation was requested
312 #[educe(PartialEq(ignore), Hash(ignore))]
313 pub reason: ReconcileReason,
314}
315
316impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
317
318impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
319 fn from(obj_ref: ObjectRef<K>) -> Self {
320 ReconcileRequest {
321 obj_ref,
322 reason: ReconcileReason::Unknown,
323 }
324 }
325}
326
327/// The reason a reconcile was requested
328///
329/// Note that this reason is deliberately hidden from the reconciler.
330/// See <https://kube.rs/controllers/reconciler/#reasons-for-reconciliation>.
331#[derive(Debug, Clone)]
332pub enum ReconcileReason {
333 /// A custom reconcile triggered via `reconcile_on`
334 Unknown,
335
336 /// The main object was updated.
337 ObjectUpdated,
338
339 /// A related object was updated through a mapper
340 ///
341 /// The related object traversed its relation up to the object kind you are reconciling.
342 /// Your object may not have changed, but you may need to update child objects.
343 RelatedObjectUpdated {
344 /// An object ref to the related object
345 obj_ref: Box<ObjectRef<DynamicObject>>,
346 },
347
348 /// The users `reconcile` scheduled a reconciliation via an `Action`
349 ReconcilerRequestedRetry,
350
351 /// The users `error_policy` scheduled a reconciliation via an `Action`
352 ErrorPolicyRequestedRetry,
353
354 /// A bulk reconcile was requested via `reconcile_all_on`
355 BulkReconcile,
356
357 /// A custom reconcile reason for custom integrations.
358 ///
359 /// Can be used when injecting elements into the queue stream directly.
360 Custom {
361 /// A user provided reason through a custom integration
362 reason: String,
363 },
364}
365
366impl Display for ReconcileReason {
367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 match self {
369 ReconcileReason::Unknown => f.write_str("unknown"),
370 ReconcileReason::ObjectUpdated => f.write_str("object updated"),
371 ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
372 f.write_fmt(format_args!("related object updated: {object}"))
373 }
374 ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
375 ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
376 ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
377 ReconcileReason::Custom { reason } => f.write_str(reason),
378 }
379 }
380}
381
382const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
383
384/// Apply a reconciler to an input stream, with a given retry policy
385///
386/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
387///
388/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
389/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
390/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
391/// with a [`watcher()`] or [`reflector()`] for the subobject.
392///
393/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
394/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
395#[allow(clippy::needless_pass_by_value)]
396#[allow(clippy::type_complexity)]
397pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
398 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
399 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
400 context: Arc<Ctx>,
401 store: Store<K>,
402 queue: QueueStream,
403 config: Config,
404) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
405where
406 K: Clone + Resource + 'static,
407 K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
408 ReconcilerFut: TryFuture<Ok = Action> + Unpin,
409 ReconcilerFut::Error: std::error::Error + 'static,
410 QueueStream: TryStream,
411 QueueStream::Ok: Into<ReconcileRequest<K>>,
412 QueueStream::Error: std::error::Error + 'static,
413{
414 let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
415 let (scheduler_tx, scheduler_rx) =
416 channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
417 let error_policy = Arc::new(error_policy);
418 let delay_store = store.clone();
419 // Create a stream of ObjectRefs that need to be reconciled
420 trystream_try_via(
421 // input: stream combining scheduled tasks and user specified inputs event
422 Box::pin(stream::select(
423 // 1. inputs from users queue stream
424 queue
425 .map_err(Error::QueueError)
426 .map_ok(|request| ScheduleRequest {
427 message: request.into(),
428 run_at: Instant::now(),
429 })
430 .on_complete(async move {
431 // On error: scheduler has already been shut down and there is nothing for us to do
432 let _ = scheduler_shutdown_tx.send(());
433 tracing::debug!("applier queue terminated, starting graceful shutdown")
434 }),
435 // 2. requests sent to scheduler_tx
436 scheduler_rx
437 .map(Ok)
438 .take_until(scheduler_shutdown_rx)
439 .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
440 )),
441 // all the Oks from the select gets passed through the scheduler stream, and are then executed
442 move |s| {
443 Runner::new(
444 debounced_scheduler(s, config.debounce),
445 config.concurrency,
446 move |request| {
447 let request = request.clone();
448 match store.get(&request.obj_ref) {
449 Some(obj) => {
450 let scheduler_tx = scheduler_tx.clone();
451 let error_policy_ctx = context.clone();
452 let error_policy = error_policy.clone();
453 let reconciler_span = info_span!(
454 "reconciling object",
455 "object.ref" = %request.obj_ref,
456 object.reason = %request.reason
457 );
458 TryFutureExt::into_future(
459 reconciler_span.in_scope(|| reconciler(Arc::clone(&obj), context.clone())),
460 )
461 .then(move |res| {
462 let error_policy = error_policy;
463 RescheduleReconciliation::new(
464 res,
465 |err| error_policy(obj, err, error_policy_ctx),
466 request.obj_ref.clone(),
467 scheduler_tx,
468 )
469 // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
470 // to them separately
471 .map(|res| Ok((request.obj_ref, res)))
472 })
473 .instrument(reconciler_span)
474 .left_future()
475 }
476 None => {
477 std::future::ready(Err(Error::ObjectNotFound(Box::new(request.obj_ref.erase()))))
478 .right_future()
479 }
480 }
481 },
482 )
483 .delay_tasks_until(async move {
484 tracing::debug!("applier runner held until store is ready");
485 let res = delay_store.wait_until_ready().await;
486 tracing::debug!("store is ready, starting runner");
487 res
488 })
489 .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
490 .on_complete(async { tracing::debug!("applier runner terminated") })
491 },
492 )
493 .on_complete(async { tracing::debug!("applier runner-merge terminated") })
494 // finally, for each completed reconcile call:
495 .and_then(move |(obj_ref, reconciler_result)| async move {
496 match reconciler_result {
497 Ok(action) => Ok((obj_ref, action)),
498 Err(err) => Err(Error::ReconcilerFailed(err, Box::new(obj_ref.erase()))),
499 }
500 })
501 .on_complete(async { tracing::debug!("applier terminated") })
502}
503
504/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
505///
506/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
507#[pin_project]
508#[must_use]
509struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
510 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
511
512 reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
513 result: Option<Result<Action, ReconcilerErr>>,
514}
515
516impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
517where
518 K: Resource,
519{
520 fn new(
521 result: Result<Action, ReconcilerErr>,
522 error_policy: impl FnOnce(&ReconcilerErr) -> Action,
523 obj_ref: ObjectRef<K>,
524 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
525 ) -> Self {
526 let reconciler_finished_at = Instant::now();
527
528 let (action, reschedule_reason) = result.as_ref().map_or_else(
529 |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
530 |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
531 );
532
533 Self {
534 reschedule_tx,
535 reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
536 message: ReconcileRequest {
537 obj_ref,
538 reason: reschedule_reason,
539 },
540 run_at: reconciler_finished_at
541 .checked_add(requeue_after)
542 .unwrap_or_else(crate::scheduler::max_schedule_time),
543 }),
544 result: Some(result),
545 }
546 }
547}
548
549impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
550where
551 K: Resource,
552{
553 type Output = Result<Action, ReconcilerErr>;
554
555 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
556 let this = self.get_mut();
557
558 if this.reschedule_request.is_some() {
559 let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
560 let reschedule_request = this
561 .reschedule_request
562 .take()
563 .expect("PostReconciler::reschedule_request was taken during processing");
564 // Failure to schedule item = in graceful shutdown mode, ignore
565 if let Ok(()) = rescheduler_ready {
566 let _ = this.reschedule_tx.start_send(reschedule_request);
567 }
568 }
569
570 Poll::Ready(
571 this.result
572 .take()
573 .expect("PostReconciler::result was already taken"),
574 )
575 }
576}
577
578/// Accumulates all options that can be used on a [`Controller`] invocation.
579#[derive(Clone, Debug, Default)]
580pub struct Config {
581 debounce: Duration,
582 concurrency: u16,
583}
584
585impl Config {
586 /// The debounce duration used to deduplicate reconciliation requests.
587 ///
588 /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
589 /// resulting in __trailing edge debouncing__ of reconciler requests.
590 /// This option can help to reduce the amount of unnecessary reconciler calls
591 /// when using multiple controller relations, or during rapid phase transitions.
592 ///
593 /// ## Warning
594 /// This option delays (and keeps delaying) reconcile requests for objects while
595 /// the object is updated. It can **permanently hide** updates from your reconciler
596 /// if set too high on objects that are updated frequently (like nodes).
597 #[must_use]
598 pub fn debounce(mut self, debounce: Duration) -> Self {
599 self.debounce = debounce;
600 self
601 }
602
603 /// The number of concurrent reconciliations of that are allowed to run at an given moment.
604 ///
605 /// This can be adjusted to the controller's needs to increase
606 /// performance and/or make performance predictable. By default, its 0 meaning
607 /// the controller runs with unbounded concurrency.
608 ///
609 /// Note that despite concurrency, a controller never schedules concurrent reconciles
610 /// on the same object.
611 #[must_use]
612 pub fn concurrency(mut self, concurrency: u16) -> Self {
613 self.concurrency = concurrency;
614 self
615 }
616}
617
618/// Controller for a Resource `K`
619///
620/// A controller is an infinite stream of objects to be reconciled.
621///
622/// Once `run` and continuously awaited, it continuously calls out to user provided
623/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
624/// or if errors are seen from `reconcile`.
625///
626/// Reconciles are generally requested for all changes on your root objects.
627/// Changes to managed child resources will also trigger the reconciler for the
628/// managing object by traversing owner references (for `Controller::owns`),
629/// or traverse a custom mapping (for `Controller::watches`).
630///
631/// This mapping mechanism ultimately hides the reason for the reconciliation request,
632/// and forces you to write an idempotent reconciler.
633///
634/// General setup:
635/// ```no_run
636/// use kube::{Api, Client, CustomResource};
637/// use kube::runtime::{controller::{Controller, Action}, watcher};
638/// # use serde::{Deserialize, Serialize};
639/// # use tokio::time::Duration;
640/// use futures::StreamExt;
641/// use k8s_openapi::api::core::v1::ConfigMap;
642/// use schemars::JsonSchema;
643/// # use std::sync::Arc;
644/// use thiserror::Error;
645///
646/// #[derive(Debug, Error)]
647/// enum Error {}
648///
649/// /// A custom resource
650/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
651/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
652/// struct ConfigMapGeneratorSpec {
653/// content: String,
654/// }
655///
656/// /// The reconciler that will be called when either object change
657/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
658/// // .. use api here to reconcile a child ConfigMap with ownerreferences
659/// // see configmapgen_controller example for full info
660/// Ok(Action::requeue(Duration::from_secs(300)))
661/// }
662/// /// an error handler that will be called when the reconciler fails with access to both the
663/// /// object that caused the failure and the actual error
664/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
665/// Action::requeue(Duration::from_secs(60))
666/// }
667///
668/// /// something to drive the controller
669///
670/// async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
671/// # let client: Client = todo!();
672/// let context = Arc::new(()); // bad empty context - put client in here
673/// let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
674/// let cms = Api::<ConfigMap>::all(client.clone());
675/// Controller::new(cmgs, watcher::Config::default())
676/// .owns(cms, watcher::Config::default())
677/// .run(reconcile, error_policy, context)
678/// .for_each(|res| async move {
679/// match res {
680/// Ok(o) => println!("reconciled {:?}", o),
681/// Err(e) => println!("reconcile failed: {:?}", e),
682/// }
683/// })
684/// .await; // controller does nothing unless polled
685/// # Ok(())
686/// # }
687/// ```
688pub struct Controller<K>
689where
690 K: Clone + Resource + Debug + 'static,
691 K::DynamicType: Eq + Hash,
692{
693 // NB: Need to Unpin for stream::select_all
694 trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
695 trigger_backoff: Box<dyn Backoff + Send>,
696 /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
697 /// refusing to start any new reconciliations but letting any existing ones finish.
698 graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
699 /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
700 /// requesting that all running reconciliations be aborted.
701 /// However, note that they *will* keep running until their next yield point (`.await`),
702 /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
703 forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
704 dyntype: K::DynamicType,
705 reader: Store<K>,
706 config: Config,
707}
708
709impl<K> Controller<K>
710where
711 K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
712 K::DynamicType: Eq + Hash + Clone,
713{
714 /// Create a Controller for a resource `K`
715 ///
716 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
717 ///
718 /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
719 /// and receive reconcile events for.
720 /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
721 #[must_use]
722 pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
723 where
724 K::DynamicType: Default,
725 {
726 Self::new_with(main_api, wc, Default::default())
727 }
728
729 /// Create a Controller for a resource `K`
730 ///
731 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
732 ///
733 /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
734 /// to watch - in the Api's configured scope - and receive reconcile events for.
735 /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
736 ///
737 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
738 ///
739 /// [`watcher::Config`]: crate::watcher::Config
740 /// [`Api`]: kube_client::Api
741 /// [`dynamic`]: kube_client::core::dynamic
742 /// [`Config::default`]: crate::watcher::Config::default
743 pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
744 let writer = Writer::<K>::new(dyntype.clone());
745 let reader = writer.as_reader();
746 let mut trigger_selector = stream::SelectAll::new();
747 let self_watcher = trigger_self(
748 reflector(writer, watcher(main_api, wc)).applied_objects(),
749 dyntype.clone(),
750 )
751 .boxed();
752 trigger_selector.push(self_watcher);
753 Self {
754 trigger_selector,
755 trigger_backoff: Box::<DefaultBackoff>::default(),
756 graceful_shutdown_selector: vec![
757 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
758 future::pending().boxed(),
759 ],
760 forceful_shutdown_selector: vec![
761 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
762 future::pending().boxed(),
763 ],
764 dyntype,
765 reader,
766 config: Default::default(),
767 }
768 }
769
770 /// Create a Controller for a resource `K` from a stream of `K` objects
771 ///
772 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
773 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
774 /// as well as sharing input streams between multiple controllers.
775 ///
776 /// # Example:
777 ///
778 /// ```no_run
779 /// # use futures::StreamExt;
780 /// # use k8s_openapi::api::apps::v1::Deployment;
781 /// # use kube::runtime::controller::{Action, Controller};
782 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
783 /// # use kube::{Api, Client, Error, ResourceExt};
784 /// # use std::sync::Arc;
785 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
786 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
787 /// # async fn doc(client: kube::Client) {
788 /// let api: Api<Deployment> = Api::default_namespaced(client);
789 /// let (reader, writer) = reflector::store();
790 /// let deploys = watcher(api, watcher::Config::default())
791 /// .default_backoff()
792 /// .reflect(writer)
793 /// .applied_objects()
794 /// .predicate_filter(predicates::generation, Default::default());
795 ///
796 /// Controller::for_stream(deploys, reader)
797 /// .run(reconcile, error_policy, Arc::new(()))
798 /// .for_each(|_| std::future::ready(()))
799 /// .await;
800 /// # }
801 /// ```
802 ///
803 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
804 #[cfg(feature = "unstable-runtime-stream-control")]
805 pub fn for_stream(
806 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
807 reader: Store<K>,
808 ) -> Self
809 where
810 K::DynamicType: Default,
811 {
812 Self::for_stream_with(trigger, reader, Default::default())
813 }
814
815 /// Create a Controller for a resource `K` from a stream of `K` objects
816 ///
817 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
818 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
819 /// as well as sharing input streams between multiple controllers.
820 ///
821 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
822 ///
823 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
824 ///
825 /// [`dynamic`]: kube_client::core::dynamic
826 #[cfg(feature = "unstable-runtime-stream-control")]
827 pub fn for_stream_with(
828 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
829 reader: Store<K>,
830 dyntype: K::DynamicType,
831 ) -> Self {
832 let mut trigger_selector = stream::SelectAll::new();
833 let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
834 trigger_selector.push(self_watcher);
835 Self {
836 trigger_selector,
837 trigger_backoff: Box::<DefaultBackoff>::default(),
838 graceful_shutdown_selector: vec![
839 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
840 future::pending().boxed(),
841 ],
842 forceful_shutdown_selector: vec![
843 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
844 future::pending().boxed(),
845 ],
846 dyntype,
847 reader,
848 config: Default::default(),
849 }
850 }
851
852 /// This is the same as [`Controller::for_stream`]. Instead of taking an
853 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
854 /// streams can be created out-of-band by subscribing on a store `Writer`.
855 /// Through this interface, multiple controllers can use the same root
856 /// (shared) input stream of resources to keep memory overheads smaller.
857 ///
858 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
859 /// need to share the stream.
860 ///
861 /// ## Warning:
862 ///
863 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
864 /// is driven to readiness independently of this controller to ensure the
865 /// watcher never deadlocks.
866 ///
867 /// # Example:
868 ///
869 /// ```no_run
870 /// # use futures::StreamExt;
871 /// # use k8s_openapi::api::apps::v1::Deployment;
872 /// # use kube::runtime::controller::{Action, Controller};
873 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
874 /// # use kube::{Api, Client, Error, ResourceExt};
875 /// # use std::sync::Arc;
876 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
877 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
878 /// # async fn doc(client: kube::Client) {
879 /// let api: Api<Deployment> = Api::default_namespaced(client);
880 /// let (reader, writer) = reflector::store_shared(128);
881 /// let subscriber = writer
882 /// .subscribe()
883 /// .expect("subscribers can only be created from shared stores");
884 /// let deploys = watcher(api, watcher::Config::default())
885 /// .default_backoff()
886 /// .reflect(writer)
887 /// .applied_objects()
888 /// .for_each(|ev| async move {
889 /// match ev {
890 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
891 /// Err(error) => tracing::error!(%error, "received error")
892 /// }
893 /// });
894 ///
895 /// let controller = Controller::for_shared_stream(subscriber, reader)
896 /// .run(reconcile, error_policy, Arc::new(()))
897 /// .for_each(|ev| async move {
898 /// tracing::info!("reconciled {ev:?}")
899 /// });
900 ///
901 /// // Drive streams using a select statement
902 /// tokio::select! {
903 /// _ = deploys => {},
904 /// _ = controller => {},
905 /// }
906 /// # }
907 #[cfg(feature = "unstable-runtime-subscribe")]
908 pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
909 where
910 K::DynamicType: Default,
911 {
912 Self::for_shared_stream_with(trigger, reader, Default::default())
913 }
914
915 /// This is the same as [`Controller::for_stream`]. Instead of taking an
916 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
917 /// streams can be created out-of-band by subscribing on a store `Writer`.
918 /// Through this interface, multiple controllers can use the same root
919 /// (shared) input stream of resources to keep memory overheads smaller.
920 ///
921 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
922 /// need to share the stream.
923 ///
924 /// This variant constructor is used for [`dynamic`] types found through
925 /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
926 /// known at compile time).
927 ///
928 /// [`dynamic`]: kube_client::core::dynamic
929 #[cfg(feature = "unstable-runtime-subscribe")]
930 pub fn for_shared_stream_with(
931 trigger: impl Stream<Item = Arc<K>> + Send + 'static,
932 reader: Store<K>,
933 dyntype: K::DynamicType,
934 ) -> Self {
935 let mut trigger_selector = stream::SelectAll::new();
936 let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
937 trigger_selector.push(self_watcher);
938 Self {
939 trigger_selector,
940 trigger_backoff: Box::<DefaultBackoff>::default(),
941 graceful_shutdown_selector: vec![
942 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
943 future::pending().boxed(),
944 ],
945 forceful_shutdown_selector: vec![
946 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
947 future::pending().boxed(),
948 ],
949 dyntype,
950 reader,
951 config: Default::default(),
952 }
953 }
954
955 /// Specify the configuration for the controller's behavior.
956 #[must_use]
957 pub fn with_config(mut self, config: Config) -> Self {
958 self.config = config;
959 self
960 }
961
962 /// Specify the backoff policy for "trigger" watches
963 ///
964 /// This includes the core watch, as well as auxiliary watches introduced by [`Self::owns`] and [`Self::watches`].
965 ///
966 /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
967 /// but can be overridden by calling this method.
968 #[must_use]
969 pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
970 self.trigger_backoff = Box::new(backoff);
971 self
972 }
973
974 /// Retrieve a copy of the reader before starting the controller
975 pub fn store(&self) -> Store<K> {
976 self.reader.clone()
977 }
978
979 /// Specify `Child` objects which `K` owns and should be watched
980 ///
981 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
982 /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
983 ///
984 /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
985 /// to watch - in the Api's configured scope - and receive reconcile events for.
986 /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
987 ///
988 /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
989 #[must_use]
990 pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
991 self,
992 api: Api<Child>,
993 wc: watcher::Config,
994 ) -> Self {
995 self.owns_with(api, (), wc)
996 }
997
998 /// Specify `Child` objects which `K` owns and should be watched
999 ///
1000 /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
1001 #[must_use]
1002 pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
1003 mut self,
1004 api: Api<Child>,
1005 dyntype: Child::DynamicType,
1006 wc: watcher::Config,
1007 ) -> Self
1008 where
1009 Child::DynamicType: Debug + Eq + Hash + Clone,
1010 {
1011 // TODO: call owns_stream_with when it's stable
1012 #[allow(deprecated)]
1013 let child_watcher = trigger_owners(
1014 metadata_watcher(api, wc).touched_objects(),
1015 self.dyntype.clone(),
1016 dyntype,
1017 );
1018 self.trigger_selector.push(child_watcher.boxed());
1019 self
1020 }
1021
1022 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1023 ///
1024 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1025 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1026 /// as well as sharing input streams between multiple controllers.
1027 ///
1028 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1029 ///
1030 /// # Example:
1031 ///
1032 /// ```no_run
1033 /// # use futures::StreamExt;
1034 /// # use k8s_openapi::api::core::v1::ConfigMap;
1035 /// # use k8s_openapi::api::apps::v1::StatefulSet;
1036 /// # use kube::runtime::controller::Action;
1037 /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
1038 /// # use kube::{Api, Client, Error, ResourceExt};
1039 /// # use std::sync::Arc;
1040 /// # type CustomResource = ConfigMap;
1041 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1042 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1043 /// # async fn doc(client: kube::Client) {
1044 /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
1045 /// .touched_objects()
1046 /// .predicate_filter(predicates::generation, Default::default());
1047 ///
1048 /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
1049 /// .owns_stream(sts_stream)
1050 /// .run(reconcile, error_policy, Arc::new(()))
1051 /// .for_each(|_| std::future::ready(()))
1052 /// .await;
1053 /// # }
1054 /// ```
1055 #[cfg(feature = "unstable-runtime-stream-control")]
1056 #[must_use]
1057 pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1058 self,
1059 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1060 ) -> Self {
1061 self.owns_stream_with(trigger, ())
1062 }
1063
1064 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1065 ///
1066 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1067 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1068 /// as well as sharing input streams between multiple controllers.
1069 ///
1070 /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1071 #[cfg(feature = "unstable-runtime-stream-control")]
1072 #[must_use]
1073 pub fn owns_stream_with<Child: Resource + Send + 'static>(
1074 mut self,
1075 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1076 dyntype: Child::DynamicType,
1077 ) -> Self
1078 where
1079 Child::DynamicType: Debug + Eq + Hash + Clone,
1080 {
1081 let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1082 self.trigger_selector.push(child_watcher.boxed());
1083 self
1084 }
1085
1086 /// This is the same as [`Controller::for_stream`]. Instead of taking an
1087 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1088 /// streams can be created out-of-band by subscribing on a store `Writer`.
1089 /// Through this interface, multiple controllers can use the same root
1090 /// (shared) input stream of resources to keep memory overheads smaller.
1091 ///
1092 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1093 /// need to share the stream.
1094 ///
1095 /// ## Warning:
1096 ///
1097 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1098 /// is driven to readiness independently of this controller to ensure the
1099 /// watcher never deadlocks.
1100 ///
1101 ///
1102 /// Trigger the reconciliation process for a shared stream of `Child`
1103 /// objects of the owner `K`
1104 ///
1105 /// Conceptually the same as [`Controller::owns`], but a stream is used
1106 /// instead of an `Api`. This interface behaves similarly to its non-shared
1107 /// counterpart [`Controller::owns_stream`].
1108 ///
1109 /// # Example:
1110 ///
1111 /// ```no_run
1112 /// # use futures::StreamExt;
1113 /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1114 /// # use kube::runtime::controller::{Action, Controller};
1115 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1116 /// # use kube::{Api, Client, Error, ResourceExt};
1117 /// # use std::sync::Arc;
1118 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1119 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1120 /// # async fn doc(client: kube::Client) {
1121 /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1122 /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1123 ///
1124 /// let (reader, writer) = reflector::store_shared(128);
1125 /// let subscriber = writer
1126 /// .subscribe()
1127 /// .expect("subscribers can only be created from shared stores");
1128 /// let pods = watcher(pod_api, watcher::Config::default())
1129 /// .default_backoff()
1130 /// .reflect(writer)
1131 /// .applied_objects()
1132 /// .for_each(|ev| async move {
1133 /// match ev {
1134 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
1135 /// Err(error) => tracing::error!(%error, "received error")
1136 /// }
1137 /// });
1138 ///
1139 /// let controller = Controller::new(deploys, Default::default())
1140 /// .owns_shared_stream(subscriber)
1141 /// .run(reconcile, error_policy, Arc::new(()))
1142 /// .for_each(|ev| async move {
1143 /// tracing::info!("reconciled {ev:?}")
1144 /// });
1145 ///
1146 /// // Drive streams using a select statement
1147 /// tokio::select! {
1148 /// _ = pods => {},
1149 /// _ = controller => {},
1150 /// }
1151 /// # }
1152 #[cfg(feature = "unstable-runtime-subscribe")]
1153 #[must_use]
1154 pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1155 self,
1156 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1157 ) -> Self {
1158 self.owns_shared_stream_with(trigger, ())
1159 }
1160
1161 /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1162 ///
1163 /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1164 /// The source stream can be shared between multiple controllers, optimising
1165 /// resource usage.
1166 ///
1167 /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1168 #[cfg(feature = "unstable-runtime-subscribe")]
1169 #[must_use]
1170 pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1171 mut self,
1172 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1173 dyntype: Child::DynamicType,
1174 ) -> Self
1175 where
1176 Child::DynamicType: Debug + Eq + Hash + Clone,
1177 {
1178 let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1179 self.trigger_selector.push(child_watcher.boxed());
1180 self
1181 }
1182
1183 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1184 ///
1185 /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1186 /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1187 ///
1188 /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1189 ///
1190 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1191 ///
1192 /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1193 /// to watch - in the Api's configured scope - and run through the custom mapper.
1194 /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1195 ///
1196 /// # Example
1197 ///
1198 /// Tracking cross cluster references using the [Operator-SDK] annotations.
1199 ///
1200 /// ```
1201 /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1202 /// # use kube::{Api, ResourceExt};
1203 /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1204 /// # use futures::StreamExt;
1205 /// # use std::sync::Arc;
1206 /// # type WatchedResource = Namespace;
1207 /// # struct Context;
1208 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1209 /// # Ok(Action::await_change())
1210 /// # };
1211 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1212 /// # Action::await_change()
1213 /// # }
1214 /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1215 /// # let memcached = Api::<ConfigMap>::all(client.clone());
1216 /// # let context = Arc::new(Context);
1217 /// Controller::new(memcached, watcher::Config::default())
1218 /// .watches(
1219 /// Api::<WatchedResource>::all(client.clone()),
1220 /// watcher::Config::default(),
1221 /// |ar| {
1222 /// let prt = ar
1223 /// .annotations()
1224 /// .get("operator-sdk/primary-resource-type")
1225 /// .map(String::as_str);
1226 ///
1227 /// if prt != Some("Memcached.cache.example.com") {
1228 /// return None;
1229 /// }
1230 ///
1231 /// let (namespace, name) = ar
1232 /// .annotations()
1233 /// .get("operator-sdk/primary-resource")?
1234 /// .split_once('/')?;
1235 ///
1236 /// Some(ObjectRef::new(name).within(namespace))
1237 /// }
1238 /// )
1239 /// .run(reconcile, error_policy, context)
1240 /// .for_each(|_| futures::future::ready(()))
1241 /// .await;
1242 /// # Ok(())
1243 /// # }
1244 /// ```
1245 ///
1246 /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1247 #[must_use]
1248 pub fn watches<Other, I>(
1249 self,
1250 api: Api<Other>,
1251 wc: watcher::Config,
1252 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1253 ) -> Self
1254 where
1255 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1256 Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1257 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1258 I::IntoIter: Send,
1259 {
1260 self.watches_with(api, Default::default(), wc, mapper)
1261 }
1262
1263 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1264 ///
1265 /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1266 #[must_use]
1267 pub fn watches_with<Other, I>(
1268 mut self,
1269 api: Api<Other>,
1270 dyntype: Other::DynamicType,
1271 wc: watcher::Config,
1272 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1273 ) -> Self
1274 where
1275 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1276 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1277 I::IntoIter: Send,
1278 Other::DynamicType: Debug + Clone + Eq + Hash,
1279 {
1280 let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1281 self.trigger_selector.push(other_watcher.boxed());
1282 self
1283 }
1284
1285 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1286 ///
1287 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1288 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1289 /// as well as sharing input streams between multiple controllers.
1290 ///
1291 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1292 ///
1293 /// # Example:
1294 ///
1295 /// ```no_run
1296 /// # use futures::StreamExt;
1297 /// # use k8s_openapi::api::core::v1::ConfigMap;
1298 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1299 /// # use kube::runtime::controller::Action;
1300 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1301 /// # use kube::{Api, Client, Error, ResourceExt};
1302 /// # use std::sync::Arc;
1303 /// # type CustomResource = ConfigMap;
1304 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1305 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1306 /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1307 /// # async fn doc(client: kube::Client) {
1308 /// let api: Api<DaemonSet> = Api::all(client.clone());
1309 /// let cr: Api<CustomResource> = Api::all(client.clone());
1310 /// let daemons = watcher(api, watcher::Config::default())
1311 /// .touched_objects()
1312 /// .predicate_filter(predicates::generation, Default::default());
1313 ///
1314 /// Controller::new(cr, watcher::Config::default())
1315 /// .watches_stream(daemons, mapper)
1316 /// .run(reconcile, error_policy, Arc::new(()))
1317 /// .for_each(|_| std::future::ready(()))
1318 /// .await;
1319 /// # }
1320 /// ```
1321 #[cfg(feature = "unstable-runtime-stream-control")]
1322 #[must_use]
1323 pub fn watches_stream<Other, I>(
1324 self,
1325 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1326 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1327 ) -> Self
1328 where
1329 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1330 Other::DynamicType: Default + Debug + Clone,
1331 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1332 I::IntoIter: Send,
1333 {
1334 self.watches_stream_with(trigger, mapper, Default::default())
1335 }
1336
1337 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1338 ///
1339 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1340 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1341 /// as well as sharing input streams between multiple controllers.
1342 ///
1343 /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1344 #[cfg(feature = "unstable-runtime-stream-control")]
1345 #[must_use]
1346 pub fn watches_stream_with<Other, I>(
1347 mut self,
1348 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1349 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1350 dyntype: Other::DynamicType,
1351 ) -> Self
1352 where
1353 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1354 Other::DynamicType: Debug + Clone,
1355 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1356 I::IntoIter: Send,
1357 {
1358 let other_watcher = trigger_others(trigger, mapper, dyntype);
1359 self.trigger_selector.push(other_watcher.boxed());
1360 self
1361 }
1362
1363 /// Trigger the reconciliation process for a shared stream of `Other`
1364 /// objects related to a `K`
1365 ///
1366 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1367 /// stream of resources is used. This allows for sharing input streams
1368 /// between multiple controllers.
1369 ///
1370 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1371 ///
1372 /// # Example:
1373 ///
1374 /// ```no_run
1375 /// # use futures::StreamExt;
1376 /// # use k8s_openapi::api::core::v1::ConfigMap;
1377 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1378 /// # use kube::runtime::controller::Action;
1379 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1380 /// # use kube::{Api, Client, Error, ResourceExt};
1381 /// # use std::sync::Arc;
1382 /// # type CustomResource = ConfigMap;
1383 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1384 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1385 /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1386 /// # async fn doc(client: kube::Client) {
1387 /// let api: Api<DaemonSet> = Api::all(client.clone());
1388 /// let cr: Api<CustomResource> = Api::all(client.clone());
1389 /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1390 /// let subscriber = writer
1391 /// .subscribe()
1392 /// .expect("subscribers can only be created from shared stores");
1393 /// let daemons = watcher(api, watcher::Config::default())
1394 /// .reflect(writer)
1395 /// .touched_objects()
1396 /// .for_each(|ev| async move {
1397 /// match ev {
1398 /// Ok(obj) => {},
1399 /// Err(error) => tracing::error!(%error, "received err")
1400 /// }
1401 /// });
1402 ///
1403 /// let controller = Controller::new(cr, watcher::Config::default())
1404 /// .watches_shared_stream(subscriber, mapper)
1405 /// .run(reconcile, error_policy, Arc::new(()))
1406 /// .for_each(|_| std::future::ready(()));
1407 ///
1408 /// // Drive streams using a select statement
1409 /// tokio::select! {
1410 /// _ = daemons => {},
1411 /// _ = controller => {},
1412 /// }
1413 /// # }
1414 /// ```
1415 #[cfg(feature = "unstable-runtime-subscribe")]
1416 #[must_use]
1417 pub fn watches_shared_stream<Other, I>(
1418 self,
1419 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1420 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1421 ) -> Self
1422 where
1423 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1424 Other::DynamicType: Default + Debug + Clone,
1425 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1426 I::IntoIter: Send,
1427 {
1428 self.watches_shared_stream_with(trigger, mapper, Default::default())
1429 }
1430
1431 /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1432 ///
1433 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1434 /// stream of resources is used. This allows for sharing of streams between
1435 /// multiple controllers.
1436 ///
1437 /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1438 #[cfg(feature = "unstable-runtime-subscribe")]
1439 #[must_use]
1440 pub fn watches_shared_stream_with<Other, I>(
1441 mut self,
1442 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1443 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1444 dyntype: Other::DynamicType,
1445 ) -> Self
1446 where
1447 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1448 Other::DynamicType: Debug + Clone,
1449 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1450 I::IntoIter: Send,
1451 {
1452 let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1453 self.trigger_selector.push(other_watcher.boxed());
1454 self
1455 }
1456
1457 /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1458 ///
1459 /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1460 ///
1461 /// To reconcile all objects when a new line is entered:
1462 ///
1463 /// ```
1464 /// # async {
1465 /// use futures::stream::StreamExt;
1466 /// use k8s_openapi::api::core::v1::ConfigMap;
1467 /// use kube::{
1468 /// Client,
1469 /// api::{Api, ResourceExt},
1470 /// runtime::{
1471 /// controller::{Controller, Action},
1472 /// watcher,
1473 /// },
1474 /// };
1475 /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1476 /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1477 /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1478 /// // and its worker prevents the Tokio runtime from shutting down.
1479 /// std::thread::spawn(move || {
1480 /// for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1481 /// let _ = reload_tx.try_send(());
1482 /// }
1483 /// });
1484 /// Controller::new(
1485 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1486 /// watcher::Config::default(),
1487 /// )
1488 /// .reconcile_all_on(reload_rx.map(|_| ()))
1489 /// .run(
1490 /// |o, _| async move {
1491 /// println!("Reconciling {}", o.name_any());
1492 /// Ok(Action::await_change())
1493 /// },
1494 /// |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1495 /// Arc::new(()),
1496 /// );
1497 /// # };
1498 /// ```
1499 ///
1500 /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1501 ///
1502 /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1503 #[must_use]
1504 pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1505 let store = self.store();
1506 let dyntype = self.dyntype.clone();
1507 self.trigger_selector.push(
1508 trigger
1509 .flat_map(move |()| {
1510 let dyntype = dyntype.clone();
1511 stream::iter(store.state().into_iter().map(move |obj| {
1512 Ok(ReconcileRequest {
1513 obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1514 reason: ReconcileReason::BulkReconcile,
1515 })
1516 }))
1517 })
1518 .boxed(),
1519 );
1520 self
1521 }
1522
1523 /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1524 ///
1525 /// This can be used to inject reconciliations for specific objects from an external resource.
1526 ///
1527 /// # Example:
1528 ///
1529 /// ```no_run
1530 /// # async {
1531 /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1532 /// # use k8s_openapi::api::core::v1::{ConfigMap};
1533 /// # use kube::api::Api;
1534 /// # use kube::runtime::controller::Action;
1535 /// # use kube::runtime::reflector::{ObjectRef, Store};
1536 /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1537 /// # use kube::runtime::watcher::Config;
1538 /// # use kube::{Client, Error, ResourceExt};
1539 /// # use std::future;
1540 /// # use std::sync::Arc;
1541 /// #
1542 /// # let client: Client = todo!();
1543 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1544 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1545 /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1546 /// # let ns = "controller-ns".to_string();
1547 /// struct ExternalObject {
1548 /// name: String,
1549 /// }
1550 /// let external_stream = watch_external_objects().map(|ext| {
1551 /// ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1552 /// });
1553 ///
1554 /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1555 /// .reconcile_on(external_stream)
1556 /// .run(reconcile, error_policy, Arc::new(()))
1557 /// .for_each(|_| future::ready(()))
1558 /// .await;
1559 /// # };
1560 /// ```
1561 #[cfg(feature = "unstable-runtime-reconcile-on")]
1562 #[must_use]
1563 pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1564 self.trigger_selector.push(
1565 trigger
1566 .map(move |obj| {
1567 Ok(ReconcileRequest {
1568 obj_ref: obj,
1569 reason: ReconcileReason::Unknown,
1570 })
1571 })
1572 .boxed(),
1573 );
1574 self
1575 }
1576
1577 /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1578 ///
1579 /// - No new reconciliations are started from the scheduler
1580 /// - The underlying Kubernetes watch is terminated
1581 /// - All running reconciliations are allowed to finish
1582 /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1583 ///
1584 /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1585 ///
1586 /// ```rust
1587 /// # async {
1588 /// use futures::future::FutureExt;
1589 /// use k8s_openapi::api::core::v1::ConfigMap;
1590 /// use kube::{Api, Client, ResourceExt};
1591 /// use kube_runtime::{
1592 /// controller::{Controller, Action},
1593 /// watcher,
1594 /// };
1595 /// use std::{convert::Infallible, sync::Arc};
1596 /// Controller::new(
1597 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1598 /// watcher::Config::default(),
1599 /// )
1600 /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1601 /// .run(
1602 /// |o, _| async move {
1603 /// println!("Reconciling {}", o.name_any());
1604 /// Ok(Action::await_change())
1605 /// },
1606 /// |_, err: &Infallible, _| Err(err).unwrap(),
1607 /// Arc::new(()),
1608 /// );
1609 /// # };
1610 /// ```
1611 ///
1612 /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1613 /// as soon as *any* [`Future`] resolves.
1614 #[must_use]
1615 pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1616 self.graceful_shutdown_selector.push(trigger.boxed());
1617 self
1618 }
1619
1620 /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1621 ///
1622 /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1623 /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1624 ///
1625 /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1626 /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1627 /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1628 /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1629 ///
1630 /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1631 /// [`Controller::graceful_shutdown_on`].
1632 ///
1633 /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1634 /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1635 /// unless you run [`std::process::exit`] afterwards.
1636 #[must_use]
1637 pub fn shutdown_on_signal(mut self) -> Self {
1638 async fn shutdown_signal() {
1639 futures::future::select(
1640 tokio::signal::ctrl_c().map(|_| ()).boxed(),
1641 #[cfg(unix)]
1642 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1643 .unwrap()
1644 .recv()
1645 .map(|_| ())
1646 .boxed(),
1647 // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1648 #[cfg(not(unix))]
1649 futures::future::pending::<()>(),
1650 )
1651 .await;
1652 }
1653
1654 let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1655 self.graceful_shutdown_selector
1656 .push(graceful_rx.map(|_| ()).boxed());
1657 self.forceful_shutdown_selector.push(
1658 async {
1659 tracing::info!("press ctrl+c to shut down gracefully");
1660 shutdown_signal().await;
1661 if let Ok(()) = graceful_tx.send(()) {
1662 tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1663 } else {
1664 tracing::info!(
1665 "graceful shutdown already requested, press ctrl+c again to force shutdown"
1666 );
1667 }
1668 shutdown_signal().await;
1669 tracing::info!("forced shutdown requested");
1670 }
1671 .boxed(),
1672 );
1673 self
1674 }
1675
1676 /// Consume all the parameters of the Controller and start the applier stream
1677 ///
1678 /// This creates a stream from all builder calls and starts an applier with
1679 /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1680 /// with a configurable `context`.
1681 pub fn run<ReconcilerFut, Ctx>(
1682 self,
1683 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1684 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1685 context: Arc<Ctx>,
1686 ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1687 where
1688 K::DynamicType: Debug + Unpin,
1689 ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1690 ReconcilerFut::Error: std::error::Error + Send + 'static,
1691 {
1692 applier(
1693 move |obj, ctx| {
1694 CancelableJoinHandle::spawn(
1695 TryFutureExt::into_future(reconciler(obj, ctx)).in_current_span(),
1696 &Handle::current(),
1697 )
1698 },
1699 error_policy,
1700 context,
1701 self.reader,
1702 StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1703 .take_until(future::select_all(self.graceful_shutdown_selector)),
1704 self.config,
1705 )
1706 .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1707 }
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712 use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1713
1714 use super::{APPLIER_REQUEUE_BUF_SIZE, Action};
1715 use crate::{
1716 Config, Controller, applier,
1717 reflector::{self, ObjectRef},
1718 watcher::{self, Event, watcher},
1719 };
1720 use futures::{Stream, StreamExt, TryStreamExt};
1721 use k8s_openapi::api::core::v1::ConfigMap;
1722 use kube_client::{
1723 Api, Resource,
1724 core::{ObjectMeta, PartialObjectMeta},
1725 };
1726 use serde::de::DeserializeOwned;
1727 use tokio::time::timeout;
1728
1729 fn assert_send<T: Send>(x: T) -> T {
1730 x
1731 }
1732
1733 // Used to typecheck that a type T is a generic type that implements Stream
1734 // and returns a WatchEvent generic over a resource `K`
1735 fn assert_stream<T, K>(x: T) -> T
1736 where
1737 T: Stream<Item = watcher::Result<Event<K>>> + Send,
1738 K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1739 {
1740 x
1741 }
1742
1743 fn mock_type<T>() -> T {
1744 unimplemented!(
1745 "mock_type is not supposed to be called, only used for filling holes in type assertions"
1746 )
1747 }
1748
1749 // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1750 #[allow(dead_code, unused_must_use)]
1751 fn test_controller_should_be_send() {
1752 assert_send(
1753 Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1754 |_, _| async { Ok(mock_type::<Action>()) },
1755 |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1756 Arc::new(()),
1757 ),
1758 );
1759 }
1760
1761 // not #[test] because we don't want to actually run it, we just want to
1762 // assert that it typechecks
1763 //
1764 // will check return types for `watcher` and `watcher with PartialObjectMeta` do not drift
1765 // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1766 #[allow(dead_code, unused_must_use)]
1767 fn test_watcher_stream_type_drift() {
1768 assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1769 assert_stream(watcher(
1770 mock_type::<Api<PartialObjectMeta<ConfigMap>>>(),
1771 Default::default(),
1772 ));
1773 }
1774
1775 #[tokio::test]
1776 async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1777 // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1778 // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1779
1780 // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1781 // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1782 let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1783 // Assume that everything's OK if we can reconcile every object 3 times on average
1784 let reconciles = items * 3;
1785
1786 let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1787 let (store_rx, mut store_tx) = reflector::store();
1788 let mut applier = pin!(applier(
1789 |_obj, _| {
1790 Box::pin(async move {
1791 // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1792 //println!("reconciling {:?}", obj.metadata.name);
1793 Ok(Action::requeue(Duration::ZERO))
1794 })
1795 },
1796 |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1797 Arc::new(()),
1798 store_rx,
1799 queue_rx.map(Result::<_, Infallible>::Ok),
1800 Config::default(),
1801 ));
1802 store_tx.apply_watcher_event(&watcher::Event::InitDone);
1803 for i in 0..items {
1804 let obj = ConfigMap {
1805 metadata: ObjectMeta {
1806 name: Some(format!("cm-{i}")),
1807 namespace: Some("default".to_string()),
1808 ..Default::default()
1809 },
1810 ..Default::default()
1811 };
1812 store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1813 queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1814 }
1815
1816 timeout(
1817 Duration::from_secs(10),
1818 applier
1819 .as_mut()
1820 .take(reconciles)
1821 .try_for_each(|_| async { Ok(()) }),
1822 )
1823 .await
1824 .expect("test timeout expired, applier likely deadlocked")
1825 .unwrap();
1826
1827 // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1828 drop(queue_tx);
1829 timeout(
1830 Duration::from_secs(10),
1831 applier.try_for_each(|_| async { Ok(()) }),
1832 )
1833 .await
1834 .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1835 .unwrap();
1836 }
1837}