1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
use crate::{ api::{Api, ListParams, Meta, WatchEvent}, Result, }; use futures::{lock::Mutex, Stream, StreamExt}; use serde::de::DeserializeOwned; use std::{sync::Arc, time::Duration}; /// An event informer for a Kubernetes ['Api'] resource /// /// This observes events on an `Api<K>` and tracks last seen versions. /// As per the kubernetes documentation, this is an abstraction that can /// [efficiently detect changes](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) /// /// In the case where kubernetes returns 410 Gone (desynced / watched for too long) /// this object will reset the informer ensuring that it always keeps running. /// /// This means that you might occasionally get some duplicate added events, /// but we have configured timeouts such that this should not happen frequently. /// /// On boot, the initial watch causes added events for every currently live object. /// /// Because of https://github.com/clux/kube-rs/issues/219 we recommend you use this /// with kubernetes >= 1.16 and watch bookmarks enabled. #[derive(Clone)] pub struct Informer<K> where K: Clone + DeserializeOwned + Meta, { version: Arc<Mutex<String>>, api: Api<K>, params: ListParams, needs_resync: Arc<Mutex<bool>>, } impl<K> Informer<K> where K: Clone + DeserializeOwned + Meta, { /// Create an informer on an api resource pub fn new(api: Api<K>) -> Self { Informer { api, params: ListParams::default(), version: Arc::new(Mutex::new(0.to_string())), needs_resync: Arc::new(Mutex::new(false)), } } /// Modify the default watch parameters for the underlying watch pub fn params(mut self, lp: ListParams) -> Self { self.params = lp; self } /// Override the version to an externally tracked version /// /// Prefer not using this. Even if you track previous resource versions, /// you will miss deleted events if you have any downtime. /// /// Controllers/finalizers/ownerReferences are the preferred ways /// to garbage collect related resources. pub fn set_version(self, v: String) -> Self { debug!("Setting Informer version for {} to {}", self.api.resource.kind, v); // We need to block on this as our mutex needs go be async compatible futures::executor::block_on(async { *self.version.lock().await = v; }); self } /// Reset the resourceVersion to 0 /// /// This will trigger new Added events for all existing resources pub async fn reset(&self) { *self.version.lock().await = 0.to_string(); } /// Return the current version pub fn version(&self) -> String { // We need to block on a future here quickly // to get a lock on our version futures::executor::block_on(async { self.version.lock().await.clone() }) } /// Start a single watch stream /// /// Opens a long polling GET and returns a stream of WatchEvents. /// You should always poll. When this call ends, call it again. /// Do not call it from more than one context. /// /// All real errors are bubbled up, as are WachEvent::Error instances. /// If we are desynced we force a 10s wait 10s before starting the poll. /// /// If you need to track the `resourceVersion` you can use `Informer::version()`. pub async fn poll(&self) -> Result<impl Stream<Item = Result<WatchEvent<K>>>> { trace!("Watching {}", self.api.resource.kind); // First check if we need to backoff or reset our resourceVersion from last time { let mut needs_resync = self.needs_resync.lock().await; if *needs_resync { // Try again in a bit let dur = Duration::from_secs(10); tokio::time::delay_for(dur).await; // If we are outside history, start over from latest if *needs_resync { self.reset().await; } *needs_resync = false; } } // Clone Arcs for stream handling let version = self.version.clone(); let needs_resync = self.needs_resync.clone(); // Start watching from our previous watch point let resource_version = self.version.lock().await.clone(); let stream = self.api.watch(&self.params, &resource_version).await?; // Intercept stream elements to update internal resourceVersion let newstream = stream.then(move |event| { // Clone our Arcs for each event let needs_resync = needs_resync.clone(); let version = version.clone(); async move { // Check if we need to update our version based on the incoming events match &event { Ok(WatchEvent::Added(o)) | Ok(WatchEvent::Modified(o)) | Ok(WatchEvent::Deleted(o)) | Ok(WatchEvent::Bookmark(o)) => { // always store the last seen resourceVersion if let Some(nv) = Meta::resource_ver(o) { *version.lock().await = nv.clone(); } } Ok(WatchEvent::Error(e)) => { // 410 Gone => we need to restart from latest next call if e.code == 410 { warn!("Stream desynced: {:?}", e); *needs_resync.lock().await = true; } } Err(e) => { // All we seem to get here are: // - EOFs (mostly solved with timeout enforcement + resyncs) // - serde errors (bad struct use, on app side) // Not much we can do about these here. warn!("Unexpected watch error: {:?}", e); } }; event } }); Ok(newstream) } }