pub trait WatchStreamExt: Stream {
    // Provided methods
    fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
       where Self: TryStream + Sized { ... }
    fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
       where B: Backoff,
             Self: TryStream + Sized { ... }
    fn applied_objects<K>(self) -> EventFlatten<Self, K>
       where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
    fn touched_objects<K>(self) -> EventFlatten<Self, K>
       where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
    fn modify<F, K>(self, f: F) -> EventModify<Self, F>
       where Self: Stream<Item = Result<Event<K>, Error>> + Sized,
             F: FnMut(&mut K) { ... }
    fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
       where Self: Stream<Item = Result<K, Error>> + Sized,
             K: Resource + 'static,
             P: Predicate<K> + 'static { ... }
    fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
       where Self: Stream<Item = Result<Event<K>, Error>> + Send + Sized + 'static { ... }
}
Available on crate feature runtime only.
Expand description

Extension trait for streams returned by watcher or reflector

Provided Methods§

source

fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>where Self: TryStream + Sized,

Apply the DefaultBackoff watcher [Backoff] policy

This is recommended for controllers that want to play nicely with the apiserver.

source

fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>where B: Backoff, Self: TryStream + Sized,

Apply a specific [Backoff] policy to a [Stream] using StreamBackoff

source

fn applied_objects<K>(self) -> EventFlatten<Self, K>where Self: Stream<Item = Result<Event<K>, Error>> + Sized,

Flatten a watcher() stream into a stream of applied objects

All Added/Modified events are passed through, and critical errors bubble up.

source

fn touched_objects<K>(self) -> EventFlatten<Self, K>where Self: Stream<Item = Result<Event<K>, Error>> + Sized,

Flatten a watcher() stream into a stream of touched objects

All Added/Modified/Deleted events are passed through, and critical errors bubble up.

source

fn modify<F, K>(self, f: F) -> EventModify<Self, F>where Self: Stream<Item = Result<Event<K>, Error>> + Sized, F: FnMut(&mut K),

Modify elements of a watcher() stream.

Calls watcher::Event::modify() on every element. Stream shorthand for stream.map_ok(|event| { event.modify(f) }).

let deploys: Api<Deployment> = Api::all(client);
let truncated_deploy_stream = watcher(deploys, watcher::Config::default())
    .modify(|deploy| {
        deploy.managed_fields_mut().clear();
        deploy.status = None;
    })
    .applied_objects();
pin_mut!(truncated_deploy_stream);

while let Some(d) = truncated_deploy_stream.try_next().await? {
   println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
}
source

fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>where Self: Stream<Item = Result<K, Error>> + Sized, K: Resource + 'static, P: Predicate<K> + 'static,

Filter out a flattened stream on predicates.

This will filter out repeat calls where the predicate returns the same result. Common use case for this is to avoid repeat events for status updates by filtering on predicates::generation.

NB: This is constructor requires an unstable feature.

Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, predicates};
use k8s_openapi::api::apps::v1::Deployment;
let deploys: Api<Deployment> = Api::default_namespaced(client);
let changed_deploys = watcher(deploys, watcher::Config::default())
    .applied_objects()
    .predicate_filter(predicates::generation);
pin_mut!(changed_deploys);

while let Some(d) = changed_deploys.try_next().await? {
   println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
}
source

fn stream_subscribe<K>(self) -> StreamSubscribe<Self>where Self: Stream<Item = Result<Event<K>, Error>> + Send + Sized + 'static,

Create a StreamSubscribe from a watcher() stream.

The StreamSubscribe::subscribe() method which allows additional consumers of events from a stream without consuming the stream itself.

If a subscriber begins to lag behind the stream, it will receive an Error::Lagged error. The subscriber can then decide to abort its task or tolerate the lost events.

If the [Stream] is dropped or ends, any StreamSubscribe::subscribe() streams will also end.

NB: This is constructor requires an unstable feature.

Warning

If the primary [Stream] is not polled, the StreamSubscribe::subscribe() streams will never receive any events.

Usage
use futures::{Stream, StreamExt};
use std::{fmt::Debug, sync::Arc};
use kube_runtime::{watcher, WatchStreamExt};

fn explain_events<K, S>(
    stream: S,
) -> (
    impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
    impl Stream<Item = String> + Send + Sized + 'static,
)
where
    K: Debug + Send + Sync + 'static,
    S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
{
    // Create a stream that can be subscribed to
    let stream_subscribe = stream.stream_subscribe();
    // Create a subscription to that stream
    let subscription = stream_subscribe.subscribe();

    // Create a stream of descriptions of the events
    let explain_stream = subscription.filter_map(|event| async move {
        // We don't care about lagged events so we can throw that error away
        match event.ok()?.as_ref() {
            Ok(watcher::Event::Applied(event)) => {
                Some(format!("An object was added or modified: {event:?}"))
            }
            Ok(_) => todo!("explain other events"),
            // We don't care about watcher errors either
            Err(_) => None,
        }
    });

    // We now still have the original stream, and a secondary stream of explanations
    (stream_subscribe, explain_stream)
}

Implementors§

source§

impl<St> WatchStreamExt for Stwhere St: Stream + ?Sized,