Struct kube::runtime::Controller

source ·
pub struct Controller<K>
where K: Clone + Resource + Debug + 'static, <K as Resource>::DynamicType: Eq + Hash,
{ /* private fields */ }
Available on crate feature runtime only.
Expand description

Controller for a Resource K

A controller is an infinite stream of objects to be reconciled.

Once run and continuously awaited, it continuously calls out to user provided reconcile and error_policy callbacks whenever relevant changes are detected or if errors are seen from reconcile.

Reconciles are generally requested for all changes on your root objects. Changes to managed child resources will also trigger the reconciler for the managing object by traversing owner references (for Controller::owns), or traverse a custom mapping (for Controller::watches).

This mapping mechanism ultimately hides the reason for the reconciliation request, and forces you to write an idempotent reconciler.

General setup:

use kube::{Api, Client, CustomResource};
use kube::runtime::{controller::{Controller, Action}, watcher};
use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use schemars::JsonSchema;
use thiserror::Error;

#[derive(Debug, Error)]
enum Error {}

/// A custom resource
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
struct ConfigMapGeneratorSpec {
    content: String,
}

/// The reconciler that will be called when either object change
async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
    // .. use api here to reconcile a child ConfigMap with ownerreferences
    // see configmapgen_controller example for full info
    Ok(Action::requeue(Duration::from_secs(300)))
}
/// an error handler that will be called when the reconciler fails with access to both the
/// object that caused the failure and the actual error
fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
    Action::requeue(Duration::from_secs(60))
}

/// something to drive the controller
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::try_default().await?;
    let context = Arc::new(()); // bad empty context - put client in here
    let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
    let cms = Api::<ConfigMap>::all(client.clone());
    Controller::new(cmgs, watcher::Config::default())
        .owns(cms, watcher::Config::default())
        .run(reconcile, error_policy, context)
        .for_each(|res| async move {
            match res {
                Ok(o) => println!("reconciled {:?}", o),
                Err(e) => println!("reconcile failed: {:?}", e),
            }
        })
        .await; // controller does nothing unless polled
    Ok(())
}

Implementations§

source§

impl<K> Controller<K>
where K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static, <K as Resource>::DynamicType: Eq + Hash + Clone,

source

pub fn new(main_api: Api<K>, wc: Config) -> Controller<K>
where <K as Resource>::DynamicType: Default,

Create a Controller for a resource K

Takes an Api object that determines how the Controller listens for changes to the K.

The watcher::Config controls to the possible subset of objects of K that you want to manage and receive reconcile events for. For the full set of objects K in the given Api scope, you can use watcher::Config::default.

source

pub fn new_with( main_api: Api<K>, wc: Config, dyntype: <K as Resource>::DynamicType ) -> Controller<K>

Create a Controller for a resource K

Takes an Api object that determines how the Controller listens for changes to the K.

The watcher::Config lets you define a possible subset of objects of K that you want the Api to watch - in the Api’s configured scope - and receive reconcile events for. For the full set of objects K in the given Api scope, you can use Config::default.

This variant constructor is for dynamic types found through discovery. Prefer Controller::new for static types.

source

