Function kube_runtime::watcher::watcher[][src]

pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
    api: Api<K>,
    list_params: ListParams
) -> impl Stream<Item = Result<Event<K>>> + Send
Expand description

Watches a Kubernetes Resource for changes continuously

Compared to Api::watch, this automatically tries to recover the stream upon errors.

Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll. You can apply your own backoff by not polling the stream for a duration after errors. Keep in mind that some TryStream combinators (such as try_for_each and try_concat) will terminate eagerly as soon as they receive an Err.

This is intended to provide a safe and atomic input interface for a state store like a reflector, direct users may want to flatten composite events with try_flatten_applied:

use kube::{
  api::{Api, ListParams, ResourceExt}, Client,
  runtime::{utils::try_flatten_applied, watcher}
};
use k8s_openapi::api::core::v1::Pod;
use futures::{StreamExt, TryStreamExt};
#[tokio::main]
async fn main() -> Result<(), watcher::Error> {
    let client = Client::try_default().await.unwrap();
    let pods: Api<Pod> = Api::namespaced(client, "apps");
    let watcher = watcher(pods, ListParams::default());
    try_flatten_applied(watcher)
        .try_for_each(|p| async move {
         println!("Applied: {}", p.name());
            Ok(())
        })
        .await?;
   Ok(())
}

Recovery

(The details of recovery are considered an implementation detail and should not be relied on to be stable, but are documented here for posterity.)

If the watch connection is interrupted then we attempt to restart the watch using the last resource versions that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. If this fails because the resource version is no longer valid then we start over with a new stream, starting with an Event::Restarted.