1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use crate::{
    api::{Api, ListParams, Meta, WatchEvent},
    Result,
};

use futures::{lock::Mutex, Stream, StreamExt};
use serde::de::DeserializeOwned;
use std::{sync::Arc, time::Duration};

/// An event informer for a Kubernetes ['Api'] resource
///
/// This observes events on an `Api<K>` and tracks last seen versions.
/// As per the kubernetes documentation, this is an abstraction that can
/// [efficiently detect changes](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
///
/// In the case where kubernetes returns 410 Gone (desynced / watched for too long)
/// this object will reset the informer ensuring that it always keeps running.
///
/// This means that you might occasionally get some duplicate added events,
/// but we have configured timeouts such that this should not happen frequently.
///
/// On boot, the initial watch causes added events for every currently live object.
///
/// Because of https://github.com/clux/kube-rs/issues/219 we recommend you use this
/// with kubernetes >= 1.16 and watch bookmarks enabled.
#[derive(Clone)]
pub struct Informer<K>
where
    K: Clone + DeserializeOwned + Meta,
{
    version: Arc<Mutex<String>>,
    api: Api<K>,
    params: ListParams,
    needs_resync: Arc<Mutex<bool>>,
}

impl<K> Informer<K>
where
    K: Clone + DeserializeOwned + Meta,
{
    /// Create an informer on an api resource
    pub fn new(api: Api<K>) -> Self {
        Informer {
            api,
            params: ListParams::default(),
            version: Arc::new(Mutex::new(0.to_string())),
            needs_resync: Arc::new(Mutex::new(false)),
        }
    }

    /// Modify the default watch parameters for the underlying watch
    pub fn params(mut self, lp: ListParams) -> Self {
        self.params = lp;
        self
    }

    /// Override the version to an externally tracked version
    ///
    /// Prefer not using this. Even if you track previous resource versions,
    /// you will miss deleted events if you have any downtime.
    ///
    /// Controllers/finalizers/ownerReferences are the preferred ways
    /// to garbage collect related resources.
    pub fn set_version(self, v: String) -> Self {
        debug!("Setting Informer version for {} to {}", self.api.resource.kind, v);

        // We need to block on this as our mutex needs go be async compatible
        futures::executor::block_on(async {
            *self.version.lock().await = v;
        });
        self
    }

    /// Reset the resourceVersion to 0
    ///
    /// This will trigger new Added events for all existing resources
    pub async fn reset(&self) {
        *self.version.lock().await = 0.to_string();
    }

    /// Return the current version
    pub fn version(&self) -> String {
        // We need to block on a future here quickly
        // to get a lock on our version
        futures::executor::block_on(async { self.version.lock().await.clone() })
    }

    /// Start a single watch stream
    ///
    /// Opens a long polling GET and returns a stream of WatchEvents.
    /// You should always poll. When this call ends, call it again.
    /// Do not call it from more than one context.
    ///
    /// All real errors are bubbled up, as are WachEvent::Error instances.
    /// If we are desynced we force a 10s wait 10s before starting the poll.
    ///
    /// If you need to track the `resourceVersion` you can use `Informer::version()`.
    pub async fn poll(&self) -> Result<impl Stream<Item = Result<WatchEvent<K>>>> {
        trace!("Watching {}", self.api.resource.kind);

        // First check if we need to backoff or reset our resourceVersion from last time
        {
            let mut needs_resync = self.needs_resync.lock().await;
            if *needs_resync {
                // Try again in a bit
                let dur = Duration::from_secs(10);
                tokio::time::delay_for(dur).await;
                // If we are outside history, start over from latest
                if *needs_resync {
                    self.reset().await;
                }
                *needs_resync = false;
            }
        }

        // Clone Arcs for stream handling
        let version = self.version.clone();
        let needs_resync = self.needs_resync.clone();

        // Start watching from our previous watch point
        let resource_version = self.version.lock().await.clone();
        let stream = self.api.watch(&self.params, &resource_version).await?;

        // Intercept stream elements to update internal resourceVersion
        let newstream = stream.then(move |event| {
            // Clone our Arcs for each event
            let needs_resync = needs_resync.clone();
            let version = version.clone();
            async move {
                // Check if we need to update our version based on the incoming events
                match &event {
                    Ok(WatchEvent::Added(o))
                    | Ok(WatchEvent::Modified(o))
                    | Ok(WatchEvent::Deleted(o))
                    | Ok(WatchEvent::Bookmark(o)) => {
                        // always store the last seen resourceVersion
                        if let Some(nv) = Meta::resource_ver(o) {
                            *version.lock().await = nv.clone();
                        }
                    }
                    Ok(WatchEvent::Error(e)) => {
                        // 410 Gone => we need to restart from latest next call
                        if e.code == 410 {
                            warn!("Stream desynced: {:?}", e);
                            *needs_resync.lock().await = true;
                        }
                    }
                    Err(e) => {
                        // All we seem to get here are:
                        // - EOFs (mostly solved with timeout enforcement + resyncs)
                        // - serde errors (bad struct use, on app side)
                        // Not much we can do about these here.
                        warn!("Unexpected watch error: {:?}", e);
                    }
                };
                event
            }
        });
        Ok(newstream)
    }
}