Struct kube_runtime::controller::Controller
source · [−]pub struct Controller<K> where
K: Clone + Resource + Debug + 'static,
K::DynamicType: Eq + Hash, { /* private fields */ }Expand description
Controller
A controller is made up of:
- 1
reflector(for the core object) - N
watcherobjects for each object child object - user defined
reconcile+error_policycallbacks - a generated input stream considering all sources
And all reconcile requests through an internal scheduler
Pieces:
use kube::{
Client, CustomResource,
api::{Api, ListParams},
runtime::controller::{Context, Controller, Action}
};
use serde::{Deserialize, Serialize};
use tokio::time::Duration;
use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use schemars::JsonSchema;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {}
/// A custom resource
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
struct ConfigMapGeneratorSpec {
content: String,
}
/// The reconciler that will be called when either object change
async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Context<()>) -> Result<Action, Error> {
// .. use api here to reconcile a child ConfigMap with ownerreferences
// see configmapgen_controller example for full info
Ok(Action::requeue(Duration::from_secs(300)))
}
/// an error handler that will be called when the reconciler fails
fn error_policy(_error: &Error, _ctx: Context<()>) -> Action {
Action::requeue(Duration::from_secs(60))
}
/// something to drive the controller
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let context = Context::new(()); // bad empty context - put client in here
let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
let cms = Api::<ConfigMap>::all(client.clone());
Controller::new(cmgs, ListParams::default())
.owns(cms, ListParams::default())
.run(reconcile, error_policy, context)
.for_each(|res| async move {
match res {
Ok(o) => println!("reconciled {:?}", o),
Err(e) => println!("reconcile failed: {:?}", e),
}
})
.await; // controller does nothing unless polled
Ok(())
}Implementations
sourceimpl<K> Controller<K> where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
impl<K> Controller<K> where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
sourcepub fn new(owned_api: Api<K>, lp: ListParams) -> Self where
K::DynamicType: Default,
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self where
K::DynamicType: Default,
Create a Controller on a type K
Takes an Api object that determines how the Controller listens for changes to the K.
The [ListParams] controls to the possible subset of objects of K that you want to manage
and receive reconcile events for.
For the full set of objects K in the given Api scope, you can use [ListParams::default].
sourcepub fn new_with(
owned_api: Api<K>,
lp: ListParams,
dyntype: K::DynamicType
) -> Self
pub fn new_with(
owned_api: Api<K>,
lp: ListParams,
dyntype: K::DynamicType
) -> Self
Create a Controller on a type K
Takes an Api object that determines how the Controller listens for changes to the K.
The ListParams lets you define a possible subset of objects of K that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
For the full set of objects K in the given Api scope, you can use ListParams::default.
This variant constructor is for dynamic types found through discovery. Prefer Controller::new for static types.
sourcepub fn trigger_backoff(self, backoff: impl Backoff + Send + 'static) -> Self
pub fn trigger_backoff(self, backoff: impl Backoff + Send + 'static) -> Self
Specify the backoff policy for “trigger” watches
This includes the core watch, as well as auxilary watches introduced by Self::owns and Self::watches.
The default_backoff follows client-go conventions,
but can be overridden by calling this method.
sourcepub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
lp: ListParams
) -> Self
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
lp: ListParams
) -> Self
Specify Child objects which K owns and should be watched
Takes an Api object that determines how the Controller listens for changes to the Child.
All owned Child objects must contain an OwnerReference pointing back to a K.
The [ListParams] refer to the possible subset of Child objects that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
To watch the full set of Child objects in the given Api scope, you can use [ListParams::default].
sourcepub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
dyntype: Child::DynamicType,
lp: ListParams
) -> Self where
Child::DynamicType: Debug + Eq + Hash + Clone,
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
dyntype: Child::DynamicType,
lp: ListParams
) -> Self where
Child::DynamicType: Debug + Eq + Hash + Clone,
Specify Child objects which K owns and should be watched
Same as Controller::owns, but accepts a DynamicType so it can be used with dynamic resources.
sourcepub fn watches<Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
pub fn watches<Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Specify Watched object which K has a custom relation to and should be watched
To define the Watched relation with K, you must define a custom relation mapper, which,
when given a Watched object, returns an option or iterator of relevant ObjectRef<K> to reconcile.
If the relation K has to Watched is that K owns Watched, consider using Controller::owns.
Takes an Api object that determines how the Controller listens for changes to the Watched.
The [ListParams] refer to the possible subset of Watched objects that you want the Api
to watch - in the Api’s configured scope - and run through the custom mapper.
To watch the full set of Watched objects in given the Api scope, you can use [ListParams::default].
sourcepub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Other::DynamicType: Clone,
pub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Other::DynamicType: Clone,
Specify Watched object which K has a custom relation to and should be watched
Same as Controller::watches, but accepts a DynamicType so it can be used with dynamic resources.
sourcepub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static
) -> Self
pub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static
) -> Self
Trigger a reconciliation for all managed objects whenever trigger emits a value
For example, this can be used to reconcile all objects whenever the controller’s configuration changes.
To reconcile all objects when a new line is entered:
use futures::stream::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
Client,
api::{ListParams, Api, ResourceExt},
runtime::{controller::{Context, Controller, Action}},
};
use std::{convert::Infallible, io::BufRead};
let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
// Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
// and its worker prevents the Tokio runtime from shutting down.
std::thread::spawn(move || {
for _ in std::io::BufReader::new(std::io::stdin()).lines() {
let _ = reload_tx.try_send(());
}
});
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
ListParams::default(),
)
.reconcile_all_on(reload_rx.map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name());
Ok(Action::await_change())
},
|err: &Infallible, _| Err(err).unwrap(),
Context::new(()),
);This can be called multiple times, in which case they are additive; reconciles are scheduled whenever any [Stream] emits a new item.
If a [Stream] is terminated (by emitting None) then the Controller keeps running, but the [Stream] stops being polled.
sourcepub fn graceful_shutdown_on(
self,
trigger: impl Future<Output = ()> + Send + Sync + 'static
) -> Self
pub fn graceful_shutdown_on(
self,
trigger: impl Future<Output = ()> + Send + Sync + 'static
) -> Self
Start a graceful shutdown when trigger resolves. Once a graceful shutdown has been initiated:
- No new reconciliations are started from the scheduler
- The underlying Kubernetes watch is terminated
- All running reconciliations are allowed to finish
Controller::run’s [Stream] terminates once all running reconciliations are done.
For example, to stop the reconciler whenever the user presses Ctrl+C:
use futures::future::FutureExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{api::ListParams, Api, Client, ResourceExt};
use kube_runtime::controller::{Context, Controller, Action};
use std::convert::Infallible;
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
ListParams::default(),
)
.graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name());
Ok(Action::await_change())
},
|err: &Infallible, _| Err(err).unwrap(),
Context::new(()),
);This can be called multiple times, in which case they are additive; the Controller starts to terminate
as soon as any Future resolves.
sourcepub fn shutdown_on_signal(self) -> Self
pub fn shutdown_on_signal(self) -> Self
Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the Controller has
terminated. If you run this in a process containing more tasks than just the Controller, ensure that
all other tasks either terminate when the Controller does, that they have their own signal handlers,
or use Controller::graceful_shutdown_on to manage your own shutdown strategy.
NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
Controller::graceful_shutdown_on.
NOTE: Controller::run terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
in the background while they terminate. This will block tokio::runtime::Runtime termination until they actually terminate,
unless you run std::process::exit afterwards.
sourcepub fn run<ReconcilerFut, T>(
self,
reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, Error>>> where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: Error + Send + 'static,
pub fn run<ReconcilerFut, T>(
self,
reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, Error>>> where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: Error + Send + 'static,
Consume all the parameters of the Controller and start the applier stream
This creates a stream from all builder calls and starts an applier with
a specified reconciler and error_policy callbacks. Each of these will be called
with a configurable Context.
Auto Trait Implementations
impl<K> !RefUnwindSafe for Controller<K>
impl<K> Send for Controller<K> where
K: Send + Sync,
impl<K> !Sync for Controller<K>
impl<K> Unpin for Controller<K> where
<K as Resource>::DynamicType: Unpin,
impl<K> !UnwindSafe for Controller<K>
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
impl<V, T> VZip<V> for T where
V: MultiLane<T>,
impl<V, T> VZip<V> for T where
V: MultiLane<T>,
fn vzip(self) -> V
sourceimpl<T> WithSubscriber for T
impl<T> WithSubscriber for T
sourcefn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
Attaches the provided Subscriber to this type, returning a
WithDispatch wrapper. Read more
sourcefn with_current_subscriber(self) -> WithDispatch<Self>
fn with_current_subscriber(self) -> WithDispatch<Self>
Attaches the current default Subscriber to this type, returning a
WithDispatch wrapper. Read more