kube_runtime/controller/mod.rs
1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4use crate::{
5 reflector::{
6 self, ObjectRef, reflector,
7 store::{Store, Writer},
8 },
9 scheduler::{ScheduleRequest, debounced_scheduler},
10 utils::{
11 Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, trystream_try_via,
12 },
13 watcher::{self, DefaultBackoff, metadata_watcher, watcher},
14};
15use educe::Educe;
16use futures::{
17 FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, channel,
18 future::{self, BoxFuture},
19 stream,
20};
21use kube_client::api::{Api, DynamicObject, Resource};
22use pin_project::pin_project;
23use serde::de::DeserializeOwned;
24use std::{
25 fmt::{Debug, Display},
26 hash::Hash,
27 sync::Arc,
28 task::{Poll, ready},
29 time::Duration,
30};
31use stream::BoxStream;
32use thiserror::Error;
33use tokio::{runtime::Handle, time::Instant};
34use tracing::{Instrument, info_span};
35
36mod future_hash_map;
37mod runner;
38
39/// The reasons the internal runner can fail
40pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
41
42/// Errors returned by the applier and visible in a controller stream if inspecting it
43///
44/// WARNING: These errors do not terminate `Controller::run`, and are not passed to the `reconcile` fn
45/// as they exist primarily for diagnostics.
46///
47/// To inspect these errors, you can run a `for_each` on the run stream:
48///
49/// ```compile_fail
50/// Controller::new(api, watcher_config)
51/// .run(reconcile, error_policy, context)
52/// .for_each(|res| async move {
53/// match res {
54/// Ok(o) => info!("reconciled {:?}", o),
55/// /// Reconciler errors visible here:
56/// Err(e) => warn!("reconcile failed: {}", e),
57/// }
58/// })
59/// .await;
60/// ```
61#[derive(Debug, Error)]
62pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
63 /// A scheduled reconcile for an object refers to an object that no longer exists
64 ///
65 /// This is usually not a problem and often expected with certain relations.
66 /// See <https://github.com/kube-rs/kube/issues/1167#issuecomment-1636773541>
67 /// for a more detailed explanation of how/why this happens.
68 #[error("tried to reconcile object {0} that was not found in local store")]
69 ObjectNotFound(ObjectRef<DynamicObject>),
70
71 /// User's reconcile fn failed for the object
72 #[error("reconciler for object {1} failed")]
73 ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
74
75 /// The queue stream contained an error
76 #[error("event queue error")]
77 QueueError(#[source] QueueErr),
78
79 /// The internal runner returned an error
80 #[error("runner error")]
81 RunnerError(#[source] RunnerError),
82}
83
84/// Results of the reconciliation attempt
85#[derive(Debug, Clone, Eq, PartialEq)]
86pub struct Action {
87 /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
88 ///
89 /// For example, use this to query external systems for updates, expire time-limited resources, or
90 /// (in your `error_policy`) retry after errors.
91 requeue_after: Option<Duration>,
92}
93
94impl Action {
95 /// Action to the reconciliation at this time even if no external watch triggers hit
96 ///
97 /// This is the best-practice action that ensures eventual consistency of your controller
98 /// even in the case of missed changes (which can happen).
99 ///
100 /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
101 #[must_use]
102 pub const fn requeue(duration: Duration) -> Self {
103 Self {
104 requeue_after: Some(duration),
105 }
106 }
107
108 /// Do nothing until a change is detected
109 ///
110 /// This stops the controller periodically reconciling this object until a relevant watch event
111 /// was **detected**.
112 ///
113 /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
114 /// It is therefore not recommended to disable requeuing this way, unless you have
115 /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
116 #[must_use]
117 pub const fn await_change() -> Self {
118 Self { requeue_after: None }
119 }
120}
121
122/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
123pub fn trigger_with<T, K, I, S>(
124 stream: S,
125 mapper: impl Fn(T) -> I,
126) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
127where
128 S: TryStream<Ok = T>,
129 I: IntoIterator,
130 I::Item: Into<ReconcileRequest<K>>,
131 K: Resource,
132{
133 stream
134 .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
135 .try_flatten()
136}
137
138/// Enqueues the object itself for reconciliation
139pub fn trigger_self<K, S>(
140 stream: S,
141 dyntype: K::DynamicType,
142) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
143where
144 S: TryStream<Ok = K>,
145 K: Resource,
146 K::DynamicType: Clone,
147{
148 trigger_with(stream, move |obj| {
149 Some(ReconcileRequest {
150 obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
151 reason: ReconcileReason::ObjectUpdated,
152 })
153 })
154}
155
156/// Enqueues the object itself for reconciliation when the object is behind a
157/// shared pointer
158#[cfg(feature = "unstable-runtime-subscribe")]
159fn trigger_self_shared<K, S>(
160 stream: S,
161 dyntype: K::DynamicType,
162) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
163where
164 // Input stream has item as some Arc'd Resource (via
165 // Controller::for_shared_stream)
166 S: TryStream<Ok = Arc<K>>,
167 K: Resource,
168 K::DynamicType: Clone,
169{
170 trigger_with(stream, move |obj| {
171 Some(ReconcileRequest {
172 obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
173 reason: ReconcileReason::ObjectUpdated,
174 })
175 })
176}
177
178/// Enqueues any mapper returned `K` types for reconciliation
179fn trigger_others<S, K, I>(
180 stream: S,
181 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
182 dyntype: <S::Ok as Resource>::DynamicType,
183) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
184where
185 // Input stream has items as some Resource (via Controller::watches)
186 S: TryStream,
187 S::Ok: Resource,
188 <S::Ok as Resource>::DynamicType: Clone,
189 // Output stream is requests for the root type K
190 K: Resource,
191 K::DynamicType: Clone,
192 // but the mapper can produce many of them
193 I: 'static + IntoIterator<Item = ObjectRef<K>>,
194 I::IntoIter: Send,
195{
196 trigger_with(stream, move |obj| {
197 let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
198 mapper(obj)
199 .into_iter()
200 .map(move |mapped_obj_ref| ReconcileRequest {
201 obj_ref: mapped_obj_ref,
202 reason: ReconcileReason::RelatedObjectUpdated {
203 obj_ref: Box::new(watch_ref.clone()),
204 },
205 })
206 })
207}
208
209/// Enqueues any mapper returned `Arc<K>` types for reconciliation
210#[cfg(feature = "unstable-runtime-subscribe")]
211fn trigger_others_shared<S, O, K, I>(
212 stream: S,
213 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
214 dyntype: O::DynamicType,
215) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
216where
217 // Input is some shared resource (`Arc<O>`) obtained via `reflect`
218 S: TryStream<Ok = Arc<O>>,
219 O: Resource,
220 O::DynamicType: Clone,
221 // Output stream is requests for the root type K
222 K: Resource,
223 K::DynamicType: Clone,
224 // but the mapper can produce many of them
225 I: 'static + IntoIterator<Item = ObjectRef<K>>,
226 I::IntoIter: Send,
227{
228 trigger_with(stream, move |obj| {
229 let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
230 mapper(obj)
231 .into_iter()
232 .map(move |mapped_obj_ref| ReconcileRequest {
233 obj_ref: mapped_obj_ref,
234 reason: ReconcileReason::RelatedObjectUpdated {
235 obj_ref: Box::new(watch_ref.clone()),
236 },
237 })
238 })
239}
240
241/// Enqueues any owners of type `KOwner` for reconciliation
242pub fn trigger_owners<KOwner, S>(
243 stream: S,
244 owner_type: KOwner::DynamicType,
245 child_type: <S::Ok as Resource>::DynamicType,
246) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
247where
248 S: TryStream,
249 S::Ok: Resource,
250 <S::Ok as Resource>::DynamicType: Clone,
251 KOwner: Resource,
252 KOwner::DynamicType: Clone,
253{
254 let mapper = move |obj: S::Ok| {
255 let meta = obj.meta().clone();
256 let ns = meta.namespace;
257 let owner_type = owner_type.clone();
258 meta.owner_references
259 .into_iter()
260 .flatten()
261 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
262 };
263 trigger_others(stream, mapper, child_type)
264}
265
266// TODO: do we really need to deal with a trystream? can we simplify this at
267// all?
268/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
269/// owned `K` objects
270#[cfg(feature = "unstable-runtime-subscribe")]
271fn trigger_owners_shared<KOwner, S, K>(
272 stream: S,
273 owner_type: KOwner::DynamicType,
274 child_type: K::DynamicType,
275) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
276where
277 S: TryStream<Ok = Arc<K>>,
278 K: Resource,
279 K::DynamicType: Clone,
280 KOwner: Resource,
281 KOwner::DynamicType: Clone,
282{
283 let mapper = move |obj: S::Ok| {
284 let meta = obj.meta().clone();
285 let ns = meta.namespace;
286 let owner_type = owner_type.clone();
287 meta.owner_references
288 .into_iter()
289 .flatten()
290 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
291 };
292 trigger_others_shared(stream, mapper, child_type)
293}
294
295/// A request to reconcile an object, annotated with why that request was made.
296///
297/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
298/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
299/// In this case, only *the first* reason is stored.
300#[derive(Educe)]
301#[educe(
302 Debug(bound("K::DynamicType: Debug")),
303 Clone(bound("K::DynamicType: Clone")),
304 PartialEq(bound("K::DynamicType: PartialEq")),
305 Hash(bound("K::DynamicType: Hash"))
306)]
307pub struct ReconcileRequest<K: Resource> {
308 /// A reference to the object to be reconciled
309 pub obj_ref: ObjectRef<K>,
310 /// The reason for why reconciliation was requested
311 #[educe(PartialEq(ignore), Hash(ignore))]
312 pub reason: ReconcileReason,
313}
314
315impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
316
317impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
318 fn from(obj_ref: ObjectRef<K>) -> Self {
319 ReconcileRequest {
320 obj_ref,
321 reason: ReconcileReason::Unknown,
322 }
323 }
324}
325
326/// The reason a reconcile was requested
327///
328/// Note that this reason is deliberately hidden from the reconciler.
329/// See <https://kube.rs/controllers/reconciler/#reasons-for-reconciliation>.
330#[derive(Debug, Clone)]
331pub enum ReconcileReason {
332 /// A custom reconcile triggered via `reconcile_on`
333 Unknown,
334
335 /// The main object was updated.
336 ObjectUpdated,
337
338 /// A related object was updated through a mapper
339 ///
340 /// The related object traversed its relation up to the object kind you are reconciling.
341 /// Your object may not have changed, but you may need to update child objects.
342 RelatedObjectUpdated {
343 /// An object ref to the related object
344 obj_ref: Box<ObjectRef<DynamicObject>>,
345 },
346
347 /// The users `reconcile` scheduled a reconciliation via an `Action`
348 ReconcilerRequestedRetry,
349
350 /// The users `error_policy` scheduled a reconciliation via an `Action`
351 ErrorPolicyRequestedRetry,
352
353 /// A bulk reconcile was requested via `reconcile_all_on`
354 BulkReconcile,
355
356 /// A custom reconcile reason for custom integrations.
357 ///
358 /// Can be used when injecting elements into the queue stream directly.
359 Custom {
360 /// A user provided reason through a custom integration
361 reason: String,
362 },
363}
364
365impl Display for ReconcileReason {
366 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 match self {
368 ReconcileReason::Unknown => f.write_str("unknown"),
369 ReconcileReason::ObjectUpdated => f.write_str("object updated"),
370 ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
371 f.write_fmt(format_args!("related object updated: {object}"))
372 }
373 ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
374 ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
375 ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
376 ReconcileReason::Custom { reason } => f.write_str(reason),
377 }
378 }
379}
380
381const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
382
383/// Apply a reconciler to an input stream, with a given retry policy
384///
385/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
386///
387/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
388/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
389/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
390/// with a [`watcher()`] or [`reflector()`] for the subobject.
391///
392/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
393/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
394#[allow(clippy::needless_pass_by_value)]
395#[allow(clippy::type_complexity)]
396#[allow(clippy::result_large_err)] // see #1880 as an alt; https://github.com/kube-rs/kube/pull/1880
397pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
398 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
399 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
400 context: Arc<Ctx>,
401 store: Store<K>,
402 queue: QueueStream,
403 config: Config,
404) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
405where
406 K: Clone + Resource + 'static,
407 K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
408 ReconcilerFut: TryFuture<Ok = Action> + Unpin,
409 ReconcilerFut::Error: std::error::Error + 'static,
410 QueueStream: TryStream,
411 QueueStream::Ok: Into<ReconcileRequest<K>>,
412 QueueStream::Error: std::error::Error + 'static,
413{
414 let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
415 let (scheduler_tx, scheduler_rx) =
416 channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
417 let error_policy = Arc::new(error_policy);
418 let delay_store = store.clone();
419 // Create a stream of ObjectRefs that need to be reconciled
420 trystream_try_via(
421 // input: stream combining scheduled tasks and user specified inputs event
422 Box::pin(stream::select(
423 // 1. inputs from users queue stream
424 queue
425 .map_err(Error::QueueError)
426 .map_ok(|request| ScheduleRequest {
427 message: request.into(),
428 run_at: Instant::now(),
429 })
430 .on_complete(async move {
431 // On error: scheduler has already been shut down and there is nothing for us to do
432 let _ = scheduler_shutdown_tx.send(());
433 tracing::debug!("applier queue terminated, starting graceful shutdown")
434 }),
435 // 2. requests sent to scheduler_tx
436 scheduler_rx
437 .map(Ok)
438 .take_until(scheduler_shutdown_rx)
439 .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
440 )),
441 // all the Oks from the select gets passed through the scheduler stream, and are then executed
442 move |s| {
443 Runner::new(
444 debounced_scheduler(s, config.debounce),
445 config.concurrency,
446 move |request| {
447 let request = request.clone();
448 match store.get(&request.obj_ref) {
449 Some(obj) => {
450 let scheduler_tx = scheduler_tx.clone();
451 let error_policy_ctx = context.clone();
452 let error_policy = error_policy.clone();
453 let reconciler_span = info_span!(
454 "reconciling object",
455 "object.ref" = %request.obj_ref,
456 object.reason = %request.reason
457 );
458 TryFutureExt::into_future(
459 reconciler_span.in_scope(|| reconciler(Arc::clone(&obj), context.clone())),
460 )
461 .then(move |res| {
462 let error_policy = error_policy;
463 RescheduleReconciliation::new(
464 res,
465 |err| error_policy(obj, err, error_policy_ctx),
466 request.obj_ref.clone(),
467 scheduler_tx,
468 )
469 // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
470 // to them separately
471 .map(|res| Ok((request.obj_ref, res)))
472 })
473 .instrument(reconciler_span)
474 .left_future()
475 }
476 None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
477 .right_future(),
478 }
479 },
480 )
481 .delay_tasks_until(async move {
482 tracing::debug!("applier runner held until store is ready");
483 let res = delay_store.wait_until_ready().await;
484 tracing::debug!("store is ready, starting runner");
485 res
486 })
487 .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
488 .on_complete(async { tracing::debug!("applier runner terminated") })
489 },
490 )
491 .on_complete(async { tracing::debug!("applier runner-merge terminated") })
492 // finally, for each completed reconcile call:
493 .and_then(move |(obj_ref, reconciler_result)| async move {
494 match reconciler_result {
495 Ok(action) => Ok((obj_ref, action)),
496 Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
497 }
498 })
499 .on_complete(async { tracing::debug!("applier terminated") })
500}
501
502/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
503///
504/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
505#[pin_project]
506#[must_use]
507struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
508 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
509
510 reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
511 result: Option<Result<Action, ReconcilerErr>>,
512}
513
514impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
515where
516 K: Resource,
517{
518 fn new(
519 result: Result<Action, ReconcilerErr>,
520 error_policy: impl FnOnce(&ReconcilerErr) -> Action,
521 obj_ref: ObjectRef<K>,
522 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
523 ) -> Self {
524 let reconciler_finished_at = Instant::now();
525
526 let (action, reschedule_reason) = result.as_ref().map_or_else(
527 |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
528 |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
529 );
530
531 Self {
532 reschedule_tx,
533 reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
534 message: ReconcileRequest {
535 obj_ref,
536 reason: reschedule_reason,
537 },
538 run_at: reconciler_finished_at
539 .checked_add(requeue_after)
540 .unwrap_or_else(crate::scheduler::max_schedule_time),
541 }),
542 result: Some(result),
543 }
544 }
545}
546
547impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
548where
549 K: Resource,
550{
551 type Output = Result<Action, ReconcilerErr>;
552
553 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
554 let this = self.get_mut();
555
556 if this.reschedule_request.is_some() {
557 let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
558 let reschedule_request = this
559 .reschedule_request
560 .take()
561 .expect("PostReconciler::reschedule_request was taken during processing");
562 // Failure to schedule item = in graceful shutdown mode, ignore
563 if let Ok(()) = rescheduler_ready {
564 let _ = this.reschedule_tx.start_send(reschedule_request);
565 }
566 }
567
568 Poll::Ready(
569 this.result
570 .take()
571 .expect("PostReconciler::result was already taken"),
572 )
573 }
574}
575
576/// Accumulates all options that can be used on a [`Controller`] invocation.
577#[derive(Clone, Debug, Default)]
578pub struct Config {
579 debounce: Duration,
580 concurrency: u16,
581}
582
583impl Config {
584 /// The debounce duration used to deduplicate reconciliation requests.
585 ///
586 /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
587 /// resulting in __trailing edge debouncing__ of reconciler requests.
588 /// This option can help to reduce the amount of unnecessary reconciler calls
589 /// when using multiple controller relations, or during rapid phase transitions.
590 ///
591 /// ## Warning
592 /// This option delays (and keeps delaying) reconcile requests for objects while
593 /// the object is updated. It can **permanently hide** updates from your reconciler
594 /// if set too high on objects that are updated frequently (like nodes).
595 #[must_use]
596 pub fn debounce(mut self, debounce: Duration) -> Self {
597 self.debounce = debounce;
598 self
599 }
600
601 /// The number of concurrent reconciliations of that are allowed to run at an given moment.
602 ///
603 /// This can be adjusted to the controller's needs to increase
604 /// performance and/or make performance predictable. By default, its 0 meaning
605 /// the controller runs with unbounded concurrency.
606 ///
607 /// Note that despite concurrency, a controller never schedules concurrent reconciles
608 /// on the same object.
609 #[must_use]
610 pub fn concurrency(mut self, concurrency: u16) -> Self {
611 self.concurrency = concurrency;
612 self
613 }
614}
615
616/// Controller for a Resource `K`
617///
618/// A controller is an infinite stream of objects to be reconciled.
619///
620/// Once `run` and continuously awaited, it continuously calls out to user provided
621/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
622/// or if errors are seen from `reconcile`.
623///
624/// Reconciles are generally requested for all changes on your root objects.
625/// Changes to managed child resources will also trigger the reconciler for the
626/// managing object by traversing owner references (for `Controller::owns`),
627/// or traverse a custom mapping (for `Controller::watches`).
628///
629/// This mapping mechanism ultimately hides the reason for the reconciliation request,
630/// and forces you to write an idempotent reconciler.
631///
632/// General setup:
633/// ```no_run
634/// use kube::{Api, Client, CustomResource};
635/// use kube::runtime::{controller::{Controller, Action}, watcher};
636/// # use serde::{Deserialize, Serialize};
637/// # use tokio::time::Duration;
638/// use futures::StreamExt;
639/// use k8s_openapi::api::core::v1::ConfigMap;
640/// use schemars::JsonSchema;
641/// # use std::sync::Arc;
642/// use thiserror::Error;
643///
644/// #[derive(Debug, Error)]
645/// enum Error {}
646///
647/// /// A custom resource
648/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
649/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
650/// struct ConfigMapGeneratorSpec {
651/// content: String,
652/// }
653///
654/// /// The reconciler that will be called when either object change
655/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
656/// // .. use api here to reconcile a child ConfigMap with ownerreferences
657/// // see configmapgen_controller example for full info
658/// Ok(Action::requeue(Duration::from_secs(300)))
659/// }
660/// /// an error handler that will be called when the reconciler fails with access to both the
661/// /// object that caused the failure and the actual error
662/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
663/// Action::requeue(Duration::from_secs(60))
664/// }
665///
666/// /// something to drive the controller
667///
668/// async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
669/// # let client: Client = todo!();
670/// let context = Arc::new(()); // bad empty context - put client in here
671/// let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
672/// let cms = Api::<ConfigMap>::all(client.clone());
673/// Controller::new(cmgs, watcher::Config::default())
674/// .owns(cms, watcher::Config::default())
675/// .run(reconcile, error_policy, context)
676/// .for_each(|res| async move {
677/// match res {
678/// Ok(o) => println!("reconciled {:?}", o),
679/// Err(e) => println!("reconcile failed: {:?}", e),
680/// }
681/// })
682/// .await; // controller does nothing unless polled
683/// # Ok(())
684/// # }
685/// ```
686pub struct Controller<K>
687where
688 K: Clone + Resource + Debug + 'static,
689 K::DynamicType: Eq + Hash,
690{
691 // NB: Need to Unpin for stream::select_all
692 trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
693 trigger_backoff: Box<dyn Backoff + Send>,
694 /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
695 /// refusing to start any new reconciliations but letting any existing ones finish.
696 graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
697 /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
698 /// requesting that all running reconciliations be aborted.
699 /// However, note that they *will* keep running until their next yield point (`.await`),
700 /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
701 forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
702 dyntype: K::DynamicType,
703 reader: Store<K>,
704 config: Config,
705}
706
707impl<K> Controller<K>
708where
709 K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
710 K::DynamicType: Eq + Hash + Clone,
711{
712 /// Create a Controller for a resource `K`
713 ///
714 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
715 ///
716 /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
717 /// and receive reconcile events for.
718 /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
719 #[must_use]
720 pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
721 where
722 K::DynamicType: Default,
723 {
724 Self::new_with(main_api, wc, Default::default())
725 }
726
727 /// Create a Controller for a resource `K`
728 ///
729 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
730 ///
731 /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
732 /// to watch - in the Api's configured scope - and receive reconcile events for.
733 /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
734 ///
735 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
736 ///
737 /// [`watcher::Config`]: crate::watcher::Config
738 /// [`Api`]: kube_client::Api
739 /// [`dynamic`]: kube_client::core::dynamic
740 /// [`Config::default`]: crate::watcher::Config::default
741 pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
742 let writer = Writer::<K>::new(dyntype.clone());
743 let reader = writer.as_reader();
744 let mut trigger_selector = stream::SelectAll::new();
745 let self_watcher = trigger_self(
746 reflector(writer, watcher(main_api, wc)).applied_objects(),
747 dyntype.clone(),
748 )
749 .boxed();
750 trigger_selector.push(self_watcher);
751 Self {
752 trigger_selector,
753 trigger_backoff: Box::<DefaultBackoff>::default(),
754 graceful_shutdown_selector: vec![
755 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
756 future::pending().boxed(),
757 ],
758 forceful_shutdown_selector: vec![
759 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
760 future::pending().boxed(),
761 ],
762 dyntype,
763 reader,
764 config: Default::default(),
765 }
766 }
767
768 /// Create a Controller for a resource `K` from a stream of `K` objects
769 ///
770 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
771 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
772 /// as well as sharing input streams between multiple controllers.
773 ///
774 /// # Example:
775 ///
776 /// ```no_run
777 /// # use futures::StreamExt;
778 /// # use k8s_openapi::api::apps::v1::Deployment;
779 /// # use kube::runtime::controller::{Action, Controller};
780 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
781 /// # use kube::{Api, Client, Error, ResourceExt};
782 /// # use std::sync::Arc;
783 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
784 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
785 /// # async fn doc(client: kube::Client) {
786 /// let api: Api<Deployment> = Api::default_namespaced(client);
787 /// let (reader, writer) = reflector::store();
788 /// let deploys = watcher(api, watcher::Config::default())
789 /// .default_backoff()
790 /// .reflect(writer)
791 /// .applied_objects()
792 /// .predicate_filter(predicates::generation, Default::default());
793 ///
794 /// Controller::for_stream(deploys, reader)
795 /// .run(reconcile, error_policy, Arc::new(()))
796 /// .for_each(|_| std::future::ready(()))
797 /// .await;
798 /// # }
799 /// ```
800 ///
801 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
802 #[cfg(feature = "unstable-runtime-stream-control")]
803 pub fn for_stream(
804 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
805 reader: Store<K>,
806 ) -> Self
807 where
808 K::DynamicType: Default,
809 {
810 Self::for_stream_with(trigger, reader, Default::default())
811 }
812
813 /// Create a Controller for a resource `K` from a stream of `K` objects
814 ///
815 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
816 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
817 /// as well as sharing input streams between multiple controllers.
818 ///
819 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
820 ///
821 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
822 ///
823 /// [`dynamic`]: kube_client::core::dynamic
824 #[cfg(feature = "unstable-runtime-stream-control")]
825 pub fn for_stream_with(
826 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
827 reader: Store<K>,
828 dyntype: K::DynamicType,
829 ) -> Self {
830 let mut trigger_selector = stream::SelectAll::new();
831 let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
832 trigger_selector.push(self_watcher);
833 Self {
834 trigger_selector,
835 trigger_backoff: Box::<DefaultBackoff>::default(),
836 graceful_shutdown_selector: vec![
837 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
838 future::pending().boxed(),
839 ],
840 forceful_shutdown_selector: vec![
841 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
842 future::pending().boxed(),
843 ],
844 dyntype,
845 reader,
846 config: Default::default(),
847 }
848 }
849
850 /// This is the same as [`Controller::for_stream`]. Instead of taking an
851 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
852 /// streams can be created out-of-band by subscribing on a store `Writer`.
853 /// Through this interface, multiple controllers can use the same root
854 /// (shared) input stream of resources to keep memory overheads smaller.
855 ///
856 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
857 /// need to share the stream.
858 ///
859 /// ## Warning:
860 ///
861 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
862 /// is driven to readiness independently of this controller to ensure the
863 /// watcher never deadlocks.
864 ///
865 /// # Example:
866 ///
867 /// ```no_run
868 /// # use futures::StreamExt;
869 /// # use k8s_openapi::api::apps::v1::Deployment;
870 /// # use kube::runtime::controller::{Action, Controller};
871 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
872 /// # use kube::{Api, Client, Error, ResourceExt};
873 /// # use std::sync::Arc;
874 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
875 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
876 /// # async fn doc(client: kube::Client) {
877 /// let api: Api<Deployment> = Api::default_namespaced(client);
878 /// let (reader, writer) = reflector::store_shared(128);
879 /// let subscriber = writer
880 /// .subscribe()
881 /// .expect("subscribers can only be created from shared stores");
882 /// let deploys = watcher(api, watcher::Config::default())
883 /// .default_backoff()
884 /// .reflect(writer)
885 /// .applied_objects()
886 /// .for_each(|ev| async move {
887 /// match ev {
888 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
889 /// Err(error) => tracing::error!(%error, "received error")
890 /// }
891 /// });
892 ///
893 /// let controller = Controller::for_shared_stream(subscriber, reader)
894 /// .run(reconcile, error_policy, Arc::new(()))
895 /// .for_each(|ev| async move {
896 /// tracing::info!("reconciled {ev:?}")
897 /// });
898 ///
899 /// // Drive streams using a select statement
900 /// tokio::select! {
901 /// _ = deploys => {},
902 /// _ = controller => {},
903 /// }
904 /// # }
905 #[cfg(feature = "unstable-runtime-subscribe")]
906 pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
907 where
908 K::DynamicType: Default,
909 {
910 Self::for_shared_stream_with(trigger, reader, Default::default())
911 }
912
913 /// This is the same as [`Controller::for_stream`]. Instead of taking an
914 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
915 /// streams can be created out-of-band by subscribing on a store `Writer`.
916 /// Through this interface, multiple controllers can use the same root
917 /// (shared) input stream of resources to keep memory overheads smaller.
918 ///
919 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
920 /// need to share the stream.
921 ///
922 /// This variant constructor is used for [`dynamic`] types found through
923 /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
924 /// known at compile time).
925 ///
926 /// [`dynamic`]: kube_client::core::dynamic
927 #[cfg(feature = "unstable-runtime-subscribe")]
928 pub fn for_shared_stream_with(
929 trigger: impl Stream<Item = Arc<K>> + Send + 'static,
930 reader: Store<K>,
931 dyntype: K::DynamicType,
932 ) -> Self {
933 let mut trigger_selector = stream::SelectAll::new();
934 let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
935 trigger_selector.push(self_watcher);
936 Self {
937 trigger_selector,
938 trigger_backoff: Box::<DefaultBackoff>::default(),
939 graceful_shutdown_selector: vec![
940 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
941 future::pending().boxed(),
942 ],
943 forceful_shutdown_selector: vec![
944 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
945 future::pending().boxed(),
946 ],
947 dyntype,
948 reader,
949 config: Default::default(),
950 }
951 }
952
953 /// Specify the configuration for the controller's behavior.
954 #[must_use]
955 pub fn with_config(mut self, config: Config) -> Self {
956 self.config = config;
957 self
958 }
959
960 /// Specify the backoff policy for "trigger" watches
961 ///
962 /// This includes the core watch, as well as auxiliary watches introduced by [`Self::owns`] and [`Self::watches`].
963 ///
964 /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
965 /// but can be overridden by calling this method.
966 #[must_use]
967 pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
968 self.trigger_backoff = Box::new(backoff);
969 self
970 }
971
972 /// Retrieve a copy of the reader before starting the controller
973 pub fn store(&self) -> Store<K> {
974 self.reader.clone()
975 }
976
977 /// Specify `Child` objects which `K` owns and should be watched
978 ///
979 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
980 /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
981 ///
982 /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
983 /// to watch - in the Api's configured scope - and receive reconcile events for.
984 /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
985 ///
986 /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
987 #[must_use]
988 pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
989 self,
990 api: Api<Child>,
991 wc: watcher::Config,
992 ) -> Self {
993 self.owns_with(api, (), wc)
994 }
995
996 /// Specify `Child` objects which `K` owns and should be watched
997 ///
998 /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
999 #[must_use]
1000 pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
1001 mut self,
1002 api: Api<Child>,
1003 dyntype: Child::DynamicType,
1004 wc: watcher::Config,
1005 ) -> Self
1006 where
1007 Child::DynamicType: Debug + Eq + Hash + Clone,
1008 {
1009 // TODO: call owns_stream_with when it's stable
1010 let child_watcher = trigger_owners(
1011 metadata_watcher(api, wc).touched_objects(),
1012 self.dyntype.clone(),
1013 dyntype,
1014 );
1015 self.trigger_selector.push(child_watcher.boxed());
1016 self
1017 }
1018
1019 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1020 ///
1021 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1022 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1023 /// as well as sharing input streams between multiple controllers.
1024 ///
1025 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1026 ///
1027 /// # Example:
1028 ///
1029 /// ```no_run
1030 /// # use futures::StreamExt;
1031 /// # use k8s_openapi::api::core::v1::ConfigMap;
1032 /// # use k8s_openapi::api::apps::v1::StatefulSet;
1033 /// # use kube::runtime::controller::Action;
1034 /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
1035 /// # use kube::{Api, Client, Error, ResourceExt};
1036 /// # use std::sync::Arc;
1037 /// # type CustomResource = ConfigMap;
1038 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1039 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1040 /// # async fn doc(client: kube::Client) {
1041 /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
1042 /// .touched_objects()
1043 /// .predicate_filter(predicates::generation, Default::default());
1044 ///
1045 /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
1046 /// .owns_stream(sts_stream)
1047 /// .run(reconcile, error_policy, Arc::new(()))
1048 /// .for_each(|_| std::future::ready(()))
1049 /// .await;
1050 /// # }
1051 /// ```
1052 #[cfg(feature = "unstable-runtime-stream-control")]
1053 #[must_use]
1054 pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1055 self,
1056 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1057 ) -> Self {
1058 self.owns_stream_with(trigger, ())
1059 }
1060
1061 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1062 ///
1063 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1064 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1065 /// as well as sharing input streams between multiple controllers.
1066 ///
1067 /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1068 #[cfg(feature = "unstable-runtime-stream-control")]
1069 #[must_use]
1070 pub fn owns_stream_with<Child: Resource + Send + 'static>(
1071 mut self,
1072 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1073 dyntype: Child::DynamicType,
1074 ) -> Self
1075 where
1076 Child::DynamicType: Debug + Eq + Hash + Clone,
1077 {
1078 let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1079 self.trigger_selector.push(child_watcher.boxed());
1080 self
1081 }
1082
1083 /// This is the same as [`Controller::for_stream`]. Instead of taking an
1084 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1085 /// streams can be created out-of-band by subscribing on a store `Writer`.
1086 /// Through this interface, multiple controllers can use the same root
1087 /// (shared) input stream of resources to keep memory overheads smaller.
1088 ///
1089 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1090 /// need to share the stream.
1091 ///
1092 /// ## Warning:
1093 ///
1094 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1095 /// is driven to readiness independently of this controller to ensure the
1096 /// watcher never deadlocks.
1097 ///
1098 ///
1099 /// Trigger the reconciliation process for a shared stream of `Child`
1100 /// objects of the owner `K`
1101 ///
1102 /// Conceptually the same as [`Controller::owns`], but a stream is used
1103 /// instead of an `Api`. This interface behaves similarly to its non-shared
1104 /// counterpart [`Controller::owns_stream`].
1105 ///
1106 /// # Example:
1107 ///
1108 /// ```no_run
1109 /// # use futures::StreamExt;
1110 /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1111 /// # use kube::runtime::controller::{Action, Controller};
1112 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1113 /// # use kube::{Api, Client, Error, ResourceExt};
1114 /// # use std::sync::Arc;
1115 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1116 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1117 /// # async fn doc(client: kube::Client) {
1118 /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1119 /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1120 ///
1121 /// let (reader, writer) = reflector::store_shared(128);
1122 /// let subscriber = writer
1123 /// .subscribe()
1124 /// .expect("subscribers can only be created from shared stores");
1125 /// let pods = watcher(pod_api, watcher::Config::default())
1126 /// .default_backoff()
1127 /// .reflect(writer)
1128 /// .applied_objects()
1129 /// .for_each(|ev| async move {
1130 /// match ev {
1131 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
1132 /// Err(error) => tracing::error!(%error, "received error")
1133 /// }
1134 /// });
1135 ///
1136 /// let controller = Controller::new(deploys, Default::default())
1137 /// .owns_shared_stream(subscriber)
1138 /// .run(reconcile, error_policy, Arc::new(()))
1139 /// .for_each(|ev| async move {
1140 /// tracing::info!("reconciled {ev:?}")
1141 /// });
1142 ///
1143 /// // Drive streams using a select statement
1144 /// tokio::select! {
1145 /// _ = pods => {},
1146 /// _ = controller => {},
1147 /// }
1148 /// # }
1149 #[cfg(feature = "unstable-runtime-subscribe")]
1150 #[must_use]
1151 pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1152 self,
1153 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1154 ) -> Self {
1155 self.owns_shared_stream_with(trigger, ())
1156 }
1157
1158 /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1159 ///
1160 /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1161 /// The source stream can be shared between multiple controllers, optimising
1162 /// resource usage.
1163 ///
1164 /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1165 #[cfg(feature = "unstable-runtime-subscribe")]
1166 #[must_use]
1167 pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1168 mut self,
1169 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1170 dyntype: Child::DynamicType,
1171 ) -> Self
1172 where
1173 Child::DynamicType: Debug + Eq + Hash + Clone,
1174 {
1175 let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1176 self.trigger_selector.push(child_watcher.boxed());
1177 self
1178 }
1179
1180 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1181 ///
1182 /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1183 /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1184 ///
1185 /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1186 ///
1187 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1188 ///
1189 /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1190 /// to watch - in the Api's configured scope - and run through the custom mapper.
1191 /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1192 ///
1193 /// # Example
1194 ///
1195 /// Tracking cross cluster references using the [Operator-SDK] annotations.
1196 ///
1197 /// ```
1198 /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1199 /// # use kube::{Api, ResourceExt};
1200 /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1201 /// # use futures::StreamExt;
1202 /// # use std::sync::Arc;
1203 /// # type WatchedResource = Namespace;
1204 /// # struct Context;
1205 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1206 /// # Ok(Action::await_change())
1207 /// # };
1208 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1209 /// # Action::await_change()
1210 /// # }
1211 /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1212 /// # let memcached = Api::<ConfigMap>::all(client.clone());
1213 /// # let context = Arc::new(Context);
1214 /// Controller::new(memcached, watcher::Config::default())
1215 /// .watches(
1216 /// Api::<WatchedResource>::all(client.clone()),
1217 /// watcher::Config::default(),
1218 /// |ar| {
1219 /// let prt = ar
1220 /// .annotations()
1221 /// .get("operator-sdk/primary-resource-type")
1222 /// .map(String::as_str);
1223 ///
1224 /// if prt != Some("Memcached.cache.example.com") {
1225 /// return None;
1226 /// }
1227 ///
1228 /// let (namespace, name) = ar
1229 /// .annotations()
1230 /// .get("operator-sdk/primary-resource")?
1231 /// .split_once('/')?;
1232 ///
1233 /// Some(ObjectRef::new(name).within(namespace))
1234 /// }
1235 /// )
1236 /// .run(reconcile, error_policy, context)
1237 /// .for_each(|_| futures::future::ready(()))
1238 /// .await;
1239 /// # Ok(())
1240 /// # }
1241 /// ```
1242 ///
1243 /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1244 #[must_use]
1245 pub fn watches<Other, I>(
1246 self,
1247 api: Api<Other>,
1248 wc: watcher::Config,
1249 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1250 ) -> Self
1251 where
1252 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1253 Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1254 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1255 I::IntoIter: Send,
1256 {
1257 self.watches_with(api, Default::default(), wc, mapper)
1258 }
1259
1260 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1261 ///
1262 /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1263 #[must_use]
1264 pub fn watches_with<Other, I>(
1265 mut self,
1266 api: Api<Other>,
1267 dyntype: Other::DynamicType,
1268 wc: watcher::Config,
1269 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1270 ) -> Self
1271 where
1272 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1273 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1274 I::IntoIter: Send,
1275 Other::DynamicType: Debug + Clone + Eq + Hash,
1276 {
1277 let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1278 self.trigger_selector.push(other_watcher.boxed());
1279 self
1280 }
1281
1282 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1283 ///
1284 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1285 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1286 /// as well as sharing input streams between multiple controllers.
1287 ///
1288 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1289 ///
1290 /// # Example:
1291 ///
1292 /// ```no_run
1293 /// # use futures::StreamExt;
1294 /// # use k8s_openapi::api::core::v1::ConfigMap;
1295 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1296 /// # use kube::runtime::controller::Action;
1297 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1298 /// # use kube::{Api, Client, Error, ResourceExt};
1299 /// # use std::sync::Arc;
1300 /// # type CustomResource = ConfigMap;
1301 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1302 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1303 /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1304 /// # async fn doc(client: kube::Client) {
1305 /// let api: Api<DaemonSet> = Api::all(client.clone());
1306 /// let cr: Api<CustomResource> = Api::all(client.clone());
1307 /// let daemons = watcher(api, watcher::Config::default())
1308 /// .touched_objects()
1309 /// .predicate_filter(predicates::generation, Default::default());
1310 ///
1311 /// Controller::new(cr, watcher::Config::default())
1312 /// .watches_stream(daemons, mapper)
1313 /// .run(reconcile, error_policy, Arc::new(()))
1314 /// .for_each(|_| std::future::ready(()))
1315 /// .await;
1316 /// # }
1317 /// ```
1318 #[cfg(feature = "unstable-runtime-stream-control")]
1319 #[must_use]
1320 pub fn watches_stream<Other, I>(
1321 self,
1322 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1323 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1324 ) -> Self
1325 where
1326 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1327 Other::DynamicType: Default + Debug + Clone,
1328 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1329 I::IntoIter: Send,
1330 {
1331 self.watches_stream_with(trigger, mapper, Default::default())
1332 }
1333
1334 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1335 ///
1336 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1337 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1338 /// as well as sharing input streams between multiple controllers.
1339 ///
1340 /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1341 #[cfg(feature = "unstable-runtime-stream-control")]
1342 #[must_use]
1343 pub fn watches_stream_with<Other, I>(
1344 mut self,
1345 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1346 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1347 dyntype: Other::DynamicType,
1348 ) -> Self
1349 where
1350 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1351 Other::DynamicType: Debug + Clone,
1352 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1353 I::IntoIter: Send,
1354 {
1355 let other_watcher = trigger_others(trigger, mapper, dyntype);
1356 self.trigger_selector.push(other_watcher.boxed());
1357 self
1358 }
1359
1360 /// Trigger the reconciliation process for a shared stream of `Other`
1361 /// objects related to a `K`
1362 ///
1363 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1364 /// stream of resources is used. This allows for sharing input streams
1365 /// between multiple controllers.
1366 ///
1367 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1368 ///
1369 /// # Example:
1370 ///
1371 /// ```no_run
1372 /// # use futures::StreamExt;
1373 /// # use k8s_openapi::api::core::v1::ConfigMap;
1374 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1375 /// # use kube::runtime::controller::Action;
1376 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1377 /// # use kube::{Api, Client, Error, ResourceExt};
1378 /// # use std::sync::Arc;
1379 /// # type CustomResource = ConfigMap;
1380 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1381 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1382 /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1383 /// # async fn doc(client: kube::Client) {
1384 /// let api: Api<DaemonSet> = Api::all(client.clone());
1385 /// let cr: Api<CustomResource> = Api::all(client.clone());
1386 /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1387 /// let subscriber = writer
1388 /// .subscribe()
1389 /// .expect("subscribers can only be created from shared stores");
1390 /// let daemons = watcher(api, watcher::Config::default())
1391 /// .reflect(writer)
1392 /// .touched_objects()
1393 /// .for_each(|ev| async move {
1394 /// match ev {
1395 /// Ok(obj) => {},
1396 /// Err(error) => tracing::error!(%error, "received err")
1397 /// }
1398 /// });
1399 ///
1400 /// let controller = Controller::new(cr, watcher::Config::default())
1401 /// .watches_shared_stream(subscriber, mapper)
1402 /// .run(reconcile, error_policy, Arc::new(()))
1403 /// .for_each(|_| std::future::ready(()));
1404 ///
1405 /// // Drive streams using a select statement
1406 /// tokio::select! {
1407 /// _ = daemons => {},
1408 /// _ = controller => {},
1409 /// }
1410 /// # }
1411 /// ```
1412 #[cfg(feature = "unstable-runtime-subscribe")]
1413 #[must_use]
1414 pub fn watches_shared_stream<Other, I>(
1415 self,
1416 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1417 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1418 ) -> Self
1419 where
1420 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1421 Other::DynamicType: Default + Debug + Clone,
1422 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1423 I::IntoIter: Send,
1424 {
1425 self.watches_shared_stream_with(trigger, mapper, Default::default())
1426 }
1427
1428 /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1429 ///
1430 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1431 /// stream of resources is used. This allows for sharing of streams between
1432 /// multiple controllers.
1433 ///
1434 /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1435 #[cfg(feature = "unstable-runtime-subscribe")]
1436 #[must_use]
1437 pub fn watches_shared_stream_with<Other, I>(
1438 mut self,
1439 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1440 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1441 dyntype: Other::DynamicType,
1442 ) -> Self
1443 where
1444 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1445 Other::DynamicType: Debug + Clone,
1446 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1447 I::IntoIter: Send,
1448 {
1449 let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1450 self.trigger_selector.push(other_watcher.boxed());
1451 self
1452 }
1453
1454 /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1455 ///
1456 /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1457 ///
1458 /// To reconcile all objects when a new line is entered:
1459 ///
1460 /// ```
1461 /// # async {
1462 /// use futures::stream::StreamExt;
1463 /// use k8s_openapi::api::core::v1::ConfigMap;
1464 /// use kube::{
1465 /// Client,
1466 /// api::{Api, ResourceExt},
1467 /// runtime::{
1468 /// controller::{Controller, Action},
1469 /// watcher,
1470 /// },
1471 /// };
1472 /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1473 /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1474 /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1475 /// // and its worker prevents the Tokio runtime from shutting down.
1476 /// std::thread::spawn(move || {
1477 /// for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1478 /// let _ = reload_tx.try_send(());
1479 /// }
1480 /// });
1481 /// Controller::new(
1482 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1483 /// watcher::Config::default(),
1484 /// )
1485 /// .reconcile_all_on(reload_rx.map(|_| ()))
1486 /// .run(
1487 /// |o, _| async move {
1488 /// println!("Reconciling {}", o.name_any());
1489 /// Ok(Action::await_change())
1490 /// },
1491 /// |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1492 /// Arc::new(()),
1493 /// );
1494 /// # };
1495 /// ```
1496 ///
1497 /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1498 ///
1499 /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1500 #[must_use]
1501 pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1502 let store = self.store();
1503 let dyntype = self.dyntype.clone();
1504 self.trigger_selector.push(
1505 trigger
1506 .flat_map(move |()| {
1507 let dyntype = dyntype.clone();
1508 stream::iter(store.state().into_iter().map(move |obj| {
1509 Ok(ReconcileRequest {
1510 obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1511 reason: ReconcileReason::BulkReconcile,
1512 })
1513 }))
1514 })
1515 .boxed(),
1516 );
1517 self
1518 }
1519
1520 /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1521 ///
1522 /// This can be used to inject reconciliations for specific objects from an external resource.
1523 ///
1524 /// # Example:
1525 ///
1526 /// ```no_run
1527 /// # async {
1528 /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1529 /// # use k8s_openapi::api::core::v1::{ConfigMap};
1530 /// # use kube::api::Api;
1531 /// # use kube::runtime::controller::Action;
1532 /// # use kube::runtime::reflector::{ObjectRef, Store};
1533 /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1534 /// # use kube::runtime::watcher::Config;
1535 /// # use kube::{Client, Error, ResourceExt};
1536 /// # use std::future;
1537 /// # use std::sync::Arc;
1538 /// #
1539 /// # let client: Client = todo!();
1540 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1541 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1542 /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1543 /// # let ns = "controller-ns".to_string();
1544 /// struct ExternalObject {
1545 /// name: String,
1546 /// }
1547 /// let external_stream = watch_external_objects().map(|ext| {
1548 /// ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1549 /// });
1550 ///
1551 /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1552 /// .reconcile_on(external_stream)
1553 /// .run(reconcile, error_policy, Arc::new(()))
1554 /// .for_each(|_| future::ready(()))
1555 /// .await;
1556 /// # };
1557 /// ```
1558 #[cfg(feature = "unstable-runtime-reconcile-on")]
1559 #[must_use]
1560 pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1561 self.trigger_selector.push(
1562 trigger
1563 .map(move |obj| {
1564 Ok(ReconcileRequest {
1565 obj_ref: obj,
1566 reason: ReconcileReason::Unknown,
1567 })
1568 })
1569 .boxed(),
1570 );
1571 self
1572 }
1573
1574 /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1575 ///
1576 /// - No new reconciliations are started from the scheduler
1577 /// - The underlying Kubernetes watch is terminated
1578 /// - All running reconciliations are allowed to finish
1579 /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1580 ///
1581 /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1582 ///
1583 /// ```rust
1584 /// # async {
1585 /// use futures::future::FutureExt;
1586 /// use k8s_openapi::api::core::v1::ConfigMap;
1587 /// use kube::{Api, Client, ResourceExt};
1588 /// use kube_runtime::{
1589 /// controller::{Controller, Action},
1590 /// watcher,
1591 /// };
1592 /// use std::{convert::Infallible, sync::Arc};
1593 /// Controller::new(
1594 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1595 /// watcher::Config::default(),
1596 /// )
1597 /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1598 /// .run(
1599 /// |o, _| async move {
1600 /// println!("Reconciling {}", o.name_any());
1601 /// Ok(Action::await_change())
1602 /// },
1603 /// |_, err: &Infallible, _| Err(err).unwrap(),
1604 /// Arc::new(()),
1605 /// );
1606 /// # };
1607 /// ```
1608 ///
1609 /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1610 /// as soon as *any* [`Future`] resolves.
1611 #[must_use]
1612 pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1613 self.graceful_shutdown_selector.push(trigger.boxed());
1614 self
1615 }
1616
1617 /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1618 ///
1619 /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1620 /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1621 ///
1622 /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1623 /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1624 /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1625 /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1626 ///
1627 /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1628 /// [`Controller::graceful_shutdown_on`].
1629 ///
1630 /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1631 /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1632 /// unless you run [`std::process::exit`] afterwards.
1633 #[must_use]
1634 pub fn shutdown_on_signal(mut self) -> Self {
1635 async fn shutdown_signal() {
1636 futures::future::select(
1637 tokio::signal::ctrl_c().map(|_| ()).boxed(),
1638 #[cfg(unix)]
1639 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1640 .unwrap()
1641 .recv()
1642 .map(|_| ())
1643 .boxed(),
1644 // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1645 #[cfg(not(unix))]
1646 futures::future::pending::<()>(),
1647 )
1648 .await;
1649 }
1650
1651 let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1652 self.graceful_shutdown_selector
1653 .push(graceful_rx.map(|_| ()).boxed());
1654 self.forceful_shutdown_selector.push(
1655 async {
1656 tracing::info!("press ctrl+c to shut down gracefully");
1657 shutdown_signal().await;
1658 if let Ok(()) = graceful_tx.send(()) {
1659 tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1660 } else {
1661 tracing::info!(
1662 "graceful shutdown already requested, press ctrl+c again to force shutdown"
1663 );
1664 }
1665 shutdown_signal().await;
1666 tracing::info!("forced shutdown requested");
1667 }
1668 .boxed(),
1669 );
1670 self
1671 }
1672
1673 /// Consume all the parameters of the Controller and start the applier stream
1674 ///
1675 /// This creates a stream from all builder calls and starts an applier with
1676 /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1677 /// with a configurable `context`.
1678 pub fn run<ReconcilerFut, Ctx>(
1679 self,
1680 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1681 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1682 context: Arc<Ctx>,
1683 ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1684 where
1685 K::DynamicType: Debug + Unpin,
1686 ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1687 ReconcilerFut::Error: std::error::Error + Send + 'static,
1688 {
1689 applier(
1690 move |obj, ctx| {
1691 CancelableJoinHandle::spawn(
1692 TryFutureExt::into_future(reconciler(obj, ctx)).in_current_span(),
1693 &Handle::current(),
1694 )
1695 },
1696 error_policy,
1697 context,
1698 self.reader,
1699 StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1700 .take_until(future::select_all(self.graceful_shutdown_selector)),
1701 self.config,
1702 )
1703 .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1704 }
1705}
1706
1707#[cfg(test)]
1708mod tests {
1709 use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1710
1711 use super::{APPLIER_REQUEUE_BUF_SIZE, Action};
1712 use crate::{
1713 Config, Controller, applier,
1714 reflector::{self, ObjectRef},
1715 watcher::{self, Event, metadata_watcher, watcher},
1716 };
1717 use futures::{Stream, StreamExt, TryStreamExt};
1718 use k8s_openapi::api::core::v1::ConfigMap;
1719 use kube_client::{Api, Resource, core::ObjectMeta};
1720 use serde::de::DeserializeOwned;
1721 use tokio::time::timeout;
1722
1723 fn assert_send<T: Send>(x: T) -> T {
1724 x
1725 }
1726
1727 // Used to typecheck that a type T is a generic type that implements Stream
1728 // and returns a WatchEvent generic over a resource `K`
1729 fn assert_stream<T, K>(x: T) -> T
1730 where
1731 T: Stream<Item = watcher::Result<Event<K>>> + Send,
1732 K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1733 {
1734 x
1735 }
1736
1737 fn mock_type<T>() -> T {
1738 unimplemented!(
1739 "mock_type is not supposed to be called, only used for filling holes in type assertions"
1740 )
1741 }
1742
1743 // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1744 #[allow(dead_code, unused_must_use)]
1745 fn test_controller_should_be_send() {
1746 assert_send(
1747 Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1748 |_, _| async { Ok(mock_type::<Action>()) },
1749 |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1750 Arc::new(()),
1751 ),
1752 );
1753 }
1754
1755 // not #[test] because we don't want to actually run it, we just want to
1756 // assert that it typechecks
1757 //
1758 // will check return types for `watcher` and `watch_metadata` do not drift
1759 // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1760 #[allow(dead_code, unused_must_use)]
1761 fn test_watcher_stream_type_drift() {
1762 assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1763 assert_stream(metadata_watcher(
1764 mock_type::<Api<ConfigMap>>(),
1765 Default::default(),
1766 ));
1767 }
1768
1769 #[tokio::test]
1770 async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1771 // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1772 // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1773
1774 // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1775 // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1776 let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1777 // Assume that everything's OK if we can reconcile every object 3 times on average
1778 let reconciles = items * 3;
1779
1780 let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1781 let (store_rx, mut store_tx) = reflector::store();
1782 let mut applier = pin!(applier(
1783 |_obj, _| {
1784 Box::pin(async move {
1785 // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1786 //println!("reconciling {:?}", obj.metadata.name);
1787 Ok(Action::requeue(Duration::ZERO))
1788 })
1789 },
1790 |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1791 Arc::new(()),
1792 store_rx,
1793 queue_rx.map(Result::<_, Infallible>::Ok),
1794 Config::default(),
1795 ));
1796 store_tx.apply_watcher_event(&watcher::Event::InitDone);
1797 for i in 0..items {
1798 let obj = ConfigMap {
1799 metadata: ObjectMeta {
1800 name: Some(format!("cm-{i}")),
1801 namespace: Some("default".to_string()),
1802 ..Default::default()
1803 },
1804 ..Default::default()
1805 };
1806 store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1807 queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1808 }
1809
1810 timeout(
1811 Duration::from_secs(10),
1812 applier
1813 .as_mut()
1814 .take(reconciles)
1815 .try_for_each(|_| async { Ok(()) }),
1816 )
1817 .await
1818 .expect("test timeout expired, applier likely deadlocked")
1819 .unwrap();
1820
1821 // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1822 drop(queue_tx);
1823 timeout(
1824 Duration::from_secs(10),
1825 applier.try_for_each(|_| async { Ok(()) }),
1826 )
1827 .await
1828 .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1829 .unwrap();
1830 }
1831}