pub fn reflector<K, W>(
writer: Writer<K>,
stream: W,
) -> impl Stream<Item = <W as Stream>::Item>
runtime
only.Expand description
Cache objects from a watcher()
stream into a local Store
Observes the raw Stream
of watcher::Event
objects, and modifies the cache.
It passes the raw watcher()
stream through unmodified.
§Usage
Create a Store
through e.g. store::store()
. The writer
part is not-clonable,
and must be moved into the reflector. The reader
part is the Store
interface
that you can send to other parts of your program as state.
The cache contains the last-seen state of objects, which may lag slightly behind the actual state.
§Example
Infinite watch of Node
resources with a certain label.
The reader
part being passed around to a webserver is omitted.
For examples see version-rs for integration with axum,
or controller-rs for the similar controller integration with actix-web.
use std::future::ready;
use k8s_openapi::api::core::v1::Node;
use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
use futures::StreamExt;
let nodes: Api<Node> = Api::all(client);
let node_filter = Config::default().labels("kubernetes.io/arch=amd64");
let (reader, writer) = reflector::store();
// Create the infinite reflector stream
let rf = reflector(writer, watcher(nodes, node_filter));
// !!! pass reader to your webserver/manager as state !!!
// Poll the stream (needed to keep the store up-to-date)
let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) });
infinite_watch.await;
§Memory Usage
A reflector often constitutes one of the biggest components of a controller’s memory use. Given a ~2000 pods cluster, a reflector saving everything (including injected sidecars, managed fields) can quickly consume a couple of hundred megabytes or more, depending on how much of this you are storing.
While generally acceptable, there are techniques you can leverage to reduce the memory usage depending on your use case.
- Reflect a
PartialObjectMeta<K>
stream rather than a stream ofK
You can send in a metadata_watcher()
for a type rather than a watcher()
,
and this can drop your memory usage by more than a factor of two,
depending on the size of K
. 60% reduction seen for Pod
. Usage is otherwise identical.
- Use
modify
the rawwatcher::Event
object stream to clear unneeded properties
For instance, managed fields typically constitutes around half the size of ObjectMeta
and can often be dropped:
let stream = watcher(api, Default::default()).map_ok(|ev| {
ev.modify(|pod| {
pod.managed_fields_mut().clear();
pod.annotations_mut().clear();
pod.status = None;
})
});
The stream
can then be passed to reflector
causing smaller objects to be written to its store.
Note that you cannot drop everything; you minimally need the spec properties your app relies on.
Additionally, only labels
, annotations
and managed_fields
are safe to drop from ObjectMeta
.
For more information check out: https://kube.rs/controllers/optimization/ for graphs and techniques.
§Stream sharing
reflector()
as an interface may optionally create a stream that can be
shared with other components to help with resource usage.
To share a stream, the Writer<K>
consumed by reflector()
must be
created through an interface that allows a store to be subscribed on, such
as store_shared()
. When the store supports being subscribed on, it will
broadcast an event to all active listeners after caching any object
contained in the event.
Creating subscribers requires an
unstable
feature