pub fn for_stream( trigger: impl Stream<Item = Result<K, Error>> + Send + 'static, reader: Store<K> ) -> Controller<K>
where <K as Resource>::DynamicType: Default,

Create a Controller for a resource K from a stream of K objects

Same as Controller::new, but instead of an Api, a stream of resources is used. This allows for customized and pre-filtered watch streams to be used as a trigger, as well as sharing input streams between multiple controllers.

NB: This is constructor requires an unstable feature.

§Example:
let api: Api<Deployment> = Api::default_namespaced(client);
let (reader, writer) = reflector::store();
let deploys = watcher(api, watcher::Config::default())
    .default_backoff()
    .reflect(writer)
    .applied_objects()
    .predicate_filter(predicates::generation);

Controller::for_stream(deploys, reader)
    .run(reconcile, error_policy, Arc::new(()))
    .for_each(|_| std::future::ready(()))
    .await;

Prefer Controller::new if you do not need to share the stream, or do not need pre-filtering.

source

pub fn for_stream_with( trigger: impl Stream<Item = Result<K, Error>> + Send + 'static, reader: Store<K>, dyntype: <K as Resource>::DynamicType ) -> Controller<K>

Create a Controller for a resource K from a stream of K objects

Same as Controller::new, but instead of an Api, a stream of resources is used. This allows for customized and pre-filtered watch streams to be used as a trigger, as well as sharing input streams between multiple controllers.

NB: This is constructor requires an unstable feature.

Prefer Controller::new if you do not need to share the stream, or do not need pre-filtering.

This variant constructor is for dynamic types found through discovery. Prefer Controller::for_stream for static types.

source

pub fn with_config(self, config: Config) -> Controller<K>

Specify the configuration for the controller’s behavior.

source

pub fn trigger_backoff( self, backoff: impl Backoff + Send + 'static ) -> Controller<K>

Specify the backoff policy for “trigger” watches

This includes the core watch, as well as auxilary watches introduced by Self::owns and Self::watches.

The default_backoff follows client-go conventions, but can be overridden by calling this method.

source

pub fn store(&self) -> Store<K>

Retrieve a copy of the reader before starting the controller

source

pub fn owns<Child>(self, api: Api<Child>, wc: Config) -> Controller<K>
where Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static,

Specify Child objects which K owns and should be watched

Takes an Api object that determines how the Controller listens for changes to the Child. All owned Child objects must contain an OwnerReference pointing back to a K.

The watcher::Config controls the subset of Child objects that you want the Api to watch - in the Api’s configured scope - and receive reconcile events for. To watch the full set of Child objects in the given Api scope, you can use watcher::Config::default.

source

pub fn owns_with<Child>( self, api: Api<Child>, dyntype: <Child as Resource>::DynamicType, wc: Config ) -> Controller<K>
where Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static, <Child as Resource>::DynamicType: Debug + Eq + Hash + Clone,

Specify Child objects which K owns and should be watched

Same as Controller::owns, but accepts a DynamicType so it can be used with dynamic resources.

source

pub fn owns_stream<Child>( self, trigger: impl Stream<Item = Result<Child, Error>> + Send + 'static ) -> Controller<K>
where Child: Resource<DynamicType = ()> + Send + 'static,

Trigger the reconciliation process for a stream of Child objects of the owner K

Same as Controller::owns, but instead of an Api, a stream of resources is used. This allows for customized and pre-filtered watch streams to be used as a trigger, as well as sharing input streams between multiple controllers.

NB: This is constructor requires an unstable feature.

Watcher streams passed in here should be filtered first through touched_objects.

§Example:
let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
    .touched_objects()
    .predicate_filter(predicates::generation);

Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
    .owns_stream(sts_stream)
    .run(reconcile, error_policy, Arc::new(()))
    .for_each(|_| std::future::ready(()))
    .await;
source

pub fn owns_stream_with<Child>( self, trigger: impl Stream<Item = Result<Child, Error>> + Send + 'static, dyntype: <Child as Resource>::DynamicType ) -> Controller<K>
where Child: Resource + Send + 'static, <Child as Resource>::DynamicType: Debug + Eq + Hash + Clone,

Trigger the reconciliation process for a stream of Child objects of the owner K

Same as Controller::owns, but instead of an Api, a stream of resources is used. This allows for customized and pre-filtered watch streams to be used as a trigger, as well as sharing input streams between multiple controllers.

NB: This is constructor requires an unstable feature.

Same as Controller::owns_stream, but accepts a DynamicType so it can be used with dynamic resources.

source

pub fn watches<Other, I>( self, api: Api<Other>, wc: Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static ) -> Controller<K>
where Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, <Other as Resource>::DynamicType: Default + Debug + Clone + Eq + Hash, I: 'static + IntoIterator<Item = ObjectRef<K>>, <I as IntoIterator>::IntoIter: Send,

Specify Watched object which K has a custom relation to and should be watched

To define the Watched relation with K, you must define a custom relation mapper, which, when given a Watched object, returns an option or iterator of relevant ObjectRef<K> to reconcile.

If the relation K has to Watched is that K owns Watched, consider using Controller::owns.

Takes an Api object that determines how the Controller listens for changes to the Watched.

The watcher::Config controls the subset of Watched objects that you want the Api to watch - in the Api’s configured scope - and run through the custom mapper. To watch the full set of Watched objects in given the Api scope, you can use watcher::Config::default.

§Example

Tracking cross cluster references using the Operator-SDK annotations.

Controller::new(memcached, watcher::Config::default())
    .watches(
        Api::<WatchedResource>::all(client.clone()),
        watcher::Config::default(),
        |ar| {
            let prt = ar
                .annotations()
                .get("operator-sdk/primary-resource-type")
                .map(String::as_str);

            if prt != Some("Memcached.cache.example.com") {
                return None;
            }

            let (namespace, name) = ar
                .annotations()
                .get("operator-sdk/primary-resource")?
                .split_once('/')?;

            Some(ObjectRef::new(name).within(namespace))
        }
    )
    .run(reconcile, error_policy, context)
    .for_each(|_| futures::future::ready(()))
    .await;
source

pub fn watches_with<Other, I>( self, api: Api<Other>, dyntype: <Other as Resource>::DynamicType, wc: Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static ) -> Controller<K>
where Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>, <I as IntoIterator>::IntoIter: Send, <Other as Resource>::DynamicType: Debug + Clone + Eq + Hash,

Specify Watched object which K has a custom relation to and should be watched

Same as Controller::watches, but accepts a DynamicType so it can be used with dynamic resources.

source

pub fn watches_stream<Other, I>( self, trigger: impl Stream<Item = Result<Other, Error>> + Send + 'static, mapper: impl Fn(Other) -> I + Sync + Send + 'static ) -> Controller<K>
where Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, <Other as Resource>::DynamicType: Default + Debug + Clone, I: 'static + IntoIterator<Item = ObjectRef<K>>, <I as IntoIterator>::IntoIter: Send,

Trigger the reconciliation process for a stream of Other objects related to a K

Same as Controller::watches, but instead of an Api, a stream of resources is used. This allows for customized and pre-filtered watch streams to be used as a trigger, as well as sharing input streams between multiple controllers.

NB: This is constructor requires an unstable feature.

Watcher streams passed in here should be filtered first through touched_objects.

§Example:
fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
let api: Api<DaemonSet> = Api::all(client.clone());
let cr: Api<CustomResource> = Api::all(client.clone());
let daemons = watcher(api, watcher::Config::default())
    .touched_objects()
    .predicate_filter(predicates::generation);

Controller::new(cr, watcher::Config::default())
    .watches_stream(daemons, mapper)
    .run(reconcile, error_policy, Arc::new(()))
    .for_each(|_| std::future::ready(()))
    .await;
source

pub fn watches_stream_with<Other, I>( self, trigger: impl Stream<Item = Result<Other, Error>> + Send + 'static, mapper: impl Fn(Other) -> I + Sync + Send + 'static, dyntype: <Other as Resource>::DynamicType ) -> Controller<K>
where Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, <Other as Resource>::DynamicType: Debug + Clone, I: 'static + IntoIterator<Item = ObjectRef<K>>, <I as IntoIterator>::IntoIter: Send,

Trigger the reconciliation process for a stream of Other objects related to a K

Same as Controller::owns, but instead of an Api, a stream of resources is used. This allows for customized and pre-filtered watch streams to be used as a trigger, as well as sharing input streams between multiple controllers.

NB: This is constructor requires an unstable feature.

Same as Controller::watches_stream, but accepts a DynamicType so it can be used with dynamic resources.

source

pub fn reconcile_all_on( self, trigger: impl Stream<Item = ()> + Send + Sync + 'static ) -> Controller<K>

Trigger a reconciliation for all managed objects whenever trigger emits a value

For example, this can be used to reconcile all objects whenever the controller’s configuration changes.

To reconcile all objects when a new line is entered:

use futures::stream::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
    Client,
    api::{Api, ResourceExt},
    runtime::{
        controller::{Controller, Action},
        watcher,
    },
};
use std::{convert::Infallible, io::BufRead, sync::Arc};
let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
// Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
// and its worker prevents the Tokio runtime from shutting down.
std::thread::spawn(move || {
    for _ in std::io::BufReader::new(std::io::stdin()).lines() {
        let _ = reload_tx.try_send(());
    }
});
Controller::new(
    Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
    watcher::Config::default(),
)
.reconcile_all_on(reload_rx.map(|_| ()))
.run(
    |o, _| async move {
        println!("Reconciling {}", o.name_any());
        Ok(Action::await_change())
    },
    |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
    Arc::new(()),
);

This can be called multiple times, in which case they are additive; reconciles are scheduled whenever any [Stream] emits a new item.

If a [Stream] is terminated (by emitting None) then the Controller keeps running, but the [Stream] stops being polled.

source

pub fn reconcile_on( self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static ) -> Controller<K>

Trigger the reconciliation process for a managed object ObjectRef<K> whenever trigger emits a value

This can be used to inject reconciliations for specific objects from an external resource.

§Example:
struct ExternalObject {
    name: String,
}
let external_stream = watch_external_objects().map(|ext| {
    ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
});

Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
    .reconcile_on(external_stream)
    .run(reconcile, error_policy, Arc::new(()))
    .for_each(|_| future::ready(()))
    .await;
source

pub fn graceful_shutdown_on( self, trigger: impl Future<Output = ()> + Send + Sync + 'static ) -> Controller<K>

Start a graceful shutdown when trigger resolves. Once a graceful shutdown has been initiated:

  • No new reconciliations are started from the scheduler
  • The underlying Kubernetes watch is terminated
  • All running reconciliations are allowed to finish
  • Controller::run’s [Stream] terminates once all running reconciliations are done.

For example, to stop the reconciler whenever the user presses Ctrl+C:

use futures::future::FutureExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{Api, Client, ResourceExt};
use kube_runtime::{
    controller::{Controller, Action},
    watcher,  
};
use std::{convert::Infallible, sync::Arc};
Controller::new(
    Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
    watcher::Config::default(),
)
.graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
.run(
    |o, _| async move {
        println!("Reconciling {}", o.name_any());
        Ok(Action::await_change())
    },
    |_, err: &Infallible, _| Err(err).unwrap(),
    Arc::new(()),
);

This can be called multiple times, in which case they are additive; the Controller starts to terminate as soon as any Future resolves.

source

pub fn shutdown_on_signal(self) -> Controller<K>

Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.

Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).

NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the Controller has terminated. If you run this in a process containing more tasks than just the Controller, ensure that all other tasks either terminate when the Controller does, that they have their own signal handlers, or use Controller::graceful_shutdown_on to manage your own shutdown strategy.

NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into Controller::graceful_shutdown_on.

NOTE: Controller::run terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running in the background while they terminate. This will block [tokio::runtime::Runtime] termination until they actually terminate, unless you run std::process::exit afterwards.

source

pub fn run<ReconcilerFut, Ctx>( self, reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut, error_policy: impl Fn(Arc<K>, &<ReconcilerFut as TryFuture>::Error, Arc<Ctx>) -> Action, context: Arc<Ctx> ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<<ReconcilerFut as TryFuture>::Error, Error>>>
where <K as Resource>::DynamicType: Debug + Unpin, ReconcilerFut: TryFuture<Ok = Action> + Send + 'static, <ReconcilerFut as TryFuture>::Error: Error + Send + 'static,

Consume all the parameters of the Controller and start the applier stream

This creates a stream from all builder calls and starts an applier with a specified reconciler and error_policy callbacks. Each of these will be called with a configurable context.

Auto Trait Implementations§

§

impl<K> !RefUnwindSafe for Controller<K>

§

impl<K> Send for Controller<K>
where K: Sync + Send,

§

impl<K> !Sync for Controller<K>

§

impl<K> Unpin for Controller<K>
where <K as Resource>::DynamicType: Unpin,

§

impl<K> !UnwindSafe for Controller<K>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more